Ajouter une source CDC de la base de données PostgreSQL à un Eventstream

Cet article explique comment ajouter une source de capture de données modifiées (CDC) de la base de données PostgreSQL à un Eventstream.

Le connecteur source PostgreSQL Database Change Data Capture (CDC) pour les flux d’événements de Microsoft Fabric vous permet de capturer un instantané des données actuelles dans une base de données PostgreSQL. Actuellement, postgreSQL Database Change Data Capture (CDC) est pris en charge à partir des services suivants où les bases de données sont accessibles publiquement :

  • Base de données Azure pour PostgreSQL
  • Amazon RDS pour PostgreSQL
  • Amazon Aurora PostgreSQL
  • Google Cloud SQL pour PostgreSQL

Une fois la source CDC de base de données PostgreSQL ajoutée au flux d’événements, elle capture les modifications au niveau des lignes apportées aux tables spécifiées. Ces modifications peuvent ensuite être traitées en temps réel et envoyées à différentes destinations pour une analyse plus approfondie.

Remarque

Avec DeltaFlow (Aperçu), vous pouvez transformer des événements de capture de données modifiées bruts de Debezium en flux prêts pour l'analytique qui reflètent la structure de votre table source. DeltaFlow automatise l’inscription de schéma, la gestion des tables de destination et la gestion de l’évolution des schémas. Pour utiliser DeltaFlow, choisissez les événements prêts pour l’analytique et le schéma mis à jour automatiquement pendant l’étape de gestion du schéma.

Prérequis

Activer la capture de données modifiées dans votre base de données PostgreSQL

Cette section utilise Azure Database for PostgreSQL comme exemple.

Pour activer la capture de données modifiées dans votre serveur flexible Azure Database for PostgreSQL, procédez comme suit :

  1. Dans votre page serveur flexible Azure Database for PostgreSQL dans le portail Azure, sélectionnez Paramètres du serveur dans le menu de navigation.

  2. Sur la page Paramètres du serveur :

    • Définir wal_level sur logique.
    • Mettez à jour le max_worker_processes sur au moins 16.

    Capture d’écran de l’activation de la CDC pour un déploiement de serveur flexible.

  3. Enregistrez les modifications et redémarrez le serveur.

  4. Vérifiez que votre instance de serveur flexible Azure Database for PostgreSQL autorise le trafic réseau public.

  5. Accordez les autorisations de réplication de l’utilisateur administrateur en exécutant l’instruction SQL suivante. Si vous souhaitez utiliser un autre compte utilisateur pour connecter votre base de données PostgreSQL (DB) afin de récupérer les données de changement (CDC), vérifiez que l’utilisateur est le propriétaire de la table.

    ALTER ROLE <admin_user_or_table_owner_user> WITH REPLICATION;
    

Lancer l'assistant de sélection d'une source de données

Si vous n’avez pas encore ajouté de source à votre flux d’événements, sélectionnez la vignette Connecter des sources de données . Vous pouvez également sélectionner Ajouter des sources>Connecter des sources de données dans le ruban.

Capture d’écran montrant la sélection de la vignette pour l’utilisation d’une source externe.

Si vous ajoutez la source à un flux d’événements déjà publié, basculez vers le mode Édition. Sur le ruban, sélectionnez Ajouter une source>Connecter des sources de données.

Capture d’écran montrant les sélections pour l’ajout de sources externes.

Dans la page Sélectionner une source de données, recherchez et sélectionnez Se connecter dans la vignette Base de données PostgreSQL (CDC).

Screenshot qui montre la sélection de Azure Database (DB) pour PostgreSQL (CDC) comme type source dans l’Assistant Get events.

Configurer et se connecter à postgreSQL Database CDC

Ingérer des données modifiées à partir de bases de données PostgreSQL avec inscription automatique de schéma de table via CDC dans Eventstream.

Remarque

