Enregistrer les modifications Postgres dans un lakehouse

Note

La fonctionnalité de flux de données de modification de Lakebase est disponible en préversion publique.

Configurez lakebase Change Data Feed (CDF) sur une table Postgres, puis regardez les modifications au niveau des lignes qui apparaissent dans la table Delta de destination.

Étapes :Activer la capture des modifications → ② Démarrer le flux → ③ Suivre une ligne de données dans le lakehouse → ④ Modifier la ligne de données et observer son cheminement

Note

Il s’agit d’un guide de démarrage rapide. Pour obtenir une documentation complète, consultez le flux de données modifiées Lakebase.

Avant de commencer

  • Assurez-vous que vous avez terminé d’obtenir une base de données Postgres. Vous avez besoin d’un projet Lakebase avec la table d’exemple playing_with_lakebase.
  • Un catalogue Unity Catalog et un schéma pour lesquels vous disposez de l’autorisation CREATE TABLE.

Étape 1 : Activer la capture des modifications

Postgres a besoin de données de ligne complètes dans le journal en écriture anticipée pour que CDF fonctionne. Définir l’identité de réplica sur FULL demande à Postgres d’enregistrer à la fois l’ancien et le nouvel état de la ligne pour chaque modification.

Dans l’Éditeur SQL Lakebase, exécutez :

ALTER TABLE playing_with_lakebase REPLICA IDENTITY FULL;

En savoir plus : Définir l’identité de réplica sur toutes les tables d’un schéma et l’appliquer automatiquement à de nouvelles tables

Étape 2 : Démarrer le flux

Lakebase CDF est configuré au niveau du schéma. Chaque table actuelle et future du schéma source est incluse automatiquement. Vous ne choisissez donc pas de tables individuelles.

À partir de votre branche de production, ouvrez l’onglet Flux de données modifiées , puis cliquez sur Démarrer. Choisissez public comme schéma source, puis choisissez un catalogue et un schéma de destination dans Unity Catalog. L’instantané initial commence immédiatement et lb_playing_with_lakebase_history apparaît sous la forme d’une table Delta dans votre destination.

Boîte de dialogue de démarrage avec sélection de la source et de la destination.

En savoir plus : Démarrer le flux de données modifiées

Étape 3 : Suivre une ligne dans la lakehouse

Sélectionnez une ligne dans Lakebase. Examinez la ligne id=2:

SELECT * FROM playing_with_lakebase WHERE id = 2;

Trouvez maintenant la même ligne dans la table d’historique Delta. Basculez vers un entrepôt Databricks SQL ou un notebook et exécutez :

SELECT * FROM <catalog>.<schema>.lb_playing_with_lakebase_history
WHERE id = 2;

Remplacez <catalog> et <schema> par la destination que vous avez choisie à l’étape 2. Vous verrez la ligne id=2 avec les mêmes name et value que dans Lakebase, ainsi que des colonnes supplémentaires. L’instantané initial a écrit chaque ligne existante dans Delta en tant qu’événement insert , c’est-à-dire ce que représente cette ligne.

Ces colonnes supplémentaires décrivent le type d’événement que chaque ligne représente (_pg_change_type), quand elle s’est produite (_timestamp) et les informations de classement Postgres (_pg_lsn, _pg_xid).

En savoir plus : Schéma de la table de destination | Mappage des types de données

Étape 4 : Modifier la ligne, voir la modification se propager

De retour dans l’Éditeur SQL Lakebase, mettez à jour la ligne id=2:

UPDATE playing_with_lakebase SET value = 55.5 WHERE id = 2;

Attendez quelques secondes pour que la modification apparaisse dans le flux, puis interrogez à nouveau la table d’historique :

SELECT id, value, _pg_change_type, _timestamp
FROM <catalog>.<schema>.lb_playing_with_lakebase_history
WHERE id = 2
ORDER BY _pg_lsn DESC;

Tableau d’historique delta montrant trois lignes pour id=2 : update_preimage, update_postimage et insertion

La ligne id=2 apparaît désormais trois fois : l’original insert, un update_preimage avec l’ancienne valeur et une update_postimage nouvelle valeur. Chaque modification apportée à la ligne devient une nouvelle ligne d’historique, de sorte que vous disposez toujours d’une piste d’audit complète. Les suppressions fonctionnent de la même façon, en ajoutant une ligne avec _pg_change_type = 'delete'.

En savoir plus : Modèles courants de modification | Créer des pipelines en aval

Étapes suivantes