Uso de la fuente de distribución de datos modificados en Azure Databricks

La fuente de cambios de datos (CDF) realiza un seguimiento de los cambios a nivel de fila entre versiones de una tabla de Delta Lake o de una tabla Apache Iceberg v3.

Azure Databricks admite dos enfoques:

Puede utilizar la fuente de cambios de datos para casos de uso habituales de datos, entre los que se incluyen:

  • Canales ETL incrementales que procesan solo las filas que han cambiado desde la última ejecución del canal.
  • Registros de auditoría que registran los cambios en los datos para fines de cumplimiento normativo y gobernanza.
  • Cargas de trabajo de replicación de datos que sincronizan los cambios con tablas, cachés o sistemas externos de destino.

Flujo automático de datos de cambios

Importante

Esta característica está en versión preliminar pública. Los administradores del área de trabajo pueden controlar el acceso a esta característica desde la página Vistas previas . Consulte Administrar versiones preliminares de Azure Databricks.

La fuente automática de datos de cambios calcula los cambios a nivel de fila en el momento de la consulta, en lugar de en el momento de la escritura, mediante el seguimiento de fila para Delta Lake y el linaje de fila para Apache Iceberg v3. A diferencia de la fuente de distribución de datos de cambios heredada, la fuente de distribución de datos de cambios automática no requiere una configuración de tabla individual y funciona en tablas de Delta Lake y tablas de Apache Iceberg v3.

Dado que los cambios no se calculan en cada operación de escritura de MERGE INTO y UPDATE, la fuente de datos de cambios automática mejora el rendimiento de escritura y reduce los costes de almacenamiento frente a la fuente de datos de cambios heredada.

La fuente automática de datos de cambios usa las mismas API de table_changes() y readChangeFeed que la fuente heredada de datos de cambios, y funciona con consultas por lotes, Structured Streaming y Databricks-to-Databricks Delta Lake Sharing. Consulte Leer cambios en consultas por lotes y Procesar de forma incremental los datos de cambios.

Requisitos

  • Databricks Runtime 18 o superior
  • Un formato de tabla compatible que está registrado en el catálogo de Unity:
    • Una tabla administrada en formato Delta Lake con seguimiento de filas habilitado o en formato Iceberg v3.
    • Una tabla externa en formato Delta Lake con el seguimiento de filas habilitado.

Consulte los tipos de tablas de Unity Catalog de Databricks.

Nota

La fuente de datos de cambios no forma parte de la especificación de Apache Iceberg. Los lectores de Azure Databricks pueden consultar la fuente automática de datos de cambios para las tablas Apache Iceberg v3, pero los lectores externos de Iceberg no pueden. Consulte las especificaciones de la tabla de Iceberg.

En Delta Lake, solo los lectores de Azure Databricks pueden consultar la fuente de datos de cambios automática.

Usar la fuente de cambios de datos

Para usar la fuente de cambios de datos, compruebe que está usando una tabla que cumpla los requisitos. Vea Requisitos.

Para leer por lotes la fuente de datos de cambios, haga lo siguiente:

Python

spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("<table_name>")

Scala

spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("<table_name>")

SQL

SELECT * FROM table_changes('<table_name>', 0)

Para más información sobre las lecturas por lotes para la fuente de distribución de datos modificados, consulte Lectura de cambios en consultas por lotes.

Para leer la fuente de cambios de datos en streaming, haga lo siguiente:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("<table_name>")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("<table_name>")

Para obtener más información sobre las lecturas en streaming de la fuente de datos de cambios, consulte Procesar de forma incremental los datos de cambios.

Migrar desde la fuente de cambios de datos heredada

Para migrar una tabla de Delta Lake desde la fuente de distribución de datos de cambios heredada a la fuente de distribución de datos de cambios automática, haga lo siguiente:

  1. Compruebe que la tabla cumple los requisitos.
  2. Para desactivar la fuente de datos de cambios heredada, ejecute el siguiente comando:
ALTER TABLE <table_name> UNSET TBLPROPERTIES ('delta.enableChangeDataFeed');

No se pueden usar a la vez canales de datos de cambios heredados y automáticos.

Esquema de fuente de distribución de datos modificados

