Usare il feed di dati delle modifiche in Azure Databricks

Il change data feed (CDF) monitora le modifiche a livello di riga tra le versioni di una tabella Delta Lake o di una tabella Apache Iceberg v3.

Azure Databricks supporta due approcci:

  • Feed automatico dei dati di modifica: calcola le modifiche durante la lettura delle tabelle usando i metadati di provenienza delle righe. Questa operazione non richiede una singola configurazione di tabella e funziona nelle tabelle Delta Lake e Apache Iceberg v3. Vedere Feed automatico dei dati delle modifiche.
  • Feed di dati delle modifiche legacy: materializza le modifiche durante le scritture delle tabelle. Supporta solo le tabelle Delta Lake. Richiede una singola configurazione di tabella. Vedere Feed di dati delle modifiche legacy per Delta Lake.

È possibile usare il feed di dati delle modifiche per casi d'uso comuni dei dati, tra cui:

  • Pipeline ETL incrementali che elaborano solo le righe modificate dall'ultima esecuzione della pipeline.
  • Tracce di controllo che registrano le modifiche ai dati ai fini della conformità e della governance.
  • Carichi di lavoro di replica dei dati che sincronizzano le modifiche apportate a tabelle downstream, cache o sistemi esterni.

Feed automatico dei dati di modifica

Important

Questa funzionalità è in Anteprima Pubblica. Gli amministratori dell'area di lavoro possono controllare l'accesso a questa funzionalità dalla pagina Anteprime . Vedere Gestire le anteprime di Azure Databricks.

Il feed automatico dei dati di modifica calcola le modifiche a livello di riga al momento della query, anziché in fase di scrittura, utilizzando il rilevamento delle righe per Delta Lake e la tracciabilità delle righe per Apache Iceberg v3. A differenza del feed di dati delle modifiche legacy, il feed di dati delle modifiche automatico non richiede la configurazione di singole tabelle e funziona nelle tabelle Delta Lake e nelle tabelle Apache Iceberg v3.

Poiché le modifiche non vengono calcolate a ogni scrittura per le operazioni MERGE INTO e UPDATE, il feed automatico dei dati sulle modifiche migliora le prestazioni di scrittura e riduce i costi di archiviazione rispetto al feed dei dati sulle modifiche precedente.

Il feed automatico dei dati di modifica utilizza le stesse API table_changes() e readChangeFeed del feed dei dati di modifica precedente e supporta query batch, Structured Streaming e Databricks-to-Databricks Delta Lake Sharing. Vedi Leggere le modifiche nelle query in batch e Elaborare in modo incrementale i dati di modifica.

Requisiti

  • Databricks Runtime 18 o versione successiva
  • Formato di tabella supportato registrato in Unity Catalog:
    • Tabella gestita in formato Delta Lake con rilevamento delle righe abilitato o in formato Iceberg v3.
    • Tabella esterna in formato Delta Lake con il rilevamento delle righe abilitato.

Vedi i tipi di tabella di Databricks Unity Catalog.

Note

Il feed dei dati di modifica non fa parte della specifica Apache Iceberg. I lettori di Azure Databricks possono eseguire query sul feed automatico dei dati di modifica per le tabelle Apache Iceberg v3, ma i lettori Iceberg esterni non possono interrogarlo. Consulta la specifica della tabella Iceberg.

Per Delta Lake, solo i lettori di Azure Databricks possono eseguire query del feed automatico dei dati di modifica.

Usa il feed delle modifiche ai dati

Per usare il feed delle modifiche ai dati, verifica di usare una tabella che soddisfi i requisiti. Vedere Requisiti.

Per leggere in batch il feed dei dati di modifica, procedere come segue:

Python

spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("<table_name>")

Scala

spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("<table_name>")

SQL

SELECT * FROM table_changes('<table_name>', 0)

Per altre informazioni sulle letture in batch per il feed dei dati di modifica, vedere Leggere le modifiche nelle query in batch.

Per leggere il feed dei dati di modifica in streaming, procedere come segue:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("<table_name>")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("<table_name>")

Per altre informazioni sulla lettura in streaming per il feed dei dati di modifica, vedere Elaborare in modo incrementale i dati delle modifiche.

Migrare dal feed dei dati di modifica precedente

Per eseguire la migrazione di una tabella Delta Lake dal feed di dati delle modifiche legacy al feed di dati delle modifiche automatico, eseguire le operazioni seguenti:

  1. Verificare che la tabella soddisfi i requisiti.
  2. Disattivare il feed di dati delle modifiche legacy eseguendo il comando seguente:
