使用控件表驱动 For each 作业

可能需要从多个源引入。 当该列表发生更改时,在作业配置中对其进行硬编码意味着更改代码并重新部署。 通过在运行时读取和使用的表中存储源列表,使用 元数据 来解决此问题。 将一个源添加为新行,下一次作业运行时将选取该源,而不会对作业本身进行更改。

本教程介绍如何使用此方法生成作业。 SQL 任务读取控制表,For each 任务则并行迭代处理每一行。

工作原理

该模式按顺序使用三种连接在一起的任务类型:

任务 类型 它的作用是什么
read_markets SQL 查询配置表并将结果捕获为行数组
process_markets 为每个 遍历 {{tasks.read_markets.output.rows}},对每行执行一次嵌套任务
run_market_analysis_iteration 笔记本或 SQL (嵌套在 For each 中) 每行运行一次,使用作为参数传递的行值来执行业务逻辑

SQL 任务的输出(行对象的 JSON 数组)使用动态值引用For each直接流入任务的{{tasks.read_markets.output.rows}}”字段。 然后,该 For each 任务将每一行作为参数传递给嵌套任务,并作为 {{input.market}}{{input.currency}}提供。

先决条件

  • 拥有创建作业和笔记本权限的 Databricks 工作区
  • 在 Unity 目录中创建表的权限
  • Unity 目录架构,可在其中创建配置表(例如) config
  • 用于运行 SQL 任务的 SQL 仓库

步骤 1:创建配置表

配置表是控制平面。 它保存作业进程的值列表。 需要添加或删除工作时,请更新此表,而不是该作业。

运行以下 SQL,在 markets 架构中创建 config 表:

CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
  ('NL', 'EUR'),
  ('UK', 'GBP'),
  ('US', 'USD')
AS t(market, currency);

可以使用 Databricks 笔记本、SQL 编辑器或任何 SQL 任务来运行此语句。 此步骤后, config.markets 包含三行,每个市场一行,每个行都有其货币代码。

步骤 2:编写处理代码

嵌套在 For each 任务中的任务每行运行一次。 根据业务逻辑选择笔记本任务或 SQL 任务。

笔记本任务

在路径中创建新笔记本,例如/Workspace/Users/<username>/process_market。 此笔记本在For each任务的每次迭代中运行一次,每次接收不同的市场价值。

将以下代码添加到笔记本:

# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")

# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")

print(f"Processing market: {market} ({currency})")

# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
    f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)

调用 dbutils.widgets.text() 设置默认值,以便可以直接在工作区中运行笔记本,而无需将其连接到作业。 当笔记本作为For each任务中的一个嵌套任务运行时,作业将使用该迭代的实际参数值来覆盖默认值。

dbutils.widgets.text()之前调用dbutils.widgets.get()。 如果在 get 之前调用 text,则笔记本在作业外运行时会引发 InputWidgetNotDefined 错误。

使用默认值可以测试作业外部的笔记本,但请注意需要权衡的点:如果 For each 任务配置错误且未传递参数,笔记本将使用默认值并默默成功而不是失败——这可能使得错误配置更难以被发现。

SQL 任务

SQL 任务支持使用 :param_name 语法命名参数。 在查询中任何需要使用迭代值的地方,引用 :market:currency

SELECT *
FROM sales.transactions
WHERE market = :market
  AND currency_code = :currency

直接在步骤 5 中的任务编辑器中配置此查询。 该For each任务在运行时将当前迭代的值传递给:market:currency参数。 与笔记本任务不同,SQL 命名参数不支持默认值 - 如果未传递参数,查询将失败并显示参数解析错误。 若要在查询运行之前验证或默认参数,请改用笔记本任务。

步骤 3:创建作业

在 Databricks 工作区中,单击边栏中的 “工作流 ”,然后单击“ 创建作业”。 为作业指定描述性名称,例如 Market Analysis

步骤 4:配置 SQL 查找任务

