Processamento em lote com fluxos REPLACE WHERE

Importante

Os fluxos REPLACE WHERE estão em Beta.

Esta página descreve como usar fluxos REPLACE WHERE no Lakeflow Spark Declarative Pipelines para recomputar e sobrescrever um subconjunto específico de uma tabela sem reprocessar todo o histórico da tabela. Os fluxos REPLACE WHERE lidam com dados que chegam com atraso, reprocessamento a montante, evolução do esquema e preenchimentos retroativos.

Com um fluxo REPLACE WHERE , você define um predicado na tabela de destino. Todas as linhas correspondentes ao predicado são excluídas e substituídas pela reavaliação da consulta de origem para o mesmo intervalo de predicados. As linhas que não correspondem ao predicado são deixadas intocadas.

Requirements

Os fluxos REPLACE WHERE têm os seguintes requisitos:

  • O pipeline deve usar o canal PREVIEW.
  • O Databricks recomenda o Catálogo do Unity e a computação sem servidor. A atualização incremental só tem suporte na computação sem servidor.

Quando usar os fluxos REPLACE WHERE

Use os fluxos REPLACE WHERE para os seguintes cenários:

  • Processamento em lote incremental sem semântica de streaming: processe novas linhas em lotes sem gerenciar conceitos de streaming, como marcas d'água.
  • Reprocessamento seletivo: recompute apenas as linhas que correspondem a um predicado, deixando todas as outras linhas intocadas.
  • Cenários além das capacidades padrão de visão materializada:
    • Tabelas de destino com retenção mais longa do que a da origem
    • Impedindo a recomputação quando uma tabela de dimensão é alterada
    • Evolução do esquema sem recompusar todo o histórico

Criar um fluxo REPLACE WHERE

Defina os fluxos REPLACE WHERE em SQL ou Python.

SQL

Use a FLOW REPLACE WHERE cláusula embutida com CREATE STREAMING TABLE:

CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Como alternativa, use a sintaxe de forma CREATE FLOW longa:

CREATE STREAMING TABLE orders_enriched;

CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Python

Em Python, a tabela e o fluxo são definidos em uma única instrução. O fluxo herda o mesmo nome da tabela:

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
  orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
  product_dim = spark.read.table("product_dim")
  return orders_fct.join(product_dim, "product_id")

O replace_where parâmetro aceita uma expressão de coluna PySpark ou um predicado de cadeia de caracteres.

Nesses exemplos, todas as linhas dos últimos 7 dias são excluídas de orders_enriched e recalculadas usando a consulta de origem. Você não precisa adicionar o predicado à consulta de origem. O mecanismo de pipeline o aplica automaticamente ao fazer a leitura da origem.

Note

BY NAME é necessário no SQL. Faz a correspondência das colunas pelo nome em vez da posição.

Preencher dados históricos

Para gravar linhas históricas ou corrigidas na tabela de destino fora das atualizações agendadas, escolha entre dois mecanismos com base no local em que os dados históricos residem:

  • Substituições de predicado: execute novamente a consulta de origem do fluxo para um intervalo de predicado único. Use quando os dados históricos forem provenientes da mesma fonte que os dados incrementais.
  • Instruções DML: insira diretamente na tabela de destino, ignorando o fluxo. Use quando os dados históricos residem em uma fonte diferente dos dados incrementais.

Substituições de predicados

Substitua o predicado REPLACE WHERE em uma única atualização do pipeline sem modificar a definição do pipeline. As substituições de predicados são de uso único, aplicam-se apenas na atualização atual e não afetam execuções futuras.

Exemplo: carga histórica inicial

Para realizar um preenchimento retroativo único de dados históricos durante a configuração inicial de um pipeline:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
)
print(resp)

Exemplo: corrigir uma coluna para um período específico

Depois de atualizar uma definição de coluna, faça o backup da alteração para um intervalo histórico de destino:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
  refresh_selection=["orders_enriched"],
)
print(resp)

Combine várias dimensões em uma única substituição de predicado:

overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
  }
]
Função auxiliar: start_update_with_replace_where

Use a API de atualização do pipeline em um notebook para enviar substituições de predicado:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse


