Zelfstudie: Transacties coördineren tussen tabellen

Belangrijk

Transacties die naar beheerde Delta-tabellen van Unity Catalog schrijven, bevinden zich in openbare preview.

Transacties die naar beheerde Iceberg-tabellen in Unity Catalog schrijven, bevinden zich in private preview. Als u wilt deelnemen aan deze preview, dient u het voorbeeldformulier voor beheerde Iceberg-tabellen in.

In deze zelfstudie gebruikt u beide transactiemodi om updates in meerdere statements en tabellen op Azure Databricks te coördineren: niet-interactief (BEGIN ATOMIC), waarbij wijzigingen automatisch worden vastgelegd, en interactief (BEGIN TRANSACTION), waarbij u expliciete controle hebt. Deze zelfstudie toont ook hoe u transacties kunt gebruiken met opgeslagen procedures en SQL Scripting.

Requirements

  • Omgeving: Toegang tot een Azure Databricks werkruimte.
  • Compute: Ondersteunde rekentypen variëren per transactiemodus:
    • Een klassiek of serverloos SQL Warehouse ondersteunt beide transactiemodi.
    • Serverloze compute ondersteunt alleen niet-interactieve transacties.
    • Klassieke clusters met Databricks Runtime 18.0 of hoger ondersteunen alleen niet-interactieve transacties.
  • Bevoegdheden: CREATE TABLE in een Unity Catalog-schema .

Voorbeeldtabellen instellen

Alle tabellen waarnaar in transacties met meerdere statements en meerdere tabellen is geschreven, moeten:

Maak twee voorbeeldtabellen in de SQL-editor of een notebook:

