Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Importante
I flussi REPLACE WHERE sono in versione beta.
Questa pagina descrive come usare i flussi REPLACE WHERE nelle pipeline dichiarative di Lakeflow Spark per ricompilare e sovrascrivere un subset di destinazione di una tabella senza rielaborare l'intera cronologia delle tabelle. I flussi REPLACE WHERE gestiscono i dati in arrivo in ritardo, la rielaborazione upstream, l'evoluzione dello schema e i backfill.
Con un flusso REPLACE WHERE si definisce un predicato nella tabella di destinazione. Tutte le righe corrispondenti al predicato vengono eliminate e sostituite rivalutando la query di origine per lo stesso intervallo di predicati. Le righe che non corrispondono al predicato vengono lasciate invariate.
Requirements
I flussi REPLACE WHERE hanno i requisiti seguenti:
- La pipeline deve usare il canale
PREVIEW. - Databricks consiglia il catalogo Unity e il calcolo serverless. L'aggiornamento incrementale è supportato solo nel calcolo serverless.
Quando usare i flussi REPLACE WHERE
Usare i flussi REPLACE WHERE per gli scenari seguenti:
- Elaborazione batch incrementale senza semantica di streaming: elaborare nuove righe in batch senza gestire concetti di streaming come filigrane.
- Rielaborazione selettiva: ricompilare solo le righe che corrispondono a un predicato lasciando invariate tutte le altre righe.
-
Scenari che superano le funzionalità di visualizzazione materializzate standard:
- Tabelle di destinazione con un periodo di conservazione più lungo rispetto alla sorgente
- Impedire la ricompilazione quando viene modificata una tabella delle dimensioni
- Evoluzione dello schema senza ricompilare l'intera cronologia
Creare un flusso REPLACE WHERE
Definisci i flussi REPLACE WHERE in SQL oppure in Python.
SQL
Usa la clausola FLOW REPLACE WHERE in linea con CREATE STREAMING TABLE:
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
In alternativa, usare la sintassi long-form CREATE FLOW :
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Python
In Python la tabella e il flusso vengono definiti in una singola istruzione. Il flusso eredita lo stesso nome della tabella:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
Il replace_where parametro accetta un'espressione di colonna PySpark o un predicato stringa.
In questi esempi, tutte le righe degli ultimi 7 giorni vengono eliminate da orders_enriched e ricalcolate utilizzando la query di origine. Non è necessario aggiungere il predicato alla query di origine. Il motore della pipeline lo applica automaticamente durante la lettura dalla sorgente.
Note
BY NAME è obbligatorio in SQL. Corrisponde alle colonne in base al nome anziché alla posizione.
Eseguire il backfill dei dati cronologici
Per scrivere righe cronologiche o corrette nella tabella di destinazione al di fuori degli aggiornamenti pianificati, scegliere tra due meccanismi in base alla posizione in cui si trovano i dati cronologici:
- Sostituzioni del predicato: Riesegui la query di origine del flusso per un intervallo del predicato valido per una sola volta. Usare quando i dati cronologici provengono dalla stessa origine dei dati incrementali.
- Istruzioni DML: inserire direttamente nella tabella di destinazione, ignorando il flusso. Usare quando i dati cronologici si trovano in un'origine diversa rispetto ai dati incrementali.
Sovrascritture del predicato
Eseguire l'override del predicato REPLACE WHERE per un singolo aggiornamento della pipeline senza modificare la definizione della pipeline. Gli override del predicato sono una tantum, si applicano solo all'aggiornamento corrente e non influiscono sulle esecuzioni future.
Esempio: Caricamento cronologico iniziale
Per eseguire un backfill una tantum dei dati storici quando si configura per la prima volta una pipeline:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
Esempio: Correggere una colonna per un periodo specifico
Dopo aver aggiornato una definizione di colonna, applicare retroattivamente la modifica a un intervallo cronologico storico specifico:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
Combinare più dimensioni in un unico override del predicato:
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
}
]
Funzione di supporto: start_update_with_replace_where
Usare l'API di aggiornamento della pipeline da un notebook per inviare gli override del predicato:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
Istruzioni DML
Eseguire istruzioni DML direttamente nella tabella di destinazione dall'esterno della pipeline per eseguire caricamenti o correzioni iniziali, ad esempio il caricamento da una tabella legacy:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Le righe inserite tramite DML non sono soggette al predicato REPLACE WHERE e vengono mantenute tra gli aggiornamenti pianificati, a meno che non rientrano nell'intervallo di predicati di un'esecuzione futura.
Comportamento di aggiornamento completo
Un aggiornamento completo di un flusso REPLACE WHERE esegue nuovamente la query di origine usando solo il predicato corrente. Le righe inserite da sostituzioni del predicato o da istruzioni DML al di fuori dell'intervallo corrente del predicato vengono eliminate definitivamente.
Avvertimento
Un aggiornamento completo cancella tutti i dati esistenti ed esegue nuovamente il flusso usando solo il predicato definito. Se una pipeline è stata eseguita per un anno con un predicato di 7 giorni, un aggiornamento completo restituisce la tabella contenente solo gli ultimi 7 giorni di dati. Tutte le righe precedenti vengono eliminate definitivamente.
Per evitare aggiornamenti completi in una tabella, impostare la proprietà pipelines.reset.allowed table su false. Vedere Informazioni di riferimento sulle proprietà della pipeline.
Aggiornamento incrementale
I flussi REPLACE WHERE usano l'aggiornamento incrementale quando possibile, rielaborando solo i dati di origine modificati dopo l'ultimo aggiornamento anziché ricompilare l'intera finestra di sostituzione. L'aggiornamento incrementale richiede capacità di calcolo serverless.
Quando si applica l'aggiornamento incrementale
Tutte le condizioni seguenti devono essere vere:
- La pipeline viene eseguita su un'infrastruttura serverless.
- La forma di query è supportata. Vedere Aggiornamento incrementale per il set di operatori supportato.
- Il predicato fa riferimento alle colonne di base di una tabella di origine. I predicati sui valori derivati, ad esempio output di funzioni aggregate o finestra, non possono essere inseriti in un'origine, che disabilita l'aggiornamento incrementale.
- Nessun DML esterno ha modificato righe nella finestra di sostituzione corrente. Il DML che modifica le righe al di fuori della finestra corrente non viene interessato.
- La finestra di sostituzione corrente non include righe escluse dal predicato precedente. Se si estende il predicato per coprire un intervallo non elaborato in precedenza, tale aggiornamento torna alla ricompilazione completa. Gli aggiornamenti successivi possono nuovamente beneficiare dell'aggiornamento incrementale.
- Il predicato è deterministico. I predicati che usano funzioni non deterministiche come
rand()disabilitano l'aggiornamento incrementale. Sono consentite funzioni temporali,current_date()ad esempio .
Il primo aggiornamento di qualsiasi flusso è sempre un calcolo completo. Se una qualsiasi condizione non è soddisfatta, l'aggiornamento ripiega sul ricalcolo completo della finestra di sostituzione corrente.
Procedure consigliate per l'aggiornamento incrementale
Seguire queste linee guida in modo che i flussi REPLACE WHERE rimangano idonei per l'aggiornamento incrementale.
Usa un limite inferiore variabile
I predicati con un limite inferiore mobile rimangono idonei per l'aggiornamento incrementale a tempo indeterminato.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Un limite superiore mobile, come date BETWEEN date_add(current_date(), -7) AND current_date(), può spostare la finestra fino a includere righe precedentemente escluse, innescando un fallback una tantum alla ricomputazione completa.
Includere la colonna del predicato in GROUP BY
Durante l'aggregazione, includere la colonna del predicato in GROUP BY in modo che il motore possa spostare il predicato al di sotto dell'aggregazione.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Se la colonna del predicato non è presente in GROUP BY, il predicato non può essere inserito sotto l'aggregazione e l'origine viene analizzata completamente.
Includere la colonna del predicato nelle chiavi di join
Includi la colonna del predicato nella condizione di join in modo che il motore possa scartare tutte le origini incluse nel join.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Se una tabella unita non espone la colonna del predicato, tale tabella viene scansionata per intero a ogni aggiornamento.
Diagnostica del ripiego al ricalcolo completo
Quando un aggiornamento ricorre al ricalcolo completo, il motivo viene segnalato nell'evento planning_information del flusso. Vedi Monitorare i log degli eventi della pipeline. Nella tabella seguente sono elencati i motivi segnalati nell'evento :
| Motivo | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Un'operazione DML esterna ha modificato delle righe nella finestra di sostituzione corrente. |
REPLACE_WHERE_NOT_DETERMINISTIC |
Il predicato usa espressioni non deterministiche. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
L'aggiornamento precedente usava un predicato non deterministico. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
Non è possibile eseguire il push del predicato in qualsiasi origine, la finestra corrente include righe non elaborate dal predicato precedente oppure l'esecuzione usa un override del predicato. |
Limitations
I flussi REPLACE WHERE presentano le limitazioni seguenti:
- La tabella di destinazione deve essere creata all'interno della pipeline.
- È consentito un solo flusso REPLACE WHERE per ogni tabella di destinazione.
- Una tabella interessata da un flusso REPLACE WHERE non può essere interessata anche da un altro tipo di flusso, ad esempio un flusso AUTO CDC o un flusso di append.
- Le aspettative non sono supportate nelle tabelle destinate ai flussi REPLACE WHERE .
- Per le tabelle di streaming create in Databricks SQL, consulta REPLACE WHERE flussi per tabelle di streaming autonome per la sintassi e le differenze di backfill.
Examples
Negli esempi seguenti vengono illustrati modelli di flusso REPLACE WHERE comuni.
Esempio 1: Mantenere le aggregazioni cronologiche da un'origine di conservazione limitata
Questo esempio mantiene le aggregazioni giornaliere per un periodo illimitato, anche dopo che i dati non elaborati escono dalla tabella di origine (conservazione di 3 giorni):
SQL
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
return (
spark.read.table("events_raw")
.groupBy("date", "key")
.agg(F.sum("val").alias("agg"))
)
Esempio 2: Impedire la ricompilazione quando viene modificata una tabella delle dimensioni
Questo esempio mantiene invariate le righe dei fatti cronologici quando gli attributi della dimensione cambiano:
SQL
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
fact_table = spark.read.table("fact_table").alias("f")
dim_users = spark.read.table("dim_users").alias("d")
return (
fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
.select(
col("f.date"),
col("f.user_id"),
col("d.region"),
col("f.revenue"),
)
)
Se l'area di un utente cambia, vengono ricalcolate solo le righe recenti. Le righe storiche mantengono il valore della regione al momento in cui sono state scritte. Per correggere le righe storiche, eseguire un backfill mirato usando override di predicato.
Esempio 3: Aggiungere una nuova metrica senza ricompilare la cronologia completa
In questo esempio viene illustrato come evolvere una definizione di tabella ed eseguire il riempimento retroattivo solo per un intervallo specifico:
Definire la tabella iniziale:
SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Python
from pyspark import pipelines as dp from pyspark.sql import functions as F from pyspark.sql.functions import col @dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg(F.count("*").alias("clicks")) )Aggiornare la query per aggiungere
uniq_users:SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Python
@dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg( F.count("*").alias("clicks"), F.countDistinct("user_id").alias("uniq_users"), ) )Eseguire il riempimento retroattivo della nuova metrica per gli scorsi 30 giorni:
overrides = [ { "flow_name": "clickstream_daily", "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'", } ] resp = start_update_with_replace_where( pipeline_id="<pipeline-id>", replace_where_overrides=overrides, refresh_selection=["clickstream_daily"], )Le righe più vecchie dell'intervallo sottoposto a backfill contengono
NULLperuniq_users.
Esempio 4: Eseguire l'iterazione in una piccola finestra prima di riempire la cronologia completa
In questo esempio viene illustrato come convalidare la logica di query in una finestra di dati di piccole dimensioni prima di elaborare l'intervallo cronologico completo.
Iniziare con una finestra breve in modo che ogni aggiornamento ricompila solo gli ultimi 7 giorni mentre si modifica la query:
SQL
CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
return (
spark.read.table("marketing_events")
.groupBy("event_date", "campaign_id")
.agg(F.sum("revenue").alias("total_revenue"))
)
Dopo aver finalizzato la query, utilizzare una sovrascrittura del predicato per eseguire un backfill storico una tantum:
overrides = [
{
"flow_name": "revenue_attribution",
"predicate_override": "event_date >= date_add(current_date(), -365)",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["revenue_attribution"],
)