SUBSTITUIR WHERE fluxos para tabelas de streaming autónomas

Importante

Os fluxos Replace WHERE para tabelas de streaming autónomas estão em beta.

Esta página descreve como usar fluxos REPLACE WHERE para recalcular e sobrescrever um subconjunto alvo de uma tabela de streaming autónoma sem reprocessar todo o histórico da sua tabela. Os fluxos REPLACE WHERE lidam com dados de chegada tardia, reprocessamento a montante, evolução de esquemas e preenchimentos retroativos.

Com um fluxo REPLACE WHERE , defines um predicado na tabela alvo. Todas as linhas que correspondem ao predicado são eliminadas e substituídas pela reavaliação da consulta de origem para esse mesmo intervalo de predicados. As linhas que não correspondem ao predicado ficam intocadas.

Requisitos

Os fluxos REPLACE WHERE têm os seguintes requisitos:

Quando usar os fluxos REPLACE WHERE

Use os fluxos REPLACE WHERE para os seguintes cenários:

  • Processamento incremental em lote sem semântica de streaming: Processar novas linhas em lotes sem ter de gerir conceitos de streaming, tais como marcas de água.
  • Reprocessamento seletivo: Recalcule apenas as linhas que correspondam a um predicado, deixando todas as outras linhas intocadas.
  • Cenários para além das capacidades padrão de visualização materializada:
    • Tabelas alvo com maior retenção do que a fonte
    • Evitar recomputação quando uma tabela de dimensões muda
    • Evolução de esquemas sem recalcular toda a história

Criar um fluxo REPLACE WHERE

Utilize a cláusula FLOW REPLACE WHERE na mesma linha que CREATE OR REFRESH STREAMING TABLE:

CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Durante a atualização, todas as linhas da tabela de destino que correspondem ao predicado são eliminadas, a consulta de origem é recalculada para esse mesmo intervalo de predicados e os novos resultados são inseridos. Neste exemplo, todas as linhas dos últimos 7 dias são eliminadas de orders_enriched e recalculadas com base na consulta de origem.

Não precisas de adicionar o predicado à consulta de origem. O motor de pipeline aplica-o automaticamente ao ler da fonte.

Note

BY NAME é obrigatório. Garante que as colunas são associadas pelo nome e não pela posição.

Preencher dados históricos

Para realizar preenchimentos, execute instruções DML diretamente na tabela de destino:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

Comportamento de atualização completo

Uma atualização completa de um fluxo REPLACE WHERE reexecuta a consulta de origem usando apenas o predicado atual. As linhas inseridas por instruções DML fora do intervalo atual de predicados são permanentemente eliminadas.

Advertência

Uma atualização completa limpa todos os dados existentes e reexecuta o fluxo usando apenas o seu predicado definido. Se um pipeline estiver em execução há um ano com um predicado de 7 dias, uma atualização completa faz com que a tabela contenha apenas os dados dos últimos 7 dias. Todas as linhas antigas são eliminadas permanentemente.

REFRESH STREAMING TABLE orders_enriched FULL;

Para evitar atualizações completas numa tabela, defina a propriedade pipelines.reset.allowed da tabela para false:

CREATE OR REFRESH STREAMING TABLE orders_enriched
  TBLPROPERTIES (pipelines.reset.allowed = 'false')
  FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
  ...

Atualização incremental

Os fluxos REPLACE WHERE utilizam atualização incremental sempre que possível, reprocessando apenas os dados de origem que mudaram desde a última atualização em vez de recalcular toda a janela de substituição. A atualização incremental requer capacidade de computação sem servidor.

Quando se aplica a atualização incremental

Tudo o seguinte deve ser verdade:

  • O fluxo de processamento é executado em computação sem servidor.
  • A forma da consulta é suportada. Ver Atualização incremental para o conjunto de operadores suportado.
  • O predicado faz referência às colunas base de uma tabela de origem. Predicados sobre valores derivados, como saídas agregadas ou de funções janela, não podem ser enviados para uma fonte, o que desabilita a atualização incremental.
  • Nenhum DML externo modificou linhas na janela de substituição atual. DML que modifica linhas fora da janela atual não é afetado.
  • A janela de substituição atual não inclui linhas que o predicado anterior excluiu. Se ampliar o predicado para abranger um intervalo que não tenha sido processado anteriormente, essa atualização recorre novamente a uma recomputação completa. As atualizações subsequentes podem voltar a beneficiar da atualização incremental.
  • O predicado é determinístico. Predicados que usam funções não determinísticas, como rand(), desativam a atualização incremental. Funções temporais como current_date() são permitidas.

