Elaborazione in batch con i flussi REPLACE WHERE

Importante

I flussi REPLACE WHERE sono in versione beta.

Questa pagina descrive come usare i flussi REPLACE WHERE nelle pipeline dichiarative di Lakeflow Spark per ricompilare e sovrascrivere un subset di destinazione di una tabella senza rielaborare l'intera cronologia delle tabelle. I flussi REPLACE WHERE gestiscono i dati in arrivo in ritardo, la rielaborazione upstream, l'evoluzione dello schema e i backfill.

Con un flusso REPLACE WHERE si definisce un predicato nella tabella di destinazione. Tutte le righe corrispondenti al predicato vengono eliminate e sostituite rivalutando la query di origine per lo stesso intervallo di predicati. Le righe che non corrispondono al predicato vengono lasciate invariate.

Requirements

I flussi REPLACE WHERE hanno i requisiti seguenti:

  • La pipeline deve usare il canale PREVIEW.
  • Databricks consiglia il catalogo Unity e il calcolo serverless. L'aggiornamento incrementale è supportato solo nel calcolo serverless.

Quando usare i flussi REPLACE WHERE

Usare i flussi REPLACE WHERE per gli scenari seguenti:

  • Elaborazione batch incrementale senza semantica di streaming: elaborare nuove righe in batch senza gestire concetti di streaming come filigrane.
  • Rielaborazione selettiva: ricompilare solo le righe che corrispondono a un predicato lasciando invariate tutte le altre righe.
  • Scenari che superano le funzionalità di visualizzazione materializzate standard:
    • Tabelle di destinazione con un periodo di conservazione più lungo rispetto alla sorgente
    • Impedire la ricompilazione quando viene modificata una tabella delle dimensioni
    • Evoluzione dello schema senza ricompilare l'intera cronologia

Creare un flusso REPLACE WHERE

Definisci i flussi REPLACE WHERE in SQL oppure in Python.

SQL

Usa la clausola FLOW REPLACE WHERE in linea con CREATE STREAMING TABLE:

CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

In alternativa, usare la sintassi long-form CREATE FLOW :

CREATE STREAMING TABLE orders_enriched;

CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Python

In Python la tabella e il flusso vengono definiti in una singola istruzione. Il flusso eredita lo stesso nome della tabella:

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
  orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
  product_dim = spark.read.table("product_dim")
  return orders_fct.join(product_dim, "product_id")

Il replace_where parametro accetta un'espressione di colonna PySpark o un predicato stringa.

In questi esempi, tutte le righe degli ultimi 7 giorni vengono eliminate da orders_enriched e ricalcolate utilizzando la query di origine. Non è necessario aggiungere il predicato alla query di origine. Il motore della pipeline lo applica automaticamente durante la lettura dalla sorgente.

Note

BY NAME è obbligatorio in SQL. Corrisponde alle colonne in base al nome anziché alla posizione.

Eseguire il backfill dei dati cronologici

Per scrivere righe cronologiche o corrette nella tabella di destinazione al di fuori degli aggiornamenti pianificati, scegliere tra due meccanismi in base alla posizione in cui si trovano i dati cronologici:

  • Sostituzioni del predicato: Riesegui la query di origine del flusso per un intervallo del predicato valido per una sola volta. Usare quando i dati cronologici provengono dalla stessa origine dei dati incrementali.
  • Istruzioni DML: inserire direttamente nella tabella di destinazione, ignorando il flusso. Usare quando i dati cronologici si trovano in un'origine diversa rispetto ai dati incrementali.

Sovrascritture del predicato

Eseguire l'override del predicato REPLACE WHERE per un singolo aggiornamento della pipeline senza modificare la definizione della pipeline. Gli override del predicato sono una tantum, si applicano solo all'aggiornamento corrente e non influiscono sulle esecuzioni future.

Esempio: Caricamento cronologico iniziale

Per eseguire un backfill una tantum dei dati storici quando si configura per la prima volta una pipeline:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
)
print(resp)

Esempio: Correggere una colonna per un periodo specifico

Dopo aver aggiornato una definizione di colonna, applicare retroattivamente la modifica a un intervallo cronologico storico specifico:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
  refresh_selection=["orders_enriched"],
)
print(resp)

Combinare più dimensioni in un unico override del predicato:

overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
  }
]
Funzione di supporto: start_update_with_replace_where

Usare l'API di aggiornamento della pipeline da un notebook per inviare gli override del predicato:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse


