Processamento em lote com fluxos de REPLACE WHERE

Importante

Os fluxos REPLACE WHERE estão em Beta.

Esta página descreve como utilizar os fluxos REPLACE WHERE nos Pipelines Declarativos do Lakeflow Spark para recalcular e substituir um subconjunto específico de uma tabela sem reprocessar todo o histórico da tabela. Os fluxos REPLACE WHERE lidam com dados de chegada tardia, reprocessamento a montante, evolução de esquemas e preenchimentos retroativos.

Com um fluxo REPLACE WHERE , defines um predicado na tabela alvo. Todas as linhas que correspondem ao predicado são eliminadas e substituídas pela reavaliação da consulta de origem para esse mesmo intervalo de predicados. As linhas que não correspondem ao predicado ficam intocadas.

Requirements

Os fluxos REPLACE WHERE têm os seguintes requisitos:

  • O seu pipeline tem de usar o canal PREVIEW.
  • Databricks recomenda o Unity Catalog e a computação sem servidor. A atualização incremental só é suportada em computação serverless.

Quando usar os fluxos REPLACE WHERE

Use os fluxos REPLACE WHERE para os seguintes cenários:

  • Processamento incremental em lote sem semântica de streaming: Processar novas linhas em lotes sem ter de gerir conceitos de streaming, tais como marcas de água.
  • Reprocessamento seletivo: Recalcule apenas as linhas que correspondam a um predicado, deixando todas as outras linhas intocadas.
  • Cenários para além das capacidades padrão de visualização materializada:
    • Tabelas alvo com maior retenção do que a fonte
    • Evitar recomputação quando uma tabela de dimensões muda
    • Evolução de esquemas sem recalcular toda a história

Criar um fluxo REPLACE WHERE

Defina fluxos REPLACE WHERE em SQL ou Python.

SQL

Utilize a cláusula FLOW REPLACE WHERE na mesma linha que CREATE STREAMING TABLE:

CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Alternativamente, use a sintaxe de formato longo CREATE FLOW :

CREATE STREAMING TABLE orders_enriched;

CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Python

Em Python, a tabela e o fluxo são definidos numa única instrução. O fluxo herda o mesmo nome da tabela:

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
  orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
  product_dim = spark.read.table("product_dim")
  return orders_fct.join(product_dim, "product_id")

O replace_where parâmetro aceita uma expressão de coluna PySpark ou um predicado de cadeia.

Nestes exemplos, todas as linhas dos últimos 7 dias são eliminadas de orders_enriched e recalculadas usando a consulta de origem. Não precisas de adicionar o predicado à consulta de origem. O motor de pipeline aplica-o automaticamente ao ler da fonte.

Note

BY NAME é exigido em SQL. Faz a correspondência das colunas com base no nome e não pela posição.

Preencher dados históricos

Para escrever linhas históricas ou corrigidas na tabela alvo fora das atualizações programadas, escolha entre dois mecanismos com base na localização dos dados históricos:

  • Substituições de predicados: Execute novamente a consulta de origem do fluxo para um intervalo de predicados único. Use quando os dados históricos provêm da mesma fonte que os dados incrementais.
  • Instruções DML: Insira diretamente na tabela de destino, ignorando o fluxo. Use quando os dados históricos estão numa fonte diferente dos dados incrementais.

Substituições de predicados

Substitua o predicado REPLACE WHERE para uma única atualização do pipeline sem modificar a definição do pipeline. As substituições de predicados são de utilização única, aplicam-se apenas à atualização em curso e não afetam execuções futuras.

Exemplo: Carga histórica inicial

Para realizar um preenchimento único de dados históricos ao configurar um pipeline pela primeira vez:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
)
print(resp)

Exemplo: Corrigir uma coluna para um período específico

Após atualizar a definição de uma coluna, preencha a alteração para um intervalo histórico alvo:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
  refresh_selection=["orders_enriched"],
)
print(resp)

Combine múltiplas dimensões numa única substituição de predicado:

overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
  }
]
Função auxiliar: start_update_with_replace_where

Utilize a API de atualização do pipeline a partir de um bloco de notas para submeter substituições de predicados:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse


