Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Importante
Os fluxos REPLACE WHERE estão em Beta.
Esta página descreve como utilizar os fluxos REPLACE WHERE nos Pipelines Declarativos do Lakeflow Spark para recalcular e substituir um subconjunto específico de uma tabela sem reprocessar todo o histórico da tabela. Os fluxos REPLACE WHERE lidam com dados de chegada tardia, reprocessamento a montante, evolução de esquemas e preenchimentos retroativos.
Com um fluxo REPLACE WHERE , defines um predicado na tabela alvo. Todas as linhas que correspondem ao predicado são eliminadas e substituídas pela reavaliação da consulta de origem para esse mesmo intervalo de predicados. As linhas que não correspondem ao predicado ficam intocadas.
Requirements
Os fluxos REPLACE WHERE têm os seguintes requisitos:
- O seu pipeline tem de usar o canal
PREVIEW. - Databricks recomenda o Unity Catalog e a computação sem servidor. A atualização incremental só é suportada em computação serverless.
Quando usar os fluxos REPLACE WHERE
Use os fluxos REPLACE WHERE para os seguintes cenários:
- Processamento incremental em lote sem semântica de streaming: Processar novas linhas em lotes sem ter de gerir conceitos de streaming, tais como marcas de água.
- Reprocessamento seletivo: Recalcule apenas as linhas que correspondam a um predicado, deixando todas as outras linhas intocadas.
-
Cenários para além das capacidades padrão de visualização materializada:
- Tabelas alvo com maior retenção do que a fonte
- Evitar recomputação quando uma tabela de dimensões muda
- Evolução de esquemas sem recalcular toda a história
Criar um fluxo REPLACE WHERE
Defina fluxos REPLACE WHERE em SQL ou Python.
SQL
Utilize a cláusula FLOW REPLACE WHERE na mesma linha que CREATE STREAMING TABLE:
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Alternativamente, use a sintaxe de formato longo CREATE FLOW :
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Python
Em Python, a tabela e o fluxo são definidos numa única instrução. O fluxo herda o mesmo nome da tabela:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
O replace_where parâmetro aceita uma expressão de coluna PySpark ou um predicado de cadeia.
Nestes exemplos, todas as linhas dos últimos 7 dias são eliminadas de orders_enriched e recalculadas usando a consulta de origem. Não precisas de adicionar o predicado à consulta de origem. O motor de pipeline aplica-o automaticamente ao ler da fonte.
Note
BY NAME é exigido em SQL. Faz a correspondência das colunas com base no nome e não pela posição.
Preencher dados históricos
Para escrever linhas históricas ou corrigidas na tabela alvo fora das atualizações programadas, escolha entre dois mecanismos com base na localização dos dados históricos:
- Substituições de predicados: Execute novamente a consulta de origem do fluxo para um intervalo de predicados único. Use quando os dados históricos provêm da mesma fonte que os dados incrementais.
- Instruções DML: Insira diretamente na tabela de destino, ignorando o fluxo. Use quando os dados históricos estão numa fonte diferente dos dados incrementais.
Substituições de predicados
Substitua o predicado REPLACE WHERE para uma única atualização do pipeline sem modificar a definição do pipeline. As substituições de predicados são de utilização única, aplicam-se apenas à atualização em curso e não afetam execuções futuras.
Exemplo: Carga histórica inicial
Para realizar um preenchimento único de dados históricos ao configurar um pipeline pela primeira vez:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
Exemplo: Corrigir uma coluna para um período específico
Após atualizar a definição de uma coluna, preencha a alteração para um intervalo histórico alvo:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
Combine múltiplas dimensões numa única substituição de predicado:
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
}
]
Função auxiliar: start_update_with_replace_where
Utilize a API de atualização do pipeline a partir de um bloco de notas para submeter substituições de predicados:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
Instruções DML
Execute instruções DML diretamente na tabela de destino a partir de fora do pipeline para realizar cargas ou correções iniciais, como carregar a partir de uma tabela legada:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
As linhas inseridas através do DML não estão sujeitas ao predicado REPLACE WHERE e persistem através das atualizações programadas, a menos que estejam dentro do intervalo de predicados de uma execução futura.
Comportamento de atualização completo
Uma atualização completa de um fluxo REPLACE WHERE reexecuta a consulta de origem usando apenas o predicado atual. As linhas que foram inseridas por sobreposições de predicados ou instruções DML fora do intervalo atual de predicados são eliminadas permanentemente.
Advertência
Uma atualização completa limpa todos os dados existentes e reexecuta o fluxo usando apenas o seu predicado definido. Se um pipeline estiver em execução há um ano com um predicado de 7 dias, uma atualização completa faz com que a tabela contenha apenas os dados dos últimos 7 dias. Todas as linhas antigas são eliminadas permanentemente.
Para evitar atualizações completas numa tabela, defina a propriedade pipelines.reset.allowed da tabela para false. Consulte Referência das propriedades de pipeline.
Atualização incremental
Os fluxos REPLACE WHERE utilizam atualização incremental sempre que possível, reprocessando apenas os dados de origem que mudaram desde a última atualização em vez de recalcular toda a janela de substituição. A atualização incremental requer capacidade de computação sem servidor.
Quando se aplica a atualização incremental
Tudo o seguinte deve ser verdade:
- O fluxo de processamento é executado em computação sem servidor.
- A forma da consulta é suportada. Ver Atualização incremental para o conjunto de operadores suportado.
- O predicado faz referência às colunas base de uma tabela de origem. Predicados sobre valores derivados, como saídas agregadas ou de funções janela, não podem ser enviados para uma fonte, o que desabilita a atualização incremental.
- Nenhum DML externo modificou linhas na janela de substituição atual. DML que modifica linhas fora da janela atual não é afetado.
- A janela de substituição atual não inclui linhas que o predicado anterior excluiu. Se ampliar o predicado para abranger um intervalo que não tenha sido processado anteriormente, essa atualização recorre novamente a uma recomputação completa. As atualizações subsequentes podem voltar a beneficiar da atualização incremental.
- O predicado é determinístico. Predicados que usam funções não determinísticas, como
rand(), desativam a atualização incremental. Funções temporais comocurrent_date()são permitidas.
A primeira atualização de qualquer fluxo é sempre um cálculo completo. Se alguma condição não for cumprida, essa atualização volta ao recálculo total da janela de substituição atual.
Boas práticas para renovação incremental
Siga estas orientações para que os fluxos REPLACE WHERE continuem elegíveis para atualização incremental.
Utilize um limite inferior móvel
Predicados com limite inferior móvel continuam elegíveis para atualização incremental indefinidamente.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Um limite superior móvel, como date BETWEEN date_add(current_date(), -7) AND current_date(), pode deslocar a janela para incluir linhas anteriormente excluídas, desencadeando um recurso pontual ao recálculo completo.
Inclua a coluna de predicado em GROUP BY
Ao agregar, inclua a coluna do predicado em GROUP BY para que o motor possa colocar o predicado abaixo da agregação.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Se a coluna do predicado estiver em falta em GROUP BY, o predicado não pode ser colocado abaixo da agregação e a fonte é varrida na totalidade.
Incluir a coluna de predicados nas chaves de junção
Inclua a coluna de predicado na condição de junção para que o motor possa eliminar todas as origens associadas na junção.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Se uma tabela associada não disponibilizar a coluna do predicado, essa tabela é integralmente analisada em cada atualização.
Diagnosticar o recurso à recomputação completa
Quando uma atualização recorre à recomputação completa, o motivo é indicado no evento planning_information do fluxo. Ver Monitorizar registos de eventos do pipeline. A tabela seguinte lista as razões reportadas no evento:
| Reason | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Um DML externo modificava as linhas na janela de substituição atual. |
REPLACE_WHERE_NOT_DETERMINISTIC |
O predicado utiliza expressões não determinísticas. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
A atualização anterior usou um predicado não determinístico. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
O predicado não pode ser aplicado a nenhuma origem, a janela atual inclui linhas não processadas pelo predicado anterior ou a execução utiliza uma substituição do predicado. |
Limitações
Os fluxos REPLACE WHERE têm as seguintes limitações:
- A tabela alvo deve ser criada dentro do pipeline.
- Apenas um fluxo REPLACE WHERE é permitido por tabela alvo.
- Uma tabela direcionada por um fluxo REPLACE WHERE não pode também ser direcionada por outro tipo de fluxo, como um fluxo AUTO CDC ou um fluxo de adição.
- As expectativas não são suportadas em tabelas direcionadas pelos fluxos REPLACE (SUBSTITUI WHERE ).
- Para tabelas de streaming criadas em Databricks SQL, veja fluxos REPLACE WHERE para tabelas de streaming independentes para diferenças de sintaxe e preenchimento.
Exemplos
Os exemplos seguintes mostram padrões de fluxo REPLACE comuns WHERE.
Exemplo 1: Manter agregados históricos provenientes de uma fonte de retenção limitada
Este exemplo mantém agregados diários indefinidamente, mesmo depois de os dados brutos deixarem de constar da tabela de origem (retenção de 3 dias):
SQL
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
return (
spark.read.table("events_raw")
.groupBy("date", "key")
.agg(F.sum("val").alias("agg"))
)
Exemplo 2: Evitar a recomputação quando uma tabela de dimensões muda
Este exemplo mantém as linhas históricas de factos inalteradas quando os atributos das dimensões mudam:
SQL
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
fact_table = spark.read.table("fact_table").alias("f")
dim_users = spark.read.table("dim_users").alias("d")
return (
fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
.select(
col("f.date"),
col("f.user_id"),
col("d.region"),
col("f.revenue"),
)
)
Se a região de um utilizador mudar, apenas as linhas recentes são recalculadas. As linhas históricas mantêm o valor da região no momento em que foram escritas. Para corrigir linhas históricas, execute um preenchimento retroativo direcionado usando substituições de predicados.
Exemplo 3: Adicionar uma nova métrica sem recalcular o histórico completo
Este exemplo mostra como evoluir uma definição de tabela e preencher apenas um intervalo alvo:
Defina a tabela inicial:
SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Python
from pyspark import pipelines as dp from pyspark.sql import functions as F from pyspark.sql.functions import col @dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg(F.count("*").alias("clicks")) )Atualize a consulta para acrescentar
uniq_users:SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Python
@dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg( F.count("*").alias("clicks"), F.countDistinct("user_id").alias("uniq_users"), ) )Preencha a nova métrica dos últimos 30 dias:
overrides = [ { "flow_name": "clickstream_daily", "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'", } ] resp = start_update_with_replace_where( pipeline_id="<pipeline-id>", replace_where_overrides=overrides, refresh_selection=["clickstream_daily"], )As linhas mais antigas do que o intervalo preenchido contêm
NULLparauniq_users.
Exemplo 4: Itera numa janela pequena antes de preencher o histórico completo
Este exemplo mostra como validar a lógica de consulta numa pequena janela de dados antes de processar todo o intervalo histórico.
Comece com uma janela curta para que cada atualização recalcule apenas os últimos 7 dias enquanto revê a consulta:
SQL
CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
return (
spark.read.table("marketing_events")
.groupBy("event_date", "campaign_id")
.agg(F.sum("revenue").alias("total_revenue"))
)
Depois de a consulta estar finalizada, utilize uma substituição de predicado para realizar um preenchimento histórico retroativo pontual:
overrides = [
{
"flow_name": "revenue_attribution",
"predicate_override": "event_date >= date_add(current_date(), -365)",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["revenue_attribution"],
)