Tutoriel : Coordonner les transactions entre les tables

Important

Les transactions qui écrivent dans les tables Delta gérées par Unity Catalog sont en Aperçu public.

Les transactions qui écrivent dans les tables Iceberg gérées par le catalogue Unity sont en préversion privée. Pour rejoindre cette préversion, envoyez le formulaire d’inscription en préversion des tables Iceberg managées.

Dans ce tutoriel, vous utilisez les deux modes de transaction pour coordonner les mises à jour entre plusieurs instructions et tables sur Azure Databricks : non interactif (BEGIN ATOMIC), qui valide automatiquement et interactive (BEGIN TRANSACTION), ce qui vous donne un contrôle explicite. Le didacticiel montre également comment utiliser des transactions avec des procédures stockées et un script SQL.

Exigences

  • Environment : accès à un espace de travail Azure Databricks.
  • Calcul : Les types de calcul pris en charge varient selon le mode transactionnel :
    • Un entrepôt SQL classique ou serverless prend en charge les deux modes de transaction.
    • Le calcul serverless prend uniquement en charge les transactions non interactives.
    • Les clusters classiques exécutant Databricks Runtime 18.0 ou version ultérieure prennent uniquement en charge les transactions non interactives.
  • Privilèges : CREATE TABLE dans un schéma de Unity Catalog.

Configurer des exemples de tables

Toutes les tables écrites dans une transaction multi-instruction, multi-table doivent :

Créez deux exemples de tables dans l’Éditeur SQL ou un notebook :