def start_update_with_replace_where(
  pipeline_id: str,
  replace_where_overrides: list[dict],
  refresh_selection: list[str] = None,
) -> StartUpdateResponse:
  """Start a pipeline update with REPLACE WHERE predicate overrides."""
  client = WorkspaceClient()

  body = {
    "pipeline_id": pipeline_id,
    "cause": "JOB_TASK",
    "update_cause_details": {
      "job_details": {"performance_target": "PERFORMANCE"}
    },
    "replace_where_overrides": replace_where_overrides,
  }

  if refresh_selection:
    body["refresh_selection"] = refresh_selection

  res = client.api_client.do(
    "POST",
    f"/api/2.0/pipelines/{pipeline_id}/updates",
    body=body,
    headers={"Accept": "application/json", "Content-Type": "application/json"},
  )

  return StartUpdateResponse.from_dict(res)

Instruções DML

Execute instruções DML diretamente na tabela de destino a partir de fora do pipeline para realizar cargas ou correções iniciais, como carregar a partir de uma tabela legada:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

As linhas inseridas através do DML não estão sujeitas ao predicado REPLACE WHERE e persistem através das atualizações programadas, a menos que estejam dentro do intervalo de predicados de uma execução futura.

Comportamento de atualização completo

Uma atualização completa de um fluxo REPLACE WHERE reexecuta a consulta de origem usando apenas o predicado atual. As linhas que foram inseridas por sobreposições de predicados ou instruções DML fora do intervalo atual de predicados são eliminadas permanentemente.

Advertência

Uma atualização completa limpa todos os dados existentes e reexecuta o fluxo usando apenas o seu predicado definido. Se um pipeline estiver em execução há um ano com um predicado de 7 dias, uma atualização completa faz com que a tabela contenha apenas os dados dos últimos 7 dias. Todas as linhas antigas são eliminadas permanentemente.

Para evitar atualizações completas numa tabela, defina a propriedade pipelines.reset.allowed da tabela para false. Consulte Referência das propriedades de pipeline.

Atualização incremental

Os fluxos REPLACE WHERE utilizam atualização incremental sempre que possível, reprocessando apenas os dados de origem que mudaram desde a última atualização em vez de recalcular toda a janela de substituição. A atualização incremental requer capacidade de computação sem servidor.

Quando se aplica a atualização incremental

Tudo o seguinte deve ser verdade:

  • O fluxo de processamento é executado em computação sem servidor.
  • A forma da consulta é suportada. Ver Atualização incremental para o conjunto de operadores suportado.
  • O predicado faz referência às colunas base de uma tabela de origem. Predicados sobre valores derivados, como saídas agregadas ou de funções janela, não podem ser enviados para uma fonte, o que desabilita a atualização incremental.
  • Nenhum DML externo modificou linhas na janela de substituição atual. DML que modifica linhas fora da janela atual não é afetado.
  • A janela de substituição atual não inclui linhas que o predicado anterior excluiu. Se ampliar o predicado para abranger um intervalo que não tenha sido processado anteriormente, essa atualização recorre novamente a uma recomputação completa. As atualizações subsequentes podem voltar a beneficiar da atualização incremental.
  • O predicado é determinístico. Predicados que usam funções não determinísticas, como rand(), desativam a atualização incremental. Funções temporais como current_date() são permitidas.

A primeira atualização de qualquer fluxo é sempre um cálculo completo. Se alguma condição não for cumprida, essa atualização volta ao recálculo total da janela de substituição atual.

Boas práticas para renovação incremental

Siga estas orientações para que os fluxos REPLACE WHERE continuem elegíveis para atualização incremental.

Utilize um limite inferior móvel

Predicados com limite inferior móvel continuam elegíveis para atualização incremental indefinidamente.

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

Um limite superior móvel, como date BETWEEN date_add(current_date(), -7) AND current_date(), pode deslocar a janela para incluir linhas anteriormente excluídas, desencadeando um recurso pontual ao recálculo completo.

Inclua a coluna de predicado em GROUP BY

Ao agregar, inclua a coluna do predicado em GROUP BY para que o motor possa colocar o predicado abaixo da agregação.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

Se a coluna do predicado estiver em falta em GROUP BY, o predicado não pode ser colocado abaixo da agregação e a fonte é varrida na totalidade.

Incluir a coluna de predicados nas chaves de junção

Inclua a coluna de predicado na condição de junção para que o motor possa eliminar todas as origens associadas na junção.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

Se uma tabela associada não disponibilizar a coluna do predicado, essa tabela é integralmente analisada em cada atualização.

Diagnosticar o recurso à recomputação completa