DeltaFlow (préversion) : lorsque vous sélectionnez des événements prêts pour l’analytique et un schéma mis à jour automatiquement à l’étape de gestion des schémas, DeltaFlow transforme les événements CDC bruts de Debezium en flux prêts pour l’analytique qui reflètent votre structure de table source. DeltaFlow automatise également la création de tables de destination et la gestion de l’évolution du schéma.

  1. Sur la page Connecter, sélectionnez Nouvelle connexion.

    Capture d’écran montrant la page Se connecter pour une base de données PostgreSQL avec le lien Nouvelle connexion mis en surbrillance.

  2. Dans la section Paramètres de connexion, saisissez les informations suivantes :

    • Serveur : adresse du serveur de votre base de données PostgreSQL, par exemple my-pgsql-server.postgres.database.azure.com.

    • Base de données : nom de la base de données, par exemple my_database.

      Capture d’écran montrant la section Paramètres de connexion pour le connecteur de base de données PostgreSQL.

    • Nom de la connexion : saisissez un nom pour la connexion.

    • Type d’authentification, sélectionnez Basic et entrez votre nom d’utilisateur et votre mot de passe pour la base de données.

      Remarque

      Actuellement, les flux d’événements Fabric prennent uniquement en charge l’authentification Basic.

    • Sélectionnez Se connecter pour terminer les paramètres de connexion. Capture d’écran montrant la section Informations d’identification de connexion pour le connecteur de base de données PostgreSQL.

  3. Port : entrez le numéro de port de votre serveur. La valeur par défaut est 5432. Si votre connexion cloud sélectionnée est configurée dans Gérer les connexions et les passerelles, vérifiez que le numéro de port correspond à celui défini ici. S’ils ne correspondent pas, le numéro de port dans la connexion cloud dans Gérer les connexions et les passerelles est prioritaire.

  4. Vous pouvez choisir entre deux options lors de la capture des modifications à partir de tables de base de données :

    • Toutes les tables : capturez les modifications de chaque table de la base de données.
    • Entrez le ou les noms de table : vous permet de spécifier un sous-ensemble de tables à l’aide d’une liste séparée par des virgules. Vous pouvez utiliser soit : identificateurs de table complets au format schemaName.tableName ou expressions régulières valides. Exemples:
    • dbo.test.*: sélectionnez toutes les tables dont les noms commencent par test dans le schéma dbo.
    • dbo\.(test1|test2): Sélectionner dbo.test1 et dbo.test2.

    Vous pouvez combiner les deux formats dans la liste. La limite de caractères totale de l’entrée entière est de 102 400 caractères.

  5. Nom de l’emplacement (facultatif) : entrez le nom de l’emplacement de décodage logique PostgreSQL créé pour la diffusion en continu des modifications à partir d’un plug-in particulier pour une base de données/un schéma particulier. Le serveur utilise cet emplacement pour diffuser des événements vers le connecteur de streaming Eventstream. Il doit contenir uniquement des lettres minuscules, des chiffres et des traits de soulignement.

    • S’il n’est pas spécifié, un GUID est utilisé pour créer l’emplacement, nécessitant les autorisations de base de données appropriées.
    • Si un nom d’emplacement spécifié existe, le connecteur l’utilise directement.
  6. Développez les paramètres avancés pour accéder à des options de configuration supplémentaires pour la source CDC de la base de données PostgreSQL :

    • Nom de la publication : spécifie le nom de la publication de réplication logique PostgreSQL à utiliser. Cette valeur doit correspondre à une publication existante dans la base de données, ou elle est automatiquement créée en fonction du mode de création automatique. Valeur par défaut : dbz_publication.

      Remarque

      L’utilisateur du connecteur doit disposer des autorisations de superutilisateur pour créer la publication. Nous vous recommandons de créer la publication manuellement avant de démarrer le connecteur pour la première fois afin d’éviter les problèmes liés aux autorisations.

    • Mode de création automatique de publication : contrôle si et comment la publication est créée automatiquement. Les options sont les suivantes :

      • Filtered (valeur par défaut) : si la publication spécifiée n'existe pas, le connecteur en crée une qui inclut uniquement les tables sélectionnées (comme spécifié dans la liste des tables à inclure).
      • AllTables: si la publication spécifiée existe, le connecteur l’utilise. S’il n’existe pas, le connecteur en crée un qui inclut toutes les tables de la base de données.
      • Disabled: le connecteur ne crée pas de publication. Si la publication spécifiée est manquante, le connecteur lève une exception et s’arrête. Dans ce cas, la publication doit être créée manuellement dans la base de données.

      Pour plus d’informations, consultez la documentation Debezium sur le mode de création automatique de publication

    • Mode de gestion décimal : spécifie la façon dont le connecteur gère les valeurs postgreSQL DECIMAL et NUMERIC de colonne :

      • Precise : représente des valeurs utilisant des types décimaux exacts (par exemple, Java BigDecimal) pour garantir une précision et une précision complètes dans la représentation des données.
      • Double: convertit les valeurs en nombres à virgule flottante de double précision. Cette option améliore la facilité d’utilisation et les performances, mais peut entraîner une perte de précision.
      • String: encode les valeurs sous forme de chaînes mises en forme. Cette option facilite leur consommation dans les systèmes en aval, mais perd des informations sémantiques sur le type numérique d’origine.
    • Mode instantané : spécifiez les critères d’exécution d’un instantané au démarrage du connecteur :

      • Initial: Le connecteur exécute un instantané uniquement lorsqu'aucun décalage n’a été enregistré pour le nom du serveur logique, ou s’il détecte qu’un instantané antérieur n’a pas été achevé avec succès. Une fois l’instantané terminé, le connecteur commence à diffuser en continu les enregistrements d’événements pour les modifications de base de données suivantes.
      • InitialOnly: Le connecteur effectue un instantané uniquement lorsque aucun décalage n’a été enregistré pour le nom du serveur logique. Une fois que l’instantané est terminé, le connecteur s’arrête. Il ne passe pas en mode streaming pour lire les événements de modification du binlog.
      • NoData: le connecteur exécute un instantané qui capture uniquement le schéma, mais pas les données de table. Définissez cette option si vous n'avez pas besoin d'un instantané cohérent des données, mais que vous avez uniquement besoin des modifications depuis le démarrage du connecteur.
    • Requête de pulsation : définit une requête que le connecteur exécute sur la base de données source lors de l'envoi d'un message de pulsation par le connecteur.

    • Surcharge de l’instruction SELECT pour l’instantané : spécifie les lignes de la table à inclure dans un instantané. Utilisez la propriété si vous souhaitez qu’un instantané inclue uniquement un sous-ensemble des lignes d’une table. Cette propriété affecte uniquement les instantanés. Cela ne s’applique pas aux événements que le connecteur lit dans le journal.