Cuando se leen datos de la fuente de datos de cambios de una tabla, la consulta usa el esquema de la versión más reciente de la tabla. Azure Databricks admite la mayoría de las operaciones de cambio y evolución del esquema, pero las tablas con asignación de columnas tienen limitaciones. Consulte Tablas con asignación de columnas.

Además de las columnas de datos del esquema de la tabla delta Lake, la fuente de distribución de datos modificados contiene columnas de metadatos que identifican el tipo de evento de cambio:

Nombre de la columna Tipo Values
_change_type Cadena Contiene: insert, update_preimage, update_postimage, delete.
preimage es el valor antes de la actualización, postimage es el valor después de la actualización.
_commit_version Long Contiene: el registro Delta o la versión de la tabla que contiene el cambio.
_commit_timestamp Timestamp Contiene: la marca de tiempo asociada al crear la confirmación.

Si el esquema contiene columnas con los mismos nombres que estas columnas de metadatos, no se puede usar la fuente de cambios de datos en la tabla. Antes de activar la fuente de cambios de datos, cambie el nombre de las columnas en la tabla para resolver este conflicto.

Procesa de forma incremental los datos de cambios

Databricks recomienda que utilice la fuente de datos de cambios en combinación con Structured Streaming para procesar de forma incremental los cambios de las tablas. Debe usar Structured Streaming para Azure Databricks para realizar un seguimiento automático de las versiones de la fuente de datos de cambios de la tabla. Para el procesamiento CDC con tablas de tipo 1 o tipo 2 de SCD, consulte Las API CDC automáticas: Simplificación de la captura de datos modificados con canalizaciones.

Cuando el flujo se inicia por primera vez, la fuente de cambios de datos devuelve la instantánea más reciente de la tabla en forma de registros INSERT y, a continuación, devuelve los cambios posteriores como datos de cambios. Los feeds de cambios de datos registran tanto los cambios en los datos como las nuevas filas de datos en el registro de transacciones de la tabla al mismo tiempo.

Para configurar un flujo para leer la fuente de datos de cambios de una tabla, establezca la opción readChangeFeed en true de la siguiente manera:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myTable")

Límites de tarifas

Azure Databricks admite límites de velocidad (maxFilesPerTrigger, maxBytesPerTrigger) y excludeRegex al leer datos modificados. Para obtener una lista completa de las opciones de streaming de Delta Lake, consulte Delta Lake.

Opcionalmente, puede especificar una versión inicial; consulte Especificar una versión inicial. En el caso de las versiones distintas de la instantánea inicial, los límites de velocidad se aplican de forma atómica a las confirmaciones completas. El lote actual incluye toda la confirmación o el lote actual aplaza la confirmación al siguiente lote.

Ver historial de tabla

Un flujo de cambios de datos no está pensado para ser un registro permanente de todos los cambios de una tabla. Solo registra los cambios que se producen después de que se haya habilitado la fuente de cambios de datos. Puede iniciar una nueva lectura de streaming para capturar la versión actual y todos los cambios posteriores.

Los registros de la fuente de distribución de datos modificados son transitorios y solo son accesibles para una ventana de retención especificada. Los registros de transacciones eliminan periódicamente las versiones de la tabla y sus correspondientes versiones de la fuente de cambios de datos. Cuando se quita una versión, ya no se puede leer el flujo de datos de cambios de esa versión.

Archivar los datos de cambios para mantener un historial permanente

Si el caso de uso requiere que mantenga un historial permanente de todos los cambios en una tabla, use lógica incremental para escribir registros de la fuente de distribución de datos modificados en una nueva tabla.

En el ejemplo siguiente se muestra cómo usar trigger.AvailableNow para procesar los datos disponibles como una carga de trabajo por lotes para auditar o reproducir cambios completos:

Python
(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)
Scala
spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Especificar una versión inicial

Para leer los cambios de un punto específico, especifique una versión inicial mediante una marca de tiempo o un número de versión. Las versiones iniciales son necesarias para las lecturas por lotes. Opcionalmente, puede especificar una versión final para limitar el intervalo. Para obtener más información sobre el historial de tablas, consulte ¿Qué es el viaje en el tiempo?.