ALTER TABLE <table_name> UNSET TBLPROPERTIES ('delta.enableChangeDataFeed');

Non è possibile usare insieme feed di dati di modifica legacy e automatici.

Schema del feed di dati delle modifiche

Quando si legge dal feed delle modifiche ai dati di una tabella, la query usa lo schema della versione più recente della tabella. Azure Databricks supporta la maggior parte delle operazioni di modifica ed evoluzione dello schema, ma le tabelle con mapping delle colonne presentano limitazioni. Vedi Tabelle con mappatura delle colonne.

Oltre alle colonne di dati dello schema della tabella Delta Lake, il feed di dati delle modifiche contiene colonne di metadati che identificano il tipo di evento di modifica:

Nome della colonna Tipo Values
_change_type Stringa Contiene: insert, update_preimage, update_postimage, delete.
preimage è il valore prima dell'aggiornamento, postimage è il valore dopo l'aggiornamento.
_commit_version Long Contiene: la versione del log o della tabella Delta contenente la modifica.
_commit_timestamp Timestamp Contiene: timestamp associato al momento della creazione del commit.

Se lo schema contiene colonne con gli stessi nomi di queste colonne di metadati, non è possibile usare il feed di dati delle modifiche in una tabella. Prima di attivare il feed di dati delle modifiche, rinominare le colonne nella tabella per risolvere il conflitto.

Elaborare in modo incrementale i dati delle modifiche

Databricks consiglia di usare il feed di dati delle modifiche in combinazione con Structured Streaming per elaborare in modo incrementale le modifiche dalle tabelle. È necessario usare Structured Streaming per Azure Databricks per tenere traccia automatica delle versioni per il feed di dati delle modifiche della tabella. Per l'elaborazione CDC con le tabelle SCD di tipo 1 o di tipo 2, vedere Le API AUTO CDC: Semplificare la Change Data Capture con le pipeline.

All'avvio del flusso, il feed dei dati di modifica restituisce l'istantanea più recente della tabella sotto forma di record INSERT e successivamente le modifiche future come dati di modifica. I feed di modifica dei dati registrano contemporaneamente nel log delle transazioni della tabella sia i dati modificati sia le nuove righe.

Per configurare un flusso per leggere il feed di dati delle modifiche di una tabella, impostare l'opzione readChangeFeed su true come segue:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myTable")

Limitazioni di velocità

Azure Databricks supporta limiti di frequenza (maxFilesPerTrigger, maxBytesPerTrigger) e excludeRegex durante la lettura dei dati delle modifiche. Per un elenco completo delle opzioni delta Lake di streaming, vedere Delta Lake.

Facoltativamente, è possibile specificare una versione iniziale, vedere Specificare una versione iniziale. Per le versioni diverse dallo snapshot iniziale, i limiti di frequenza si applicano in modo atomico a interi commit. Il batch corrente include l'intero commit o il batch corrente rinvia il commit al batch successivo.

Cronologia delle tabelle rivisitate

Un feed di dati delle modifiche non deve fungere da record permanente di tutte le modifiche apportate a una tabella. Registra solo le modifiche apportate dopo l'abilitazione del feed di dati delle modifiche. È possibile avviare una nuova lettura di streaming per acquisire la versione corrente e tutte le modifiche successive.

I record nel feed di dati delle modifiche sono temporanei e sono accessibili solo per una finestra di conservazione specificata. I log delle transazioni rimuovono le versioni delle tabelle e le corrispondenti versioni del feed dei dati di modifica a intervalli regolari. Quando una versione viene rimossa, non è più possibile leggere il feed di dati delle modifiche per tale versione.

Archiviare i dati delle modifiche per la cronologia permanente

Se il caso d'uso richiede di mantenere una cronologia permanente di tutte le modifiche apportate a una tabella, usare la logica incrementale per scrivere record dal feed di dati delle modifiche a una nuova tabella.

L'esempio seguente mostra come usare trigger.AvailableNow per elaborare i dati disponibili come carico di lavoro batch per l'audit o per i replay completi delle modifiche:

Python
(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)
Scala
spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Specificare una versione iniziale

Per leggere le modifiche da un punto specifico, specificare una versione iniziale usando un timestamp o un numero di versione. Le versioni iniziali sono necessarie per le letture batch. Facoltativamente, è possibile specificare una versione finale per limitare l'intervallo. Per altre informazioni sulla cronologia delle tabelle, vedere Che cos'è il viaggio temporale?.

