Important
SDP 的环境版本为 Beta 版。
通过 vironment 版本集的管道通过 Spark Connect 运行Python代码。 本页介绍不兼容的内容、行为不同、如何扫描管道中受影响的模式以及如何迁移现有管道。
局限性
环境版本尚未与所有管道功能兼容。 如果管道Python代码执行以下任一操作,则具有环境版本集的管道运行将失败:
- 在用管道修饰器修饰的函数内改变 Spark 会话状态。 示例包括
spark.conf.set(...)、spark.sql("USE CATALOG ...")和createOrReplaceTempView。 - 使用 Spark Connect 中不可用的 PySpark API,包括
SparkContext、RDD和SQLContext任何 Py4J API。 请参阅 Spark Connect 中支持的内容。
如果在管道上启用环境版本会导致它失败,则禁用环境版本会将管道返回到其以前的状态。
行为变更
Spark Connect 与经典 PySpark 运行时存在少量行为差异。 有关完整参考,请参阅 Spark Connect 与经典 Spark 。 兼容性扫描会提前检测这些模式,并阻止启用这些模式,直到解决这些模式,以便在它们影响生产数据之前找到并修复它们。
在管道中,行为可能有所不同的最常见情况如下:
- 交错的数据帧构造和会话突变
- 引用可变Python状态的UDF
交错的数据帧构造和会话突变
当管道构造数据帧时,会改变 Spark 会话状态(例如,更改默认目录或架构、设置配置、替换临时视图或重新注册 UDF),然后使用 DataFrame:
- 如果没有环境版本,DataFrame 将使用 预突变 会话状态。
- 借助环境版本,DataFrame 使用 突变后的 会话状态。
例如:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
如果没有环境版本, mytable 则包含 [(1, "Original Row")]。 使用环境版本时, mytable 包含 [(2, "Replaced Row")]。
引用可变Python状态的 UDF
当 UDF 引用Python定义 UDF 后其值更改的全局变量时:
- 如果没有环境版本,UDF 将使用变量 的最新 值。
- 使用环境版本,UDF 在 定义 UDF 时使用值。
例如:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
如果没有环境版本, my_mv 则包含 [("alex_b",)]。 使用环境版本时, my_mv 包含 [("alex_a",)]。
如果管道依赖于任一模式,在启用环境版本之前对其进行审核。
兼容性扫描
兼容性扫描有助于在启用环境版本之前在管道中找到代码模式,这些模式会在环境版本下生成不同的结果。 扫描是选择加入的。 在管道上启用扫描时:
- 每个管道运行根据检测到的模式在管道事件日志中发出一个
BehaviorChangeInSparkConnectWARN事件。 - 在解决上一次成功更新的所有兼容性警告之前,不能在管道上启用环境版本。
如果未启用扫描,则不会发出任何事件,并且 environment_version 不会阻止启用。 Databricks 建议在管道上启用环境版本之前启用扫描并解析任何检测到的模式。
在管道上启用扫描
可以通过添加 pipelines.environmentVersion.enableCompatibilityScan 管道配置来启用兼容性扫描,可以通过管道编辑器 UI 或向管道配置 JSON 添加条目来添加配置。
通过 UI:
- 在管道编辑器中,单击 “设置”。
- 在管道设置中查找 “配置 ”部分。
- 单击
添加配置。
- 输入
pipelines.environmentVersion.enableCompatibilityScan为键和true值。 - 保存管道设置。
在管道 JSON 中:
将以下条目添加到 configuration 块:
"configuration": {
"pipelines.environmentVersion.enableCompatibilityScan": "true"
}
建议的工作流
- 在管道上启用扫描。
- 触发管道运行。
-
查询事件的管道事件日志
BehaviorChangeInSparkConnectWARN。 有关问题代码的完整列表、示例模式和建议的修补程序,请参阅 兼容性事件参考 。 - 更新管道代码以删除检测到的模式并再次运行管道,直到不再发出任何事件。
- 使用管道上的“启用环境版本”中的某个方法添加到
environment_version管道。
如果认为兼容性警告为误报,并且想要 environment_version 启用,请从管道配置中删除 pipelines.environmentVersion.enableCompatibilityScan 该条目以绕过检查。 (不允许将值设置为 false - 必须完全删除条目。
预检检查不会在没有以前更新的管道上运行,也不会在已设置环境版本的管道上运行。
将现有管道迁移到环境版本
若要迁移尚未使用环境版本的现有管道,请遵循此端到端工作流。 它引导你查找在 Spark Connect 下可能表现不同的代码模式、修复这些模式并安全地推出环境版本。
在管道上启用兼容性扫描。 按照 兼容性扫描中所述在管道上启用扫描。 这是导致检测到的模式在事件日志中浮出水面的原因,以及启用预检检查来防止启用尝试的原因。
触发管道运行并查看兼容性事件。 触发常规管道更新。 成功完成后,查询管道事件日志中的
BehaviorChangeInSparkConnectWARN事件。 每个事件报告一个检测到的模式。 有关问题代码的完整列表、示例模式和建议的修补程序,请参阅 兼容性事件参考 。更新管道代码以解决检测到的模式。 对于每个检测到的模式,请根据建议的修补程序更新管道代码。 每次更改后,触发另一个管道更新并验证相应的事件不再出现。 重复,直到事件日志不再显示成功更新的任何兼容性事件。
在管道上启用环境版本。 在最近的成功更新没有兼容性事件后,使用 UI、API 或捆绑包添加到
environment_version管道,如 在管道上启用环境版本中所述。 下一次更新使用 Spark Connect 和固定Python语言版本和预安装的库运行。如果更新失败,因为兼容性警告仍然存在,请删除
environment_version,返回到步骤 2,并在重试之前解决剩余的警告。验证迁移。 使用环境版本进行第一次更新后,请验证:
-
create_update事件日志中的事件显示environment_version设置为预期值。 - 管道生成预期数据,并且不会显示新的错误事件。
- 对下游表进行抽查,了解行为 更改中所述的任何细微行为差异。
-
回 滚
如果迁移后管道错误,请从管道设置中删除 environment_version 该管道。 下一个更新使用以前的Python运行时配置运行。 使用回滚运行进行调试,然后在识别并修复问题后重复步骤 2 的迁移。
兼容性事件参考
在管道上启用兼容性扫描时,SDP 会根据检测到的模式在管道事件日志中发出一个BehaviorChangeInSparkConnectWARN事件。 启用扫描并检测到以前的成功更新检测到任何模式时,SDP 还会阻止 environment_version 启用,直到解决模式。
每个事件报告单个问题代码,用于标识检测到的内容。 若要查找代码,请在 问题代码 表中找到它 — 每行链接到包含示例模式和建议的修补程序的类别部分。
事件形状
BehaviorChangeInSparkConnect 事件遵循标准 管道事件日志架构:
-
event_type是behavior_change_in_spark_connect。 -
level是WARN。 -
details包含behavior_change_in_spark_connect具有单个issue字段的对象。 问题值是下面列出的代码之一。 -
message是检测到的模式的人工可读说明。
问题代码
| 类别 | 问题代码 | Description |
|---|---|---|
| 数据库和目录突变 | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
创建 DataFrame 后,默认目录已更改。 现有的 DataFrame 可以使用新的默认目录解析表。 |
| 数据库和目录突变 | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE CATALOG 在由管道修饰器修饰的函数外部调用。 对于后续操作,默认目录可能会意外更改。 |
| 数据库和目录突变 | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
创建 DataFrame 后,默认数据库已更改。 现有的 DataFrame 可以使用新的默认数据库解析表。 |
| 数据库和目录突变 | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE DATABASE 在由管道修饰器修饰的函数外部调用。 对于后续操作,默认数据库可能会意外更改。 |
| 流函数中的预先执行 | CHECKPOINT_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
流函数调用检查点命令。 |
| 流函数中的预先执行 | CREATE_DATAFRAME_VIEW_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
流函数急切地创建数据帧视图(createOrReplaceTempView 或类似)。 |
| 流函数中的预先执行 | CREATE_RESOURCE_PROFILE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
流函数创建资源配置文件。 |
| 流函数中的预先执行 | GET_RESOURCES_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
流函数调用 spark.resources 或相关的资源 API。 |
| 流函数中的预先执行 | MERGE_INTO_TABLE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
流函数对目标表执行预先配置 MERGE INTO 。 |
| 流函数中的预先执行 | ML_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
流函数执行急切的 Spark ML 操作。 |
| 流函数中的预先执行 | REGISTER_DATA_SOURCE_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
流函数注册Python数据源。 |
| 流函数中的预先执行 | STREAMING_QUERY_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
流函数对活动流式处理查询句柄进行操作。 |
| 流函数中的预先执行 | STREAMING_QUERY_LISTENER_BUS_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
流函数注册或删除流式处理查询侦听器。 |
| 流函数中的预先执行 | STREAMING_QUERY_MANAGER_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
流函数调用 spark.streams 管理流式处理查询。 |
| 流函数中的预先执行 | WRITE_OPERATION_V2_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
流函数执行预先 DataFrameWriterV2 操作。 |
| 流函数中的预先执行 | WRITE_OPERATION_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
流函数执行预先 DataFrame.write 操作。 |
| 流函数中的预先执行 | WRITE_STREAM_OPERATION_START_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
流函数启动流式处理查询(writeStream.start())。 |
| Spark 配置突变 | CHANGE_CONF_INSIDE_QUERY_FUNCTION_NOT_SUPPORTED |
spark.conf.set() 或 spark.conf.unset() 被管道修饰器修饰的函数内调用。 环境版本不支持此操作。 |
| Spark 配置突变 | SET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.set() 是在创建 DataFrame 后由管道修饰器修饰的函数外部调用的。 配置更改可能会影响执行时现有的 DataFrame。 |
| Spark 配置突变 | UNSET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.unset() 是在创建 DataFrame 后由管道修饰器修饰的函数外部调用的。 配置更改可能会影响执行时现有的 DataFrame。 |
| 临时视图替换 | REPLACE_GLOBAL_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
引用全局临时视图的数据帧创建后,将替换全局临时视图。 替换内容可能反映在现有的 DataFrame 中。 |
| 临时视图替换 | REPLACE_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
在引用临时视图的 DataFrame 创建后替换了临时视图。 替换内容可能反映在现有的 DataFrame 中。 |
| UDF 和 UDTF 突变 | OVERWRITE_SESSION_UDF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
在引用 UDF 的数据帧创建后,已使用同名重新注册 UDF。 现有数据帧可以使用新的 UDF 定义。 |
| UDF 和 UDTF 突变 | OVERWRITE_SESSION_UDTF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
在引用 UDTF 的 DataFrame 创建后,使用同名重新注册 UDTF。 现有数据帧可以使用新的 UDTF 定义。 |
| UDF 和 UDTF 突变 | UDF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
UDF 引用全局可变Python变量。 使用环境版本时,UDF 在定义 UDF 时使用变量的值,而不是在调用时使用。 |
| UDF 和 UDTF 突变 | UDTF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
UDTF 引用全局可变Python变量。 使用环境版本时,UDTF 在定义 UDTF 时使用变量的值,而不是调用时的值。 |
数据库和目录突变
当管道代码改变默认数据库或目录时,将发出这些问题。 使用环境版本时,在突变之前构造的 DataFrame 可以使用新的数据库或目录解析表。
触发事件的示例模式:
from pyspark import pipelines as dp
spark.sql("USE CATALOG marketing")
df = spark.read.table("events")
spark.sql("USE CATALOG sales") # changes the default catalog after df was created
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
如果没有环境版本,df请从marketing目录中解析events。 使用环境版本,df可从sales目录解析events。
建议的修复: 完全限定表名,因此解析不依赖于默认目录或数据库,并避免在创建和使用数据帧之间更改默认目录或数据库。
from pyspark import pipelines as dp
df = spark.read.table("marketing.default.events")
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Spark 配置突变
当管道代码以在环境版本下更改 DataFrame 行为的方式改变 Spark 配置时,会发出这些问题。
触发事件的示例模式:
from pyspark import pipelines as dp
df = spark.read.table("events")
spark.conf.set("spark.sql.ansi.enabled", "true") # changes session conf after df was created
@dp.materialized_view
def events_strict():
return df.selectExpr("CAST(price AS INT) AS price")
如果没有环境版本,强制转换会在数据帧创建时使用 conf 值。 使用环境版本时,强制转换会使用 spark.sql.ansi.enabled=true 无效输入,并且可能会失败。
建议的修复: 在创建任何 DataFrame 之前,在管道文件顶部设置所有必需的 Spark 配置。 对于按查询配置,请使用管道规范中的管道 configuration 设置。
临时视图替换
当管道代码在创建数据帧引用该视图后替换临时视图时,将发出这些问题。 使用环境版本时,现有 DataFrame 可能会反映新的视图内容。
触发事件的示例模式:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
如果没有环境版本, mytable 则包含 [(1, "Original Row")]。 使用环境版本时, mytable 包含 [(2, "Replaced Row")]。
建议的修复: 每次创建一个临时视图,不替换它。 如果需要具有相关数据的多个视图,请为每个视图提供不同的名称。
UDF 和 UDTF 突变
当管道代码以更改环境版本下的行为的方式改变 UDF 或 UDTF 时,会发出这些问题。
触发事件的示例模式:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
如果没有环境版本, my_mv 则包含 [("alex_b",)]。 使用环境版本时, my_mv 包含 [("alex_a",)]。
Suggested fix:将值作为参数传入 UDF,而不是从Python全局捕获它们,或在定义 UDF 之前设置全局值,然后不对其进行改变。
from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf
@udf
def append_suffix(s, suffix):
return s + suffix
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))
流函数中的预先执行
当管道代码在由管道修饰器(@table@materialized_view等)修饰的函数内执行预先 Spark 命令时,将发出这些问题。 流函数应定义并返回数据帧;不允许在具有环境版本集的流函数内编写数据、管理流查询、注册资源或运行 ML 操作的预先命令。
建议的修复: 将热切操作移到流函数外部,并改为从流函数返回 DataFrame。 向表写入或启动流式处理查询等副作用属于管道定义之外;管道引擎处理流函数返回的数据帧的具体化。
在事件日志中查找兼容性事件
以下查询返回管道的所有兼容性事件,这些事件是按最新顺序排序的:
SELECT
timestamp,
message,
details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
ORDER BY timestamp DESC;
若要按最近更新中的问题代码对事件进行计数:
SELECT
details:behavior_change_in_spark_connect:issue AS issue,
COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;
有关如何查询事件日志,请参阅 查询事件日志。