def start_update_with_replace_where(
  pipeline_id: str,
  replace_where_overrides: list[dict],
  refresh_selection: list[str] = None,
) -> StartUpdateResponse:
  """Start a pipeline update with REPLACE WHERE predicate overrides."""
  client = WorkspaceClient()

  body = {
    "pipeline_id": pipeline_id,
    "cause": "JOB_TASK",
    "update_cause_details": {
      "job_details": {"performance_target": "PERFORMANCE"}
    },
    "replace_where_overrides": replace_where_overrides,
  }

  if refresh_selection:
    body["refresh_selection"] = refresh_selection

  res = client.api_client.do(
    "POST",
    f"/api/2.0/pipelines/{pipeline_id}/updates",
    body=body,
    headers={"Accept": "application/json", "Content-Type": "application/json"},
  )

  return StartUpdateResponse.from_dict(res)

Instruções DML

Execute instruções DML diretamente na tabela de destino, fora do pipeline, para realizar cargas iniciais ou correções, como carregar dados de uma tabela legada:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

As linhas inseridas por meio de DML não estão sujeitas ao predicado REPLACE WHERE e persistem entre as atualizações agendadas, a menos que estejam dentro do intervalo de predicados de uma execução futura.

Comportamento de atualização completa

Uma atualização completa de um fluxo REPLACE WHERE executa novamente a consulta de origem usando apenas o predicado atual. As linhas inseridas por substituições de predicado ou instruções DML fora do intervalo de predicado atual são excluídas permanentemente.

Warning

Uma atualização completa limpa todos os dados existentes e executa novamente o fluxo usando apenas seu predicado definido. Se um pipeline estiver em execução há um ano com um predicado de 7 dias, uma atualização completa fará com que a tabela contenha somente os dados dos últimos 7 dias. Todas as linhas mais antigas são excluídas permanentemente.

Para evitar atualizações completas em uma tabela, defina a propriedade pipelines.reset.allowed da tabela como false. Consulte a referência das propriedades do pipeline.

Atualização incremental

Os fluxos REPLACE WHERE usam a atualização incremental quando possível, reprocessando apenas os dados de origem que foram alterados desde a última atualização em vez de recompusar toda a janela de substituição. A atualização incremental requer computação sem servidor.

Quando a atualização incremental se aplica

Todos os seguintes devem ser verdadeiros:

  • O pipeline é executado na computação sem servidor.
  • Há suporte para a forma de consulta. Consulte Atualização incremental para ver o conjunto de operadores compatíveis.
  • O predicado faz referência a colunas base de uma tabela de origem. Predicados sobre valores derivados, como resultados de funções de agregação ou de janela, não podem ser aplicados a uma fonte de dados, o que desativa a atualização incremental.
  • Nenhum DML externo modificou linhas na janela de substituição atual. O DML que modifica linhas fora da janela atual não é afetado.
  • A janela de substituição atual não inclui linhas excluídas pelo predicado anterior. Se você ampliar o predicado para cobrir um intervalo não processado anteriormente, essa atualização retornará à recomputação completa. As atualizações subsequentes se qualificam para atualização incremental novamente.
  • O predicado é determinístico. Predicados que usam funções não determinísticas, como rand(), desativam a atualização incremental. Funções temporais, como current_date() são permitidas.

A primeira atualização de qualquer fluxo é sempre uma computação completa. Se qualquer condição não for atendida, essa atualização retornará à recomputação completa da janela de substituição atual.

Práticas recomendadas para atualização incremental

Siga estas diretrizes para que os fluxos REPLACE WHERE permaneçam aptos à atualização incremental.

Usar um limite inferior móvel

Predicados com um limite inferior móvel permanecem elegíveis para atualização incremental indefinidamente.

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

Um limite superior móvel, como date BETWEEN date_add(current_date(), -7) AND current_date(), pode deslocar a janela para incluir linhas excluídas anteriormente, acionando um retorno pontual à recomputação completa.

Incluir a coluna de predicado em GROUP BY

Ao agregar, inclua a coluna de predicado em GROUP BY para que o mecanismo possa aplicar o predicado antes da agregação.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

Se a coluna de predicado estiver ausente em GROUP BY, o predicado não poderá ser deslocado para baixo da agregação, e a origem será lida por completo.

Incluir a coluna de predicados nas chaves de junção

Inclua a coluna de predicados na condição de junção para que o mecanismo possa podar todas as fontes associadas na junção.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

Se uma tabela associada não expuser a coluna de predicado, essa tabela será verificada por completo em cada atualização.

Diagnosticar o recurso de recomputação completa