Quando si configurano carichi di lavoro Structured Streaming che usano feed di dati delle modifiche, la specifica di una versione iniziale potrebbe influire sulle prestazioni di elaborazione:

  • Le nuove pipeline di elaborazione dati in genere traggono vantaggio dal comportamento predefinito, che registra tutti i record esistenti nella tabella come INSERT operazioni all'avvio del flusso.
  • Se la tabella di destinazione contiene già tutti i record con modifiche appropriate fino a un determinato punto, specificare una versione iniziale per evitare di elaborare lo stato della tabella di origine come INSERT eventi.

L'esempio seguente illustra come eseguire il ripristino da un errore di streaming con un checkpoint danneggiato. In questo esempio si presuppongono le condizioni seguenti:

  1. Il feed di dati delle modifiche è stato abilitato nella tabella di origine al momento della creazione della tabella.
  2. La tabella di destinazione downstream ha elaborato tutte le modifiche fino alla versione 75 inclusa.
  3. La cronologia delle versioni per la tabella di origine è disponibile per le versioni 70 e successive.

Quando si definisce il flusso di scrittura nella tabella di destinazione esistente, è necessario specificare una nuova posizione del checkpoint:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
  .writeStream
  .option("checkpointLocation", "<new-checkpoint-path>")
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
  .writeStream
  .option("checkpointLocation", "<new-checkpoint-path>")
  .toTable("target_table")

Important

Se si specifica una versione iniziale e tale versione non è disponibile nella cronologia delle tabelle, il flusso non viene avviato da un nuovo checkpoint. Poiché le tabelle gestite puliscono automaticamente le versioni cronologiche, tutte le versioni iniziali specificate vengono eliminate.

Vedere Cronologia delle tabelle di riproduzione.

Leggere le modifiche nelle query batch

È possibile usare la sintassi delle query batch per leggere tutte le modifiche a partire da una determinata versione o per leggere le modifiche all'interno di un intervallo specificato di versioni come indicato di seguito:

  • Specificare le versioni come numeri interi e timestamp come stringhe nel formato yyyy-MM-dd[ HH:mm:ss[.SSS]].
  • Le versioni iniziali e finali sono inclusive. Per leggere da una versione iniziale alla versione più recente, specificare solo la versione iniziale.
  • Se si specifica una versione prima dell'abilitazione del feed di dati delle modifiche, viene generato un errore.

Per usare le letture batch con le opzioni della versione iniziale e finale, eseguire le operazioni seguenti:

SQL

Per leggere dalla versione 0 a 10, eseguire le operazioni seguenti:

SELECT * FROM table_changes('tableName', 0, 10)

Per leggere tra due versioni di timestamp, eseguire le operazioni seguenti:

--
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

Per leggere da una versione iniziale alla più recente, eseguire le operazioni seguenti:

SELECT * FROM table_changes('tableName', 0)

Per leggere le modifiche per una tabella con caratteri speciali nel nome, eseguire le operazioni seguenti:

SELECT * FROM table_changes('`schema`.`dotted.tableName`', '2021-04-21 06:45:46', '2021-05-21 12:00:00')

Vedere table_changes funzione con valori di tabella.

Python

Per leggere dalla versione 0 a 10, eseguire le operazioni seguenti:

spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

Per leggere tra due timestamp, eseguire le operazioni seguenti:

spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

Per leggere da una versione iniziale alla più recente, eseguire le operazioni seguenti:

spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

Per leggere dalla versione 0 a 10, eseguire le operazioni seguenti:

spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

Per leggere tra due timestamp, eseguire le operazioni seguenti:

spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

Per leggere da una versione iniziale alla più recente, eseguire le operazioni seguenti:

spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Gestire le versioni fuori dall'intervallo

Per impostazione predefinita, se si specifica una versione o un timestamp che supera l'ultimo commit, la query restituisce l'errore timestampGreaterThanLatestCommit.

In Databricks Runtime 11.3 LTS e versioni successive è possibile abilitare la tolleranza per le versioni fuori intervallo come indicato di seguito:

SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Quando questa configurazione è abilitata, la query restituisce risultati diversi nel modo seguente:

  • Una versione iniziale o un timestamp oltre l'ultimo commit restituisce un risultato vuoto.
  • Una versione finale o un timestamp oltre l'ultimo commit restituisce tutte le modifiche dall'inizio all'ultimo commit.

Feed di dati delle modifiche legacy per Delta Lake

