Ersätt WHERE flöden för fristående tabeller för strömning

Important

REPLACE-flöden WHERE för fristående strömningstabeller finns i Beta.

REPLACE WHERE-flöden beräknar om och skriver över en specifik delmängd av en fristående strömmande tabell utan att hela tabellens historik behöver bearbetas på nytt. De hanterar sent ankommande data, uppströms ombearbetning, schemautveckling och återfyllnad.

Med ett REPLACE-flöde WHERE definierar du ett predikat i måltabellen. Alla rader som matchar predikatet tas bort och ersätts genom omvärdering av källfrågan för samma predikatintervall. Rader som inte matchar predikatet lämnas orörda.

Requirements

REPLACE-flöden WHERE har följande krav:

När du ska använda REPLACE-flöden WHERE

Använd REPLACE-flöden WHERE för följande scenarier:

  • Inkrementell batchbearbetning utan strömmande semantik: Bearbeta nya rader i batchar utan att hantera strömningsbegrepp som vattenstämplar.
  • Selektiv ombearbetning: Räkna bara om rader som matchar ett predikat och lämna alla andra rader orörda.
  • Scenarier utöver standardfunktioner för materialiserad vy:
    • Måltabeller med längre lagringstid än källtabellen
    • Förhindra omkomputation när en dimensionstabell ändras
    • Schemautveckling utan att omberäkna hela historiken

Skapa ett REPLACE-flöde WHERE

Använd FLOW REPLACE WHERE satsen i samma rad som CREATE OR REFRESH STREAMING TABLE:

CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Under uppdateringen tas alla rader i måltabellen som matchar predikatet bort, källfrågan omberäknas för samma predikatintervall och de nya resultaten infogas. I det här exemplet tas alla rader från de senaste 7 dagarna bort från orders_enriched och omberäknas med hjälp av källfrågan.

Du behöver inte lägga till predikatet i källfrågan. Pipeline-motorn tillämpar den automatiskt när den läser från källan.

Note

BY NAME måste anges. Det säkerställer att kolumner matchas med namn i stället för position.

Fyll i historiska data

Kör DML-instruktioner direkt i måltabellen för att utföra återfyllnad:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

Fullständigt uppdateringsbeteende

En fullständig uppdatering av ett REPLACE-flöde WHERE kör källfrågan igen med endast det aktuella predikatet. Rader som infogades av DML-uttryck utanför det aktuella predikatintervallet tas bort permanent.

Varning

En fullständig uppdatering rensar alla befintliga data och kör flödet igen med endast dess definierade predikat. Om en pipeline har körts i ett år med ett 7-dagars predikat resulterar en fullständig uppdatering i att tabellen endast innehåller data från de senaste 7 dagarna. Alla äldre rader tas bort permanent.

REFRESH STREAMING TABLE orders_enriched FULL;

Om du vill förhindra fullständiga uppdateringar i en tabell anger du tabellegenskapen pipelines.reset.allowed till false:

CREATE OR REFRESH STREAMING TABLE orders_enriched
  TBLPROPERTIES (pipelines.reset.allowed = 'false')
  FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
  ...

Inkrementell uppdatering

REPLACE-flöden WHERE använder inkrementell uppdatering när det är möjligt och ombearbetar endast de källdata som har ändrats sedan den senaste uppdateringen i stället för att omberäkna hela ersättningsfönstret. Inkrementell uppdatering kräver serverlös beräkning.

När inkrementell uppdatering gäller

Allt följande måste vara sant:

  • Pipelinen körs på serverlös beräkning.
  • Frågeformen stöds. Se Incremental refresh för den operatoruppsättning som stöds.
  • Predikatet refererar till baskolumner från en källtabell. Predikat på härledda värden, till exempel utdata från aggregat- eller fönsterfunktioner, kan inte skickas vidare till en källa, vilket förhindrar inkrementell uppdatering.
  • Ingen extern DML har ändrat rader i det aktuella ersättningsfönstret. DML som ändrar rader utanför det aktuella fönstret påverkas inte.
  • Det aktuella ersättningsfönstret innehåller inte rader som det tidigare predikatet exkluderade. Om du breddar predikatet så att det täcker ett intervall som inte tidigare bearbetats återgår den uppdateringen till fullständig omkomputation. Efterföljande uppdateringar är berättigade till inkrementell uppdatering igen.
  • Predikatet är deterministiskt. Predikat med icke-deterministiska funktioner, till exempel rand() inaktivera inkrementell uppdatering. Temporala funktioner som current_date() är tillåtna.