Quando uma atualização recorre à recomputação completa, o motivo é informado no evento planning_information do fluxo. Consulte os logs de eventos de pipeline do Monitor. A tabela a seguir lista os motivos relatados no evento:

Motivo Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW Um DML externo modificou linhas na janela de substituição atual.
REPLACE_WHERE_NOT_DETERMINISTIC O predicado usa expressões não determinísticas.
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC A atualização anterior usou um predicado não determinístico.
UNSUPPORTED_REPLACE_WHERE_PREDICATE O predicado não pode ser enviado por push para qualquer origem, a janela atual inclui linhas não processadas pelo predicado anterior ou a execução usa uma substituição de predicado.

Limitações

Os fluxos REPLACE WHERE têm as seguintes limitações:

  • A tabela de destino deve ser criada dentro do pipeline.
  • Somente um fluxo REPLACE WHERE é permitido por tabela de destino.
  • Uma tabela usada como destino por um fluxo do tipo REPLACE WHERE também não pode ser usada como destino por outro tipo de fluxo, como um fluxo AUTO CDC ou um fluxo de anexação.
  • Não há suporte para expectativas em tabelas direcionadas por fluxos REPLACE WHERE .
  • Para tabelas de streaming criadas no Databricks SQL, consulte fluxos com REPLACE WHERE para tabelas de streaming independentes para ver as diferenças de sintaxe e de backfill.

Exemplos

Os exemplos a seguir mostram padrões comuns de fluxo REPLACE WHERE .

Exemplo 1: Manter agregações históricas de uma fonte de retenção limitada

Este exemplo mantém agregações diárias indefinidamente, mesmo após os dados brutos serem retirados da tabela de origem (retenção de 3 dias):

SQL

CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
  return (
    spark.read.table("events_raw")
      .groupBy("date", "key")
      .agg(F.sum("val").alias("agg"))
  )

Exemplo 2: impedir a recomputação quando uma tabela de dimensão for alterada

Este exemplo mantém as linhas de fatos históricos inalteradas quando os atributos de dimensão mudam:

SQL

CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
  fact_table = spark.read.table("fact_table").alias("f")
  dim_users = spark.read.table("dim_users").alias("d")
  return (
    fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
      .select(
        col("f.date"),
        col("f.user_id"),
        col("d.region"),
        col("f.revenue"),
      )
  )

Se a região de um usuário for alterada, somente as linhas recentes serão recomputadas. As linhas históricas mantêm o valor da região no momento em que foram gravadas. Para corrigir linhas históricas, execute um backfill direcionado usando substituições de predicado.

Exemplo 3: Adicionar uma nova métrica sem recompusar o histórico completo

Este exemplo mostra como evoluir a definição de uma tabela e preencher retroativamente apenas um intervalo específico:

  1. Defina a tabela inicial:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    from pyspark.sql.functions import col
    
    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(F.count("*").alias("clicks"))
      )
    
  2. Atualize a consulta para adicionar uniq_users:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(
            F.count("*").alias("clicks"),
            F.countDistinct("user_id").alias("uniq_users"),
          )
      )
    
  3. Faça o backup da nova métrica nos últimos 30 dias:

    overrides = [
      {
        "flow_name": "clickstream_daily",
        "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
      }
    ]
    
    resp = start_update_with_replace_where(
      pipeline_id="<pipeline-id>",
      replace_where_overrides=overrides,
      refresh_selection=["clickstream_daily"],
    )
    

    Linhas mais antigas que o intervalo preenchido retroativamente contêm NULL em uniq_users.

Exemplo 4: Iterar em uma janela pequena antes de fazer backup do histórico completo

Este exemplo mostra como validar a lógica de consulta em uma pequena janela de dados antes de processar o intervalo histórico completo.

Comece com uma janela curta para que cada atualização recomputa apenas os últimos 7 dias enquanto você revisa a consulta:

SQL

CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
  return (
    spark.read.table("marketing_events")
      .groupBy("event_date", "campaign_id")
      .agg(F.sum("revenue").alias("total_revenue"))
  )

Depois que a consulta for finalizada, use uma substituição de predicado para executar um backfill histórico único:

overrides = [
  {
    "flow_name": "revenue_attribution",
    "predicate_override": "event_date >= date_add(current_date(), -365)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id="<pipeline-id>",
  replace_where_overrides=overrides,
  refresh_selection=["revenue_attribution"],
)