Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Importante
Os fluxos REPLACE WHERE estão em Beta.
Esta página descreve como usar fluxos REPLACE WHERE no Lakeflow Spark Declarative Pipelines para recomputar e sobrescrever um subconjunto específico de uma tabela sem reprocessar todo o histórico da tabela. Os fluxos REPLACE WHERE lidam com dados que chegam com atraso, reprocessamento a montante, evolução do esquema e preenchimentos retroativos.
Com um fluxo REPLACE WHERE , você define um predicado na tabela de destino. Todas as linhas correspondentes ao predicado são excluídas e substituídas pela reavaliação da consulta de origem para o mesmo intervalo de predicados. As linhas que não correspondem ao predicado são deixadas intocadas.
Requirements
Os fluxos REPLACE WHERE têm os seguintes requisitos:
- O pipeline deve usar o canal
PREVIEW. - O Databricks recomenda o Catálogo do Unity e a computação sem servidor. A atualização incremental só tem suporte na computação sem servidor.
Quando usar os fluxos REPLACE WHERE
Use os fluxos REPLACE WHERE para os seguintes cenários:
- Processamento em lote incremental sem semântica de streaming: processe novas linhas em lotes sem gerenciar conceitos de streaming, como marcas d'água.
- Reprocessamento seletivo: recompute apenas as linhas que correspondem a um predicado, deixando todas as outras linhas intocadas.
-
Cenários além das capacidades padrão de visão materializada:
- Tabelas de destino com retenção mais longa do que a da origem
- Impedindo a recomputação quando uma tabela de dimensão é alterada
- Evolução do esquema sem recompusar todo o histórico
Criar um fluxo REPLACE WHERE
Defina os fluxos REPLACE WHERE em SQL ou Python.
SQL
Use a FLOW REPLACE WHERE cláusula embutida com CREATE STREAMING TABLE:
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Como alternativa, use a sintaxe de forma CREATE FLOW longa:
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Python
Em Python, a tabela e o fluxo são definidos em uma única instrução. O fluxo herda o mesmo nome da tabela:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
O replace_where parâmetro aceita uma expressão de coluna PySpark ou um predicado de cadeia de caracteres.
Nesses exemplos, todas as linhas dos últimos 7 dias são excluídas de orders_enriched e recalculadas usando a consulta de origem. Você não precisa adicionar o predicado à consulta de origem. O mecanismo de pipeline o aplica automaticamente ao fazer a leitura da origem.
Note
BY NAME é necessário no SQL. Faz a correspondência das colunas pelo nome em vez da posição.
Preencher dados históricos
Para gravar linhas históricas ou corrigidas na tabela de destino fora das atualizações agendadas, escolha entre dois mecanismos com base no local em que os dados históricos residem:
- Substituições de predicado: execute novamente a consulta de origem do fluxo para um intervalo de predicado único. Use quando os dados históricos forem provenientes da mesma fonte que os dados incrementais.
- Instruções DML: insira diretamente na tabela de destino, ignorando o fluxo. Use quando os dados históricos residem em uma fonte diferente dos dados incrementais.
Substituições de predicados
Substitua o predicado REPLACE WHERE em uma única atualização do pipeline sem modificar a definição do pipeline. As substituições de predicados são de uso único, aplicam-se apenas na atualização atual e não afetam execuções futuras.
Exemplo: carga histórica inicial
Para realizar um preenchimento retroativo único de dados históricos durante a configuração inicial de um pipeline:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
Exemplo: corrigir uma coluna para um período específico
Depois de atualizar uma definição de coluna, faça o backup da alteração para um intervalo histórico de destino:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
Combine várias dimensões em uma única substituição de predicado:
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
}
]
Função auxiliar: start_update_with_replace_where
Use a API de atualização do pipeline em um notebook para enviar substituições de predicado:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
Instruções DML
Execute instruções DML diretamente na tabela de destino, fora do pipeline, para realizar cargas iniciais ou correções, como carregar dados de uma tabela legada:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
As linhas inseridas por meio de DML não estão sujeitas ao predicado REPLACE WHERE e persistem entre as atualizações agendadas, a menos que estejam dentro do intervalo de predicados de uma execução futura.
Comportamento de atualização completa
Uma atualização completa de um fluxo REPLACE WHERE executa novamente a consulta de origem usando apenas o predicado atual. As linhas inseridas por substituições de predicado ou instruções DML fora do intervalo de predicado atual são excluídas permanentemente.
Warning
Uma atualização completa limpa todos os dados existentes e executa novamente o fluxo usando apenas seu predicado definido. Se um pipeline estiver em execução há um ano com um predicado de 7 dias, uma atualização completa fará com que a tabela contenha somente os dados dos últimos 7 dias. Todas as linhas mais antigas são excluídas permanentemente.
Para evitar atualizações completas em uma tabela, defina a propriedade pipelines.reset.allowed da tabela como false. Consulte a referência das propriedades do pipeline.
Atualização incremental
Os fluxos REPLACE WHERE usam a atualização incremental quando possível, reprocessando apenas os dados de origem que foram alterados desde a última atualização em vez de recompusar toda a janela de substituição. A atualização incremental requer computação sem servidor.
Quando a atualização incremental se aplica
Todos os seguintes devem ser verdadeiros:
- O pipeline é executado na computação sem servidor.
- Há suporte para a forma de consulta. Consulte Atualização incremental para ver o conjunto de operadores compatíveis.
- O predicado faz referência a colunas base de uma tabela de origem. Predicados sobre valores derivados, como resultados de funções de agregação ou de janela, não podem ser aplicados a uma fonte de dados, o que desativa a atualização incremental.
- Nenhum DML externo modificou linhas na janela de substituição atual. O DML que modifica linhas fora da janela atual não é afetado.
- A janela de substituição atual não inclui linhas excluídas pelo predicado anterior. Se você ampliar o predicado para cobrir um intervalo não processado anteriormente, essa atualização retornará à recomputação completa. As atualizações subsequentes se qualificam para atualização incremental novamente.
- O predicado é determinístico. Predicados que usam funções não determinísticas, como
rand(), desativam a atualização incremental. Funções temporais, comocurrent_date()são permitidas.
A primeira atualização de qualquer fluxo é sempre uma computação completa. Se qualquer condição não for atendida, essa atualização retornará à recomputação completa da janela de substituição atual.
Práticas recomendadas para atualização incremental
Siga estas diretrizes para que os fluxos REPLACE WHERE permaneçam aptos à atualização incremental.
Usar um limite inferior móvel
Predicados com um limite inferior móvel permanecem elegíveis para atualização incremental indefinidamente.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Um limite superior móvel, como date BETWEEN date_add(current_date(), -7) AND current_date(), pode deslocar a janela para incluir linhas excluídas anteriormente, acionando um retorno pontual à recomputação completa.
Incluir a coluna de predicado em GROUP BY
Ao agregar, inclua a coluna de predicado em GROUP BY para que o mecanismo possa aplicar o predicado antes da agregação.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Se a coluna de predicado estiver ausente em GROUP BY, o predicado não poderá ser deslocado para baixo da agregação, e a origem será lida por completo.
Incluir a coluna de predicados nas chaves de junção
Inclua a coluna de predicados na condição de junção para que o mecanismo possa podar todas as fontes associadas na junção.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Se uma tabela associada não expuser a coluna de predicado, essa tabela será verificada por completo em cada atualização.
Diagnosticar o recurso de recomputação completa
Quando uma atualização recorre à recomputação completa, o motivo é informado no evento planning_information do fluxo. Consulte os logs de eventos de pipeline do Monitor. A tabela a seguir lista os motivos relatados no evento:
| Motivo | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Um DML externo modificou linhas na janela de substituição atual. |
REPLACE_WHERE_NOT_DETERMINISTIC |
O predicado usa expressões não determinísticas. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
A atualização anterior usou um predicado não determinístico. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
O predicado não pode ser enviado por push para qualquer origem, a janela atual inclui linhas não processadas pelo predicado anterior ou a execução usa uma substituição de predicado. |
Limitações
Os fluxos REPLACE WHERE têm as seguintes limitações:
- A tabela de destino deve ser criada dentro do pipeline.
- Somente um fluxo REPLACE WHERE é permitido por tabela de destino.
- Uma tabela usada como destino por um fluxo do tipo REPLACE WHERE também não pode ser usada como destino por outro tipo de fluxo, como um fluxo AUTO CDC ou um fluxo de anexação.
- Não há suporte para expectativas em tabelas direcionadas por fluxos REPLACE WHERE .
- Para tabelas de streaming criadas no Databricks SQL, consulte fluxos com REPLACE WHERE para tabelas de streaming independentes para ver as diferenças de sintaxe e de backfill.
Exemplos
Os exemplos a seguir mostram padrões comuns de fluxo REPLACE WHERE .
Exemplo 1: Manter agregações históricas de uma fonte de retenção limitada
Este exemplo mantém agregações diárias indefinidamente, mesmo após os dados brutos serem retirados da tabela de origem (retenção de 3 dias):
SQL
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
return (
spark.read.table("events_raw")
.groupBy("date", "key")
.agg(F.sum("val").alias("agg"))
)
Exemplo 2: impedir a recomputação quando uma tabela de dimensão for alterada
Este exemplo mantém as linhas de fatos históricos inalteradas quando os atributos de dimensão mudam:
SQL
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
fact_table = spark.read.table("fact_table").alias("f")
dim_users = spark.read.table("dim_users").alias("d")
return (
fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
.select(
col("f.date"),
col("f.user_id"),
col("d.region"),
col("f.revenue"),
)
)
Se a região de um usuário for alterada, somente as linhas recentes serão recomputadas. As linhas históricas mantêm o valor da região no momento em que foram gravadas. Para corrigir linhas históricas, execute um backfill direcionado usando substituições de predicado.
Exemplo 3: Adicionar uma nova métrica sem recompusar o histórico completo
Este exemplo mostra como evoluir a definição de uma tabela e preencher retroativamente apenas um intervalo específico:
Defina a tabela inicial:
SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Python
from pyspark import pipelines as dp from pyspark.sql import functions as F from pyspark.sql.functions import col @dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg(F.count("*").alias("clicks")) )Atualize a consulta para adicionar
uniq_users:SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Python
@dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg( F.count("*").alias("clicks"), F.countDistinct("user_id").alias("uniq_users"), ) )Faça o backup da nova métrica nos últimos 30 dias:
overrides = [ { "flow_name": "clickstream_daily", "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'", } ] resp = start_update_with_replace_where( pipeline_id="<pipeline-id>", replace_where_overrides=overrides, refresh_selection=["clickstream_daily"], )Linhas mais antigas que o intervalo preenchido retroativamente contêm
NULLemuniq_users.
Exemplo 4: Iterar em uma janela pequena antes de fazer backup do histórico completo
Este exemplo mostra como validar a lógica de consulta em uma pequena janela de dados antes de processar o intervalo histórico completo.
Comece com uma janela curta para que cada atualização recomputa apenas os últimos 7 dias enquanto você revisa a consulta:
SQL
CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
return (
spark.read.table("marketing_events")
.groupBy("event_date", "campaign_id")
.agg(F.sum("revenue").alias("total_revenue"))
)
Depois que a consulta for finalizada, use uma substituição de predicado para executar um backfill histórico único:
overrides = [
{
"flow_name": "revenue_attribution",
"predicate_override": "event_date >= date_add(current_date(), -365)",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["revenue_attribution"],
)