Batchbehandling med REPLACE WHERE-flöden

Important

REPLACE-flöden WHERE finns i Beta.

Den här sidan beskriver hur du använder REPLACE WHERE-flöden i Lakeflow Spark Declarative Pipelines för att beräkna om och skriva över en angiven delmängd av en tabell utan att behöva bearbeta om tabellens hela historik. REPLACE-flöden WHERE hanterar sent ankommande data, uppströms ombearbetning, schemautveckling och återfyllnad.

Med ett REPLACE-flöde WHERE definierar du ett predikat i måltabellen. Alla rader som matchar predikatet tas bort och ersätts genom omvärdering av källfrågan för samma predikatintervall. Rader som inte matchar predikatet lämnas orörda.

Requirements

REPLACE-flöden WHERE har följande krav:

  • Din pipeline måste använda PREVIEW kanalen.
  • Databricks rekommenderar Unity Catalog och serverlös beräkning. Inkrementell uppdatering stöds endast vid serverlös beräkning.

När du ska använda REPLACE-flöden WHERE

Använd REPLACE-flöden WHERE för följande scenarier:

  • Inkrementell batchbearbetning utan strömmande semantik: Bearbeta nya rader i batchar utan att hantera strömningsbegrepp som vattenstämplar.
  • Selektiv ombearbetning: Räkna bara om rader som matchar ett predikat och lämna alla andra rader orörda.
  • Scenarier utöver standardfunktioner för materialiserad vy:
    • Måltabeller med längre lagringstid än källtabellen
    • Förhindra omkomputation när en dimensionstabell ändras
    • Schemautveckling utan att omberäkna hela historiken

Skapa ett REPLACE-flöde WHERE

Definiera REPLACE WHERE-flöden i SQL eller Python.

SQL

Använd FLOW REPLACE WHERE satsen i samma rad som CREATE STREAMING TABLE:

CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Du kan också använda långformssyntaxen CREATE FLOW :

CREATE STREAMING TABLE orders_enriched;

CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Python

I Python definieras tabellen och flödet i en enda instruktion. Flödet ärver samma namn som tabellen:

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
  orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
  product_dim = spark.read.table("product_dim")
  return orders_fct.join(product_dim, "product_id")

Parametern replace_where accepterar antingen ett PySpark-kolumnuttryck eller ett strängpredikat.

I de här exemplen tas alla rader från de senaste 7 dagarna bort från orders_enriched och omberäknas med hjälp av källfrågan. Du behöver inte lägga till predikatet i källfrågan. Pipeline-motorn tillämpar den automatiskt när den läser från källan.

Note

BY NAME krävs i SQL. Den matchar kolumner efter namn i stället för position.

Fyll i historiska data

Om du vill skriva historiska eller korrigerade rader i måltabellen utanför schemalagda uppdateringar väljer du mellan två mekanismer baserat på var historiska data finns:

  • Åsidosättning av predikat: Kör flödets källfråga igen för ett engångspredikatintervall. Använd när historiska data kommer från samma källa som inkrementella data.
  • DML-instruktioner: Infoga direkt i måltabellen och kringgå flödet. Använd när historiska data finns i en annan källa än inkrementella data.

Åsidosättande av predikat

Åsidosätt REPLACE-predikatet WHERE för en enskild pipelineuppdatering utan att ändra pipelinedefinitionen. Åsidosättningar av predikat är engångsåtgärder, gäller bara den aktuella uppdateringen och påverkar inte framtida körningar.

Exempel: Inledande historisk belastning

Så här utför du en engångsåterfyllning av historiska data när du först konfigurerar en pipeline:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
)
print(resp)

Exempel: Korrigera en kolumn för en viss period

När du har uppdaterat en kolumndefinition fyller du på ändringen för ett målhistorikintervall:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
  refresh_selection=["orders_enriched"],
)
print(resp)

Kombinera flera dimensioner i en enda åsidosättning av predikat:

overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
  }
]
Hjälpfunktion: start_update_with_replace_where

Använd API:et för pipelineuppdatering från en notebook-fil för att skicka åsidosättningar för predikat:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse


