Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Ändra dataflöde (CDF) spårar ändringar på radnivå mellan versioner av en Delta Lake-tabell eller Apache Iceberg v3-tabell.
Azure Databricks har stöd för två metoder:
- Automatisk ändringsdataflöde: Beräknar ändringar under tabellläsningar med hjälp av radradsmetadata. Detta kräver inte individuell tabellkonfiguration och fungerar på Delta Lake- och Apache Iceberg v3-tabeller. Se Automatisk ändringsdataflöde.
- Äldre ändringsdataflöde: Materialiserar ändringar under tabellskrivningar. Stöder endast Delta Lake-tabeller. Kräver individuell tabellkonfiguration. Se Äldre ändringsdataflöde för Delta Lake.
Du kan använda ändringsdataflöde för vanliga dataanvändningsfall, inklusive:
- Inkrementella ETL-pipelines som endast bearbetar de rader som har ändrats sedan den senaste pipelinekörningen.
- Revisionsloggar som registrerar dataändringar för krav på efterlevnad och styrning.
- Arbetsbelastningar för datareplikering som synkroniserar ändringar i underordnade tabeller, cacheminnen eller externa system.
Automatisk ändringsdataflöde
Viktig
Den här funktionen finns som allmänt tillgänglig förhandsversion. Arbetsyteadministratörer kan styra åtkomsten till den här funktionen från sidan Förhandsversioner . Se Hantera förhandsversioner av Azure Databricks.
Automatisk ändringsdataflöde beräknar ändringar på radnivå vid frågetid, i stället för vid skrivtid, med hjälp av radspårning för Delta Lake och rad härkomst för Apache Iceberg v3. Till skillnad från äldre ändringsdataflöde kräver automatisk ändringsdatafeed inte individuell tabellkonfiguration och fungerar på Delta Lake-tabeller och Apache Iceberg v3-tabeller.
Eftersom ändringar inte beräknas på varje skrivning för MERGE INTO och UPDATE åtgärder, förbättrar automatisk ändringsdataflöde skrivprestanda och minskar lagringskostnaderna jämfört med äldre ändringsdataflöde.
Automatiskt flöde för ändringsdata använder samma table_changes()-API:er och readChangeFeed-API:er som det äldre flödet för ändringsdata och fungerar med batchfrågor, Structured Streaming och Delta Lake-delning från Databricks till Databricks. Se Läsa ändringar i batchfrågor och Bearbeta ändringsdata stegvis.
krav
- Databricks Runtime 18 eller senare
- Ett tabellformat som stöds som är registrerat i Unity Catalog:
- En hanterad tabell i Delta Lake-format med radspårning aktiverat eller i Iceberg v3-format.
- En extern tabell i Delta Lake-format med radspårning aktiverat.
Se tabelltyper i Databricks Unity Catalog.
Note
Ändringsdataflöde är inte en del av Apache Iceberg-specifikationen. Azure Databricks läsare kan fråga automatiskt ändringsdataflöde för Apache Iceberg v3-tabeller, men externa Iceberg-läsare kan inte göra det. Se specifikationen för isbergstabellen.
För Delta Lake är det endast Azure Databricks-läsare som kan fråga det automatiska flödet för ändringsdata.
Använda ändringsdataflöde
Om du vill använda ändringsdataflöde kontrollerar du att du använder en tabell som uppfyller kraven. Se kraven.
Gör följande för att batchläsa ändringsdataflöde:
Python
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("<table_name>")
Scala
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("<table_name>")
SQL
SELECT * FROM table_changes('<table_name>', 0)
Mer information om batchläsningar för ändringsdataflöde finns i Läsa ändringar i batchfrågor.
Gör följande om du vill läsa ändringsdataflödet som en ström:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
Mer information om strömmande läsningar för ändringsdataflöde finns i Stegvis bearbeta ändringsdata.
Migrera från äldre ändringsdataflöde
Om du vill migrera en Delta Lake-tabell från äldre ändringsdataflöde till automatisk ändringsdatafeed gör du följande:
- Kontrollera att tabellen uppfyller kraven.
- Inaktivera äldre ändringsdataflöde genom att köra följande kommando:
ALTER TABLE <table_name> UNSET TBLPROPERTIES ('delta.enableChangeDataFeed');
Du kan inte använda både äldre och automatiska ändringsdataflöden tillsammans.
Ändra dataflödesschema
När du läser från ändringsdataflödet för en tabell använder frågan schemat för den senaste tabellversionen. Azure Databricks stöder de flesta schemaändringar och utvecklingsåtgärder, men tabeller med kolumnmappning har begränsningar. Se Tabeller med kolumnmappning.
Förutom datakolumnerna från schemat i Delta Lake-tabellen innehåller ändringsdataflödet metadatakolumner som identifierar typen av ändringshändelse:
| Kolumnnamn | Type | Värden |
|---|---|---|
_change_type |
String | Innehåller: insert, update_preimage, update_postimage, delete.preimage är värdet före uppdateringen, postimage är värdet efter uppdateringen. |
_commit_version |
Long | Innehåller: Delta-loggen eller tabellversionen som innehåller ändringen. |
_commit_timestamp |
Tidsstämpel | Innehåller: tidsstämpeln som är kopplad till när incheckningen skapades. |
Om schemat innehåller kolumner med samma namn som dessa metadatakolumner kan du inte använda ändringsdataflöde i en tabell. Innan du aktiverar ändringsdataflöde byter du namn på kolumner i tabellen för att lösa konflikten.
Bearbeta ändringsdata stegvis
Databricks rekommenderar att du använder ändringsdataflöde i kombination med Strukturerad direktuppspelning för att stegvis bearbeta ändringar från tabeller. Du måste använda Structured Streaming för Azure Databricks för att automatiskt spåra versioner för tabellens ändringsdataflöde. För CDC-bearbetning med SCD-tabeller av typ 1 eller typ 2, se API:er för AUTOMATISK CDC: Förenkla insamling av ändringsdata med pipelines.
När dataströmmen startar returnerar flödet för ändringsdata först den senaste ögonblicksbilden av tabellen i form av INSERT-poster och därefter efterföljande ändringar som ändringsdata. Ändringsdataflöden skriver både ändringsdata och nya datarader till tabellens transaktionslogg samtidigt.
Om du vill konfigurera en dataström för att läsa ändringsdataflödet för en tabell anger du alternativet readChangeFeed enligt true följande:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
Hastighetsbegränsningar
Azure Databricks stöder hastighetsbegränsningar (maxFilesPerTrigger, maxBytesPerTrigger) och excludeRegex vid läsning av ändringsdata. En fullständig lista över alternativ för strömning av Delta Lake finns i Delta Lake.
Du kan också ange en startversion, se Ange en startversion. För andra versioner än den ursprungliga ögonblicksbilden tillämpas frekvensbegränsningar atomiskt på hela commitar. Antingen innehåller den aktuella batchen hela incheckningen, eller så defersar den aktuella batchen incheckningen till nästa batch.
Spela upp tabellhistorik
Ett dataflöde för ändringar är inte avsett att fungera som ett permanent register över alla ändringar i en tabell. Den registrerar bara ändringar som inträffar efter att ändringsdataflödet har aktiverats. Du kan starta en ny strömmande läsning för att fånga upp den aktuella versionen och alla efterföljande ändringar.
Poster i ändringsdataflödet är tillfälliga och är endast tillgängliga för ett angivet kvarhållningsfönster. Transaktionsloggar tar bort tabellversioner och deras motsvarande ändringsdataflödesversioner med jämna mellanrum. När en version tas bort kan du inte längre läsa ändringsdataflödet för den versionen.
Arkivera ändringsdata för permanent historik
Om ditt användningsfall kräver att du har en permanent historik över alla ändringar i en tabell använder du inkrementell logik för att skriva poster från ändringsdataflödet till en ny tabell.
Följande exempel visar hur du använder trigger.AvailableNow för att bearbeta tillgängliga data som en batchprocess för granskning eller fullständig återuppspelning av ändringar:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
Ange en startversion
Om du vill läsa ändringar från en viss punkt anger du en startversion med antingen en tidsstämpel eller ett versionsnummer. Startversioner krävs för batchläsningar. Du kan också ange en slutversion för att begränsa intervallet. Mer information om tabellhistorik finns i Vad är tidsresor?.
När du konfigurerar strukturerade strömningsarbetsbelastningar som använder ändringsdataflöde kan det påverka bearbetningsprestanda om du anger en startversion:
- Nya databehandlingspipelines drar vanligtvis nytta av standardbeteendet, som registrerar alla befintliga poster i tabellen som
INSERToperationer när dataströmmen startar för första gången. - Om måltabellen redan innehåller alla poster med lämpliga ändringar upp till en viss punkt anger du en startversion för att undvika att bearbeta källtabelltillståndet som
INSERThändelser.
I följande exempel visas hur du återställer från ett strömningsfel med en skadad kontrollpunkt. I det här exemplet förutsätter du följande villkor:
- Ändringsdataflöde aktiverades i källtabellen när tabellen skapades.
- I den underordnade måltabellen bearbetas alla ändringar upp till och med version 75.
- Versionshistorik för källtabellen är tillgänglig för version 70 och senare.
När du definierar skrivströmmen till den befintliga måltabellen måste du ange en ny kontrollpunktsplats:
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
Viktig
Om du anger en startversion och den versionen inte är tillgänglig i tabellhistoriken kan strömmen inte starta från en ny kontrollpunkt. Eftersom hanterade tabeller rensar historiska versioner automatiskt tas alla angivna startversioner så småningom bort.
Läs ändringar i batchfrågor
Du kan använda batchfrågesyntax för att läsa alla ändringar från en viss version eller för att läsa ändringar inom ett angivet versionsintervall enligt följande:
- Ange versioner som heltal och tidsstämplar som strängar i formatet
yyyy-MM-dd[ HH:mm:ss[.SSS]]. - Start- och slutversioner är inkluderande. Om du vill läsa från en startversion till den senaste versionen anger du endast startversionen.
- Om du anger en version innan dataflödet för ändring aktiverades uppstår ett fel.
Om du vill använda batchläsningar med alternativ för start och slutversion gör du följande:
SQL
Om du vill läsa från version 0 till 10gör du följande:
SELECT * FROM table_changes('tableName', 0, 10)
Gör följande för att läsa mellan två tidsstämpelversioner:
--
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
Gör följande för att läsa från en startversion till den senaste:
SELECT * FROM table_changes('tableName', 0)
Om du vill läsa ändringar för en tabell med specialtecken i namnet gör du följande:
SELECT * FROM table_changes('`schema`.`dotted.tableName`', '2021-04-21 06:45:46', '2021-05-21 12:00:00')
Se table_changes tabellvärdesfunktion.
Python
Om du vill läsa från version 0 till 10gör du följande:
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
Gör följande för att läsa mellan två tidsstämplar:
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
Gör följande för att läsa från en startversion till den senaste:
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala
Om du vill läsa från version 0 till 10gör du följande:
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
Gör följande för att läsa mellan två tidsstämplar:
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
Gör följande för att läsa från en startversion till den senaste:
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
Hantera out-of-range-versioner
Som standard, om du anger en version eller tidsstämpel som är senare än den senaste incheckningen, returnerar frågan felet timestampGreaterThanLatestCommit.
I Databricks Runtime 11.3 LTS och senare kan du aktivera tolerans för versioner utanför intervallet på följande sätt:
SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
När den här konfigurationen är aktiverad returnerar frågan olika resultat enligt följande:
- En startversion eller tidsstämpel som är senare än den senaste incheckningen returnerar ett tomt resultat.
- En slutlig version eller tidsstämpel som ligger efter den senaste committen returnerar alla ändringar från startversionen till den senaste committen.
Äldre ändringsdataflöde för Delta Lake
Äldre ändringsdataflöde kräver manuell konfiguration för enskilda Delta Lake-tabeller. Eftersom ändringsdataflöde inte ingår i Apache Iceberg-specifikationen stöds inte Apache Iceberg-tabeller. Databricks rekommenderar att du migrerar till automatisk ändringsdatafeed. Se Migrera från äldre ändringsdataflöde.
När det äldre flödet för ändringsdata är aktiverat registrerar körmiljön ändringshändelser för all data som skrivs till tabellen. Detta inkluderar raddata tillsammans med metadata som anger om den angivna raden infogades, togs bort eller uppdaterades.
Det äldre ändringsdataflödet använder samma readChangeFeed- och table_changes()-API:er för läsning som automatiskt ändringsdataflöde. Se Bearbeta ändringsdata stegvis och Läs ändringar i batchfrågor.
Aktivera äldre ändringsdataflöde
Du måste uttryckligen aktivera äldre ändringsdataflöde i enskilda tabeller. Använd någon av följande metoder:
Ny tabell
Ange tabellegenskapen delta.enableChangeDataFeed = trueCREATE TABLE i kommandot .
CREATE TABLE student (id INT, name STRING, age INT)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
Note
Om du inaktiverar äldre ändringsdataflöde under ett tidsintervall och sedan aktiverar det igen, kommer intervallet inte att vara frågebart. Använd automatisk ändringsdatafeed för att fråga efter ändringar under intervallet. Se Automatisk ändringsdataflöde.
Befintlig tabell
Ange tabellegenskapen delta.enableChangeDataFeed = trueALTER TABLE i kommandot .
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Överväganden för lagring
Hanterade tabeller registrerar dataändringar effektivt och kan använda andra funktioner för att optimera lagringslayouten.
Med äldre ändringsdataflöde måste du tänka på följande lagringsbeteende:
- Du kan se en liten ökning av lagringskostnaderna eftersom ändringar kan registreras i separata filer.
- Vissa åtgärder, till exempel borttagningar med endast infogning eller fullständig partition, genererar inte ändringsdatafiler. Azure Databricks beräknar ändringsdataflödet direkt från transaktionsloggen.
- Ändringsdatafiler använder tabellens bevarandeprincip. Kommandot
VACUUMtar bort ändringsdatafiler och ändringar från transaktionsloggen använder kvarhållningsprincipen för kontrollpunkter.
Databricks rekommenderar att du inte försöker rekonstruera ändringsdataflödet genom att fråga ändringsdatafiler direkt. Använd alltid Delta Lake- och Apache Iceberg-API:er.
Limitations
Överväg följande begränsningar för ändringsdataflöden:
Tabeller med kolumnmappning
Med kolumnmappning aktiverat i en Delta Lake-tabell kan du släppa eller byta namn på kolumner utan att skriva om datafiler. Se Byt namn på och ta bort kolumner med kolumnmappning i Delta Lake.
Ändringsdataflöden har dock begränsningar efter icke-additiva schemaändringar. Icke-additiva schemaändringar omfattar följande åtgärder:
- Byt namn på eller släpp kolumner.
- Ändra kolumndatatyper.
- Ändra kolumnens nullabilitet, till exempel med
ALTER COLUMN ... SET NOT NULL. Se Ange enNOT NULLbegränsning i Azure Databricks.
Du kan inte läsa ändringsdataflöden för en transaktion eller ett intervall där en icke-additiv schemaändring sker.
För att tillåta icke-additiva schemaändringar före eller efter det angivna intervallet av batchläsningar använder frågor schemat för intervallets slutversion i stället för den senaste tabellversionen. Frågor misslyckas fortfarande om versionsintervallet sträcker sig över en icke-additiv schemaändring.
Automatisk ändringsdataflöde
- Eftersom ändringsdataflöde inte stöds i Apache Iceberg-specifikationen kan externa Iceberg-klienter inte köra frågor mot automatisk ändringsdatafeed. Se specifikationen för isbergstabellen.
- För transaktioner med flera satser stöds inte automatiskt flöde för ändringsdata om källtabellen modifierades under transaktionen.
- Automatisk ändringsdatafeed stöds inte i tabeller med radfilter eller kolumnmasker. Se Radfilter och kolumnmasker.
- Frågor mot ändringsdataflödet kan inte omfatta tabellversioner där en icke-additiv schemaändring har inträffat, till exempel namnbyte på en kolumn, borttagning av en kolumn eller ändring av datatyp. Dela upp frågan i intervall före och efter schemaändringen.