-- Account data
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- Transaction records
CREATE TABLE IF NOT EXISTS sample_transactions (
  id INT,
  account_id INT,
  transaction_type STRING,
  amount DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

Note

Als u transacties in een bestaande tabel wilt inschakelen, voert u het volgende uit:

ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

Voeg voorbeeldgegevens in beide tabellen in:

INSERT INTO sample_accounts VALUES
  (1, 'Alice', 1000.00),
  (2, 'Bob', 500.00);

INSERT INTO sample_transactions VALUES
  (1, 1, 'deposit', 100.00);

Controleer de installatie:

SELECT * FROM sample_accounts;
SELECT * FROM sample_transactions;

Uitvoer:

sample_accounts:
id	account_name	balance
1	  Alice	        1000.00
2	  Bob	          500.00

sample_transactions:
id	account_id	transaction_type	amount
1	           1	         deposit	100.00

Niet-interactieve transacties

Niet-interactieve transacties gebruiken BEGIN ATOMIC ... END; syntaxis. Alle instructies worden uitgevoerd als één atomische eenheid. Als elke instructie slaagt, committeert Azure Databricks automatisch. Als er een instructie mislukt, draait Azure Databricks alle wijzigingen automatisch terug. Zie niet-interactieve transacties voor gedetailleerde syntaxis en gebruikspatronen.

Een geslaagde transactie uitvoeren

Werk beide tabellen atomisch bij:

BEGIN ATOMIC
  -- Update Alice's account balance
  UPDATE sample_accounts
  SET balance = balance + 100.00
  WHERE id = 1;

  -- Record the deposit transaction
  INSERT INTO sample_transactions
  VALUES (2, 1, 'deposit', 100.00);
END;

Controleer of het saldo van Alice nu 1100,00 is:

SELECT * FROM sample_accounts WHERE id = 1;

Controleer of er nu twee transactierecords bestaan:

SELECT * FROM sample_transactions;

Zowel de saldo-update als de transactierecord zijn samen gemaakt. Als een van beide instructies zou falen, dan zou er geen enkele wijziging zijn doorgevoerd, en zou Databricks de transactie zonder neveneffecten hebben beëindigd.

SIGNAL gebruiken om een transactie te laten mislukken op een voorwaarde

U kunt SIGNAL binnen een BEGIN ATOMIC ... END; blok gebruiken om de transactie te mislukken wanneer niet aan een door de gebruiker gedefinieerde voorwaarde wordt voldaan. In dit voorbeeld wordt een account met een negatief saldo toegevoegd, waarna SIGNAL wordt gebruikt om de transactie te laten mislukken als de saldocontrole faalt:

BEGIN ATOMIC
  INSERT INTO sample_accounts VALUES (3, 'Charlie', -50.00);

  IF (SELECT balance FROM sample_accounts WHERE id = 3) < 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Account balance cannot be negative';
  END IF;
END;

Er wordt een fout opgeworpen SIGNAL, waardoor de hele transactie automatisch wordt teruggedraaid. Dit levert nul rijen op omdat de invoeging is teruggedraaid:

SELECT * FROM sample_accounts WHERE id = 3;

Automatische terugdraaiactie bij fout bekijken

Voer een transactie uit waarbij de eerste instructie geldig is, maar de tweede verwijst naar een tabel die niet bestaat:

BEGIN ATOMIC
  -- Valid
  INSERT INTO sample_accounts VALUES (4, 'David', 300.00);
  -- Invalid
  INSERT INTO non_existent_table VALUES (1, 2, 3);
END;

De transactie mislukt met een fout. Dit retourneert 0 rijen omdat de hele transactie is teruggedraaid:

SELECT * FROM sample_accounts WHERE id = 4;

Hoewel de eerste INSERT instructie geldig was, werd deze teruggedraaid omdat de tweede instructie is mislukt. Dit demonstreert de alles-of-niets garantie van transacties.

Interactieve transacties

Interactieve transacties geven u expliciet controle over wanneer u wilt doorvoeren of terugdraaien. Gebruik BEGIN TRANSACTION om te beginnen, dan COMMIT om wijzigingen op te slaan of ROLLBACK om ze te negeren.

Wijzigingen doorvoeren

Een transactie starten:

BEGIN TRANSACTION;

Wijzigingen aanbrengen (nog niet doorgevoerd):

INSERT INTO sample_accounts VALUES (5, 'Eve', 850.00);
UPDATE sample_accounts SET balance = balance + 50.00 WHERE id = 2;

Voer door om wijzigingen definitief te maken.

COMMIT;

Controleer of het account van Eve nu zichtbaar is:

SELECT * FROM sample_accounts WHERE id = 5;

Controleer of het saldo van Bob nu 550.00is:

SELECT * FROM sample_accounts WHERE id = 2;

Wijzigingen terugdraaien

Een nieuwe transactie starten:

BEGIN TRANSACTION;

Breng een wijziging aan:

INSERT INTO sample_accounts VALUES (6, 'Frank', 600.00);

Controleer of de wijziging zichtbaar is in uw sessie (de rij is pas zichtbaar voor andere sessies als deze zijn doorgevoerd):

SELECT * FROM sample_accounts WHERE id = 6;

Terugdraaien om de wijziging te negeren:

ROLLBACK;

Dit retourneert nul rijen omdat de insert werd teruggedraaid:

SELECT * FROM sample_accounts WHERE id = 6;

Gebruiken met opgeslagen procedures en SQL-scripts

U kunt transacties combineren met opgeslagen procedures om herbruikbare transactielogica te maken. Dit patroon is handig voor complexe bewerkingen die u regelmatig uitvoert.

  1. Tabellen maken met ingeschakelde catalogusdoorvoeringen

    CREATE SCHEMA IF NOT EXISTS main.retail;
    
    CREATE TABLE IF NOT EXISTS main.retail.orders (
      order_id STRING,
      customer_id STRING,
      amount DECIMAL(18,2)
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    CREATE TABLE IF NOT EXISTS main.retail.orders_staging (
      order_id STRING,
      customer_id STRING,
      amount DECIMAL(18,2),
      batch_id STRING
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    CREATE TABLE IF NOT EXISTS main.retail.total_sales (
      customer_id STRING,
      total_amount DECIMAL(18,2)
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
  2. De opgeslagen procedure definiëren

    CREATE OR REPLACE PROCEDURE main.retail.apply_order(
        IN  p_order_id      STRING,
        IN  p_customer_id   STRING,
        IN  p_order_amount  DECIMAL(18,2)
    )
    LANGUAGE SQL
    SQL SECURITY INVOKER
    MODIFIES SQL DATA
    AS
    BEGIN
        -- Insert the order
        INSERT INTO main.retail.orders (order_id, customer_id, amount)
        VALUES (p_order_id, p_customer_id, p_order_amount);
    
        -- Update total sales per customer
        MERGE INTO main.retail.total_sales AS t
        USING (
            SELECT
              p_customer_id  AS customer_id,
              p_order_amount AS order_amount
        ) s
          ON t.customer_id = s.customer_id
        WHEN MATCHED THEN
          UPDATE SET t.total_amount = t.total_amount + s.order_amount
        WHEN NOT MATCHED THEN
          INSERT (customer_id, total_amount)
          VALUES (s.customer_id, s.order_amount);
    END;
    
    
  3. De transactie definiëren

    BEGIN ATOMIC
        -- Staging batch id for this transaction
        DECLARE new_order_id STRING DEFAULT uuid();
        DECLARE v_batch_id STRING DEFAULT uuid();
    
        -- 1) Stage incoming customer and order rows
        INSERT INTO main.retail.orders_staging (order_id, customer_id, amount, batch_id)
        VALUES (new_order_id, 'CUST_123', 249.99, v_batch_id);
    
        -- 2) Drive final writes from staging to production via stored procedure
        FOR o AS
          SELECT
            order_id,
            customer_id,
            amount
          FROM main.retail.orders_staging
          WHERE batch_id = v_batch_id
        DO
            CALL main.retail.apply_order(
              o.order_id,
              o.customer_id,
              o.amount
            );
        END FOR;
    
        -- 3) Clean up processed staging rows
        DELETE FROM main.retail.orders_staging
        WHERE batch_id = v_batch_id;
    END; -- 4) Commit the transaction
    
    

Als een deel van de transactie mislukt, worden alle wijzigingen automatisch teruggedraaid door Databricks.

Opschonen

Verwijder de voorbeeldtabellen:

DROP TABLE IF EXISTS sample_accounts;
DROP TABLE IF EXISTS sample_transactions;

DROP TABLE IF EXISTS main.retail.orders;
DROP TABLE IF EXISTS main.retail.orders_staging;
DROP TABLE IF EXISTS main.retail.total_sales;

Aanvullende informatiebronnen