Den första uppdateringen av ett flöde är alltid en fullständig beräkning. Om något villkor inte uppfylls, går uppdateringen tillbaka till fullständig omberäkning av det aktuella ersättningsfönstret.

Metodtips för inkrementell uppdatering

Följ dessa riktlinjer så att REPLACE-flöden WHERE fortfarande är berättigade till inkrementell uppdatering.

Använd en rörlig nedre gräns

Predikat med en rörlig lägre gräns är fortfarande berättigade till inkrementell uppdatering på obestämd tid.

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

En flyttande övre gräns, till exempel date BETWEEN date_add(current_date(), -7) AND current_date(), kan flytta fönstret så att det inkluderar tidigare exkluderade rader, vilket utlöser en engångsåterställning till fullständig omkomputation.

Inkludera predikatkolumnen i GROUP BY

När du aggregerar ska du inkludera predikatkolumnen i GROUP BY så att motorn kan flytta ned predikatet under aggregationen.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

Om predikatkolumnen saknas i GROUP BY kan predikatet inte flyttas ned under aggregeringen och källan skannas i sin helhet.

Inkludera predikatkolumnen i sammanslagningsnycklarna

Inkludera predikatkolumnen i kopplingsvillkoret så att motorn kan rensa alla anslutna källor.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

Om en ansluten tabell inte exponerar predikatkolumnen genomsöks tabellen i sin helhet vid varje uppdatering.

Diagnostisera övergång till fullständig omberäkning

När en uppdatering faller tillbaka på en fullständig omberäkning rapporteras orsaken i händelsen planning_information för flödet. Se Övervaka händelseloggar för pipeline. I följande tabell visas de orsaker som rapporterats i händelsen:

Reason Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW En extern DML ändrade rader i det aktuella ersättningsfönstret.
REPLACE_WHERE_NOT_DETERMINISTIC Predikatet använder icke-deterministiska uttryck.
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC Den tidigare uppdateringen använde ett icke-deterministiskt predikat.
UNSUPPORTED_REPLACE_WHERE_PREDICATE Predikatet kan inte flyttas ned till någon källa, det aktuella fönstret omfattar rader som inte har bearbetats av det föregående predikatet, eller så använder körningen en åsidosättning av predikatet.

Exempel

I följande exempel visas vanliga REPLACE-flödesmönster WHERE .

Exempel 1: Behåll historiska aggregeringar från en källa med begränsad kvarhållning

Det här exemplet behåller dagliga aggregeringar utan tidsbegränsning, även efter att rådata har gallrats bort ur källtabellen (3 dagars lagringstid):

CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Exempel 2: Förhindra omberäkning när en dimensionstabell ändras

I det här exemplet förblir historiska faktarader oförändrade när dimensionsattribut ändras:

CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

Om en användares region ändras omberäknas endast de senaste raderna. Historiska rader behåller regionvärdet när de skrevs.

Exempel 3: Lägg till ett nytt mått utan att omberäkna fullständig historik

Det här exemplet visar hur du utvecklar en tabelldefinition och återfyller endast ett målintervall:

  1. Definiera den inledande tabellen:

    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    
  2. Uppdatera frågan för att lägga till uniq_users:

    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    Rader som är äldre än 7-dagarsfönstret innehåller NULL för uniq_users.

Exempel 4: Iterera i ett litet fönster innan du fyller i hela historiken

Det här exemplet visar hur du validerar frågelogik i ett litet datafönster innan du bearbetar hela det historiska intervallet.

Börja med ett kort fönster för att validera mått och iterera affärslogik med lägre beräkningskostnader:

CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Ett kort fönster beräknar endast de senaste 7 dagarna på varje uppdatering, så ändra frågan så många gånger som behövs innan du genomför en fullständig historisk körning.

När frågan har slutförts använder du DML för att fylla i hela det historiska intervallet:

INSERT INTO revenue_attribution
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;