Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Importante
Os fluxos Replace WHERE para tabelas de streaming autónomas estão em beta.
Esta página descreve como usar fluxos REPLACE WHERE para recalcular e sobrescrever um subconjunto alvo de uma tabela de streaming autónoma sem reprocessar todo o histórico da sua tabela. Os fluxos REPLACE WHERE lidam com dados de chegada tardia, reprocessamento a montante, evolução de esquemas e preenchimentos retroativos.
Com um fluxo REPLACE WHERE , defines um predicado na tabela alvo. Todas as linhas que correspondem ao predicado são eliminadas e substituídas pela reavaliação da consulta de origem para esse mesmo intervalo de predicados. As linhas que não correspondem ao predicado ficam intocadas.
Requisitos
Os fluxos REPLACE WHERE têm os seguintes requisitos:
- A sua tabela de streaming tem de utilizar o canal
PREVIEW. Vejachannelem Configurações do oleoduto. - Databricks recomenda o Unity Catalog e a computação sem servidor. A atualização incremental só é suportada em computação serverless.
Quando usar os fluxos REPLACE WHERE
Use os fluxos REPLACE WHERE para os seguintes cenários:
- Processamento incremental em lote sem semântica de streaming: Processar novas linhas em lotes sem ter de gerir conceitos de streaming, tais como marcas de água.
- Reprocessamento seletivo: Recalcule apenas as linhas que correspondam a um predicado, deixando todas as outras linhas intocadas.
-
Cenários para além das capacidades padrão de visualização materializada:
- Tabelas alvo com maior retenção do que a fonte
- Evitar recomputação quando uma tabela de dimensões muda
- Evolução de esquemas sem recalcular toda a história
Criar um fluxo REPLACE WHERE
Utilize a cláusula FLOW REPLACE WHERE na mesma linha que CREATE OR REFRESH STREAMING TABLE:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Durante a atualização, todas as linhas da tabela de destino que correspondem ao predicado são eliminadas, a consulta de origem é recalculada para esse mesmo intervalo de predicados e os novos resultados são inseridos. Neste exemplo, todas as linhas dos últimos 7 dias são eliminadas de orders_enriched e recalculadas com base na consulta de origem.
Não precisas de adicionar o predicado à consulta de origem. O motor de pipeline aplica-o automaticamente ao ler da fonte.
Note
BY NAME é obrigatório. Garante que as colunas são associadas pelo nome e não pela posição.
Preencher dados históricos
Para realizar preenchimentos, execute instruções DML diretamente na tabela de destino:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Comportamento de atualização completo
Uma atualização completa de um fluxo REPLACE WHERE reexecuta a consulta de origem usando apenas o predicado atual. As linhas inseridas por instruções DML fora do intervalo atual de predicados são permanentemente eliminadas.
Advertência
Uma atualização completa limpa todos os dados existentes e reexecuta o fluxo usando apenas o seu predicado definido. Se um pipeline estiver em execução há um ano com um predicado de 7 dias, uma atualização completa faz com que a tabela contenha apenas os dados dos últimos 7 dias. Todas as linhas antigas são eliminadas permanentemente.
REFRESH STREAMING TABLE orders_enriched FULL;
Para evitar atualizações completas numa tabela, defina a propriedade pipelines.reset.allowed da tabela para false:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.reset.allowed = 'false')
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
...
Atualização incremental
Os fluxos REPLACE WHERE utilizam atualização incremental sempre que possível, reprocessando apenas os dados de origem que mudaram desde a última atualização em vez de recalcular toda a janela de substituição. A atualização incremental requer capacidade de computação sem servidor.
Quando se aplica a atualização incremental
Tudo o seguinte deve ser verdade:
- O fluxo de processamento é executado em computação sem servidor.
- A forma da consulta é suportada. Ver Atualização incremental para o conjunto de operadores suportado.
- O predicado faz referência às colunas base de uma tabela de origem. Predicados sobre valores derivados, como saídas agregadas ou de funções janela, não podem ser enviados para uma fonte, o que desabilita a atualização incremental.
- Nenhum DML externo modificou linhas na janela de substituição atual. DML que modifica linhas fora da janela atual não é afetado.
- A janela de substituição atual não inclui linhas que o predicado anterior excluiu. Se ampliar o predicado para abranger um intervalo que não tenha sido processado anteriormente, essa atualização recorre novamente a uma recomputação completa. As atualizações subsequentes podem voltar a beneficiar da atualização incremental.
- O predicado é determinístico. Predicados que usam funções não determinísticas, como
rand(), desativam a atualização incremental. Funções temporais comocurrent_date()são permitidas.
A primeira atualização de qualquer fluxo é sempre um cálculo completo. Se alguma condição não for cumprida, essa atualização volta ao recálculo total da janela de substituição atual.
Boas práticas para renovação incremental
Siga estas orientações para que os fluxos REPLACE WHERE continuem elegíveis para atualização incremental.
Utilize um limite inferior móvel
Predicados com limite inferior móvel continuam elegíveis para atualização incremental indefinidamente.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Um limite superior móvel, como date BETWEEN date_add(current_date(), -7) AND current_date(), pode deslocar a janela para incluir linhas anteriormente excluídas, desencadeando um recurso pontual ao recálculo completo.
Inclua a coluna de predicado em GROUP BY
Ao agregar, inclua a coluna do predicado em GROUP BY para que o motor possa colocar o predicado abaixo da agregação.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Se a coluna do predicado estiver em falta em GROUP BY, o predicado não pode ser colocado abaixo da agregação e a fonte é varrida na totalidade.
Incluir a coluna de predicados nas chaves de junção
Inclua a coluna de predicado na condição de junção para que o motor possa eliminar todas as origens associadas na junção.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Se uma tabela associada não disponibilizar a coluna do predicado, essa tabela é integralmente analisada em cada atualização.
Diagnosticar o recurso à recomputação completa
Quando uma atualização recorre à recomputação completa, o motivo é indicado no evento planning_information do fluxo. Ver Monitorizar registos de eventos do pipeline. A tabela seguinte lista as razões reportadas no evento:
| Reason | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Um DML externo modificava as linhas na janela de substituição atual. |
REPLACE_WHERE_NOT_DETERMINISTIC |
O predicado utiliza expressões não determinísticas. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
A atualização anterior usou um predicado não determinístico. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
O predicado não pode ser aplicado a nenhuma origem, a janela atual inclui linhas não processadas pelo predicado anterior ou a execução utiliza uma substituição do predicado. |
Exemplos
Os exemplos seguintes mostram padrões de fluxo REPLACE comuns WHERE.
Exemplo 1: Manter agregados históricos provenientes de uma fonte de retenção limitada
Este exemplo mantém agregados diários indefinidamente, mesmo depois de os dados brutos deixarem de constar da tabela de origem (retenção de 3 dias):
CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Exemplo 2: Evitar a recomputação quando uma tabela de dimensões muda
Este exemplo mantém as linhas históricas de factos inalteradas quando os atributos das dimensões mudam:
CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Se a região de um utilizador mudar, apenas as linhas recentes são recalculadas. As linhas históricas mantêm o valor da região no momento em que foram escritas.
Exemplo 3: Adicionar uma nova métrica sem recalcular o histórico completo
Este exemplo mostra como evoluir uma definição de tabela e preencher apenas um intervalo alvo:
Defina a tabela inicial:
CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Atualize a consulta para acrescentar
uniq_users:CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;As linhas anteriores ao período de 7 dias contêm
NULLparauniq_users.
Exemplo 4: Itera numa janela pequena antes de preencher o histórico completo
Este exemplo mostra como validar a lógica de consulta numa pequena janela de dados antes de processar todo o intervalo histórico.
Comece com uma janela curta para validar métricas e itere na lógica de negócio com custos de computação mais baixos:
CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Uma janela curta recalcula apenas os últimos 7 dias em cada atualização, por isso reveja a consulta quantas vezes for necessário antes de se comprometer com uma execução histórica completa.
Uma vez finalizada a consulta, use DML para preencher todo o intervalo histórico:
INSERT INTO revenue_attribution
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;