Flux ou détails de la source

  1. Dans la page Se connecter , suivez l’une de ces étapes en fonction de l’utilisation d’Eventstream ou de Real-Time hub.

    • Flux d’événements :

      Dans le volet Détails de la source à droite, procédez comme suit :

      1. Pour le nom de la source, sélectionnez l'icône du crayon pour modifier le nom.

      2. Notez que le nom eventstream et le nom stream sont en lecture seule.

    • Hub en temps réel :

      Dans la section Détails du flux à droite, procédez comme suit :

      1. Sélectionnez l’espace de travail Fabric où vous souhaitez créer le flux d’événements.

      2. Pour le flux d'événements, sélectionnez le bouton Crayon et entrez un nom pour le flux d'événements.

      3. La valeur du nom du flux est générée automatiquement pour vous en ajoutant -stream au nom de l’événementstream. Ce flux s’affiche sur la page Tous les flux de données du hub en temps réel une fois que l’Assistant est terminé.

  2. Sélectionnez Suivant en bas de la page Configurer .

Vérifier et se connecter

Dans l’écran Vérifier + se connecter , passez en revue le résumé, puis sélectionnez Ajouter (Eventstream) ou Se connecter (Real-Time hub).

Page Gestion des schémas

  1. Dans l’étape de gestion des schémas , choisissez l’une des options suivantes :

    • Événements prêts à l’analytique et schéma mis à jour automatiquement (préversion DeltaFlow) : le connecteur transforme les événements CDC bruts en flux prêts pour l’analytique qui reflètent la structure de votre table source. DeltaFlow enrichit les événements avec des métadonnées telles que le type de modification (insertion, mise à jour ou suppression) et les horodatages, et gère automatiquement les tables de destination et l’évolution du schéma.
    • Événements CDC bruts : le connecteur ingère et rend disponibles les événements CDC bruts. Si vous le souhaitez, le connecteur peut découvrir automatiquement des schémas de table et les inscrire dans le registre de schémas. Utilisez cette option lorsque vous souhaitez connaître le schéma sans transformation DeltaFlow.

    Remarque

    La capture d’écran suivante montre Azure SQL Database capture de données modifiées. Les options de gestion des schémas sont identiques pour tous les connecteurs sources CDC pris en charge.

    Capture d’écran montrant l’étape de gestion de schéma avec les options d’événement DeltaFlow et CDC brutes d’un connecteur source CDC.

  2. Activer l’association de schéma d’événement.

  3. Pour Workspace, sélectionnez un espace de travail Fabric pour le jeu de schémas.

  4. Pour le jeu de schémas, + Créer est sélectionné par défaut, ce qui crée un jeu de schémas. Vous pouvez le modifier pour sélectionner un jeu de schémas d’événements existant.

  5. Si vous avez sélectionné l’option + Créer à l’étape précédente, entrez un nom pour le jeu de schémas.

  6. Dans la page Vérifier + se connecter , passez en revue le résumé, puis sélectionnez Ajouter (Eventstream) ou Se connecter (Real-Time hub).

    Capture d’écran montrant la page Vérifier et créer pour le connecteur de base de données PostgreSQL avec des fonctionnalités étendues.

    Pour toutes les tables ou tables sélectionnées dans la base de données PostgreSQL, le connecteur effectue une découverte automatique et crée des schémas et les inscrit auprès du registre de schémas.