def start_update_with_replace_where(
  pipeline_id: str,
  replace_where_overrides: list[dict],
  refresh_selection: list[str] = None,
) -> StartUpdateResponse:
  """Start a pipeline update with REPLACE WHERE predicate overrides."""
  client = WorkspaceClient()

  body = {
    "pipeline_id": pipeline_id,
    "cause": "JOB_TASK",
    "update_cause_details": {
      "job_details": {"performance_target": "PERFORMANCE"}
    },
    "replace_where_overrides": replace_where_overrides,
  }

  if refresh_selection:
    body["refresh_selection"] = refresh_selection

  res = client.api_client.do(
    "POST",
    f"/api/2.0/pipelines/{pipeline_id}/updates",
    body=body,
    headers={"Accept": "application/json", "Content-Type": "application/json"},
  )

  return StartUpdateResponse.from_dict(res)

DML-instruktioner

Kör DML-instruktioner direkt i måltabellen utanför pipelinen för att utföra inledande inläsningar eller korrigeringar, till exempel inläsning från en äldre tabell:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

Rader som infogas via DML omfattas inte av REPLACE-predikatet WHERE och bevaras över schemalagda uppdateringar om de inte faller inom predikatintervallet för en framtida körning.

Fullständigt uppdateringsbeteende

En fullständig uppdatering av ett REPLACE-flöde WHERE kör källfrågan igen med endast det aktuella predikatet. Rader som infogades genom åsidosättanden av predikat eller DML-satser utanför det aktuella predikatområdet tas bort permanent.

Varning

En fullständig uppdatering rensar alla befintliga data och kör flödet igen med endast dess definierade predikat. Om en pipeline har körts i ett år med ett 7-dagars predikat resulterar en fullständig uppdatering i att tabellen endast innehåller data från de senaste 7 dagarna. Alla äldre rader tas bort permanent.

Om du vill förhindra fullständiga uppdateringar i en tabell anger du tabellegenskapen pipelines.reset.allowed till false. Se Referens för pipelineegenskaper.

Inkrementell uppdatering

REPLACE-flöden WHERE använder inkrementell uppdatering när det är möjligt och ombearbetar endast de källdata som har ändrats sedan den senaste uppdateringen i stället för att omberäkna hela ersättningsfönstret. Inkrementell uppdatering kräver serverlös beräkning.

När inkrementell uppdatering gäller

Allt följande måste vara sant:

  • Pipelinen körs på serverlös beräkning.
  • Frågeformen stöds. Se Incremental refresh för den operatoruppsättning som stöds.
  • Predikatet refererar till baskolumner från en källtabell. Predikat på härledda värden, till exempel utdata från aggregat- eller fönsterfunktioner, kan inte skickas vidare till en källa, vilket förhindrar inkrementell uppdatering.
  • Ingen extern DML har ändrat rader i det aktuella ersättningsfönstret. DML som ändrar rader utanför det aktuella fönstret påverkas inte.
  • Det aktuella ersättningsfönstret innehåller inte rader som det tidigare predikatet exkluderade. Om du breddar predikatet så att det täcker ett intervall som inte tidigare bearbetats återgår den uppdateringen till fullständig omkomputation. Efterföljande uppdateringar är berättigade till inkrementell uppdatering igen.
  • Predikatet är deterministiskt. Predikat med icke-deterministiska funktioner, till exempel rand() inaktivera inkrementell uppdatering. Temporala funktioner som current_date() är tillåtna.

Den första uppdateringen av ett flöde är alltid en fullständig beräkning. Om något villkor inte uppfylls, går uppdateringen tillbaka till fullständig omberäkning av det aktuella ersättningsfönstret.

Metodtips för inkrementell uppdatering

Följ dessa riktlinjer så att REPLACE-flöden WHERE fortfarande är berättigade till inkrementell uppdatering.

Använd en rörlig nedre gräns

Predikat med en rörlig lägre gräns är fortfarande berättigade till inkrementell uppdatering på obestämd tid.

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

En flyttande övre gräns, till exempel date BETWEEN date_add(current_date(), -7) AND current_date(), kan flytta fönstret så att det inkluderar tidigare exkluderade rader, vilket utlöser en engångsåterställning till fullständig omkomputation.

Inkludera predikatkolumnen i GROUP BY

När du aggregerar ska du inkludera predikatkolumnen i GROUP BY så att motorn kan flytta ned predikatet under aggregationen.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

Om predikatkolumnen saknas i GROUP BY kan predikatet inte flyttas ned under aggregeringen och källan skannas i sin helhet.

Inkludera predikatkolumnen i sammanslagningsnycklarna

Inkludera predikatkolumnen i kopplingsvillkoret så att motorn kan rensa alla anslutna källor.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

Om en ansluten tabell inte exponerar predikatkolumnen genomsöks tabellen i sin helhet vid varje uppdatering.

