将参数与管道配合使用

使用管道参数可以跨环境或数据集重复使用相同的管道源代码。 例如,可以针对 devprod 目录运行相同的转换,或者在每次运行时从不同的源路径引入。 在管道上定义参数(或在启动更新时重写它们),并从 SQL 源代码引用参数。

重要

此功能在 Beta 版中。 工作区管理员可以从 预览 页控制对此功能的访问。 请参阅 Manage Azure Databricks 预览版

本页介绍可用于 SQL 源代码的管道参数功能。 若要在流水线中参数化 Python 源代码,请继续使用 Configuration 字段,如 使用配置字段引用参数 中所述。 配置还用于设置管道在运行时读取的 Spark 配置值。 有关 Spark 配置设置的详细信息,请参阅 管道属性参考

什么是管道参数?

管道参数是键值对,可以:

  • 在管道设置中声明为默认值。
  • 从管道 UI、开始更新 API使用不同设置对话框启动更新时进行覆盖。
  • 对作业中的管道任务进行覆盖,并可选择下推 作业级参数
  • 使用 命名参数 语法引用 SQL 源代码中的内容。

参数值始终是字符串。 键可以包含字母数字字符、下划线(_)、连字符(-)和句点(.)。

管道参数和 配置 字段用于不同的用途:

使用 参数 ... 使用配置来……
更新之间更改的值(目标目录、源路径、日期范围)。 用于控制管道行为的 Spark 配置 (pipelines.enzyme.enabledpipelines.clusterLabelsV2Enabled)。
要从作业或任务向下推送的值。 静态结构管道属性。
使用命名参数语法在 SQL 中引用的值。 在 SQL 中使用 ${key} 语法或 Python 中的 spark.conf.get("key") 引用的值。

定义管道参数

可以在管道设置中定义默认参数值。 在没有替代的情况下运行更新时,管道会使用这些默认值。

使用流水线用户界面

  1. 在工作区中,点击侧边栏中的工作流图标作业和流水线,然后选择你的流水线。
  2. 单击“设置”。
  3. “管道设置” 边栏中,找到“ 参数 ”部分,然后单击“ 编辑”。
  4. 添加 条目,然后单击“ 保存”。

使用 JSON 或 REST API

向管道定义中添加一个 parameters 映射:

{
  "name": "Sales pipeline",
  "parameters": {
    "source_catalog": "dev_catalog",
    "source_schema": "sales",
    "start_date": "2026-01-01"
  }
}

有关完整的管道 JSON 参考,请参阅 管道配置

SQL 源代码中的引用参数

通过在键名前加上冒号来引用参数。 Azure Databricks在更新时将值绑定为字符串:

CREATE OR REFRESH MATERIALIZED VIEW transaction_summary AS
SELECT account_id,
  COUNT(txn_id) AS txn_count,
  SUM(txn_amount) AS account_revenue
FROM :source_catalog.sales.transactions
WHERE txn_date >= :start_date
GROUP BY account_id

若要在标识符位置(如目录、架构或表名)中使用参数,请将其包装在 IDENTIFIER()以下位置:

USE CATALOG IDENTIFIER(:source_catalog);
USE SCHEMA IDENTIFIER(:source_schema);

CREATE OR REFRESH MATERIALIZED VIEW daily_sales AS
SELECT date(timestamp) AS date,
  SUM(price) AS total_sales
FROM transactions
GROUP BY date;

如果源代码引用了在更新时没有值的参数,则更新将失败并显示错误。 管道忽略代码未引用的额外参数。

在更新时重写参数

可以替代单个更新的参数值,而无需更改保存的默认值。

  • 在流水线界面中,单击使用不同设置运行,然后编辑参数部分。
  • 在作业中的管道任务中,在任务的 “参数 ”字段中设置参数替代。 请参阅参数
  • 通过 API,在开始更新请求中传递 parameters映射。

Azure Databricks记录更新历史记录中特定更新的参数,并在管道运行列表的 Run parameters 列中显示这些参数。

参数优先级

在多个位置定义相同的键时,优先级最高的值将获胜。 从高到低:

  1. 作业运行参数:为单次作业运行提供的值(覆盖项)。
  2. 作业参数:父作业上定义的默认值。
  3. 管道任务参数:在管道任务上设置的值。
  4. 管道参数:管道设置中定义的默认值。

这与其他 作业参数任务类型使用的优先级匹配。

Lakeflow Jobs 中的管道参数

在作业中将管道计划为 管道任务 时,该任务可以提供替代管道默认值的参数。 参数值可以使用 动态值引用 来注入作业运行时值,例如 {{job.trigger.time.iso_date}}{{job.parameters.region}}

Lakeflow 作业还会自动将所有作业参数传递给管道任务,其方式与传递给笔记本和 SQL 任务的方式相同。 管道源代码可以使用命名参数语法引用任何下推值。 在管道设置中声明参数是可选的,并且仅设置不带替代的运行的默认参数。

注意事项和已知限制

  • 管道一次运行一次更新:管道一次只能运行单个更新。 为防止作业因多个更新原本会发生重叠而失败,Azure Databricks 在两种情况下会将并发数上限设为 1:

    • 包含管道任务并配置了多个 max_concurrent_runs 的作业。
    • 一个封装在 for-each task 中的管道任务,不论迭代次数是多少。

    作业 UI 显示此上限生效时的通知。 设计想要使用多个参数组合运行的参数化管道时,请围绕上限进行规划。

  • 日期筛选器可以触发完全刷新:常见的参数化用例是按日期筛选数据。 注意谓词的使用:在日期范围的 端进行筛选会导致物化视图的增量处理失效,并在每次更新时触发全量刷新。

    -- Triggers a full refresh on each update
    CREATE OR REFRESH MATERIALIZED VIEW recent_orders AS
    SELECT * FROM orders
    WHERE order_date >= :start_date AND order_date < :end_date;
    
    -- Processes incrementally
    CREATE OR REFRESH MATERIALIZED VIEW recent_orders AS
    SELECT * FROM orders
    WHERE order_date >= :start_date;
    
  • 命名参数仅限 SQL:在此 Beta 版本中,命名参数语法只能在 SQL 源代码中使用。 若要参数化Python源代码,请继续使用 Configuration 字段和 spark.conf.get()。 请参阅 使用配置字段的参考参数

使用配置字段引用参数

管道上的 “配置 ”字段接受作为 Spark 配置值公开的任意键值对。 这是旧参数化机制,可以继续与管道参数一起工作。 将其用于 Python 源代码,以及那些你希望使用 spark.conf.get() 而非命名参数语法来读取其值的键。

以下示例使用 mypipeline.start_date 配置值将开发管道限制为输入数据的子集:

SQL

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM source_table WHERE date > '${mypipeline.start_date}';

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def customer_events():
  start_date = spark.conf.get("mypipeline.start_date")
  return spark.read.table("source_table").where(col("date") > start_date)

在管道设置的 “配置 ”部分或 configuration 管道 JSON 字段中设置配置值。 避免与保留管道或 Apache Spark 配置值冲突的密钥。