Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Important
REPLACE-flöden WHERE för fristående strömningstabeller finns i Beta.
REPLACE WHERE-flöden beräknar om och skriver över en specifik delmängd av en fristående strömmande tabell utan att hela tabellens historik behöver bearbetas på nytt. De hanterar sent ankommande data, uppströms ombearbetning, schemautveckling och återfyllnad.
Med ett REPLACE-flöde WHERE definierar du ett predikat i måltabellen. Alla rader som matchar predikatet tas bort och ersätts genom omvärdering av källfrågan för samma predikatintervall. Rader som inte matchar predikatet lämnas orörda.
Requirements
REPLACE-flöden WHERE har följande krav:
- Din streamingtabell måste använda
PREVIEW-kanalen. Sechanneli Pipelinekonfigurationer. - Databricks rekommenderar Unity Catalog och serverlös beräkning. Inkrementell uppdatering stöds endast vid serverlös beräkning.
När du ska använda REPLACE-flöden WHERE
Använd REPLACE-flöden WHERE för följande scenarier:
- Inkrementell batchbearbetning utan strömmande semantik: Bearbeta nya rader i batchar utan att hantera strömningsbegrepp som vattenstämplar.
- Selektiv ombearbetning: Räkna bara om rader som matchar ett predikat och lämna alla andra rader orörda.
-
Scenarier utöver standardfunktioner för materialiserad vy:
- Måltabeller med längre lagringstid än källtabellen
- Förhindra omkomputation när en dimensionstabell ändras
- Schemautveckling utan att omberäkna hela historiken
Skapa ett REPLACE-flöde WHERE
Använd FLOW REPLACE WHERE satsen i samma rad som CREATE OR REFRESH STREAMING TABLE:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Under uppdateringen tas alla rader i måltabellen som matchar predikatet bort, källfrågan omberäknas för samma predikatintervall och de nya resultaten infogas. I det här exemplet tas alla rader från de senaste 7 dagarna bort från orders_enriched och omberäknas med hjälp av källfrågan.
Du behöver inte lägga till predikatet i källfrågan. Pipeline-motorn tillämpar den automatiskt när den läser från källan.
Note
BY NAME måste anges. Det säkerställer att kolumner matchas med namn i stället för position.
Fyll i historiska data
Kör DML-instruktioner direkt i måltabellen för att utföra återfyllnad:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Fullständigt uppdateringsbeteende
En fullständig uppdatering av ett REPLACE-flöde WHERE kör källfrågan igen med endast det aktuella predikatet. Rader som infogades av DML-uttryck utanför det aktuella predikatintervallet tas bort permanent.
Varning
En fullständig uppdatering rensar alla befintliga data och kör flödet igen med endast dess definierade predikat. Om en pipeline har körts i ett år med ett 7-dagars predikat resulterar en fullständig uppdatering i att tabellen endast innehåller data från de senaste 7 dagarna. Alla äldre rader tas bort permanent.
REFRESH STREAMING TABLE orders_enriched FULL;
Om du vill förhindra fullständiga uppdateringar i en tabell anger du tabellegenskapen pipelines.reset.allowed till false:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.reset.allowed = 'false')
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
...
Inkrementell uppdatering
REPLACE-flöden WHERE använder inkrementell uppdatering när det är möjligt och ombearbetar endast de källdata som har ändrats sedan den senaste uppdateringen i stället för att omberäkna hela ersättningsfönstret. Inkrementell uppdatering kräver serverlös beräkning.
När inkrementell uppdatering gäller
Allt följande måste vara sant:
- Pipelinen körs på serverlös beräkning.
- Frågeformen stöds. Se Incremental refresh för den operatoruppsättning som stöds.
- Predikatet refererar till baskolumner från en källtabell. Predikat på härledda värden, till exempel utdata från aggregat- eller fönsterfunktioner, kan inte skickas vidare till en källa, vilket förhindrar inkrementell uppdatering.
- Ingen extern DML har ändrat rader i det aktuella ersättningsfönstret. DML som ändrar rader utanför det aktuella fönstret påverkas inte.
- Det aktuella ersättningsfönstret innehåller inte rader som det tidigare predikatet exkluderade. Om du breddar predikatet så att det täcker ett intervall som inte tidigare bearbetats återgår den uppdateringen till fullständig omkomputation. Efterföljande uppdateringar är berättigade till inkrementell uppdatering igen.
- Predikatet är deterministiskt. Predikat med icke-deterministiska funktioner, till exempel
rand()inaktivera inkrementell uppdatering. Temporala funktioner somcurrent_date()är tillåtna.
Den första uppdateringen av ett flöde är alltid en fullständig beräkning. Om något villkor inte uppfylls, går uppdateringen tillbaka till fullständig omberäkning av det aktuella ersättningsfönstret.
Metodtips för inkrementell uppdatering
Följ dessa riktlinjer så att REPLACE-flöden WHERE fortfarande är berättigade till inkrementell uppdatering.
Använd en rörlig nedre gräns
Predikat med en rörlig lägre gräns är fortfarande berättigade till inkrementell uppdatering på obestämd tid.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
En flyttande övre gräns, till exempel date BETWEEN date_add(current_date(), -7) AND current_date(), kan flytta fönstret så att det inkluderar tidigare exkluderade rader, vilket utlöser en engångsåterställning till fullständig omkomputation.
Inkludera predikatkolumnen i GROUP BY
När du aggregerar ska du inkludera predikatkolumnen i GROUP BY så att motorn kan flytta ned predikatet under aggregationen.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Om predikatkolumnen saknas i GROUP BY kan predikatet inte flyttas ned under aggregeringen och källan skannas i sin helhet.
Inkludera predikatkolumnen i sammanslagningsnycklarna
Inkludera predikatkolumnen i kopplingsvillkoret så att motorn kan rensa alla anslutna källor.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Om en ansluten tabell inte exponerar predikatkolumnen genomsöks tabellen i sin helhet vid varje uppdatering.
Diagnostisera övergång till fullständig omberäkning
När en uppdatering faller tillbaka på en fullständig omberäkning rapporteras orsaken i händelsen planning_information för flödet. Se Övervaka händelseloggar för pipeline. I följande tabell visas de orsaker som rapporterats i händelsen:
| Reason | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
En extern DML ändrade rader i det aktuella ersättningsfönstret. |
REPLACE_WHERE_NOT_DETERMINISTIC |
Predikatet använder icke-deterministiska uttryck. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
Den tidigare uppdateringen använde ett icke-deterministiskt predikat. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
Predikatet kan inte flyttas ned till någon källa, det aktuella fönstret omfattar rader som inte har bearbetats av det föregående predikatet, eller så använder körningen en åsidosättning av predikatet. |
Exempel
I följande exempel visas vanliga REPLACE-flödesmönster WHERE .
Exempel 1: Behåll historiska aggregeringar från en källa med begränsad kvarhållning
Det här exemplet behåller dagliga aggregeringar utan tidsbegränsning, även efter att rådata har gallrats bort ur källtabellen (3 dagars lagringstid):
CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Exempel 2: Förhindra omberäkning när en dimensionstabell ändras
I det här exemplet förblir historiska faktarader oförändrade när dimensionsattribut ändras:
CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Om en användares region ändras omberäknas endast de senaste raderna. Historiska rader behåller regionvärdet när de skrevs.
Exempel 3: Lägg till ett nytt mått utan att omberäkna fullständig historik
Det här exemplet visar hur du utvecklar en tabelldefinition och återfyller endast ett målintervall:
Definiera den inledande tabellen:
CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Uppdatera frågan för att lägga till
uniq_users:CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Rader som är äldre än 7-dagarsfönstret innehåller
NULLföruniq_users.
Exempel 4: Iterera i ett litet fönster innan du fyller i hela historiken
Det här exemplet visar hur du validerar frågelogik i ett litet datafönster innan du bearbetar hela det historiska intervallet.
Börja med ett kort fönster för att validera mått och iterera affärslogik med lägre beräkningskostnader:
CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Ett kort fönster beräknar endast de senaste 7 dagarna på varje uppdatering, så ändra frågan så många gånger som behövs innan du genomför en fullständig historisk körning.
När frågan har slutförts använder du DML för att fylla i hela det historiska intervallet:
INSERT INTO revenue_attribution
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;