Al configurar cargas de trabajo de Structured Streaming que usan la fuente de distribución de datos modificados, especificar una versión inicial podría afectar al rendimiento del procesamiento:

  • Las nuevas canalizaciones de procesamiento de datos suelen beneficiarse del comportamiento predeterminado, que registra todos los registros existentes de la tabla como INSERT operaciones cuando se inicia la secuencia por primera vez.
  • Si la tabla de destino ya contiene todos los registros con los cambios adecuados hasta un punto determinado, especifique una versión inicial para evitar procesar el estado de la tabla de origen como eventos de INSERT.

En el ejemplo siguiente se muestra cómo recuperarse de un error de streaming con un punto de control dañado. En este ejemplo, supongamos las condiciones siguientes:

  1. La alimentación de datos de cambios fue habilitada en la tabla de origen cuando se creó la tabla.
  2. La tabla de destino aguas abajo procesó todos los cambios hasta la versión 75, incluida.
  3. El historial de versiones de la tabla de origen está disponible para las versiones 70 y posteriores.

Al definir la secuencia de escritura en la tabla de destino existente, debe especificar una nueva ubicación de punto de control:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
  .writeStream
  .option("checkpointLocation", "<new-checkpoint-path>")
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
  .writeStream
  .option("checkpointLocation", "<new-checkpoint-path>")
  .toTable("target_table")

Importante

Si especifica una versión inicial y esa versión no está disponible en el historial de tablas, la secuencia no se inicia desde un nuevo punto de control. Dado que las tablas administradas limpian automáticamente las versiones históricas, todas las versiones iniciales especificadas se eliminan finalmente.

Consulte Historial de tablas de reproducción.

Leer cambios en las consultas por lotes

Puede usar la sintaxis de consulta por lotes para leer todos los cambios a partir de una versión determinada o para leer los cambios dentro de un intervalo especificado de versiones de la siguiente manera:

  • Especifique versiones como enteros y marcas de tiempo como cadenas con el formato yyyy-MM-dd[ HH:mm:ss[.SSS]].
  • Las versiones iniciales y finales son inclusivas. Para leer desde una versión inicial a la versión más reciente, especifique solo la versión inicial.
  • Si especifica una versión anterior a la habilitación de la fuente de cambios de datos, se produce un error.

Para usar lecturas por lotes con opciones de versión inicial y final, haga lo siguiente:

SQL

Para leer de la versión 0 a 10, haga lo siguiente:

SELECT * FROM table_changes('tableName', 0, 10)

Para leer entre dos versiones con marcas temporales, haga lo siguiente:

--
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

Para leer de una versión inicial a la más reciente, haga lo siguiente:

SELECT * FROM table_changes('tableName', 0)

Para leer los cambios de una tabla con caracteres especiales en el nombre, haga lo siguiente:

SELECT * FROM table_changes('`schema`.`dotted.tableName`', '2021-04-21 06:45:46', '2021-05-21 12:00:00')

Consulte table_changes función con valores de tabla.

Python

Para leer de la versión 0 a 10, haga lo siguiente:

spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

Para leer entre dos marcas de tiempo, haga lo siguiente:

spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

Para leer de una versión inicial a la más reciente, haga lo siguiente:

spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

Para leer de la versión 0 a 10, haga lo siguiente:

spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

Para leer entre dos marcas temporales, haga lo siguiente:

spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

Para leer de una versión inicial a la más reciente, haga lo siguiente:

spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Manejar versiones fuera de rango

De forma predeterminada, si especifica una versión o marca de tiempo que supera la última confirmación, la consulta devuelve el error timestampGreaterThanLatestCommit.

En Databricks Runtime 11.3 LTS y versiones posteriores, puede habilitar la tolerancia para las versiones fuera del rango de la siguiente manera:

SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Cuando esta configuración está habilitada, la consulta devuelve resultados diferentes como se indica a continuación:

  • Una versión inicial o marca de tiempo más allá de la última confirmación devuelve un resultado vacío.
  • Una versión final o marca de tiempo más allá de la última confirmación devuelve todos los cambios del inicio a la última confirmación.

Fuente de distribución de datos de cambios heredada para Delta Lake

La fuente de distribución de datos de cambios heredada requiere una configuración manual para tablas individuales de Delta Lake. Dado que la fuente de datos de cambios no está incluida en la especificación de Apache Iceberg, las tablas de Apache Iceberg no son compatibles. Databricks recomienda que migre a la fuente de cambios de datos automática. Consulte Migrar desde la fuente de cambios de datos heredada.