DeltaFlow : transformation d'événements prête pour l'analyse (aperçu)

Lorsque vous activez les événements prêts pour l’analytique et le schéma mis à jour automatiquement (DeltaFlow), le connecteur fournit les fonctionnalités suivantes :

  • Forme d’événement prête pour l’analytique : les événements bruts de capture de données modifiées de Debezium sont transformés en un format tabulaire qui reflète la structure de la table source. Les événements sont enrichis avec des colonnes de métadonnées, notamment le type de modification (insert, updateou delete) et l’horodatage des événements.
  • Gestion automatique des tables de destination : lorsque vous routez des flux deltaFlow vers une destination prise en charge comme un eventhouse, les tables de destination sont automatiquement créées pour correspondre au schéma de table source. Vous n’avez pas besoin de créer ou de configurer manuellement des tables de destination.
  • Gestion de l’évolution du schéma : lorsque les tables de base de données sources changent (par exemple, les nouvelles colonnes sont ajoutées ou des tables sont créées), DeltaFlow détecte automatiquement les modifications, met à jour les schémas inscrits et ajuste les tables de destination en conséquence. Ce comportement réduit l’intervention manuelle causée par les modifications de schéma.

Remarque

DeltaFlow (préversion) est actuellement supporté par Azure SQL Database CDC, Azure SQL Managed Instance CDC, SQL Server sur machine virtuelle CDC, et PostgreSQL CDC.

Pour plus d’informations sur la façon dont DeltaFlow transforme les événements CDC bruts en sortie prête pour l’analytique, y compris les types d’opérations et les colonnes de métadonnées, consultez la transformation de sortie DeltaFlow.

Afficher l’Eventstream mis à jour

  1. Vous pouvez voir la source CDC de la base de données PostgreSQL ajoutée à votre Eventstream en mode Édition.

    Capture d’écran de la source cdc de la base de données PostgreSQL en continu en mode Édition avec des fonctionnalités étendues.

  2. Pour implémenter cette source CDC nouvellement ajoutée pour la BD PostgreSQL, sélectionnez Publier. Une fois ces étapes terminées, votre source CDC de la base de données PostgreSQL est disponible pour la visualisation dans l’affichage en direct.

    Une capture d'écran de la source CDC de la base de données PostgreSQL en streaming en vue Live avec des fonctionnalités étendues.

Configurer des destinations Eventstream pour utiliser des schémas

