使用 REPLACE WHERE 流进行批处理

Important

REPLACE WHERE 流处于 Beta 版中。

本页介绍如何在 Lakeflow Spark Declarative Pipelines 中使用 REPLACE WHERE flows,对表中的指定子集进行重新计算并覆盖,而无需重新处理整个表的历史数据。 REPLACE WHERE 流处理后期到达的数据、上游重新处理、架构演变和回填。

使用 REPLACE WHERE 流,可以在目标表上定义谓词。 删除与该谓词匹配的所有行,并用针对同一谓词范围内重新求值后的源查询结果替换这些行。 与谓词不匹配的行保持不变。

Requirements

REPLACE WHERE 流程具有以下要求:

  • 您的流水线必须使用 PREVIEW 通道。
  • Databricks 建议使用 Unity 目录和无服务器计算。 增量刷新仅支持在无服务器计算上使用。

何时使用 REPLACE WHERE 流程

将 REPLACE WHERE 流用于以下方案:

  • 无流式处理语义的增量批处理:在不管理流式处理概念(如水印)的情况下批量处理新行。
  • 选择性重新处理:仅重新计算与谓词匹配的行,同时使所有其他行保持不变。
  • 超出标准物化视图能力的场景
    • 保留期长于源表的目标表
    • 防止在维度表更改时重新计算
    • 架构演变,无需重新计算整个历史记录

创建 REPLACE WHERE 流程

在 SQL 或 Python 中定义 REPLACE WHERE 流。

SQL

FLOW REPLACE WHERE 子句与 CREATE STREAMING TABLE 内联使用:

CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

或者,使用长格式 CREATE FLOW 语法:

CREATE STREAMING TABLE orders_enriched;

CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Python

在Python中,表和流在单个语句中定义。 流的名称与表相同:

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
  orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
  product_dim = spark.read.table("product_dim")
  return orders_fct.join(product_dim, "product_id")

replace_where 参数接受 PySpark 列表达式或字符串谓词。

在这些示例中,从过去 7 天内删除 orders_enriched 所有行,并使用源查询重新计算。 无需将谓词添加到源查询。 管道引擎在从源端读取时会自动应用它。

注释

BY NAME 在 SQL 中是必需的。 它按名称而不是位置匹配列。

回填历史数据

若要在计划刷新之外将历史数据行或已更正的数据行写入目标表,请根据历史数据的存放位置在两种机制中选择:

  • 谓词覆盖:针对一次性谓词范围重新运行流的源查询。 当历史数据来自与增量数据相同的源时使用。
  • DML 语句:直接插入目标表中,绕过流。 当历史数据位于与增量数据不同的源中时使用。

谓词重写

在不修改管道定义的情况下,覆盖单次管道更新的 REPLACE WHERE 谓词。 谓词替代是一次性的,仅适用于当前更新,不会影响将来的运行。

示例:初始历史负载

若要在首次设置管道时执行历史数据的一次性回填:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
)
print(resp)

示例:更正特定时间段的列

更新列定义后,对指定的历史范围回填该更改:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
  refresh_selection=["orders_enriched"],
)
print(resp)

在单个谓词重写中合并多个维度:

overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
  }
]
辅助函数: start_update_with_replace_where

使用笔记本中的管道更新 API 提交谓词替代:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse


def start_update_with_replace_where(
  pipeline_id: str,
  replace_where_overrides: list[dict],
  refresh_selection: list[str] = None,
) -> StartUpdateResponse:
  """Start a pipeline update with REPLACE WHERE predicate overrides."""
  client = WorkspaceClient()

  body = {
    "pipeline_id": pipeline_id,
    "cause": "JOB_TASK",
    "update_cause_details": {
      "job_details": {"performance_target": "PERFORMANCE"}
    },
    "replace_where_overrides": replace_where_overrides,
  }

  if refresh_selection:
    body["refresh_selection"] = refresh_selection

  res = client.api_client.do(
    "POST",
    f"/api/2.0/pipelines/{pipeline_id}/updates",
    body=body,
    headers={"Accept": "application/json", "Content-Type": "application/json"},
  )

  return StartUpdateResponse.from_dict(res)

DML 语句

直接从管道外部对目标表运行 DML 语句以执行初始加载或更正,例如从旧表加载:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

通过 DML 插入的行不受 REPLACE WHERE 谓词的约束,在计划刷新中保留,除非它们属于将来运行的谓词范围。

完全刷新时的行为

对 REPLACE WHERE 流进行完全刷新时,将仅使用当前谓词重新执行源查询。 由谓词覆盖或 DML 语句插入且位于当前谓词范围之外的行将被永久删除。

Warning

全量刷新会清除所有现有数据,并仅依据其已定义的谓词条件重新执行该流程。 如果某个管道已运行一年并使用 7 天谓词条件,则执行一次完全刷新后,表中将仅包含最近 7 天的数据。 所有旧行都会永久删除。

若要防止对表进行完全刷新,请将表属性 pipelines.reset.allowed 设置为 false。 请参阅 管道属性参考

增量刷新

REPLACE WHERE 流尽可能使用增量刷新,只处理自上次刷新以来已更改的源数据,而不是重新计算整个替换窗口。 增量刷新需要无服务器计算。

增量刷新何时适用

