独立流式表的 REPLACE WHERE 流

Important

独立流式表的 REPLACE WHERE 流目前处于测试版

REPLACE WHERE 流会重新计算并覆盖独立流式表中的目标子集,而无需重新处理整个表的历史记录。 它们处理后期到达的数据、上游重新处理、架构演变和回填。

使用 REPLACE WHERE 流,可以在目标表上定义谓词。 所有符合谓词的行都会被删除,并通过针对同一谓词范围重新评估源查询来替换。 与谓词不匹配的行保持不变。

Requirements

REPLACE WHERE 流程具有以下要求:

  • 您的流式表必须使用 PREVIEW 通道。 请参阅channel管道配置
  • Databricks 建议使用 Unity 目录和无服务器计算。 增量刷新仅支持在无服务器计算上使用。

何时使用 REPLACE WHERE 流

将 REPLACE WHERE 流用于以下方案:

  • 无流式处理语义的增量批处理:在不管理流式处理概念(如水印)的情况下批量处理新行。
  • 选择性重新处理:仅重新计算与谓词匹配的行,同时使所有其他行保持不变。
  • 超出标准物化视图能力的场景
    • 目标表的保留时间长于源表
    • 防止在维度表更改时重新计算
    • 架构演变,无需重新计算整个历史记录

创建 REPLACE WHERE 流

FLOW REPLACE WHERE 子句与 CREATE OR REFRESH STREAMING TABLE 内联使用:

CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

刷新期间,将删除与谓词匹配的目标表中的所有行,将为同一谓词范围重新计算源查询,并插入新结果。 在此示例中,从过去 7 天内删除 orders_enriched 所有行,并使用源查询重新计算。

无需将谓词添加到源查询。 管道引擎在从源端读取时会自动应用它。

注释

BY NAME 必需。 它确保按列名而非位置进行列匹配。

回填历史数据

若要执行回填,请直接在目标表上运行 DML 语句:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

完全刷新时的行为

对 REPLACE WHERE 流进行完全刷新时,将仅使用当前谓词重新执行源查询。 由 DML 语句插入且位于当前谓词范围之外的行将被永久删除。

Warning

完全刷新会清除所有现有数据,并仅使用其定义的谓词重新执行流。 如果一条管道已运行一年,且使用 7 天的谓词,则完全刷新后,表中将仅包含最近 7 天的数据。 所有旧行都会永久删除。

REFRESH STREAMING TABLE orders_enriched FULL;

若要防止对表进行完全刷新,请将表属性 pipelines.reset.allowed 设置为 false

CREATE OR REFRESH STREAMING TABLE orders_enriched
  TBLPROPERTIES (pipelines.reset.allowed = 'false')
  FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
  ...

增量刷新

REPLACE WHERE 流尽可能使用增量刷新,只处理自上次刷新以来已更改的源数据,而不是重新计算整个替换窗口。 增量刷新需要无服务器计算。

增量刷新何时适用

以下各项都必须为真:

  • 管道在无服务器计算上运行。
  • 该查询形状受支持。 有关受支持的运算符集,请参阅 增量刷新
  • 谓词引用源表中的基列。 基于派生值(如聚合或窗口函数输出)的谓词无法推送到源,这将禁用增量刷新。
  • 在当前替换窗口中,没有任何行被外部 DML 修改。 修改当前窗口外行记录的 DML 操作不受影响。
  • 当前替换窗口不包含前一个谓词所排除的行。 如果将谓词范围扩大到之前未处理的区间,该次刷新将回退为全量重新计算。 后续刷新将再次支持增量刷新。
  • 谓词是确定性的。 使用非确定性函数(例如 rand())的谓词会禁用增量刷新。 允许使用诸如 current_date() 这样的时间函数。

任何流的第一次刷新始终是完整计算。 如果任何条件未满足,该次刷新将回退为对当前替换窗口的全量重新计算。

增量刷新的最佳做法

遵循以下准则,以确保 REPLACE WHERE 流可继续进行增量刷新。

使用可移动下限

具有移动下限的谓词仍有资格无限期地进行增量刷新。

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

移动上界(例如 date BETWEEN date_add(current_date(), -7) AND current_date())可能会使窗口范围发生变化,从而包含先前被排除的行,从而触发一次性的回退到全量重新计算。

在 GROUP BY 中包含谓词列

进行聚合时,请将谓词列包含在 GROUP BY 中,以便引擎能将谓词推送到聚合下方。

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

如果 GROUP BY 中缺少谓词列,则无法将谓词推送到聚合下方,系统将对源数据进行全量扫描。

在联接键中包含谓词列

请将谓词列包含在连接条件中,以便引擎能够修剪所有连接的数据源。

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

如果联接表未提供谓词列,则每次刷新时都会对该表执行全表扫描。

诊断回退到完全重新计算的情况

当刷新操作回退到完全重新计算时,原因会在该流的 planning_information 事件中报告。 请参阅 监视管道事件日志。 下表列出了事件中报告的原因:

原因 Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW 外部 DML 操作修改了当前替换窗口中的行。
REPLACE_WHERE_NOT_DETERMINISTIC 谓词使用非确定性表达式。
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC 上一次刷新使用了非确定性谓词。
UNSUPPORTED_REPLACE_WHERE_PREDICATE 原因可能是:谓词无法推送到任何数据源;当前窗口包含未被先前谓词处理过的行;或者该运行使用了谓词覆盖。

示例

以下示例显示了常见的 REPLACE WHERE 流模式。

示例 1:从受限保留源中保留历史聚合

此示例会无限期保留每日聚合数据,即使原始数据在源表中超过保留期后被删除(保留期为 3 天):

CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

示例 2:当维度表发生更改时防止重新计算

本示例在维度属性更改时保持历史事实行不变:

CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

如果用户的区域发生更改,则仅重新计算最近的行。 历史行保留其写入时的区域值。

示例 3:添加新指标而不重新计算完整历史记录

此示例演示如何改进表定义并仅回填目标范围:

  1. 定义初始表:

    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    
  2. 更新查询以添加 uniq_users

    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    超过 7 天窗口期的行,其 uniq_users 字段将显示为 NULL

示例 4:先在小时间窗口上迭代,再回填全部历史数据

此示例演示如何在处理完整历史范围之前验证小型数据窗口中的查询逻辑。

从较短的时间窗口开始,验证指标并迭代业务逻辑,以降低计算成本:

CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

短窗口在每次刷新时仅重新计算最近 7 天的数据,因此请在执行完整历史数据运行之前,根据需要多次修改查询。

完成查询后,使用 DML 回填整个历史范围:

INSERT INTO revenue_attribution
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;