Quando uma atualização recorre à recomputação completa, o motivo é indicado no evento planning_information do fluxo. Ver Monitorizar registos de eventos do pipeline. A tabela seguinte lista as razões reportadas no evento:

Reason Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW Um DML externo modificava as linhas na janela de substituição atual.
REPLACE_WHERE_NOT_DETERMINISTIC O predicado utiliza expressões não determinísticas.
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC A atualização anterior usou um predicado não determinístico.
UNSUPPORTED_REPLACE_WHERE_PREDICATE O predicado não pode ser aplicado a nenhuma origem, a janela atual inclui linhas não processadas pelo predicado anterior ou a execução utiliza uma substituição do predicado.

Limitações

Os fluxos REPLACE WHERE têm as seguintes limitações:

  • A tabela alvo deve ser criada dentro do pipeline.
  • Apenas um fluxo REPLACE WHERE é permitido por tabela alvo.
  • Uma tabela direcionada por um fluxo REPLACE WHERE não pode também ser direcionada por outro tipo de fluxo, como um fluxo AUTO CDC ou um fluxo de adição.
  • As expectativas não são suportadas em tabelas direcionadas pelos fluxos REPLACE (SUBSTITUI WHERE ).
  • Para tabelas de streaming criadas em Databricks SQL, veja fluxos REPLACE WHERE para tabelas de streaming independentes para diferenças de sintaxe e preenchimento.

Exemplos

Os exemplos seguintes mostram padrões de fluxo REPLACE comuns WHERE.

Exemplo 1: Manter agregados históricos provenientes de uma fonte de retenção limitada

Este exemplo mantém agregados diários indefinidamente, mesmo depois de os dados brutos deixarem de constar da tabela de origem (retenção de 3 dias):

SQL

CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
  return (
    spark.read.table("events_raw")
      .groupBy("date", "key")
      .agg(F.sum("val").alias("agg"))
  )

Exemplo 2: Evitar a recomputação quando uma tabela de dimensões muda

Este exemplo mantém as linhas históricas de factos inalteradas quando os atributos das dimensões mudam:

SQL

CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
  fact_table = spark.read.table("fact_table").alias("f")
  dim_users = spark.read.table("dim_users").alias("d")
  return (
    fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
      .select(
        col("f.date"),
        col("f.user_id"),
        col("d.region"),
        col("f.revenue"),
      )
  )

Se a região de um utilizador mudar, apenas as linhas recentes são recalculadas. As linhas históricas mantêm o valor da região no momento em que foram escritas. Para corrigir linhas históricas, execute um preenchimento retroativo direcionado usando substituições de predicados.

Exemplo 3: Adicionar uma nova métrica sem recalcular o histórico completo

Este exemplo mostra como evoluir uma definição de tabela e preencher apenas um intervalo alvo:

  1. Defina a tabela inicial:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    from pyspark.sql.functions import col
    
    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(F.count("*").alias("clicks"))
      )
    
  2. Atualize a consulta para acrescentar uniq_users:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(
            F.count("*").alias("clicks"),
            F.countDistinct("user_id").alias("uniq_users"),
          )
      )
    
  3. Preencha a nova métrica dos últimos 30 dias:

    overrides = [
      {
        "flow_name": "clickstream_daily",
        "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
      }
    ]
    
    resp = start_update_with_replace_where(
      pipeline_id="<pipeline-id>",
      replace_where_overrides=overrides,
      refresh_selection=["clickstream_daily"],
    )
    

    As linhas mais antigas do que o intervalo preenchido contêm NULL para uniq_users.

Exemplo 4: Itera numa janela pequena antes de preencher o histórico completo

Este exemplo mostra como validar a lógica de consulta numa pequena janela de dados antes de processar todo o intervalo histórico.

Comece com uma janela curta para que cada atualização recalcule apenas os últimos 7 dias enquanto revê a consulta:

SQL

CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
  return (
    spark.read.table("marketing_events")
      .groupBy("event_date", "campaign_id")
      .agg(F.sum("revenue").alias("total_revenue"))
  )

Depois de a consulta estar finalizada, utilize uma substituição de predicado para realizar um preenchimento histórico retroativo pontual:

overrides = [
  {
    "flow_name": "revenue_attribution",
    "predicate_override": "event_date >= date_add(current_date(), -365)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id="<pipeline-id>",
  replace_where_overrides=overrides,
  refresh_selection=["revenue_attribution"],
)