Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Quando si gestiscono grandi quantità di dati, è necessaria una pipeline in grado di elaborare solo i record nuovi e modificati anziché rielaborare l'intero set di dati. Questa operazione è denominata ETL incrementale. In Databricks SQL è possibile creare pipeline ETL incrementali usando tabelle di streaming e viste materializzate, senza scrivere codice procedurale o pianificare aggiornamenti manuali.
Questa esercitazione illustra un modello comune: tenere traccia delle modifiche apportate al prodotto nel tempo. Si crea una tabella di origine, si acquisiscono gli eventi di modifica, si crea una tabella delle dimensioni che mantiene la cronologia completa di ogni prodotto e si aggiunge un livello di report aggregato sopra.
La funzionalità chiave di questa esercitazione è AUTO CDC. In un warehouse tradizionale si scriverebbero istruzioni complesse MERGE INTO per riconciliare gli eventi di inserimento, aggiornamento ed eliminazione in una tabella di destinazione. Questo approccio è soggetto a errori, soprattutto quando gli eventi arrivano fuori ordine.
AUTO CDC gestisce questo per te. Si definisce la chiave business, la colonna di ordinamento e se si vuole impostare il SCD Type 1 (solo valore più recente) o il SCD Type 2 (cronologia completa) e Azure Databricks applica automaticamente la logica di unione corretta. Per una panoramica di CDC, vedere Le API AUTO CDC: semplificare il Change Data Capture con le pipeline.
Al termine di questa esercitazione, si avrà:
- È stata creata una tabella di origine che tiene traccia delle modifiche con il feed di dati delle modifiche.
- Esaminare i dati delle modifiche non elaborati per comprendere il flusso di eventi CDC.
- Consente di
AUTO CDCcompilare una tabella delle dimensioni scD Di tipo 2 da tali eventi. - Eventi di eliminazione elaborati in modo incrementale tramite la pipeline.
- È stata creata una vista materializzata che mantiene in modo incrementale un report aggregato.
- Configurata
SCHEDULE REFRESH EVERY 1 DAYin modo che le modifiche vengano propagate automaticamente tramite la pipeline.
Requisiti
Per completare questa esercitazione, è necessario soddisfare i requisiti seguenti:
- Un'area di lavoro Azure Databricks con Unity Catalog abilitata.
- Sql Warehouse (serverless o pro).
- Disporre dell'autorizzazione per creare una risorsa di calcolo o accedere a una risorsa di calcolo.
- Calcolo serverless abilitato per il tuo account. Consulta Funzionalità con disponibilità regionale limitata.
Passaggio 1: Configurare il catalogo e lo schema
Aprire l'editor SQL di Databricks e impostare il catalogo e lo schema di lavoro. È necessario disporre dell'autorizzazione per USE il catalogo e lo schema selezionati:
USE CATALOG <your-catalog>;
USE SCHEMA <your-schema>;
Passaggio 2: Creare una tabella di origine e caricare i dati
Creare una tabella products con Usare il feed di dati delle modifiche delta Lake in Azure Databricks (CDF) abilitato. CDF è una funzionalità Delta Lake che registra ogni inserimento, aggiornamento ed eliminazione come log delle modifiche su cui è possibile eseguire query. È simile a un flusso CDC da un sistema di origine transazionale, ad eccezione delle modifiche acquisite direttamente all'interno della tabella Delta anziché da un log esterno. In questo caso si usa CDF per generare gli eventi di modifica che verranno usati dalla pipeline downstream.
Creare la tabella e caricare i record iniziali:
CREATE OR REPLACE TABLE products ( product_id INT, product_name STRING, category STRING, warehouse STRING ) TBLPROPERTIES (delta.enableChangeDataFeed = true); INSERT INTO products VALUES (1, 'Spoon', 'Cutlery', 'Seattle'), (2, 'Fork', 'Cutlery', 'Portland'), (3, 'Knife', 'Cutlery', 'Denver'), (4, 'Chair', 'Furniture', 'Austin'), (5, 'Table', 'Furniture', 'Chicago'), (6, 'Lamp', 'Lighting', 'Boston'), (7, 'Mug', 'Kitchenware', 'Seattle'), (8, 'Plate', 'Kitchenware', 'Atlanta'), (9, 'Bowl', 'Kitchenware', 'Dallas'), (10, 'Glass', 'Kitchenware', 'Phoenix');Simulare le modifiche upstream, inclusi i nuovi prodotti, lo spostamento del magazzino e la riassegnazione di una categoria:
INSERT INTO products VALUES (11, 'Napkin', 'Dining', 'San Francisco'), (12, 'Coaster', 'Dining', 'New York'); UPDATE products SET warehouse = 'Los Angeles' WHERE product_id = 1; UPDATE products SET category = 'Dining' WHERE product_id = 2;
Passaggio 3: Eseguire una query sul feed di dati delle modifiche
Prima di costruire la pipeline downstream, è utile esaminare gli eventi di modifica non elaborati per comprendere meglio cosa AUTO CDC verrà elaborato. La table_changes() funzione legge il log CDF e restituisce ogni operazione acquisita insieme alle colonne di metadati:
SELECT
product_id, product_name, warehouse,
_change_type, _commit_version
FROM table_changes('products', 1)
ORDER BY _commit_version, product_id;
Ad esempio, il Cucchiaio ha tre eventi: un insert (Seattle), un update_preimage (Seattle) e un update_postimage (Los Angeles).
Si noti che una singola modifica logica (ad esempio, lo spostamento dello Spoon in un magazzino diverso) produce più eventi: una preimmagine e una postimmagine. In un warehouse tradizionale si scrive un'istruzione MERGE per riconciliare tutti questi eventi in una tabella di destinazione, gestendo inserimenti, aggiornamenti ed eliminazioni con logica separata e assicurandosi che gli eventi vengano applicati nell'ordine corretto. Questa è esattamente la complessità che AUTO CDC elimina nel passaggio successivo.
Passaggio 4: Creare una dimensione scD di tipo 2 con AUTO CDC
Importante
AUTO CDC è in beta. Richiede Databricks Runtime 17.3 o versione successiva.
Una tabella di streaming elabora i dati in modo incrementale. In ogni aggiornamento legge solo le nuove righe dall'ultima esecuzione, quindi non è necessario rielaborare il set di dati completo. In questo modo è particolarmente adatto per fonti ad alto volume o a modifica frequente.
AUTO CDC aggiunge l'elaborazione di Change Data Capture sopra una tabella di streaming. Anziché scrivere un'istruzione MERGE INTO che gestisce manualmente inserimenti, aggiornamenti ed eliminazioni, dichiarare la chiave business e la colonna di sequenziazione e consentire Azure Databricks applicare la logica corretta.
AUTO CDC gestisce automaticamente gli eventi non ordinati, un problema comune quando si usa MERGE INTO per gestire gli eventi in arrivo da sistemi distribuiti o carichi batch con timestamp sovrapposti.
L'istruzione seguente crea una tabella SCD Type 2 che mantiene la cronologia completa delle versioni di ogni prodotto. Ogni versione riceve __START_AT e __END_AT timestamp. Un NULL in __END_AT indica la versione corrente.
CREATE OR REFRESH STREAMING TABLE products_history
SCHEDULE REFRESH EVERY 1 DAY
FLOW AUTO CDC
FROM STREAM products WITH (readChangeFeed = true)
KEYS (product_id)
APPLY AS DELETE WHEN _change_type = 'delete'
SEQUENCE BY _commit_timestamp
COLUMNS * EXCEPT (_change_type, _commit_version, _commit_timestamp)
STORED AS SCD TYPE 2;
-
SCHEDULE REFRESH EVERY 1 DAY: aggiorna la tabella in base a una pianificazione giornaliera. -
FLOW AUTO CDC: dichiara che si tratta di un flusso CDC. Azure Databricks applica automaticamente la semantica di inserimento, aggiornamento ed eliminazione. -
KEYS (product_id): chiave aziendale. Gli eventi con la stessa chiave vengono uniti in righe versionate. -
APPLY AS DELETE WHEN _change_type = 'delete': chiude la versione corrente all'arrivo di un evento di eliminazione. In questo modo è possibile definire la condizione che identifica un evento di eliminazione. -
SEQUENCE BY _commit_timestamp: stabilisce l'ordinamento degli eventi. Gestisce correttamente gli arrivi non in ordine. -
STORED AS SCD TYPE 2: mantiene la cronologia completa.AUTO CDCsupporta sia il tipo scD 1 che il tipo 2.
Eseguire una query sulla tabella delle dimensioni:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
- Cucchiaio: due versioni. Seattle (chiuso,
__END_ATimpostato) e Los Angeles (attuale,__END_AT = NULL). - Fork: due versioni. Categoria delle posate (chiusa) e categoria pranzo (attuale).
- Napkin e Coaster: una versione ciascuna (appena inserita,
__END_AT = NULL). - Tutti gli altri prodotti: una versione ogni (
__END_AT = NULL).
Passaggio 5: Elaborazione delle eliminazioni tramite la pipeline
Simulare ora due prodotti sospesi eliminandoli dalla tabella di origine:
DELETE FROM products WHERE product_id = 9;
DELETE FROM products WHERE product_id = 10;
Questi eventi di eliminazione vengono registrati nel log CDF, ma la tabella di streaming non li ha ancora visti. Aggiornare la tabella di streaming per elaborare i nuovi eventi:
REFRESH STREAMING TABLE products_history;
Eseguire una query sulla tabella delle dimensioni per verificare che siano state applicate le eliminazioni:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
Ciotola e Vetro sono ora chiusi con __END_AT set, contrassegnandoli come sospesi. Tutti gli altri prodotti correnti rimangono invariati. La tabella di streaming ha elaborato solo i nuovi eventi di eliminazione senza rielaborare gli inserimenti e gli aggiornamenti dall'aggiornamento precedente.
Passaggio 6: Creare una vista materializzata aggregata
Ora che è presente una tabella delle dimensioni che rimane aggiornata con le modifiche all'origine, è possibile aggiungere un livello di report in alto.
Una vista materializzata archivia i risultati delle query pre-calcolati come tabella fisica. A differenza di una visualizzazione regolare, che esegue nuovamente la query ogni volta che si legge da essa, una vista materializzata rende persistenti i risultati e ricompila solo le righe interessate dalle modifiche upstream in ogni aggiornamento. In questo modo è particolarmente adatto per dashboard e report in cui le prestazioni delle query sono importanti.
CREATE OR REPLACE MATERIALIZED VIEW products_by_category
SCHEDULE REFRESH EVERY 1 DAY
AS
SELECT
category,
COUNT(*) AS active_products
FROM products_history
WHERE __END_AT IS NULL
GROUP BY category;
SCHEDULE REFRESH EVERY 1 DAY indica che questa visualizzazione viene aggiornata in base a una pianificazione giornaliera. In combinazione con la stessa pianificazione nella tabella di streaming, ora hai una pipeline a tre fasi in cui le modifiche alla tabella di origine si propagano lungo la dimensione e nell'aggregato in ogni ciclo di aggiornamento. Non è disponibile alcun aggiornamento manuale da eseguire.
SELECT * FROM products_by_category ORDER BY active_products DESC;
Passaggio 7: Verificare la cascata da capo a capo
Per verificare la propagazione completa della pipeline, apportare una modifica alla tabella di origine:
UPDATE products SET warehouse = 'Seattle' WHERE product_id = 3;
Il coltello si sposta da Denver a Seattle. Questa singola modifica DML attiva la propagazione completa della pipeline, dimostrando come interagiscono le tre fasi:
-
productsregistra l'evento di modifica tramite CDF. -
products_historyelabora l'evento e aggiunge una nuova versione del Knife. -
products_by_categoryricompila solo la riga interessata Distolleria.
Verificare:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
WHERE product_id = 3
ORDER BY __START_AT;
SELECT * FROM products_by_category ORDER BY active_products DESC;
Pulizia
Per pulire le risorse create da questa esercitazione, usare il codice SQL seguente:
DROP MATERIALIZED VIEW IF EXISTS products_by_category;
DROP STREAMING TABLE IF EXISTS products_history;
DROP TABLE IF EXISTS products;