SQL 任务运行配置查询,并使其输出可用于下游任务。

  1. 在作业编辑器中,单击“ 添加任务”。

  2. 任务名称 设置为 read_markets.

  3. 类型 设置为 SQL

  4. SQL 字段中,输入以下查询:

    SELECT market, currency FROM config.markets
    
  5. SQL 仓库 设置为工作区中的仓库。

  6. 单击“创建任务”。

此任务运行时,Databricks 将运行查询,并将结果捕获为 JSON 数组。tasks.read_markets.output.rows SQL 任务输出始终作为 JSON 数组返回 , 无需其他配置。 此引用的泛型形式与tasks.<task-name>.output.rows<task-name>在作业编辑器中设置的任务键匹配。 输出如下所示:

[
  { "market": "NL", "currency": "EUR" },
  { "market": "UK", "currency": "GBP" },
  { "market": "US", "currency": "USD" }
]

步骤 5:为每个任务配置

For each 任务读取 SQL 输出并启动每个行的一个嵌套任务运行。

  1. 单击添加任务并将依赖设置为read_markets

  2. 任务名称 设置为 process_markets.

  3. 类型 设置为 “For each”。

  4. “输入 ”字段中,输入:

    {{tasks.read_markets.output.rows}}
    

    这会引用 SQL 任务捕获的行数组。

  5. 并发 设置为 2 允许两次并行运行迭代。 增加此值以提高吞吐量,或者嵌套任务是否支持更高的并行度。

  6. 单击添加循环任务,然后根据在步骤2中选择的类型配置嵌套任务:

笔记本任务

  1. 任务名称 设置为 run_market_analysis_iteration.

  2. “类型 ”设置为 “笔记本”。

  3. 路径 设置为在步骤 2 中创建的笔记本的路径。

  4. 单击“ 参数”,然后单击“ 添加” 以添加以下每个参数:

    • market{{input.market}}
    • currency{{input.currency}}

    每个 {{input.<key>}} 引用都会解析成当前循环中行对象的相应字段。

  5. 单击“创建任务”。

SQL 任务

  1. 任务名称 设置为 run_market_analysis_iteration.

  2. 类型 设置为 SQL

  3. SQL 字段中,输入具有命名参数的查询,例如:

    SELECT *
    FROM sales.transactions
    WHERE market = :market
      AND currency_code = :currency
    
  4. SQL 仓库 设置为工作区中的仓库。

  5. 单击“ 参数”,然后单击“ 添加” 以添加以下每个参数:

    • market{{input.market}}
    • currency{{input.currency}}

    每个 {{input.<key>}} 引用都会解析成当前循环中行对象的相应字段。

  6. 单击“创建任务”。

作业 DAG 现在显示 read_markets 流向 process_marketsFor each 节点内嵌套任务可见。

步骤 6:运行作业并验证

  1. 单击“ 立即运行 ”以触发作业。
  2. 在作业运行页上,单击 process_markets 节点以展开 For each 任务。
  3. 作业运行页显示一个迭代表(每个市场值一行),每个表显示其状态、开始时间和持续时间。
  4. 单击任何迭代行以打开任务运行输出,并确认它收到了正确的市场价值。

如果特定迭代失败,则只能从作业运行页重新运行该迭代,而无需重新运行整个作业。

扩展模式

若要添加新市场,请将行插入配置表中:

INSERT INTO config.markets VALUES ('DE', 'EUR');

下一个作业运行自动包括德国,无需更改作业配置或笔记本编辑。

此模式适用于希望数据驱动迭代的任何用例:

  • 按客户处理:每个客户 ID 对应一行;笔记本应用客户特定的转换,或将数据交付到客户指定的目标。
  • 表引入:每个源表名一行;笔记本程序读取并引入每个表。
  • 回填处理:每个日期分区一行;笔记本将重新处理该分区的历史数据。
  • 功能标志驱动执行:每个已启用的功能或试验一行;笔记本激活相应的逻辑。

若要从处理中删除项,请删除其行或添加 active 标志列并在 SQL 查询中进行筛选:

SELECT market, currency FROM config.markets WHERE active = TRUE

后续步骤