Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Important
REPLACE-flöden WHERE finns i Beta.
Den här sidan beskriver hur du använder REPLACE WHERE-flöden i Lakeflow Spark Declarative Pipelines för att beräkna om och skriva över en angiven delmängd av en tabell utan att behöva bearbeta om tabellens hela historik. REPLACE-flöden WHERE hanterar sent ankommande data, uppströms ombearbetning, schemautveckling och återfyllnad.
Med ett REPLACE-flöde WHERE definierar du ett predikat i måltabellen. Alla rader som matchar predikatet tas bort och ersätts genom omvärdering av källfrågan för samma predikatintervall. Rader som inte matchar predikatet lämnas orörda.
Requirements
REPLACE-flöden WHERE har följande krav:
- Din pipeline måste använda
PREVIEWkanalen. - Databricks rekommenderar Unity Catalog och serverlös beräkning. Inkrementell uppdatering stöds endast vid serverlös beräkning.
När du ska använda REPLACE-flöden WHERE
Använd REPLACE-flöden WHERE för följande scenarier:
- Inkrementell batchbearbetning utan strömmande semantik: Bearbeta nya rader i batchar utan att hantera strömningsbegrepp som vattenstämplar.
- Selektiv ombearbetning: Räkna bara om rader som matchar ett predikat och lämna alla andra rader orörda.
-
Scenarier utöver standardfunktioner för materialiserad vy:
- Måltabeller med längre lagringstid än källtabellen
- Förhindra omkomputation när en dimensionstabell ändras
- Schemautveckling utan att omberäkna hela historiken
Skapa ett REPLACE-flöde WHERE
Definiera REPLACE WHERE-flöden i SQL eller Python.
SQL
Använd FLOW REPLACE WHERE satsen i samma rad som CREATE STREAMING TABLE:
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Du kan också använda långformssyntaxen CREATE FLOW :
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Python
I Python definieras tabellen och flödet i en enda instruktion. Flödet ärver samma namn som tabellen:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
Parametern replace_where accepterar antingen ett PySpark-kolumnuttryck eller ett strängpredikat.
I de här exemplen tas alla rader från de senaste 7 dagarna bort från orders_enriched och omberäknas med hjälp av källfrågan. Du behöver inte lägga till predikatet i källfrågan. Pipeline-motorn tillämpar den automatiskt när den läser från källan.
Note
BY NAME krävs i SQL. Den matchar kolumner efter namn i stället för position.
Fyll i historiska data
Om du vill skriva historiska eller korrigerade rader i måltabellen utanför schemalagda uppdateringar väljer du mellan två mekanismer baserat på var historiska data finns:
- Åsidosättning av predikat: Kör flödets källfråga igen för ett engångspredikatintervall. Använd när historiska data kommer från samma källa som inkrementella data.
- DML-instruktioner: Infoga direkt i måltabellen och kringgå flödet. Använd när historiska data finns i en annan källa än inkrementella data.
Åsidosättande av predikat
Åsidosätt REPLACE-predikatet WHERE för en enskild pipelineuppdatering utan att ändra pipelinedefinitionen. Åsidosättningar av predikat är engångsåtgärder, gäller bara den aktuella uppdateringen och påverkar inte framtida körningar.
Exempel: Inledande historisk belastning
Så här utför du en engångsåterfyllning av historiska data när du först konfigurerar en pipeline:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
Exempel: Korrigera en kolumn för en viss period
När du har uppdaterat en kolumndefinition fyller du på ändringen för ett målhistorikintervall:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
Kombinera flera dimensioner i en enda åsidosättning av predikat:
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
}
]
Hjälpfunktion: start_update_with_replace_where
Använd API:et för pipelineuppdatering från en notebook-fil för att skicka åsidosättningar för predikat:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
DML-instruktioner
Kör DML-instruktioner direkt i måltabellen utanför pipelinen för att utföra inledande inläsningar eller korrigeringar, till exempel inläsning från en äldre tabell:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Rader som infogas via DML omfattas inte av REPLACE-predikatet WHERE och bevaras över schemalagda uppdateringar om de inte faller inom predikatintervallet för en framtida körning.
Fullständigt uppdateringsbeteende
En fullständig uppdatering av ett REPLACE-flöde WHERE kör källfrågan igen med endast det aktuella predikatet. Rader som infogades genom åsidosättanden av predikat eller DML-satser utanför det aktuella predikatområdet tas bort permanent.
Varning
En fullständig uppdatering rensar alla befintliga data och kör flödet igen med endast dess definierade predikat. Om en pipeline har körts i ett år med ett 7-dagars predikat resulterar en fullständig uppdatering i att tabellen endast innehåller data från de senaste 7 dagarna. Alla äldre rader tas bort permanent.
Om du vill förhindra fullständiga uppdateringar i en tabell anger du tabellegenskapen pipelines.reset.allowed till false. Se Referens för pipelineegenskaper.
Inkrementell uppdatering
REPLACE-flöden WHERE använder inkrementell uppdatering när det är möjligt och ombearbetar endast de källdata som har ändrats sedan den senaste uppdateringen i stället för att omberäkna hela ersättningsfönstret. Inkrementell uppdatering kräver serverlös beräkning.
När inkrementell uppdatering gäller
Allt följande måste vara sant:
- Pipelinen körs på serverlös beräkning.
- Frågeformen stöds. Se Incremental refresh för den operatoruppsättning som stöds.
- Predikatet refererar till baskolumner från en källtabell. Predikat på härledda värden, till exempel utdata från aggregat- eller fönsterfunktioner, kan inte skickas vidare till en källa, vilket förhindrar inkrementell uppdatering.
- Ingen extern DML har ändrat rader i det aktuella ersättningsfönstret. DML som ändrar rader utanför det aktuella fönstret påverkas inte.
- Det aktuella ersättningsfönstret innehåller inte rader som det tidigare predikatet exkluderade. Om du breddar predikatet så att det täcker ett intervall som inte tidigare bearbetats återgår den uppdateringen till fullständig omkomputation. Efterföljande uppdateringar är berättigade till inkrementell uppdatering igen.
- Predikatet är deterministiskt. Predikat med icke-deterministiska funktioner, till exempel
rand()inaktivera inkrementell uppdatering. Temporala funktioner somcurrent_date()är tillåtna.
Den första uppdateringen av ett flöde är alltid en fullständig beräkning. Om något villkor inte uppfylls, går uppdateringen tillbaka till fullständig omberäkning av det aktuella ersättningsfönstret.
Metodtips för inkrementell uppdatering
Följ dessa riktlinjer så att REPLACE-flöden WHERE fortfarande är berättigade till inkrementell uppdatering.
Använd en rörlig nedre gräns
Predikat med en rörlig lägre gräns är fortfarande berättigade till inkrementell uppdatering på obestämd tid.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
En flyttande övre gräns, till exempel date BETWEEN date_add(current_date(), -7) AND current_date(), kan flytta fönstret så att det inkluderar tidigare exkluderade rader, vilket utlöser en engångsåterställning till fullständig omkomputation.
Inkludera predikatkolumnen i GROUP BY
När du aggregerar ska du inkludera predikatkolumnen i GROUP BY så att motorn kan flytta ned predikatet under aggregationen.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Om predikatkolumnen saknas i GROUP BY kan predikatet inte flyttas ned under aggregeringen och källan skannas i sin helhet.
Inkludera predikatkolumnen i sammanslagningsnycklarna
Inkludera predikatkolumnen i kopplingsvillkoret så att motorn kan rensa alla anslutna källor.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Om en ansluten tabell inte exponerar predikatkolumnen genomsöks tabellen i sin helhet vid varje uppdatering.
Diagnostisera övergång till fullständig omberäkning
När en uppdatering faller tillbaka på en fullständig omberäkning rapporteras orsaken i händelsen planning_information för flödet. Se Övervaka händelseloggar för pipeline. I följande tabell visas de orsaker som rapporterats i händelsen:
| Reason | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
En extern DML ändrade rader i det aktuella ersättningsfönstret. |
REPLACE_WHERE_NOT_DETERMINISTIC |
Predikatet använder icke-deterministiska uttryck. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
Den tidigare uppdateringen använde ett icke-deterministiskt predikat. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
Predikatet kan inte flyttas ned till någon källa, det aktuella fönstret omfattar rader som inte har bearbetats av det föregående predikatet, eller så använder körningen en åsidosättning av predikatet. |
Limitations
REPLACE-flöden WHERE har följande begränsningar:
- Måltabellen måste skapas inom pipelinen.
- Endast ett REPLACE-flöde WHERE tillåts per måltabell.
- En tabell som är mål för ett REPLACE-flöde WHERE kan inte också riktas mot en annan flödestyp, till exempel ett AUTO CDC-flöde eller ett tilläggsflöde.
- Förväntningar stöds inte för tabeller som används som mål av REPLACE-WHEREflöden.
- För strömmande tabeller som skapats i Databricks SQL, se REPLACE WHERE-flöden för fristående strömmande tabeller för information om skillnader i syntax och tillbakafyllnad.
Exempel
I följande exempel visas vanliga REPLACE-flödesmönster WHERE .
Exempel 1: Behåll historiska aggregeringar från en källa med begränsad kvarhållning
Det här exemplet behåller dagliga aggregeringar utan tidsbegränsning, även efter att rådata har gallrats bort ur källtabellen (3 dagars lagringstid):
SQL
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
return (
spark.read.table("events_raw")
.groupBy("date", "key")
.agg(F.sum("val").alias("agg"))
)
Exempel 2: Förhindra omberäkning när en dimensionstabell ändras
I det här exemplet förblir historiska faktarader oförändrade när dimensionsattribut ändras:
SQL
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
fact_table = spark.read.table("fact_table").alias("f")
dim_users = spark.read.table("dim_users").alias("d")
return (
fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
.select(
col("f.date"),
col("f.user_id"),
col("d.region"),
col("f.revenue"),
)
)
Om en användares region ändras omberäknas endast de senaste raderna. Historiska rader behåller regionvärdet när de skrevs. För att korrigera historiska rader, kör en riktad återfyllnad med åsidosättningar av predikat.
Exempel 3: Lägg till ett nytt mått utan att omberäkna fullständig historik
Det här exemplet visar hur du utvecklar en tabelldefinition och återfyller endast ett målintervall:
Definiera den inledande tabellen:
SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Python
from pyspark import pipelines as dp from pyspark.sql import functions as F from pyspark.sql.functions import col @dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg(F.count("*").alias("clicks")) )Uppdatera frågan för att lägga till
uniq_users:SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Python
@dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg( F.count("*").alias("clicks"), F.countDistinct("user_id").alias("uniq_users"), ) )Fyll i det nya måttet för de senaste 30 dagarna:
overrides = [ { "flow_name": "clickstream_daily", "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'", } ] resp = start_update_with_replace_where( pipeline_id="<pipeline-id>", replace_where_overrides=overrides, refresh_selection=["clickstream_daily"], )Rader som är äldre än det ifyllda intervallet innehåller
NULLföruniq_users.
Exempel 4: Iterera i ett litet fönster innan du fyller i hela historiken
Det här exemplet visar hur du validerar frågelogik i ett litet datafönster innan du bearbetar hela det historiska intervallet.
Börja med ett kort fönster så att varje uppdatering bara beräknas om de senaste 7 dagarna medan du ändrar frågan:
SQL
CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
return (
spark.read.table("marketing_events")
.groupBy("event_date", "campaign_id")
.agg(F.sum("revenue").alias("total_revenue"))
)
När frågan har fastställts använder du en åsidosättning av villkor för att utföra en engångsbakåtfyllning av historiska data:
overrides = [
{
"flow_name": "revenue_attribution",
"predicate_override": "event_date >= date_add(current_date(), -365)",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["revenue_attribution"],
)