Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Note
La fonctionnalité de flux de données de modification de Lakebase est disponible en préversion publique.
Configurez lakebase Change Data Feed (CDF) sur une table Postgres, puis regardez les modifications au niveau des lignes qui apparaissent dans la table Delta de destination.
Étapes : ① Activer la capture des modifications → ② Démarrer le flux → ③ Suivre une ligne de données dans le lakehouse → ④ Modifier la ligne de données et observer son cheminement
Note
Il s’agit d’un guide de démarrage rapide. Pour obtenir une documentation complète, consultez le flux de données modifiées Lakebase.
Avant de commencer
- Assurez-vous que vous avez terminé d’obtenir une base de données Postgres. Vous avez besoin d’un projet Lakebase avec la table d’exemple
playing_with_lakebase. - Un catalogue Unity Catalog et un schéma pour lesquels vous disposez de l’autorisation
CREATE TABLE.
Étape 1 : Activer la capture des modifications
Postgres a besoin de données de ligne complètes dans le journal en écriture anticipée pour que CDF fonctionne. Définir l’identité de réplica sur FULL demande à Postgres d’enregistrer à la fois l’ancien et le nouvel état de la ligne pour chaque modification.
Dans l’Éditeur SQL Lakebase, exécutez :
ALTER TABLE playing_with_lakebase REPLICA IDENTITY FULL;
En savoir plus : Définir l’identité de réplica sur toutes les tables d’un schéma et l’appliquer automatiquement à de nouvelles tables
Étape 2 : Démarrer le flux
Lakebase CDF est configuré au niveau du schéma. Chaque table actuelle et future du schéma source est incluse automatiquement. Vous ne choisissez donc pas de tables individuelles.
À partir de votre branche de production, ouvrez l’onglet Flux de données modifiées , puis cliquez sur Démarrer. Choisissez public comme schéma source, puis choisissez un catalogue et un schéma de destination dans Unity Catalog. L’instantané initial commence immédiatement et lb_playing_with_lakebase_history apparaît sous la forme d’une table Delta dans votre destination.
En savoir plus : Démarrer le flux de données modifiées
Étape 3 : Suivre une ligne dans la lakehouse
Sélectionnez une ligne dans Lakebase. Examinez la ligne id=2:
SELECT * FROM playing_with_lakebase WHERE id = 2;
Trouvez maintenant la même ligne dans la table d’historique Delta. Basculez vers un entrepôt Databricks SQL ou un notebook et exécutez :
SELECT * FROM <catalog>.<schema>.lb_playing_with_lakebase_history
WHERE id = 2;
Remplacez <catalog> et <schema> par la destination que vous avez choisie à l’étape 2. Vous verrez la ligne id=2 avec les mêmes name et value que dans Lakebase, ainsi que des colonnes supplémentaires. L’instantané initial a écrit chaque ligne existante dans Delta en tant qu’événement insert , c’est-à-dire ce que représente cette ligne.
Ces colonnes supplémentaires décrivent le type d’événement que chaque ligne représente (_pg_change_type), quand elle s’est produite (_timestamp) et les informations de classement Postgres (_pg_lsn, _pg_xid).
En savoir plus : Schéma de la table de destination | Mappage des types de données
Étape 4 : Modifier la ligne, voir la modification se propager
De retour dans l’Éditeur SQL Lakebase, mettez à jour la ligne id=2:
UPDATE playing_with_lakebase SET value = 55.5 WHERE id = 2;
Attendez quelques secondes pour que la modification apparaisse dans le flux, puis interrogez à nouveau la table d’historique :
SELECT id, value, _pg_change_type, _timestamp
FROM <catalog>.<schema>.lb_playing_with_lakebase_history
WHERE id = 2
ORDER BY _pg_lsn DESC;
La ligne id=2 apparaît désormais trois fois : l’original insert, un update_preimage avec l’ancienne valeur et une update_postimage nouvelle valeur. Chaque modification apportée à la ligne devient une nouvelle ligne d’historique, de sorte que vous disposez toujours d’une piste d’audit complète. Les suppressions fonctionnent de la même façon, en ajoutant une ligne avec _pg_change_type = 'delete'.
En savoir plus : Modèles courants de modification | Créer des pipelines en aval
Étapes suivantes
- Créez un pipeline en aval : Transformez la table d’historique en agrégat en direct avec une vue matérialisée, SDP ou Structured Streaming.
- Exécuter l’analytique : Interrogez vos tables d’historique Delta avec Databricks SQL.
- Utilisez la couche bronze : Intégrez la table d’historique dans une architecture en médaillon.
- Passez en revue les limites de production : Consultez les limitations et la résolution des problèmes et la gestion des modifications de schéma.
- Explorer Lakebase :Core concepts | Lakebase