以下各项都必须为真:

  • 管道在无服务器计算上运行。
  • 该查询形状受支持。 有关受支持的运算符集,请参阅 增量刷新
  • 谓词引用源表中的基列。 无法将针对派生值(如聚合结果或窗口函数输出)的谓词推送到源,这会从而禁用增量刷新。
  • 在当前替换窗口中,没有任何行被外部 DML 修改。 修改当前窗口之外行的 DML 语句不受影响。
  • 当前替换窗口不包含前一个谓词所排除的行。 如果将谓词扩展到先前未处理的范围,那一次刷新就会退回为完整重新计算。 后续刷新仍可再次进行增量刷新。
  • 谓词是确定性的。 使用非确定性函数(例如 rand())的谓词会禁用增量刷新。 允许使用诸如 current_date() 这样的时间函数。

任何流的第一次刷新始终是完整计算。 如果未满足任何条件,则刷新会回退到当前替换窗口的完整重新计算。

增量刷新的最佳做法

遵循以下准则,以确保 REPLACE WHERE 流可继续进行增量刷新。

使用可移动下限

具有移动下限的谓词仍有资格无限期地进行增量刷新。

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

可变上界(例如 date BETWEEN date_add(current_date(), -7) AND current_date())可能会使窗口发生移动,从而纳入先前被排除的行,并触发一次性回退为完整重算。

在 GROUP BY 中包含谓词列

进行聚合时,在 GROUP BY 中包含谓词列,以便引擎能够将谓词下推到聚合之下。

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

如果 GROUP BY 中缺少谓词列,则无法将谓词下推到聚合下方,并且将对源进行全量扫描。

在联接键中包含谓词列

在联接条件中包含谓词列,以便引擎能够裁剪所有参与联接的数据源。

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

如果联接表未提供谓词列,则每次刷新时都会对该表执行全表扫描。

诊断回退到完全重新计算

当刷新回退到完全重新计算时,原因会在该流的 planning_information 事件中报告。 请参阅 监视管道事件日志。 下表列出了事件中报告的原因:

原因 Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW 外部 DML 操作修改了当前替换窗口中的行。
REPLACE_WHERE_NOT_DETERMINISTIC 谓词使用非确定性表达式。
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC 上一次刷新使用了非确定性谓词。
UNSUPPORTED_REPLACE_WHERE_PREDICATE 谓词无法下推到任何数据源,当前窗口包含前一个谓词未处理的行,或者该运行使用了谓词覆盖。

局限性

REPLACE WHERE 流具有以下限制:

  • 必须在管道内创建目标表。
  • 每个目标表只允许一个 REPLACE WHERE 流。
  • 被 REPLACE WHERE 流作为目标的表,也不能同时被其他流类型作为目标,例如 AUTO CDC 流或追加流。
  • REPLACE WHERE 流面向的表不支持预期。
  • 有关在 Databricks SQL 中创建的流式表的语法和回填差异,请参阅 独立流式表的 REPLACE WHERE 流

示例

以下示例显示了常见的 REPLACE WHERE 流模式。

示例 1:从受限保留源中保留历史聚合

此示例会无限期保留每日聚合数据,即使原始数据在源表中超过保留期后被删除(保留期为 3 天):

SQL

CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
  return (
    spark.read.table("events_raw")
      .groupBy("date", "key")
      .agg(F.sum("val").alias("agg"))
  )

示例 2:当维度表发生更改时防止重新计算

本示例在维度属性更改时保持历史事实行不变:

SQL

CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
  fact_table = spark.read.table("fact_table").alias("f")
  dim_users = spark.read.table("dim_users").alias("d")
  return (
    fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
      .select(
        col("f.date"),
        col("f.user_id"),
        col("d.region"),
        col("f.revenue"),
      )
  )

如果用户的区域发生更改,则仅重新计算最近的行。 历史行在写入时保留区域值。 若要更正历史数据行,请使用 谓词覆盖执行有针对性的回填。

示例 3:添加新指标而不重新计算完整历史记录

此示例演示如何改进表定义并仅回填目标范围:

  1. 定义初始表:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    from pyspark.sql.functions import col
    
    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(F.count("*").alias("clicks"))
      )
    
  2. 更新查询以添加 uniq_users

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(
            F.count("*").alias("clicks"),
            F.countDistinct("user_id").alias("uniq_users"),
          )
      )
    
  3. 回填过去 30 天的新指标:

    overrides = [
      {
        "flow_name": "clickstream_daily",
        "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
      }
    ]
    
    resp = start_update_with_replace_where(
      pipeline_id="<pipeline-id>",
      replace_where_overrides=overrides,
      refresh_selection=["clickstream_daily"],
    )
    

    早于回填范围的行对于 uniq_users 的值为 NULL

示例 4:先在小时间窗口上迭代,再回填全部历史数据

此示例演示如何在处理完整历史范围之前验证小型数据窗口中的查询逻辑。

先从较短的时间窗口开始,这样在你修改查询时,每次刷新都只会重新计算最近 7 天的数据:

SQL

CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
  return (
    spark.read.table("marketing_events")
      .groupBy("event_date", "campaign_id")
      .agg(F.sum("revenue").alias("total_revenue"))
  )

完成查询后,使用谓词替代执行一次性历史回填:

overrides = [
  {
    "flow_name": "revenue_attribution",
    "predicate_override": "event_date >= date_add(current_date(), -365)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id="<pipeline-id>",
  replace_where_overrides=overrides,
  refresh_selection=["revenue_attribution"],
)