Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Gegevenswijzigingsfeed (CDF) houdt wijzigingen op regelniveau bij tussen versies van een Delta Lake-tabel of Apache Iceberg v3-tabel.
Azure Databricks ondersteunt twee benaderingen:
- Automatische feed voor wijzigingsgegevens: Berekent wijzigingen tijdens het lezen van tabellen met behulp van metadata over de herkomst van rijen. Hiervoor is geen afzonderlijke tabelconfiguratie vereist en werkt deze op Delta Lake- en Apache Iceberg v3-tabellen. Zie Automatische wijzigingsgegevensfeed.
- Oude feed voor wijzigingsgegevens: Legt wijzigingen vast tijdens schrijfbewerkingen naar tabellen. Ondersteunt alleen Delta Lake-tabellen. Vereist afzonderlijke tabelconfiguratie. Zie de verouderde wijzigingsgegevensfeed voor Delta Lake.
U kunt wijzigingenfeed gebruiken voor veelvoorkomende gebruiksvoorbeelden voor gegevens, waaronder:
- Incrementele ETL-pijplijnen die alleen de rijen verwerken die zijn gewijzigd sinds de laatste pijplijnuitvoering.
- Audittrails die gegevenswijzigingen bijhouden voor nalevings- en governancevereisten.
- Workloads voor gegevensreplicatie die wijzigingen in downstreamtabellen, caches of externe systemen synchroniseren.
Gegevensfeed automatisch wijzigen
Important
Deze functie bevindt zich in openbare preview-versie. Werkruimtebeheerders kunnen de toegang tot deze functie beheren vanaf de pagina Previews . Zie Azure Databricks previews beheren.
De automatische feed met wijzigingsgegevens berekent wijzigingen op rijniveau tijdens het uitvoeren van query's, in plaats van tijdens het schrijven, met behulp van rijtracering voor Delta Lake en rijherkomst voor Apache Iceberg v3. In tegenstelling tot de verouderde wijzigingsgegevensfeed vereist de automatische wijzigingsgegevensfeed geen configuratie per tabel en werkt deze met Delta Lake-tabellen en Apache Iceberg v3-tabellen.
Omdat wijzigingen niet bij elke schrijfbewerking voor MERGE INTO- en UPDATE-bewerkingen worden berekend, verbetert de automatische feed voor wijzigingsgegevens de schrijfprestaties en verlaagt deze de opslagkosten in vergelijking met de verouderde feed voor wijzigingsgegevens.
Automatische feed voor wijzigingsgegevens maakt gebruik van dezelfde table_changes()- en readChangeFeed-API's als de verouderde feed voor wijzigingsgegevens en werkt met batchquery's, Structured Streaming en Databricks-to-Databricks Delta Lake Sharing. Zie Wijzigingen lezen in batchquery's en incrementeel wijzigingsgegevens verwerken.
Eisen
- Databricks Runtime 18 of hoger
- Een ondersteunde tabelindeling die is geregistreerd in Unity Catalog:
- Een beheerde tabel in Delta Lake-indeling met rijtracering ingeschakeld of in Iceberg v3-indeling.
- Een externe tabel in Delta Lake-indeling waarvoor rijtracering is ingeschakeld.
Zie databricks Unity Catalog-tabeltypen.
Note
Gegevensfeed wijzigen maakt geen deel uit van de Apache Iceberg-specificatie. Azure Databricks lezers kunnen een query uitvoeren op automatische gegevensfeed voor Apache Iceberg v3-tabellen, maar externe Iceberg-lezers niet. Zie de Iceberg-tabelspecificatie.
Voor Delta Lake kunnen alleen Azure Databricks lezers een query uitvoeren op automatische wijzigingengegevensfeed.
Wijzigingenfeed gebruiken
Als u wijzigingenfeed wilt gebruiken, controleert u of u een tabel gebruikt die voldoet aan de vereisten. Raadpleeg Vereisten.
Ga als volgt te werk om de wijzigingenfeed batchgewijs te lezen:
Python
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("<table_name>")
Scala
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("<table_name>")
SQL
SELECT * FROM table_changes('<table_name>', 0)
Zie Wijzigingen lezen in batchquery's voor meer informatie over batchlezingen voor change data feed.
Ga als volgt te werk om de feed met wijzigingsgegevens via streaming te lezen:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
Zie Wijzigingsgegevens incrementeel verwerken voor meer informatie over streamingleesbewerkingen voor de feed voor wijzigingsgegevens.
Migreren vanuit oude feed voor wijzigingsgegevens
Ga als volgt te werk om een Delta Lake-tabel te migreren van verouderde wijzigingengegevensfeed naar automatische wijzigingenfeed:
- Controleer of uw tabel voldoet aan de vereisten.
- Schakel verouderde wijzigingsgegevensfeed uit door de volgende opdracht uit te voeren:
ALTER TABLE <table_name> UNSET TBLPROPERTIES ('delta.enableChangeDataFeed');
U kunt niet zowel verouderde als automatische wijzigingsgegevensfeeds gebruiken.
Schema voor gegevensfeed wijzigen
Wanneer u uit de wijzigingengegevensfeed voor een tabel leest, gebruikt de query het schema voor de nieuwste tabelversie. Azure Databricks ondersteunt de meeste bewerkingen voor schemawijziging en evolutie, maar tabellen met kolomtoewijzing hebben beperkingen. Zie Tabellen met kolomtoewijzing.
Naast de gegevenskolommen uit het schema van de Delta Lake-tabel bevat de wijzigingenfeed metagegevenskolommen waarmee het type wijzigingsevenement wordt geïdentificeerd:
| Kolomnaam | Typ | Values |
|---|---|---|
_change_type |
String | Bevat: insert, update_preimage, update_postimage, . deletepreimage is de waarde vóór de update, postimage is de waarde na de update. |
_commit_version |
Long | Bevat: het Delta-logboek of de tabelversie met de wijziging. |
_commit_timestamp |
Timestamp | Bevat: de tijdstempel die is gekoppeld aan het maken van de doorvoer. |
Als het schema kolommen bevat met dezelfde namen als deze metagegevenskolommen, kunt u de feed voor wijzigingsgegevens voor een tabel niet gebruiken. Voordat u de gegevensfeed wijzigt, wijzigt u de naam van kolommen in de tabel om dit conflict op te lossen.
Wijzigingsgegevens incrementeel verwerken
Databricks raadt u aan wijzigingenfeeds te gebruiken in combinatie met Structured Streaming om wijzigingen uit tabellen incrementeel te verwerken. U moet Structured Streaming voor Azure Databricks gebruiken om automatisch versies bij te houden voor de wijzigingengegevensfeed van uw tabel. Voor CDC-verwerking met SCD-type 1- of type 2-tabellen raadpleegt u de AUTO CDC-API's: Het vastleggen van wijzigingsgegevens vereenvoudigen met pijplijnen.
Wanneer de stream voor het eerst start, retourneert de feed voor wijzigingsgegevens de meest recente momentopname van de tabel in de vorm van INSERT-records en geeft deze vervolgens latere wijzigingen terug als wijzigingsgegevens. Met wijzigingen in gegevensfeeds worden zowel wijzigingsgegevens als nieuwe gegevensrijen tegelijkertijd doorgevoerd in het transactielogboek van de tabel.
Als u een stream wilt configureren om de wijzigingengegevensfeed van een tabel te lezen, stelt u de optie readChangeFeedtrue als volgt in:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
Limieten voor tarieven
Azure Databricks ondersteunt frequentielimieten (maxFilesPerTrigger, maxBytesPerTrigger) en excludeRegex bij het lezen van wijzigingsgegevens. Zie Delta Lake voor een volledige lijst met opties voor het streamen van Delta Lake.
U kunt desgewenst een beginversie opgeven. Zie Een beginversie opgeven. Voor andere versies dan de oorspronkelijke snapshot gelden de ratelimieten atomisch voor volledige commits. De huidige batch bevat de volledige doorvoer of de huidige batch zorgt ervoor dat de doorvoer wordt uitgesteld naar de volgende batch.
Tabelgeschiedenis opnieuw afspelen
Een wijzigingsgegevensfeed is niet bedoeld als een permanent record van alle wijzigingen in een tabel. Er worden alleen wijzigingen vastgelegd die optreden nadat de wijzigingenfeed is ingeschakeld. U kunt een nieuwe streaming-leesbewerking starten om de huidige versie en alle volgende wijzigingen vast te leggen.
Records in de wijzigingengegevensfeed zijn tijdelijk en alleen toegankelijk voor een opgegeven bewaarvenster. Transactielogboeken verwijderen tabelversies en de bijbehorende versies van de feed met wijzigingsgegevens op regelmatige tijdstippen. Wanneer een versie wordt verwijderd, kunt u de wijzigingengegevensfeed voor die versie niet meer lezen.
Wijzigingsgegevens archiveren voor permanente geschiedenis
Als voor uw use-case een permanente geschiedenis van alle wijzigingen in een tabel moet worden bijgehouden, gebruikt u incrementele logica om records van de wijzigingengegevensfeed naar een nieuwe tabel te schrijven.
In het volgende voorbeeld wordt getoond hoe u met trigger.AvailableNow beschikbare gegevens verwerkt als batchtaak voor controles of het volledig opnieuw afspelen van wijzigingen:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
Een beginversie opgeven
Als u wijzigingen vanaf een bepaald punt wilt lezen, geeft u een beginversie op met behulp van een tijdstempel of versienummer. Beginversies zijn vereist voor batchleesbewerkingen. U kunt desgewenst een eindversie opgeven om het bereik te beperken. Meer informatie over tabelgeschiedenis vindt u in Wat is tijdreizen?.
Wanneer u Structured Streaming-workloads configureert die gebruikmaken van wijzigingenfeed, kan het opgeven van een beginversie van invloed zijn op de verwerking van de prestaties:
- Nieuwe pijplijnen voor gegevensverwerking profiteren doorgaans van het standaardgedrag, dat alle bestaande records in de tabel registreert als
INSERTbewerkingen wanneer de stream voor het eerst wordt gestart. - Als uw doeltabel al alle records met de juiste wijzigingen tot een bepaald punt bevat, geeft u een beginversie op om te voorkomen dat de status van de brontabel als
INSERTgebeurtenissen wordt verwerkt.
In het volgende voorbeeld ziet u hoe u herstelt van een streamingfout met een beschadigd controlepunt. In dit voorbeeld wordt uitgegaan van de volgende voorwaarden:
- De wijzigingsgegevensfeed is bij het aanmaken van de brontabel ingeschakeld.
- De downstream-doeltabel verwerkt alle wijzigingen tot en met versie 75.
- Versiegeschiedenis voor de brontabel is beschikbaar voor versies 70 en hoger.
Wanneer u de schrijfstroom naar de bestaande doeltabel definieert, moet u een nieuwe controlepuntlocatie opgeven:
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
Important
Als u een beginversie opgeeft en die versie niet beschikbaar is in de tabelgeschiedenis, kan de stream niet worden gestart vanaf een nieuw controlepunt. Omdat beheerde tabellen historische versies automatisch opschonen, worden alle opgegeven beginversies uiteindelijk verwijderd.
Zie Tabelgeschiedenis opnieuw afspelen.
Lees wijzigingen in batchqueries
U kunt de syntaxis van batchquery's gebruiken om alle wijzigingen te lezen die beginnen met een bepaalde versie of om wijzigingen binnen een bepaald bereik van versies als volgt te lezen:
- Geef versies op als gehele getallen en tijdstempels als tekenreeksen in de notatie
yyyy-MM-dd[ HH:mm:ss[.SSS]]. - Begin- en eindversies zijn inclusief. Als u wilt lezen van een beginversie naar de nieuwste versie, geeft u alleen de beginversie op.
- Als u een versie opgeeft van voordat de feed voor wijzigingsgegevens was ingeschakeld, wordt er een fout weergegeven.
Ga als volgt te werk om batchleesbewerkingen te gebruiken met opties voor de begin- en eindversie:
SQL
Ga als volgt te werk om te lezen van versie 0 naar 10:
SELECT * FROM table_changes('tableName', 0, 10)
Ga als volgt te werk om gegevens tussen twee tijdstempelversies te lezen:
--
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
Ga als volgt te werk om te lezen van een beginversie naar de meest recente versie:
SELECT * FROM table_changes('tableName', 0)
Ga als volgt te werk om wijzigingen voor een tabel met speciale tekens in de naam te lezen:
SELECT * FROM table_changes('`schema`.`dotted.tableName`', '2021-04-21 06:45:46', '2021-05-21 12:00:00')
Zie table_changes tabelwaardefunctie.
Python
Ga als volgt te werk om te lezen van versie 0 naar 10:
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
Ga als volgt te werk om tussen twee tijdstempels te lezen:
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
Ga als volgt te werk om te lezen van een beginversie naar de meest recente versie:
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala
Ga als volgt te werk om te lezen van versie 0 naar 10:
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
Ga als volgt te werk om tussen twee tijdstempels te lezen:
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
Ga als volgt te werk om te lezen van een beginversie naar de meest recente versie:
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
Out-of-range versies verwerken
Als u standaard een versie of tijdstempel opgeeft die de laatste doorvoering overschrijdt, retourneert de query de fout timestampGreaterThanLatestCommit.
In Databricks Runtime 11.3 LTS en hoger kunt u als volgt tolerantie inschakelen voor buitenbereikversies:
SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Wanneer deze configuratie is ingeschakeld, retourneert de query als volgt verschillende resultaten:
- Een beginversie of tijdstempel na de laatste commit geeft een leeg resultaat.
- Een eindversie of tijdstempel na de laatste doorvoering retourneert alle wijzigingen vanaf het begin tot de laatste doorvoering.
Oude feed met wijzigingsgegevens voor Delta Lake
Verouderde wijzigingsgegevensfeed vereist handmatige configuratie voor afzonderlijke Delta Lake-tabellen. Omdat de wijzigingenfeed niet is opgenomen in de Apache Iceberg-specificatie, worden Apache Iceberg-tabellen niet ondersteund. Databricks raadt u aan om te migreren naar de gegevensfeed voor automatische wijzigingen. Zie Migreren van verouderde wijzigingsgegevensfeed.
Wanneer verouderde wijzigingsgegevensfeed is ingeschakeld, registreert de runtime wijzigingengebeurtenissen voor alle gegevens die in de tabel zijn geschreven. Dit omvat de rijgegevens samen met metagegevens die aangeven of de opgegeven rij is ingevoegd, verwijderd of bijgewerkt.
De oude feed voor wijzigingsgegevens gebruikt dezelfde readChangeFeed- en table_changes()-lees-API's als de automatische feed voor wijzigingsgegevens. Zie Wijzigingsgegevens incrementeel verwerken en Wijzigingen in batchquery's lezen.
Schakel de legacy-feed voor wijzigingsgegevens in
U moet de legacy feed voor wijzigingsgegevens expliciet inschakelen voor afzonderlijke tabellen. Hanteer één van de volgende methoden:
Nieuwe tabel
Stel de tabeleigenschap delta.enableChangeDataFeed = true in de CREATE TABLE opdracht in.
CREATE TABLE student (id INT, name STRING, age INT)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
Note
Als u verouderde wijzigingsgegevensfeed voor een bepaald tijdsinterval uitschakelt en deze vervolgens weer inschakelt, kan het interval niet worden opgevraagd. Gebruik de automatische feed voor wijzigingsgegevens om gedurende het interval wijzigingen op te vragen. Zie Automatische wijzigingsgegevensfeed.
Bestaande tabel
Stel de tabeleigenschap delta.enableChangeDataFeed = true in de ALTER TABLE opdracht in.
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Overwegingen voor opslag
Beheerde tabellen registreren gegevens efficiënt en kunnen andere functies gebruiken om de opslagindeling te optimaliseren.
Met verouderde wijzigingsgegevensfeed moet u rekening houden met het volgende opslaggedrag:
- Mogelijk ziet u een kleine toename van de opslagkosten, omdat wijzigingen mogelijk worden vastgelegd in afzonderlijke bestanden.
- Sommige bewerkingen, zoals alleen-invoegbewerkingen of het verwijderen van volledige partities, genereren geen change data-bestanden. Azure Databricks de wijzigingsgegevensfeed rechtstreeks vanuit het transactielogboek berekent.
- Wijzigingen in gegevensbestanden maken gebruik van het bewaarbeleid van de tabel. De
VACUUMopdracht verwijdert bestanden met wijzigingsgegevens, en voor wijzigingen uit het transactielogboek geldt het bewaarbeleid voor controlepunten.
Databricks raadt u aan om de wijzigingenfeed niet opnieuw te reconstrueren door rechtstreeks query's uit te voeren op wijzigingsgegevensbestanden. Gebruik altijd Delta Lake- en Apache Iceberg-API's.
Limitations
Houd rekening met de volgende beperkingen voor wijzigingen in gegevensfeeds:
Tabellen met kolomtoewijzing
Als kolomtoewijzing is ingeschakeld voor een Delta Lake-tabel, kunt u kolommen verwijderen of een andere naam geven zonder gegevensbestanden te herschrijven. Zie Kolommen hernoemen en verwijderen met kolomtoewijzing van Delta Lake.
Wijzigingen in gegevensfeeds hebben echter beperkingen na niet-additieve schemawijzigingen. Niet-additieve schemawijzigingen omvatten de volgende bewerkingen:
- De naam van kolommen wijzigen of verwijderen.
- Kolomgegevenstypen wijzigen.
- De null-waarde van de kolom wijzigen, bijvoorbeeld met
ALTER COLUMN ... SET NOT NULL. Zie Een beperking instellenNOT NULLin Azure Databricks.
U kunt wijzigingenfeeds niet lezen voor een transactie of bereik waarin een niet-additieve schemawijziging plaatsvindt.
Om niet-additieve schemawijzigingen vóór of na het opgegeven bereik van batchleesbewerkingen toe te staan, gebruiken query's het schema van de eindversie van het bereik in plaats van de meest recente tabelversie. Query's mislukken nog steeds als het versiebereik een niet-additieve schemawijziging omvat.
Gegevensfeed automatisch wijzigen
- Omdat wijzigingenfeed niet wordt ondersteund in de Apache Iceberg-specificatie, kunnen externe Iceberg-clients geen query's uitvoeren op automatische wijzigingengegevensfeed. Zie de Iceberg-tabelspecificatie.
- Voor transacties met meerdere instructies wordt de automatische feed voor wijzigingsgegevens niet ondersteund als de brontabel tijdens de transactie is gewijzigd.
- Automatische wijzigingsgegevensfeed wordt niet ondersteund voor tabellen met rijfilters of kolommaskers. Zie rijfilters en kolommaskers.
- Query's voor gegevensfeeds kunnen geen tabelversies omvatten waarin een niet-additief schema is gewijzigd, zoals de naam van een kolom wijzigen, neerzetten of gegevenstype wijzigen. Splits de query op in bereiken voor en na de schemawijziging.