Il flusso di dati delle modifiche legacy richiede una configurazione manuale delle singole tabelle Delta Lake. Poiché il feed di dati delle modifiche non è incluso nella specifica Apache Iceberg, le tabelle Apache Iceberg non sono supportate. Databricks consiglia di eseguire la migrazione al change data feed automatico. Vedi Eseguire la migrazione dal feed dei dati di modifica precedente.

Quando il feed di dati delle modifiche legacy è attivato, il runtime registra gli eventi di modifica per tutti i dati scritti nella tabella. Sono inclusi i dati di riga insieme ai metadati che indicano se la riga specificata è stata inserita, eliminata o aggiornata.

Il feed di dati di modifica legacy usa le stesse API di lettura readChangeFeed e table_changes() del feed di dati di modifica automatico. Vedi Elaborare in modo incrementale i dati di modifica e Leggere le modifiche nelle query in batch.

Abilitare il feed dei dati di modifica precedente

È necessario attivare in modo esplicito il feed di dati delle modifiche legacy in singole tabelle. Usa uno dei seguenti metodi:

Nuova tabella

Impostare la proprietà delta.enableChangeDataFeed = true table nel CREATE TABLE comando .

CREATE TABLE student (id INT, name STRING, age INT)
  TBLPROPERTIES (delta.enableChangeDataFeed = true)

Note

Se si disattivi il change data feed legacy per un certo periodo di tempo e quindi lo si riattivi, non sarà possibile eseguire query sull'intervallo. Usare il feed di dati delle modifiche automatico per eseguire query sulle modifiche durante l'intervallo. Vedere Feed automatico dei dati delle modifiche.

Tabella esistente

Impostare la proprietà delta.enableChangeDataFeed = true table nel ALTER TABLE comando .

ALTER TABLE myDeltaTable
  SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

Considerazioni sull'archiviazione

Le tabelle gestite registrano le modifiche dei dati in modo efficiente e potrebbero usare altre funzionalità per ottimizzare il layout di archiviazione.

Con il feed dei dati di modifica precedente, è necessario considerare il seguente comportamento di archiviazione:

  • È possibile che si verifichi un piccolo aumento dei costi di archiviazione perché le modifiche potrebbero essere registrate in file separati.
  • Alcune operazioni, ad esempio operazioni di solo inserimento o eliminazioni di intere partizioni, non generano file dei dati di modifica. Azure Databricks calcola il feed di dati delle modifiche direttamente dal log delle transazioni.
  • I file di dati di modifica utilizzano i criteri di conservazione della tabella. Il comando VACUUM elimina i file di dati di modifica e fa sì che le modifiche del log delle transazioni usino il criterio di conservazione dei checkpoint.

Databricks consiglia di non tentare di ricostruire il feed di dati delle modifiche eseguendo direttamente query sui file di dati delle modifiche. Usare sempre le API Delta Lake e Apache Iceberg.

Limitations

Considera le seguenti limitazioni per i feed dei dati di modifica:

Tabelle con mappatura delle colonne

Con il mapping delle colonne abilitato in una tabella Delta Lake, è possibile eliminare o rinominare colonne senza riscrivere i file di dati. Vedi Rinominare ed eliminare colonne con la mappatura delle colonne di Delta Lake.

Tuttavia, i feed dei dati di modifica presentano limitazioni in seguito a modifiche dello schema non additive. Le modifiche dello schema non additive includono le operazioni seguenti:

Non è possibile leggere i feed dei dati di modifica per una transazione o un intervallo in cui si verifica una modifica dello schema non additiva.

Per consentire modifiche dello schema non additive prima o dopo l'intervallo specificato di letture batch, le query usano lo schema della versione finale dell'intervallo anziché la versione più recente della tabella. Le interrogazioni falliscono ancora se l'intervallo di versioni si estende su una modifica dello schema non additiva.

Feed automatico dei dati delle modifiche

  • Poiché il feed di dati delle modifiche non è supportato nella specifica Apache Iceberg, i client Iceberg esterni non possono eseguire query sul feed di dati delle modifiche automatico. Consulta la specifica della tabella Iceberg.
  • Nelle transazioni con più istruzioni, se la tabella di origine è stata modificata durante la transazione, il feed automatico dei dati di modifica non è supportato.
  • Il feed automatico dei dati di modifica non è supportato nelle tabelle con filtri di riga o maschere di colonna. Vedere Filtri di riga e maschere di colonna.
  • Le query del feed dei dati di modifica non possono estendersi tra versioni di una tabella in cui si è verificata una modifica dello schema non additiva, ad esempio la rinomina o l'eliminazione di una colonna oppure una modifica del tipo di dati. Suddividere la query in intervalli prima e dopo la modifica dello schema.