Actuellement, seules les destinations de type Eventhouse, de point de terminaison personnalisé et de flux dérivés sont prises en charge pour les flux d'événements avec des schémas associés. Cette section vous montre comment ajouter et configurer une destination Eventhouse lorsque des fonctionnalités étendues (comme la prise en charge du schéma) sont activées pour le flux d’événements.

Remarque

Lorsque vous utilisez DeltaFlow (préversion) avec une source de capture de données modifiées (CDC) prise en charge, les tables de destination dans Eventhouse sont créées et gérées automatiquement pour correspondre à la structure de la table source. Vous n’avez pas besoin de configurer manuellement le schéma de table de destination. DeltaFlow gère également l’évolution du schéma automatiquement lorsque les tables sources changent.

Configurer un schéma pour une destination de point de terminaison personnalisée

  1. Sélectionnez Transformer des événements ou ajouter une destination, puis sélectionnez CustomEndpoint.

  2. Dans le volet Point de terminaison personnalisé , spécifiez un nom pour la destination.

  3. Pour le schéma d’entrée, sélectionnez le schéma pour les événements. Vous effectuez une sélection dans cette zone lorsque vous activez la prise en charge du schéma pour un flux d’événements.

Capture d’écran montrant le volet de configuration d’un point de terminaison personnalisé.

Pour obtenir des instructions détaillées sur la configuration d’une destination de point de terminaison personnalisée, consultez Ajouter un point de terminaison personnalisé ou une destination d’application personnalisée à un flux d’événements.

Configurer des schémas pour une destination Eventhouse

  1. Sélectionnez Transformer des événements ou ajouter une destination, puis sélectionnez Eventhouse.

  2. Dans le volet Eventhouse , configurez les paramètres liés au schéma suivants :

    1. Pour le schéma d’entrée, sélectionnez un ou plusieurs schémas dans la liste déroulante.

      Capture d’écran montrant le volet de configuration eventhouse avec un schéma d’entrée sélectionné.

      Remarque

      Si vous avez sélectionné le schéma dynamique via l’option d’en-têtes lors de la configuration d’une source Event Hubs, vous avez peut-être configuré plusieurs schémas pour la source et les mappés à différentes propriétés et à leurs valeurs.

    2. Pour la méthode de création de table, sélectionnez Une table unique avec tous les schémas combinés ou tables distinctes pour chaque schéma, en fonction de vos besoins.

      Capture d’écran montrant le volet de configuration eventhouse avec les méthodes de création de table.

    3. Pour écrire des données avec, sélectionnez l’une des options suivantes :

      • Charge utile uniquement : écrivez les données de charge utile extraites dans la table. S’il existe plusieurs schémas d’entrée, les données sont envoyées à plusieurs tables.
      • Métadonnées et charge utile : écrivez des métadonnées et des données de charge utile dans une table unique. Les exemples de colonnes incluent source , subject, typeet data.

      Capture d’écran montrant le volet de configuration eventhouse avec les options d’écriture de données.

Pour obtenir des instructions détaillées sur la configuration d’une destination eventhouse, consultez Ajouter une destination eventhouse à un flux d’événements.

Afficher la sortie prête pour l'analyse DeltaFlow (aperçu)

Si vous avez activé les événements prêts pour l’analytique et le schéma mis à jour automatiquement (DeltaFlow), les tables de destination sont automatiquement créées dans une forme qui reflète vos tables de base de données source. Chaque table inclut les colonnes d’origine ainsi que les colonnes de métadonnées pour le type de modification et l’horodatage.

Remarque

La capture d’écran suivante montre Azure SQL Database capture de données modifiées. La sortie de la table de destination DeltaFlow est la même pour tous les connecteurs de source CDC pris en charge.

Capture d’écran montrant les tables de destination Eventhouse créées par DeltaFlow en forme prête pour l’analytique.

Vous pouvez interroger ces tables à l’aide du langage de requête Kusto (KQL) ou d’autres outils d’analyse, sans avoir à analyser les charges utiles brutes de CDC (capture de données modifiées) Debezium.

Autres connecteurs :