def start_update_with_replace_where(
  pipeline_id: str,
  replace_where_overrides: list[dict],
  refresh_selection: list[str] = None,
) -> StartUpdateResponse:
  """Start a pipeline update with REPLACE WHERE predicate overrides."""
  client = WorkspaceClient()

  body = {
    "pipeline_id": pipeline_id,
    "cause": "JOB_TASK",
    "update_cause_details": {
      "job_details": {"performance_target": "PERFORMANCE"}
    },
    "replace_where_overrides": replace_where_overrides,
  }

  if refresh_selection:
    body["refresh_selection"] = refresh_selection

  res = client.api_client.do(
    "POST",
    f"/api/2.0/pipelines/{pipeline_id}/updates",
    body=body,
    headers={"Accept": "application/json", "Content-Type": "application/json"},
  )

  return StartUpdateResponse.from_dict(res)

Istruzioni DML

Eseguire istruzioni DML direttamente nella tabella di destinazione dall'esterno della pipeline per eseguire caricamenti o correzioni iniziali, ad esempio il caricamento da una tabella legacy:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

Le righe inserite tramite DML non sono soggette al predicato REPLACE WHERE e vengono mantenute tra gli aggiornamenti pianificati, a meno che non rientrano nell'intervallo di predicati di un'esecuzione futura.

Comportamento di aggiornamento completo

Un aggiornamento completo di un flusso REPLACE WHERE esegue nuovamente la query di origine usando solo il predicato corrente. Le righe inserite da sostituzioni del predicato o da istruzioni DML al di fuori dell'intervallo corrente del predicato vengono eliminate definitivamente.

Avvertimento

Un aggiornamento completo cancella tutti i dati esistenti ed esegue nuovamente il flusso usando solo il predicato definito. Se una pipeline è stata eseguita per un anno con un predicato di 7 giorni, un aggiornamento completo restituisce la tabella contenente solo gli ultimi 7 giorni di dati. Tutte le righe precedenti vengono eliminate definitivamente.

Per evitare aggiornamenti completi in una tabella, impostare la proprietà pipelines.reset.allowed table su false. Vedere Informazioni di riferimento sulle proprietà della pipeline.

Aggiornamento incrementale

I flussi REPLACE WHERE usano l'aggiornamento incrementale quando possibile, rielaborando solo i dati di origine modificati dopo l'ultimo aggiornamento anziché ricompilare l'intera finestra di sostituzione. L'aggiornamento incrementale richiede capacità di calcolo serverless.

Quando si applica l'aggiornamento incrementale

Tutte le condizioni seguenti devono essere vere:

  • La pipeline viene eseguita su un'infrastruttura serverless.
  • La forma di query è supportata. Vedere Aggiornamento incrementale per il set di operatori supportato.
  • Il predicato fa riferimento alle colonne di base di una tabella di origine. I predicati sui valori derivati, ad esempio output di funzioni aggregate o finestra, non possono essere inseriti in un'origine, che disabilita l'aggiornamento incrementale.
  • Nessun DML esterno ha modificato righe nella finestra di sostituzione corrente. Il DML che modifica le righe al di fuori della finestra corrente non viene interessato.
  • La finestra di sostituzione corrente non include righe escluse dal predicato precedente. Se si estende il predicato per coprire un intervallo non elaborato in precedenza, tale aggiornamento torna alla ricompilazione completa. Gli aggiornamenti successivi possono nuovamente beneficiare dell'aggiornamento incrementale.
  • Il predicato è deterministico. I predicati che usano funzioni non deterministiche come rand() disabilitano l'aggiornamento incrementale. Sono consentite funzioni temporali, current_date() ad esempio .

Il primo aggiornamento di qualsiasi flusso è sempre un calcolo completo. Se una qualsiasi condizione non è soddisfatta, l'aggiornamento ripiega sul ricalcolo completo della finestra di sostituzione corrente.

Procedure consigliate per l'aggiornamento incrementale

Seguire queste linee guida in modo che i flussi REPLACE WHERE rimangano idonei per l'aggiornamento incrementale.

Usa un limite inferiore variabile

I predicati con un limite inferiore mobile rimangono idonei per l'aggiornamento incrementale a tempo indeterminato.

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

Un limite superiore mobile, come date BETWEEN date_add(current_date(), -7) AND current_date(), può spostare la finestra fino a includere righe precedentemente escluse, innescando un fallback una tantum alla ricomputazione completa.

Includere la colonna del predicato in GROUP BY

Durante l'aggregazione, includere la colonna del predicato in GROUP BY in modo che il motore possa spostare il predicato al di sotto dell'aggregazione.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

Se la colonna del predicato non è presente in GROUP BY, il predicato non può essere inserito sotto l'aggregazione e l'origine viene analizzata completamente.

Includere la colonna del predicato nelle chiavi di join

Includi la colonna del predicato nella condizione di join in modo che il motore possa scartare tutte le origini incluse nel join.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

Se una tabella unita non espone la colonna del predicato, tale tabella viene scansionata per intero a ogni aggiornamento.