A primeira atualização de qualquer fluxo é sempre um cálculo completo. Se alguma condição não for cumprida, essa atualização volta ao recálculo total da janela de substituição atual.

Boas práticas para renovação incremental

Siga estas orientações para que os fluxos REPLACE WHERE continuem elegíveis para atualização incremental.

Utilize um limite inferior móvel

Predicados com limite inferior móvel continuam elegíveis para atualização incremental indefinidamente.

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

Um limite superior móvel, como date BETWEEN date_add(current_date(), -7) AND current_date(), pode deslocar a janela para incluir linhas anteriormente excluídas, desencadeando um recurso pontual ao recálculo completo.

Inclua a coluna de predicado em GROUP BY

Ao agregar, inclua a coluna do predicado em GROUP BY para que o motor possa colocar o predicado abaixo da agregação.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

Se a coluna do predicado estiver em falta em GROUP BY, o predicado não pode ser colocado abaixo da agregação e a fonte é varrida na totalidade.

Incluir a coluna de predicados nas chaves de junção

Inclua a coluna de predicado na condição de junção para que o motor possa eliminar todas as origens associadas na junção.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

Se uma tabela associada não disponibilizar a coluna do predicado, essa tabela é integralmente analisada em cada atualização.

Diagnosticar o recurso à recomputação completa

Quando uma atualização recorre à recomputação completa, o motivo é indicado no evento planning_information do fluxo. Ver Monitorizar registos de eventos do pipeline. A tabela seguinte lista as razões reportadas no evento:

Reason Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW Um DML externo modificava as linhas na janela de substituição atual.
REPLACE_WHERE_NOT_DETERMINISTIC O predicado utiliza expressões não determinísticas.
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC A atualização anterior usou um predicado não determinístico.
UNSUPPORTED_REPLACE_WHERE_PREDICATE O predicado não pode ser aplicado a nenhuma origem, a janela atual inclui linhas não processadas pelo predicado anterior ou a execução utiliza uma substituição do predicado.

Exemplos

Os exemplos seguintes mostram padrões de fluxo REPLACE comuns WHERE.

Exemplo 1: Manter agregados históricos provenientes de uma fonte de retenção limitada

Este exemplo mantém agregados diários indefinidamente, mesmo depois de os dados brutos deixarem de constar da tabela de origem (retenção de 3 dias):

CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Exemplo 2: Evitar a recomputação quando uma tabela de dimensões muda

Este exemplo mantém as linhas históricas de factos inalteradas quando os atributos das dimensões mudam:

CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

Se a região de um utilizador mudar, apenas as linhas recentes são recalculadas. As linhas históricas mantêm o valor da região no momento em que foram escritas.

Exemplo 3: Adicionar uma nova métrica sem recalcular o histórico completo

Este exemplo mostra como evoluir uma definição de tabela e preencher apenas um intervalo alvo:

  1. Defina a tabela inicial:

    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    
  2. Atualize a consulta para acrescentar uniq_users:

    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    As linhas anteriores ao período de 7 dias contêm NULL para uniq_users.

Exemplo 4: Itera numa janela pequena antes de preencher o histórico completo

Este exemplo mostra como validar a lógica de consulta numa pequena janela de dados antes de processar todo o intervalo histórico.

Comece com uma janela curta para validar métricas e itere na lógica de negócio com custos de computação mais baixos:

CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Uma janela curta recalcula apenas os últimos 7 dias em cada atualização, por isso reveja a consulta quantas vezes for necessário antes de se comprometer com uma execução histórica completa.

Uma vez finalizada a consulta, use DML para preencher todo o intervalo histórico:

INSERT INTO revenue_attribution
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;