Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Le flux de données de modifications (CDF) suit les modifications ligne par ligne entre les versions d’une table Delta Lake ou d’une table Apache Iceberg v3.
Azure Databricks prend en charge deux approches :
- Flux de données de modification automatique : calcule les modifications pendant les lectures de table à l’aide des métadonnées de traçabilité des lignes. Cela ne nécessite pas de configuration de table individuelle et fonctionne sur des tables Delta Lake et Apache Iceberg v3. Consultez le flux de données de modification automatique.
- Flux de données de modification hérité : matérialise les modifications pendant les écritures de table. Prend uniquement en charge les tables Delta Lake. Nécessite une configuration de table individuelle. Consultez le flux de données de modification hérité pour Delta Lake.
Vous pouvez utiliser le flux de données modifiées pour les cas d’usage courants des données, notamment :
- Pipelines ETL incrémentaux qui traitent uniquement les lignes qui ont changé depuis la dernière exécution du pipeline.
- Pistes d’audit qui suivent les modifications de données pour les exigences de conformité et de gouvernance.
- Charges de travail de réplication de données qui synchronisent les modifications apportées aux tables, caches ou systèmes externes en aval.
Flux automatique de données des modifications
Important
Cette fonctionnalité est disponible en préversion publique. Les administrateurs d’espace de travail peuvent contrôler l’accès à cette fonctionnalité à partir de la page Aperçus . Consultez Gérer les préversions d’Azure Databricks.
Le flux de données de modification automatique calcule les modifications au niveau des lignes au moment de la requête, plutôt qu’au moment de l’écriture, en utilisant le suivi des lignes pour Delta Lake et la traçabilité des lignes pour Apache Iceberg v3. Contrairement au flux de données de modification hérité, le flux de données modifiées automatique ne nécessite pas de configuration de table individuelle et fonctionne sur des tables Delta Lake et Apache Iceberg v3.
Comme les modifications ne sont pas calculées à chaque écriture pour les opérations MERGE INTO et UPDATE, le flux de données de modification automatique améliore les performances d’écriture et réduit les coûts de stockage par rapport à l’ancien flux de données de modification.
Le flux de données de modification automatique utilise les mêmes API table_changes() et readChangeFeed que le flux de données de modification hérité et fonctionne avec des requêtes par lots, Structured Streaming et Databricks-to-Databricks Delta Lake Sharing. Consultez Lire les modifications dans les requêtes par lot et Traiter les données modifiées de manière incrémentielle.
Exigences
- Databricks Runtime 18 ou version ultérieure
- Format de tableau pris en charge inscrit dans le catalogue Unity :
- Table gérée au format Delta Lake avec suivi des lignes activé ou au format Iceberg v3.
- Table externe au format Delta Lake avec suivi des lignes activé.
Consultez les types de tables de catalogue Databricks Unity.
Note
Le flux de données de modification ne fait pas partie de la spécification Apache Iceberg. Les lecteurs Azure Databricks peuvent interroger automatiquement le flux de données de modification pour les tables Apache Iceberg v3, mais les lecteurs Iceberg externes ne le peuvent pas. Consultez la spécification de la table Iceberg.
Pour Delta Lake, seuls les lecteurs Azure Databricks peuvent interroger le flux de données de modification automatique.
Utiliser le flux de données modifiées
Pour utiliser le flux de données modifiées, vérifiez que vous utilisez une table qui répond aux exigences. Consultez Spécifications.
Pour lire par lots le flux de données de modification, procédez comme suit :
Python
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("<table_name>")
Scala
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("<table_name>")
SQL
SELECT * FROM table_changes('<table_name>', 0)
Pour plus d’informations sur les lectures par lots pour le flux de données modifiées, consultez Lire les modifications dans les requêtes batch.
Pour diffuser en continu le flux de données modifiées, procédez comme suit :
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
Pour plus d’informations sur les lectures en continu du flux de données modifiées, consultez Traiter les données modifiées de manière incrémentielle.
Migrer à partir du flux de données de modification hérité
Pour migrer une table Delta Lake du flux de données de modification hérité vers le flux de données de modification automatique, procédez comme suit :
- Vérifiez que votre table répond aux exigences.
- Désactivez le flux de données de modification hérité en exécutant la commande suivante :
ALTER TABLE <table_name> UNSET TBLPROPERTIES ('delta.enableChangeDataFeed');
Vous ne pouvez pas utiliser les flux de données de modification hérités et automatiques ensemble.
Modifier le schéma du flux de données
Lorsque vous lisez le flux de données de modification d’une table, la requête utilise le schéma pour la dernière version de la table. Azure Databricks prend en charge la plupart des opérations de modification de schéma et d’évolution, mais les tables avec mappage de colonnes ont des limitations. Consultez Tables avec mappage de colonnes.
Outre les colonnes de données du schéma de la table Delta Lake, le flux de données modifiées contient des colonnes de métadonnées qui identifient le type d’événement de modification :
| Nom de la colonne | Type | Values |
|---|---|---|
_change_type |
String | Contient : insert, , update_preimage, update_postimagedelete.preimage est la valeur avant la mise à jour, postimage est la valeur après la mise à jour. |
_commit_version |
Long | Contient : la version du journal Delta ou de la table contenant la modification. |
_commit_timestamp |
Timestamp | Contient : l’horodatage associé à la création de la validation. |
Si le schéma contient des colonnes portant le même nom que ces colonnes de métadonnées, vous ne pouvez pas utiliser le flux de données modifiées sur une table. Avant d’activer le flux de données modifiées, renommez les colonnes de votre table pour résoudre ce conflit.
Traiter de façon incrémentielle les données modifiées
Databricks vous recommande d’utiliser le flux de données de modification en combinaison avec Structured Streaming pour traiter de manière incrémentielle les modifications des tables. Vous devez utiliser Structured Streaming pour Azure Databricks pour suivre automatiquement les versions du flux de données de modification de votre table. Pour le traitement CDC avec des tables SCD de type 1 ou de type 2, consultez les API AUTO CDC : Simplifiez la capture de données modifiées avec des pipelines.
Lorsque le flux démarre pour la première fois, le flux de données de changement renvoie le dernier instantané de la table sous forme d’enregistrements INSERT, puis renvoie les modifications ultérieures sous forme de données de changement. Les flux de données de modification enregistrent à la fois les données modifiées et les nouvelles lignes de données dans le journal des transactions de la table, au même moment.
Pour configurer un flux pour lire le flux de données modifiées d’une table, définissez l’option readChangeFeedtrue comme suit :
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
Limites de taux
Azure Databricks prend en charge les limites de débit (maxFilesPerTrigger, maxBytesPerTrigger) et excludeRegex lors de la lecture des données modifiées. Pour obtenir la liste complète des options delta Lake de streaming, consultez Delta Lake.
Si vous le souhaitez, vous pouvez spécifier une version de départ, consultez Spécifier une version de départ. Pour les versions autres que l’instantané initial, les limites de taux s’appliquent de manière atomique à des commits entiers. Soit le lot actuel inclut l’intégralité de la validation, soit le lot actuel reporte la validation au lot suivant.
Revoir l’historique des tables
Un flux de données modifiées n’est pas destiné à servir d’enregistrement permanent de toutes les modifications apportées à une table. Il enregistre uniquement les modifications qui se produisent après l’activation du flux de données de modification. Vous pouvez démarrer une nouvelle lecture de diffusion en continu pour capturer la version actuelle et toutes les modifications suivantes.
Les enregistrements du flux de données modifiées sont temporaires et accessibles uniquement pour une fenêtre de rétention spécifiée. Les journaux des transactions suppriment les versions de table et leurs versions de flux de données modifiées correspondantes à intervalles réguliers. Lorsqu’une version est supprimée, vous ne pouvez plus lire le flux de données de modification pour cette version.
Archiver les données modifiées pour l’historique permanent
Si votre cas d’usage vous oblige à conserver un historique permanent de toutes les modifications apportées à une table, utilisez la logique incrémentielle pour écrire des enregistrements du flux de données de modification vers une nouvelle table.
L’exemple suivant montre comment utiliser trigger.AvailableNow pour traiter les données disponibles en tant que traitement par lots pour l’audit ou les réexécutions complètes des modifications :
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
Spécifier une version de départ
Pour lire les modifications à partir d’un point spécifique, spécifiez une version de départ à l’aide d’un horodatage ou d’un numéro de version. Les versions de départ sont requises pour les lectures par lots. Si vous le souhaitez, vous pouvez spécifier une version de fin pour limiter la plage. Pour en savoir plus sur l’historique des tables, consultez Qu’est-ce que le voyage dans le temps ?.
Lorsque vous configurez des charges de travail Structured Streaming qui utilisent le flux de données modifiées, la spécification d’une version de départ peut affecter les performances de traitement :
- Les nouveaux pipelines de traitement des données bénéficient généralement du comportement par défaut, qui enregistre tous les enregistrements existants dans la table en tant qu’opérations
INSERTau démarrage du flux. - Si votre table cible contient déjà tous les enregistrements avec des modifications appropriées jusqu’à un certain point, indiquez une version de départ pour éviter de traiter l’état de la table source en tant qu’événements
INSERT.
L’exemple suivant montre comment récupérer à partir d’une défaillance de streaming avec un point de contrôle endommagé. Dans cet exemple, supposons les conditions suivantes :
- Le flux de changements de données a été activé sur la table source lors de la création de la table.
- La table cible en aval a traité toutes les modifications jusqu’à la version 75 incluse.
- L’historique des versions de la table source est disponible pour les versions 70 et ultérieures.
Lorsque vous définissez le flux d’écriture dans la table cible existante, vous devez spécifier un nouvel emplacement de point de contrôle :
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
Important
Si vous spécifiez une version de départ et que cette version n’est pas disponible dans l’historique des tables, le flux ne parvient pas à démarrer à partir d’un nouveau point de contrôle. Étant donné que les tables managées nettoient automatiquement les versions historiques, toutes les versions de départ spécifiées sont finalement supprimées.
Consultez l’historique des tables de replay.
Lire les changements dans des requêtes par lots
Vous pouvez utiliser la syntaxe de requête par lot pour lire toutes les modifications à partir d’une version particulière ou pour lire les modifications dans une plage de versions spécifiée comme suit :
- Spécifiez les versions sous forme d’entiers et d’horodatages sous forme de chaînes au format
yyyy-MM-dd[ HH:mm:ss[.SSS]]. - Les versions de début et de fin sont inclusives. Pour lire une version de départ vers la dernière version, spécifiez uniquement la version de départ.
- Si vous spécifiez une version avant l’activation du flux de données de modification, une erreur s’affiche.
Pour utiliser les lectures par lots avec les options de version de début et de fin, procédez comme suit :
SQL
Pour lire de la version 0 à 10, procédez comme suit :
SELECT * FROM table_changes('tableName', 0, 10)
Pour lire entre deux versions d’horodatage, procédez comme suit :
--
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
Pour lire une version de départ vers la dernière version, procédez comme suit :
SELECT * FROM table_changes('tableName', 0)
Pour lire les modifications d’une table avec des caractères spéciaux dans le nom, procédez comme suit :
SELECT * FROM table_changes('`schema`.`dotted.tableName`', '2021-04-21 06:45:46', '2021-05-21 12:00:00')
Consultez table_changesTVF.
Python
Pour lire de la version 0 à 10, procédez comme suit :
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
Pour lire entre deux horodatages, procédez comme suit :
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
Pour lire une version de départ vers la dernière version, procédez comme suit :
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala
Pour lire de la version 0 à 10, procédez comme suit :
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
Pour effectuer une lecture entre deux horodatages, procédez comme suit :
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
Pour lire une version de départ vers la dernière version, procédez comme suit :
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
Gérer les versions hors limites
Par défaut, si vous spécifiez une version ou un horodatage qui dépasse la dernière validation, la requête retourne l’erreur timestampGreaterThanLatestCommit.
Dans Databricks Runtime 11.3 LTS et versions ultérieures, vous pouvez activer la tolérance pour les versions hors plage comme suit :
SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Lorsque cette configuration est activée, la requête retourne des résultats différents comme suit :
- Une version de départ ou un horodatage au-delà de la dernière validation retourne un résultat vide.
- Une version de fin ou un horodatage au-delà de la dernière validation retourne toutes les modifications du début à la dernière validation.
Flux de données de modification hérité pour Delta Lake
Le flux de données de modification hérité nécessite une configuration manuelle pour les tables Delta Lake individuelles. Étant donné que le flux de données modifiées n’est pas inclus dans la spécification Apache Iceberg, les tables Apache Iceberg ne sont pas prises en charge. Databricks vous recommande de migrer vers le flux de données de modification automatique. Consultez Migrer depuis l'ancien flux de données de modification.
Lorsque le flux de données modifiées hérité est activé, le runtime enregistre les événements de modification pour toutes les données écrites dans la table. Cela inclut les données de ligne ainsi que les métadonnées qui indiquent si la ligne spécifiée a été insérée, supprimée ou mise à jour.
Le flux de données de modification historique utilise les mêmes readChangeFeed API de lecture table_changes() que le flux de données de modification automatique. Consultez Traiter les données modifiées de manière incrémentielle et Lire les modifications dans des requêtes par lot.
Activer le flux de données de modification hérité
Vous devez activer explicitement l’ancienne version du flux de données de modification pour chaque table. Utilisez l’une des méthodes suivantes :
Nouvelle table
Définissez la propriété delta.enableChangeDataFeed = true de table dans la CREATE TABLE commande.
CREATE TABLE student (id INT, name STRING, age INT)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
Note
Si vous désactivez le flux de données de modification hérité pour un intervalle de temps et que vous l’activez à nouveau, l’intervalle ne sera pas interrogeable. Utilisez le flux de données de modification automatique pour interroger les modifications pendant l’intervalle. Consultez le flux de données de modification automatique.
Table existante
Définissez la propriété delta.enableChangeDataFeed = true de table dans la ALTER TABLE commande.
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Considérations relatives au stockage
Les tables gérées enregistrent efficacement les modifications de données et peuvent utiliser d’autres fonctionnalités pour optimiser la disposition du stockage.
Avec le flux de données de modification hérité, vous devez prendre en compte le comportement de stockage suivant :
- Vous pouvez constater une faible augmentation des coûts de stockage, car les modifications peuvent être enregistrées dans des fichiers distincts.
- Certaines opérations, telles que les suppressions d’insertion uniquement ou de partition complète, ne génèrent pas de fichiers de données modifiées. Azure Databricks calcule le flux de données modifiées directement à partir du journal des transactions.
- Les fichiers de données modifiées utilisent la stratégie de rétention de la table. La
VACUUMcommande supprime les fichiers de données modifiées et les modifications du journal des transactions utilisent la stratégie de rétention de point de contrôle.
Databricks recommande de ne pas tenter de reconstruire le flux de données modifiées en interrogeant directement les fichiers de données modifiées. Utilisez toujours les API Delta Lake et Apache Iceberg.
Limitations
Tenez compte des limitations suivantes pour les flux de données modifiées :
Tables avec mappage de colonnes
Avec le mappage de colonnes activé sur une table Delta Lake, vous pouvez supprimer ou renommer des colonnes sans réécrire des fichiers de données. Voir Renommer et supprimer des colonnes avec le mappage de colonnes Delta Lake.
Toutefois, les flux de données de modification présentent des limites après des modifications non additives du schéma. Les modifications de schéma non additifs incluent les opérations suivantes :
- Renommez ou supprimez des colonnes.
- Modifier les types de données de colonne.
- Modifier le caractère nullable d’une colonne, par exemple avec
ALTER COLUMN ... SET NOT NULL. Consultez Définir uneNOT NULLcontrainte dans Azure Databricks.
Vous ne pouvez pas lire les flux de données modifiées pour une transaction ou une plage dans laquelle une modification de schéma non additive se produit.
Pour autoriser les modifications de schéma non additif avant ou après la plage spécifiée de lectures de lots, les requêtes utilisent le schéma de la version finale de la plage plutôt que la dernière version de la table. Les requêtes échouent toujours si la plage de versions s’étend sur une modification de schéma non additif.
Flux automatique des modifications de données
- Étant donné que le flux de données modifiées n’est pas pris en charge dans la spécification Apache Iceberg, les clients Iceberg externes ne peuvent pas interroger le flux de données de modification automatique. Consultez la spécification de la table Iceberg.
- Pour les transactions à plusieurs instructions, si la table source a été modifiée pendant la transaction, le flux de données de modification automatique n’est pas pris en charge.
- Le flux automatique des données de modification n’est pas pris en charge pour les tables comportant des filtres de lignes ou des masques de colonnes. Consultez les filtres de lignes et les masques de colonne.
- Les requêtes de flux de données modifiées ne peuvent pas s’étendre sur les versions de table où une modification de schéma non additive s’est produite, telle qu’un changement de nom de colonne, de suppression ou de type de données. Fractionnez la requête en plages avant et après la modification du schéma.