-- Account data
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- Transaction records
CREATE TABLE IF NOT EXISTS sample_transactions (
  id INT,
  account_id INT,
  transaction_type STRING,
  amount DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

Note

Pour activer les transactions sur une table existante, exécutez :

ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

Insérez des exemples de données dans les deux tables :

INSERT INTO sample_accounts VALUES
  (1, 'Alice', 1000.00),
  (2, 'Bob', 500.00);

INSERT INTO sample_transactions VALUES
  (1, 1, 'deposit', 100.00);

Vérifiez la configuration :

SELECT * FROM sample_accounts;
SELECT * FROM sample_transactions;

Output:

sample_accounts:
id	account_name	balance
1	  Alice	        1000.00
2	  Bob	          500.00

sample_transactions:
id	account_id	transaction_type	amount
1	           1	         deposit	100.00

Transactions non interactives

Les transactions non interactives utilisent la syntaxe BEGIN ATOMIC ... END;. Toutes les instructions s’exécutent en tant qu’unité atomique unique. Si chaque instruction réussit, Azure Databricks commite automatiquement. Si une instruction échoue, Azure Databricks annule automatiquement toutes les modifications. Pour obtenir des modèles détaillés de syntaxe et d’utilisation, consultez les transactions non interactives.

Exécuter une transaction réussie

Mettez à jour les deux tables de manière atomique :

BEGIN ATOMIC
  -- Update Alice's account balance
  UPDATE sample_accounts
  SET balance = balance + 100.00
  WHERE id = 1;

  -- Record the deposit transaction
  INSERT INTO sample_transactions
  VALUES (2, 1, 'deposit', 100.00);
END;

Vérifiez que le solde d’Alice est maintenant 1100.00 :

SELECT * FROM sample_accounts WHERE id = 1;

Vérifiez que deux enregistrements de transaction existent maintenant :

SELECT * FROM sample_transactions;

La mise à jour du solde et le journal de transaction ont été créés ensemble. Si l’une ou l’autre instruction a échoué, aucune modification n’aurait été validée et Databricks aurait arrêté la transaction sans effets secondaires.

Utiliser SIGNAL pour échouer une transaction sur une condition

Vous pouvez utiliser SIGNAL à l’intérieur d’un BEGIN ATOMIC ... END; bloc pour échouer la transaction lorsqu’une condition définie par l’utilisateur n’est pas remplie. Cet exemple insère un compte avec un solde négatif, puis utilise SIGNAL pour échouer la transaction si la vérification du solde échoue :

BEGIN ATOMIC
  INSERT INTO sample_accounts VALUES (3, 'Charlie', -50.00);

  IF (SELECT balance FROM sample_accounts WHERE id = 3) < 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Account balance cannot be negative';
  END IF;
END;

Le SIGNAL déclenche une erreur, ce qui entraîne le retour en arrière automatique de l’intégralité de la transaction. Cette requête ne renvoie aucune ligne, car l’insertion a été annulée :

SELECT * FROM sample_accounts WHERE id = 3;

Afficher la restauration automatique en cas d’échec

Exécutez une transaction où la première instruction est valide, mais la deuxième fait référence à une table qui n’existe pas :

BEGIN ATOMIC
  -- Valid
  INSERT INTO sample_accounts VALUES (4, 'David', 300.00);
  -- Invalid
  INSERT INTO non_existent_table VALUES (1, 2, 3);
END;

La transaction échoue avec une erreur. Cela renvoie 0 ligne, car la transaction entière a été annulée :

SELECT * FROM sample_accounts WHERE id = 4;

Même si la première INSERT instruction était valide, elle a été restaurée parce que la deuxième instruction a échoué. Cela illustre la garantie tout ou rien des transactions.

Transactions interactives

Les transactions interactives vous donnent un contrôle explicite sur le moment où confirmer ou annuler. Utilisez BEGIN TRANSACTION pour démarrer, puis COMMIT pour enregistrer les modifications ou ROLLBACK pour les ignorer.

Valider les modifications

Démarrez une transaction :

BEGIN TRANSACTION;

Apportez des modifications (pas encore validées) :

INSERT INTO sample_accounts VALUES (5, 'Eve', 850.00);
UPDATE sample_accounts SET balance = balance + 50.00 WHERE id = 2;

Confirmez des modifications permanentes :

COMMIT;

Vérifiez que le compte d’Eve est désormais visible :

SELECT * FROM sample_accounts WHERE id = 5;

Vérifiez que le solde de Bob est maintenant 550.00:

SELECT * FROM sample_accounts WHERE id = 2;

Restaurer les modifications

Démarrez une nouvelle transaction :

BEGIN TRANSACTION;

Apportez une modification :

INSERT INTO sample_accounts VALUES (6, 'Frank', 600.00);

Vérifiez que la modification est visible dans votre session (la ligne n’est pas visible par d’autres sessions tant qu’elle n’est pas validée) :

SELECT * FROM sample_accounts WHERE id = 6;

Annulez pour supprimer la modification :

ROLLBACK;

Cela ne renvoie aucune ligne, car l’insertion a été annulée :

SELECT * FROM sample_accounts WHERE id = 6;

Utiliser avec des procédures stockées et un script SQL

Vous pouvez combiner des transactions avec des procédures stockées pour créer une logique de transaction réutilisable. Ce modèle est utile pour les opérations complexes que vous exécutez fréquemment.

  1. Créer les tables avec des validations de catalogue activées

    CREATE SCHEMA IF NOT EXISTS main.retail;
    
    CREATE TABLE IF NOT EXISTS main.retail.orders (
      order_id STRING,
      customer_id STRING,
      amount DECIMAL(18,2)
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    CREATE TABLE IF NOT EXISTS main.retail.orders_staging (
      order_id STRING,
      customer_id STRING,
      amount DECIMAL(18,2),
      batch_id STRING
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    CREATE TABLE IF NOT EXISTS main.retail.total_sales (
      customer_id STRING,
      total_amount DECIMAL(18,2)
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
  2. Définir une procédure stockée

    CREATE OR REPLACE PROCEDURE main.retail.apply_order(
        IN  p_order_id      STRING,
        IN  p_customer_id   STRING,
        IN  p_order_amount  DECIMAL(18,2)
    )
    LANGUAGE SQL
    SQL SECURITY INVOKER
    MODIFIES SQL DATA
    AS
    BEGIN
        -- Insert the order
        INSERT INTO main.retail.orders (order_id, customer_id, amount)
        VALUES (p_order_id, p_customer_id, p_order_amount);
    
        -- Update total sales per customer
        MERGE INTO main.retail.total_sales AS t
        USING (
            SELECT
              p_customer_id  AS customer_id,
              p_order_amount AS order_amount
        ) s
          ON t.customer_id = s.customer_id
        WHEN MATCHED THEN
          UPDATE SET t.total_amount = t.total_amount + s.order_amount
        WHEN NOT MATCHED THEN
          INSERT (customer_id, total_amount)
          VALUES (s.customer_id, s.order_amount);
    END;
    
    
  3. Définir la transaction

    BEGIN ATOMIC
        -- Staging batch id for this transaction
        DECLARE new_order_id STRING DEFAULT uuid();
        DECLARE v_batch_id STRING DEFAULT uuid();
    
        -- 1) Stage incoming customer and order rows
        INSERT INTO main.retail.orders_staging (order_id, customer_id, amount, batch_id)
        VALUES (new_order_id, 'CUST_123', 249.99, v_batch_id);
    
        -- 2) Drive final writes from staging to production via stored procedure
        FOR o AS
          SELECT
            order_id,
            customer_id,
            amount
          FROM main.retail.orders_staging
          WHERE batch_id = v_batch_id
        DO
            CALL main.retail.apply_order(
              o.order_id,
              o.customer_id,
              o.amount
            );
        END FOR;
    
        -- 3) Clean up processed staging rows
        DELETE FROM main.retail.orders_staging
        WHERE batch_id = v_batch_id;
    END; -- 4) Commit the transaction
    
    

Si une partie de la transaction échoue, Databricks restaure automatiquement toutes les modifications.

Nettoyer

Supprimez les exemples de tables :

DROP TABLE IF EXISTS sample_accounts;
DROP TABLE IF EXISTS sample_transactions;

DROP TABLE IF EXISTS main.retail.orders;
DROP TABLE IF EXISTS main.retail.orders_staging;
DROP TABLE IF EXISTS main.retail.total_sales;

Ressources supplémentaires