Diagnostica del ripiego al ricalcolo completo

Quando un aggiornamento ricorre al ricalcolo completo, il motivo viene segnalato nell'evento planning_information del flusso. Vedi Monitorare i log degli eventi della pipeline. Nella tabella seguente sono elencati i motivi segnalati nell'evento :

Motivo Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW Un'operazione DML esterna ha modificato delle righe nella finestra di sostituzione corrente.
REPLACE_WHERE_NOT_DETERMINISTIC Il predicato usa espressioni non deterministiche.
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC L'aggiornamento precedente usava un predicato non deterministico.
UNSUPPORTED_REPLACE_WHERE_PREDICATE Non è possibile eseguire il push del predicato in qualsiasi origine, la finestra corrente include righe non elaborate dal predicato precedente oppure l'esecuzione usa un override del predicato.

Limitations

I flussi REPLACE WHERE presentano le limitazioni seguenti:

  • La tabella di destinazione deve essere creata all'interno della pipeline.
  • È consentito un solo flusso REPLACE WHERE per ogni tabella di destinazione.
  • Una tabella interessata da un flusso REPLACE WHERE non può essere interessata anche da un altro tipo di flusso, ad esempio un flusso AUTO CDC o un flusso di append.
  • Le aspettative non sono supportate nelle tabelle destinate ai flussi REPLACE WHERE .
  • Per le tabelle di streaming create in Databricks SQL, consulta REPLACE WHERE flussi per tabelle di streaming autonome per la sintassi e le differenze di backfill.

Examples

Negli esempi seguenti vengono illustrati modelli di flusso REPLACE WHERE comuni.

Esempio 1: Mantenere le aggregazioni cronologiche da un'origine di conservazione limitata

Questo esempio mantiene le aggregazioni giornaliere per un periodo illimitato, anche dopo che i dati non elaborati escono dalla tabella di origine (conservazione di 3 giorni):

SQL

CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
  return (
    spark.read.table("events_raw")
      .groupBy("date", "key")
      .agg(F.sum("val").alias("agg"))
  )

Esempio 2: Impedire la ricompilazione quando viene modificata una tabella delle dimensioni

Questo esempio mantiene invariate le righe dei fatti cronologici quando gli attributi della dimensione cambiano:

SQL

CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
  fact_table = spark.read.table("fact_table").alias("f")
  dim_users = spark.read.table("dim_users").alias("d")
  return (
    fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
      .select(
        col("f.date"),
        col("f.user_id"),
        col("d.region"),
        col("f.revenue"),
      )
  )

Se l'area di un utente cambia, vengono ricalcolate solo le righe recenti. Le righe storiche mantengono il valore della regione al momento in cui sono state scritte. Per correggere le righe storiche, eseguire un backfill mirato usando override di predicato.

Esempio 3: Aggiungere una nuova metrica senza ricompilare la cronologia completa

In questo esempio viene illustrato come evolvere una definizione di tabella ed eseguire il riempimento retroattivo solo per un intervallo specifico:

  1. Definire la tabella iniziale:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    from pyspark.sql.functions import col
    
    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(F.count("*").alias("clicks"))
      )
    
  2. Aggiornare la query per aggiungere uniq_users:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(
            F.count("*").alias("clicks"),
            F.countDistinct("user_id").alias("uniq_users"),
          )
      )
    
  3. Eseguire il riempimento retroattivo della nuova metrica per gli scorsi 30 giorni:

    overrides = [
      {
        "flow_name": "clickstream_daily",
        "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
      }
    ]
    
    resp = start_update_with_replace_where(
      pipeline_id="<pipeline-id>",
      replace_where_overrides=overrides,
      refresh_selection=["clickstream_daily"],
    )
    

    Le righe più vecchie dell'intervallo sottoposto a backfill contengono NULL per uniq_users.

Esempio 4: Eseguire l'iterazione in una piccola finestra prima di riempire la cronologia completa

In questo esempio viene illustrato come convalidare la logica di query in una finestra di dati di piccole dimensioni prima di elaborare l'intervallo cronologico completo.

Iniziare con una finestra breve in modo che ogni aggiornamento ricompila solo gli ultimi 7 giorni mentre si modifica la query:

SQL

CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
  return (
    spark.read.table("marketing_events")
      .groupBy("event_date", "campaign_id")
      .agg(F.sum("revenue").alias("total_revenue"))
  )

Dopo aver finalizzato la query, utilizzare una sovrascrittura del predicato per eseguire un backfill storico una tantum:

overrides = [
  {
    "flow_name": "revenue_attribution",
    "predicate_override": "event_date >= date_add(current_date(), -365)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id="<pipeline-id>",
  replace_where_overrides=overrides,
  refresh_selection=["revenue_attribution"],
)