使用控件表驱动
可能需要从多个源引入。 当该列表发生更改时,在作业配置中对其进行硬编码意味着更改代码并重新部署。 通过在运行时读取和使用的表中存储源列表,使用 元数据 来解决此问题。 将一个源添加为新行,下一次作业运行时将选取该源,而不会对作业本身进行更改。
本教程介绍如何使用此方法生成作业。 SQL 任务读取控制表,For each 任务则并行迭代处理每一行。
工作原理
该模式按顺序使用三种连接在一起的任务类型:
| 任务 | 类型 | 它的作用是什么 |
|---|---|---|
read_markets |
SQL | 查询配置表并将结果捕获为行数组 |
process_markets |
为每个 | 遍历 {{tasks.read_markets.output.rows}},对每行执行一次嵌套任务 |
run_market_analysis_iteration |
笔记本或 SQL (嵌套在 For each 中) | 每行运行一次,使用作为参数传递的行值来执行业务逻辑 |
SQL 任务的输出(行对象的 JSON 数组)使用动态值引用For each直接流入任务的{{tasks.read_markets.output.rows}}”字段。 然后,该 For each 任务将每一行作为参数传递给嵌套任务,并作为 {{input.market}} 和 {{input.currency}}提供。
先决条件
- 拥有创建作业和笔记本权限的 Databricks 工作区
- 在 Unity 目录中创建表的权限
- Unity 目录架构,可在其中创建配置表(例如)
config - 用于运行 SQL 任务的 SQL 仓库
步骤 1:创建配置表
配置表是控制平面。 它保存作业进程的值列表。 需要添加或删除工作时,请更新此表,而不是该作业。
运行以下 SQL,在 markets 架构中创建 config 表:
CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
('NL', 'EUR'),
('UK', 'GBP'),
('US', 'USD')
AS t(market, currency);
可以使用 Databricks 笔记本、SQL 编辑器或任何 SQL 任务来运行此语句。 此步骤后, config.markets 包含三行,每个市场一行,每个行都有其货币代码。
步骤 2:编写处理代码
嵌套在 For each 任务中的任务每行运行一次。 根据业务逻辑选择笔记本任务或 SQL 任务。
笔记本任务
在路径中创建新笔记本,例如/Workspace/Users/<username>/process_market。 此笔记本在For each任务的每次迭代中运行一次,每次接收不同的市场价值。
将以下代码添加到笔记本:
# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")
# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")
print(f"Processing market: {market} ({currency})")
# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)
调用 dbutils.widgets.text() 设置默认值,以便可以直接在工作区中运行笔记本,而无需将其连接到作业。 当笔记本作为For each任务中的一个嵌套任务运行时,作业将使用该迭代的实际参数值来覆盖默认值。
dbutils.widgets.text()之前调用dbutils.widgets.get()。 如果在 get 之前调用 text,则笔记本在作业外运行时会引发 InputWidgetNotDefined 错误。
使用默认值可以测试作业外部的笔记本,但请注意需要权衡的点:如果 For each 任务配置错误且未传递参数,笔记本将使用默认值并默默成功而不是失败——这可能使得错误配置更难以被发现。
SQL 任务
SQL 任务支持使用 :param_name 语法命名参数。 在查询中任何需要使用迭代值的地方,引用 :market 和 :currency。
SELECT *
FROM sales.transactions
WHERE market = :market
AND currency_code = :currency
直接在步骤 5 中的任务编辑器中配置此查询。 该For each任务在运行时将当前迭代的值传递给:market和:currency参数。 与笔记本任务不同,SQL 命名参数不支持默认值 - 如果未传递参数,查询将失败并显示参数解析错误。 若要在查询运行之前验证或默认参数,请改用笔记本任务。
步骤 3:创建作业
在 Databricks 工作区中,单击边栏中的 “工作流 ”,然后单击“ 创建作业”。 为作业指定描述性名称,例如 Market Analysis。
步骤 4:配置 SQL 查找任务
SQL 任务运行配置查询,并使其输出可用于下游任务。
在作业编辑器中,单击“ 添加任务”。
将 任务名称 设置为
read_markets.将 类型 设置为 SQL。
在 SQL 字段中,输入以下查询:
SELECT market, currency FROM config.markets将 SQL 仓库 设置为工作区中的仓库。
单击“创建任务”。
此任务运行时,Databricks 将运行查询,并将结果捕获为 JSON 数组。tasks.read_markets.output.rows SQL 任务输出始终作为 JSON 数组返回 , 无需其他配置。 此引用的泛型形式与tasks.<task-name>.output.rows<task-name>在作业编辑器中设置的任务键匹配。 输出如下所示:
[
{ "market": "NL", "currency": "EUR" },
{ "market": "UK", "currency": "GBP" },
{ "market": "US", "currency": "USD" }
]
步骤 5:为每个任务配置
该 For each 任务读取 SQL 输出并启动每个行的一个嵌套任务运行。
单击添加任务并将依赖设置为
read_markets。将 任务名称 设置为
process_markets.将 类型 设置为 “For each”。
在 “输入 ”字段中,输入:
{{tasks.read_markets.output.rows}}这会引用 SQL 任务捕获的行数组。
将 并发 设置为
2允许两次并行运行迭代。 增加此值以提高吞吐量,或者嵌套任务是否支持更高的并行度。单击添加循环任务,然后根据在步骤2中选择的类型配置嵌套任务:
笔记本任务
将 任务名称 设置为
run_market_analysis_iteration.将 “类型 ”设置为 “笔记本”。
将 路径 设置为在步骤 2 中创建的笔记本的路径。
单击“ 参数”,然后单击“ 添加” 以添加以下每个参数:
-
键:
market, 值:{{input.market}} -
键:
currency, 值:{{input.currency}}
每个
{{input.<key>}}引用都会解析成当前循环中行对象的相应字段。-
键:
单击“创建任务”。
SQL 任务
将 任务名称 设置为
run_market_analysis_iteration.将 类型 设置为 SQL。
在 SQL 字段中,输入具有命名参数的查询,例如:
SELECT * FROM sales.transactions WHERE market = :market AND currency_code = :currency将 SQL 仓库 设置为工作区中的仓库。
单击“ 参数”,然后单击“ 添加” 以添加以下每个参数:
-
键:
market, 值:{{input.market}} -
键:
currency, 值:{{input.currency}}
每个
{{input.<key>}}引用都会解析成当前循环中行对象的相应字段。-
键:
单击“创建任务”。
作业 DAG 现在显示 read_markets 流向 process_markets,For each 节点内嵌套任务可见。
步骤 6:运行作业并验证
- 单击“ 立即运行 ”以触发作业。
- 在作业运行页上,单击
process_markets节点以展开For each任务。 - 作业运行页显示一个迭代表(每个市场值一行),每个表显示其状态、开始时间和持续时间。
- 单击任何迭代行以打开任务运行输出,并确认它收到了正确的市场价值。
如果特定迭代失败,则只能从作业运行页重新运行该迭代,而无需重新运行整个作业。
扩展模式
若要添加新市场,请将行插入配置表中:
INSERT INTO config.markets VALUES ('DE', 'EUR');
下一个作业运行自动包括德国,无需更改作业配置或笔记本编辑。
此模式适用于希望数据驱动迭代的任何用例:
- 按客户处理:每个客户 ID 对应一行;笔记本应用客户特定的转换,或将数据交付到客户指定的目标。
- 表引入:每个源表名一行;笔记本程序读取并引入每个表。
- 回填处理:每个日期分区一行;笔记本将重新处理该分区的历史数据。
- 功能标志驱动执行:每个已启用的功能或试验一行;笔记本激活相应的逻辑。
若要从处理中删除项,请删除其行或添加 active 标志列并在 SQL 查询中进行筛选:
SELECT market, currency FROM config.markets WHERE active = TRUE
后续步骤
-
使用
For each任务在循环中运行另一个任务 —For each任务的完整配置参考,包括参数类型和并发选项 -
对任务中的
For each大型参数数组使用查阅表 — 如何处理超过 48 KB 任务值限制的大型参数数组 - 从任务访问参数值 — 在笔记本、Python脚本和 SQL 任务中访问参数值的所有方法