Cuando está activado el canal de cambios de datos antiguo, el entorno de ejecución registra eventos de cambio para todos los datos que se escriben en la tabla. Esto incluye los datos de fila junto con los metadatos que indican si la fila especificada se insertó, eliminó o actualizó.

El feed de datos de cambios antiguo usa las mismas API de lectura readChangeFeed y table_changes() que el feed de datos de cambios automático. Consulte Procesamiento incremental de los datos modificados y Lectura de los cambios en las consultas por lotes.

Activa la fuente de datos de cambios heredada

Debe habilitar explícitamente la fuente heredada de captura de datos modificados para cada tabla. Utilice uno de los métodos siguientes:

Tabla nueva

Establezca la propiedad de tabla delta.enableChangeDataFeed = true en el comando CREATE TABLE.

CREATE TABLE student (id INT, name STRING, age INT)
  TBLPROPERTIES (delta.enableChangeDataFeed = true)

Nota

Si desactiva la fuente de distribución de datos de cambios heredada durante cualquier intervalo de tiempo y, a continuación, vuelve a activarla, el intervalo no será consultable. Use la fuente de distribución automática de datos de cambios para consultar los cambios durante el intervalo. Consulte Fuente de distribución automática de datos de cambios.

Tabla existente

Establezca la propiedad de tabla delta.enableChangeDataFeed = true en el comando ALTER TABLE.

ALTER TABLE myDeltaTable
  SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

Consideraciones sobre el almacenamiento

Las tablas administradas registran los cambios de datos de forma eficaz y pueden usar otras características para optimizar el diseño de almacenamiento.

Con la fuente de datos de cambios anterior, debe tener en cuenta el siguiente comportamiento de almacenamiento:

  • Es posible que vea un pequeño aumento de los costos de almacenamiento, ya que es posible que los cambios se registren en archivos independientes.
  • Algunas operaciones, como las de solo inserción o las eliminaciones de partición completa, no generan archivos de datos de cambios. Azure Databricks calcula la fuente de cambios de datos directamente a partir del registro de transacciones.
  • Los archivos de datos modificados usan la directiva de retención de la tabla. El VACUUM comando elimina los archivos de datos modificados y los cambios del registro de transacciones usan la directiva de retención de puntos de control.

Databricks recomienda no intentar reconstruir la fuente de distribución de datos modificados consultando directamente los archivos de datos modificados. Use siempre las API de Delta Lake y Apache Iceberg.

Limitaciones

Tenga en cuenta las siguientes limitaciones para las fuentes de distribución de datos modificados:

Tablas con asignación de columnas

Con la asignación de columnas habilitada en una tabla de Delta Lake, puede quitar o cambiar el nombre de las columnas sin volver a escribir archivos de datos. Consulte Renombrar y eliminar columnas con el mapeo de columnas de Delta Lake.

Sin embargo, las fuentes de distribución de datos modificados tienen limitaciones después de cambios de esquema no aditivos. Los cambios de esquema no aditivos incluyen las siguientes operaciones:

No se pueden leer fuentes de distribución de datos modificados para una transacción o intervalo en el que se produce un cambio de esquema no aditivo.

Para permitir cambios de esquema no aditivos antes o después del intervalo especificado de lecturas por lotes, las consultas usan el esquema de la versión final del intervalo en lugar de la versión de tabla más reciente. Las consultas siguen produciendo un error si el intervalo de versiones abarca un cambio de esquema no aditivo.

Flujo automático de datos de cambios

  • Dado que el flujo de datos de cambios no es compatible con la especificación de Apache Iceberg, los clientes externos de Iceberg no pueden consultar el flujo automático de datos de cambios. Consulte las especificaciones de la tabla de Iceberg.
  • En el caso de las transacciones de múltiples instrucciones, si la tabla de origen se modificó durante la transacción, no se admite la fuente automática de datos de cambios.
  • La fuente automática de cambios de datos no es compatible con tablas con filtros por fila o máscaras de columna. Consulte Filtros de fila y máscaras de columna.
  • Las consultas del flujo de datos de cambios no pueden abarcar versiones de la tabla en las que se haya producido un cambio de esquema no aditivo, como el cambio de nombre de una columna, la eliminación de una columna o un cambio en el tipo de datos. Divida la consulta en intervalos antes y después del cambio del esquema.