Diagnostisera övergång till fullständig omberäkning

När en uppdatering faller tillbaka på en fullständig omberäkning rapporteras orsaken i händelsen planning_information för flödet. Se Övervaka händelseloggar för pipeline. I följande tabell visas de orsaker som rapporterats i händelsen:

Reason Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW En extern DML ändrade rader i det aktuella ersättningsfönstret.
REPLACE_WHERE_NOT_DETERMINISTIC Predikatet använder icke-deterministiska uttryck.
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC Den tidigare uppdateringen använde ett icke-deterministiskt predikat.
UNSUPPORTED_REPLACE_WHERE_PREDICATE Predikatet kan inte flyttas ned till någon källa, det aktuella fönstret omfattar rader som inte har bearbetats av det föregående predikatet, eller så använder körningen en åsidosättning av predikatet.

Limitations

REPLACE-flöden WHERE har följande begränsningar:

  • Måltabellen måste skapas inom pipelinen.
  • Endast ett REPLACE-flöde WHERE tillåts per måltabell.
  • En tabell som är mål för ett REPLACE-flöde WHERE kan inte också riktas mot en annan flödestyp, till exempel ett AUTO CDC-flöde eller ett tilläggsflöde.
  • Förväntningar stöds inte för tabeller som används som mål av REPLACE-WHEREflöden.
  • För strömmande tabeller som skapats i Databricks SQL, se REPLACE WHERE-flöden för fristående strömmande tabeller för information om skillnader i syntax och tillbakafyllnad.

Exempel

I följande exempel visas vanliga REPLACE-flödesmönster WHERE .

Exempel 1: Behåll historiska aggregeringar från en källa med begränsad kvarhållning

Det här exemplet behåller dagliga aggregeringar utan tidsbegränsning, även efter att rådata har gallrats bort ur källtabellen (3 dagars lagringstid):

SQL

CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
  return (
    spark.read.table("events_raw")
      .groupBy("date", "key")
      .agg(F.sum("val").alias("agg"))
  )

Exempel 2: Förhindra omberäkning när en dimensionstabell ändras

I det här exemplet förblir historiska faktarader oförändrade när dimensionsattribut ändras:

SQL

CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
  fact_table = spark.read.table("fact_table").alias("f")
  dim_users = spark.read.table("dim_users").alias("d")
  return (
    fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
      .select(
        col("f.date"),
        col("f.user_id"),
        col("d.region"),
        col("f.revenue"),
      )
  )

Om en användares region ändras omberäknas endast de senaste raderna. Historiska rader behåller regionvärdet när de skrevs. För att korrigera historiska rader, kör en riktad återfyllnad med åsidosättningar av predikat.

Exempel 3: Lägg till ett nytt mått utan att omberäkna fullständig historik

Det här exemplet visar hur du utvecklar en tabelldefinition och återfyller endast ett målintervall:

  1. Definiera den inledande tabellen:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    from pyspark.sql.functions import col
    
    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(F.count("*").alias("clicks"))
      )
    
  2. Uppdatera frågan för att lägga till uniq_users:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(
            F.count("*").alias("clicks"),
            F.countDistinct("user_id").alias("uniq_users"),
          )
      )
    
  3. Fyll i det nya måttet för de senaste 30 dagarna:

    overrides = [
      {
        "flow_name": "clickstream_daily",
        "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
      }
    ]
    
    resp = start_update_with_replace_where(
      pipeline_id="<pipeline-id>",
      replace_where_overrides=overrides,
      refresh_selection=["clickstream_daily"],
    )
    

    Rader som är äldre än det ifyllda intervallet innehåller NULL för uniq_users.

Exempel 4: Iterera i ett litet fönster innan du fyller i hela historiken

Det här exemplet visar hur du validerar frågelogik i ett litet datafönster innan du bearbetar hela det historiska intervallet.

Börja med ett kort fönster så att varje uppdatering bara beräknas om de senaste 7 dagarna medan du ändrar frågan:

SQL

CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
  return (
    spark.read.table("marketing_events")
      .groupBy("event_date", "campaign_id")
      .agg(F.sum("revenue").alias("total_revenue"))
  )

När frågan har fastställts använder du en åsidosättning av villkor för att utföra en engångsbakåtfyllning av historiska data:

overrides = [
  {
    "flow_name": "revenue_attribution",
    "predicate_override": "event_date >= date_add(current_date(), -365)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id="<pipeline-id>",
  replace_where_overrides=overrides,
  refresh_selection=["revenue_attribution"],
)