环境版本兼容性

Important

SDP 的环境版本为 Beta 版

通过 vironment 版本集的管道通过 Spark Connect 运行Python代码。 本页介绍不兼容的内容、行为不同、如何扫描管道中受影响的模式以及如何迁移现有管道。

局限性

环境版本尚未与所有管道功能兼容。 如果管道Python代码执行以下任一操作,则具有环境版本集的管道运行将失败:

  • 在用管道修饰器修饰的函数内改变 Spark 会话状态。 示例包括 spark.conf.set(...)spark.sql("USE CATALOG ...")createOrReplaceTempView
  • 使用 Spark Connect 中不可用的 PySpark API,包括 SparkContextRDDSQLContext任何 Py4J API。 请参阅 Spark Connect 中支持的内容

如果在管道上启用环境版本会导致它失败,则禁用环境版本会将管道返回到其以前的状态。

行为变更

Spark Connect 与经典 PySpark 运行时存在少量行为差异。 有关完整参考,请参阅 Spark Connect 与经典 Spark兼容性扫描会提前检测这些模式,并阻止启用这些模式,直到解决这些模式,以便在它们影响生产数据之前找到并修复它们。

在管道中,行为可能有所不同的最常见情况如下:

交错的数据帧构造和会话突变

当管道构造数据帧时,会改变 Spark 会话状态(例如,更改默认目录或架构、设置配置、替换临时视图或重新注册 UDF),然后使用 DataFrame:

  • 如果没有环境版本,DataFrame 将使用 预突变 会话状态。
  • 借助环境版本,DataFrame 使用 突变后的 会话状态。

例如:

from pyspark import pipelines as dp

spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

df = spark.sql("SELECT * FROM my_view")

spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

@dp.materialized_view
def mytable():
  return df

如果没有环境版本, mytable 则包含 [(1, "Original Row")]。 使用环境版本时, mytable 包含 [(2, "Replaced Row")]

引用可变Python状态的 UDF

当 UDF 引用Python定义 UDF 后其值更改的全局变量时:

  • 如果没有环境版本,UDF 将使用变量 的最新 值。
  • 使用环境版本,UDF 在 定义 UDF 时使用值

例如:

from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf

suffix = "a"

@udf
def my_udf(s):
  return s + suffix

suffix = "b"

@dp.materialized_view
def my_mv():
  return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))

如果没有环境版本, my_mv 则包含 [("alex_b",)]。 使用环境版本时, my_mv 包含 [("alex_a",)]

如果管道依赖于任一模式,在启用环境版本之前对其进行审核。

兼容性扫描

兼容性扫描有助于在启用环境版本之前在管道中找到代码模式,这些模式会在环境版本下生成不同的结果。 扫描是选择加入的。 在管道上启用扫描时:

  • 每个管道运行根据检测到的模式在管道事件日志中发出一个 BehaviorChangeInSparkConnectWARN 事件。
  • 在解决上一次成功更新的所有兼容性警告之前,不能在管道上启用环境版本。

如果未启用扫描,则不会发出任何事件,并且 environment_version 不会阻止启用。 Databricks 建议在管道上启用环境版本之前启用扫描并解析任何检测到的模式。

在管道上启用扫描

可以通过添加 pipelines.environmentVersion.enableCompatibilityScan 管道配置来启用兼容性扫描,可以通过管道编辑器 UI 或向管道配置 JSON 添加条目来添加配置。

通过 UI

  1. 在管道编辑器中,单击 “设置”。
  2. 在管道设置中查找 “配置 ”部分。
  3. 单击 “加号”图标。添加配置
  4. 输入 pipelines.environmentVersion.enableCompatibilityScan 为键和 true 值。
  5. 保存管道设置。

在管道 JSON 中

将以下条目添加到 configuration 块:

"configuration": {
  "pipelines.environmentVersion.enableCompatibilityScan": "true"
}
  1. 在管道上启用扫描。
  2. 触发管道运行。
  3. 查询事件的管道事件日志BehaviorChangeInSparkConnectWARN。 有关问题代码的完整列表、示例模式和建议的修补程序,请参阅 兼容性事件参考
  4. 更新管道代码以删除检测到的模式并再次运行管道,直到不再发出任何事件。
  5. 使用管道上的“启用环境版本”中的某个方法添加到environment_version管道。

如果认为兼容性警告为误报,并且想要 environment_version 启用,请从管道配置中删除 pipelines.environmentVersion.enableCompatibilityScan 该条目以绕过检查。 (不允许将值设置为 false - 必须完全删除条目。

预检检查不会在没有以前更新的管道上运行,也不会在已设置环境版本的管道上运行。

将现有管道迁移到环境版本

若要迁移尚未使用环境版本的现有管道,请遵循此端到端工作流。 它引导你查找在 Spark Connect 下可能表现不同的代码模式、修复这些模式并安全地推出环境版本。

  1. 在管道上启用兼容性扫描。 按照 兼容性扫描中所述在管道上启用扫描。 这是导致检测到的模式在事件日志中浮出水面的原因,以及启用预检检查来防止启用尝试的原因。

  2. 触发管道运行并查看兼容性事件。 触发常规管道更新。 成功完成后,查询管道事件日志中的 BehaviorChangeInSparkConnectWARN 事件。 每个事件报告一个检测到的模式。 有关问题代码的完整列表、示例模式和建议的修补程序,请参阅 兼容性事件参考

  3. 更新管道代码以解决检测到的模式。 对于每个检测到的模式,请根据建议的修补程序更新管道代码。 每次更改后,触发另一个管道更新并验证相应的事件不再出现。 重复,直到事件日志不再显示成功更新的任何兼容性事件。

  4. 在管道上启用环境版本。 在最近的成功更新没有兼容性事件后,使用 UI、API 或捆绑包添加到 environment_version 管道,如 在管道上启用环境版本中所述。 下一次更新使用 Spark Connect 和固定Python语言版本和预安装的库运行。

    如果更新失败,因为兼容性警告仍然存在,请删除 environment_version,返回到步骤 2,并在重试之前解决剩余的警告。

  5. 验证迁移。 使用环境版本进行第一次更新后,请验证:

    • create_update事件日志中的事件显示environment_version设置为预期值。
    • 管道生成预期数据,并且不会显示新的错误事件。
    • 对下游表进行抽查,了解行为 更改中所述的任何细微行为差异。

回 滚

如果迁移后管道错误,请从管道设置中删除 environment_version 该管道。 下一个更新使用以前的Python运行时配置运行。 使用回滚运行进行调试,然后在识别并修复问题后重复步骤 2 的迁移。

兼容性事件参考

在管道上启用兼容性扫描时,SDP 会根据检测到的模式在管道事件日志中发出一个BehaviorChangeInSparkConnectWARN事件。 启用扫描并检测到以前的成功更新检测到任何模式时,SDP 还会阻止 environment_version 启用,直到解决模式。

每个事件报告单个问题代码,用于标识检测到的内容。 若要查找代码,请在 问题代码 表中找到它 — 每行链接到包含示例模式和建议的修补程序的类别部分。

事件形状

BehaviorChangeInSparkConnect 事件遵循标准 管道事件日志架构

  • event_typebehavior_change_in_spark_connect
  • levelWARN
  • details 包含 behavior_change_in_spark_connect 具有单个 issue 字段的对象。 问题值是下面列出的代码之一。
  • message 是检测到的模式的人工可读说明。

问题代码

类别 问题代码 Description
数据库和目录突变 USE_CATALOG_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR 创建 DataFrame 后,默认目录已更改。 现有的 DataFrame 可以使用新的默认目录解析表。
数据库和目录突变 USE_CATALOG_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR USE CATALOG 在由管道修饰器修饰的函数外部调用。 对于后续操作,默认目录可能会意外更改。
数据库和目录突变 USE_DATABASE_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR 创建 DataFrame 后,默认数据库已更改。 现有的 DataFrame 可以使用新的默认数据库解析表。
数据库和目录突变 USE_DATABASE_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR USE DATABASE 在由管道修饰器修饰的函数外部调用。 对于后续操作,默认数据库可能会意外更改。
流函数中的预先执行 CHECKPOINT_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED 流函数调用检查点命令。
流函数中的预先执行 CREATE_DATAFRAME_VIEW_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED 流函数急切地创建数据帧视图(createOrReplaceTempView 或类似)。
流函数中的预先执行 CREATE_RESOURCE_PROFILE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED 流函数创建资源配置文件。
流函数中的预先执行 GET_RESOURCES_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED 流函数调用 spark.resources 或相关的资源 API。
流函数中的预先执行 MERGE_INTO_TABLE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED 流函数对目标表执行预先配置 MERGE INTO
流函数中的预先执行 ML_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED 流函数执行急切的 Spark ML 操作。
流函数中的预先执行 REGISTER_DATA_SOURCE_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED 流函数注册Python数据源。
流函数中的预先执行 STREAMING_QUERY_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED 流函数对活动流式处理查询句柄进行操作。
流函数中的预先执行 STREAMING_QUERY_LISTENER_BUS_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED 流函数注册或删除流式处理查询侦听器。
流函数中的预先执行 STREAMING_QUERY_MANAGER_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED 流函数调用 spark.streams 管理流式处理查询。
流函数中的预先执行 WRITE_OPERATION_V2_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED 流函数执行预先 DataFrameWriterV2 操作。
流函数中的预先执行 WRITE_OPERATION_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED 流函数执行预先 DataFrame.write 操作。
流函数中的预先执行 WRITE_STREAM_OPERATION_START_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED 流函数启动流式处理查询(writeStream.start())。
Spark 配置突变 CHANGE_CONF_INSIDE_QUERY_FUNCTION_NOT_SUPPORTED spark.conf.set()spark.conf.unset() 被管道修饰器修饰的函数内调用。 环境版本不支持此操作。
Spark 配置突变 SET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR spark.conf.set() 是在创建 DataFrame 后由管道修饰器修饰的函数外部调用的。 配置更改可能会影响执行时现有的 DataFrame。
Spark 配置突变 UNSET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR spark.conf.unset() 是在创建 DataFrame 后由管道修饰器修饰的函数外部调用的。 配置更改可能会影响执行时现有的 DataFrame。
临时视图替换 REPLACE_GLOBAL_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR 引用全局临时视图的数据帧创建后,将替换全局临时视图。 替换内容可能反映在现有的 DataFrame 中。
临时视图替换 REPLACE_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR 在引用临时视图的 DataFrame 创建后替换了临时视图。 替换内容可能反映在现有的 DataFrame 中。
UDF 和 UDTF 突变 OVERWRITE_SESSION_UDF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR 在引用 UDF 的数据帧创建后,已使用同名重新注册 UDF。 现有数据帧可以使用新的 UDF 定义。
UDF 和 UDTF 突变 OVERWRITE_SESSION_UDTF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR 在引用 UDTF 的 DataFrame 创建后,使用同名重新注册 UDTF。 现有数据帧可以使用新的 UDTF 定义。
UDF 和 UDTF 突变 UDF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR UDF 引用全局可变Python变量。 使用环境版本时,UDF 在定义 UDF 时使用变量的值,而不是在调用时使用。
UDF 和 UDTF 突变 UDTF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR UDTF 引用全局可变Python变量。 使用环境版本时,UDTF 在定义 UDTF 时使用变量的值,而不是调用时的值。

数据库和目录突变

当管道代码改变默认数据库或目录时,将发出这些问题。 使用环境版本时,在突变之前构造的 DataFrame 可以使用新的数据库或目录解析表。

触发事件的示例模式:

from pyspark import pipelines as dp

spark.sql("USE CATALOG marketing")
df = spark.read.table("events")

spark.sql("USE CATALOG sales")  # changes the default catalog after df was created

@dp.materialized_view
def events_summary():
  return df.groupBy("region").count()

如果没有环境版本,df请从marketing目录中解析events。 使用环境版本,df可从sales目录解析events

建议的修复: 完全限定表名,因此解析不依赖于默认目录或数据库,并避免在创建和使用数据帧之间更改默认目录或数据库。

from pyspark import pipelines as dp

df = spark.read.table("marketing.default.events")

@dp.materialized_view
def events_summary():
  return df.groupBy("region").count()

Spark 配置突变

当管道代码以在环境版本下更改 DataFrame 行为的方式改变 Spark 配置时,会发出这些问题。

触发事件的示例模式:

from pyspark import pipelines as dp

df = spark.read.table("events")

spark.conf.set("spark.sql.ansi.enabled", "true")  # changes session conf after df was created

@dp.materialized_view
def events_strict():
  return df.selectExpr("CAST(price AS INT) AS price")

如果没有环境版本,强制转换会在数据帧创建时使用 conf 值。 使用环境版本时,强制转换会使用 spark.sql.ansi.enabled=true 无效输入,并且可能会失败。

建议的修复: 在创建任何 DataFrame 之前,在管道文件顶部设置所有必需的 Spark 配置。 对于按查询配置,请使用管道规范中的管道 configuration 设置。

临时视图替换

当管道代码在创建数据帧引用该视图后替换临时视图时,将发出这些问题。 使用环境版本时,现有 DataFrame 可能会反映新的视图内容。

触发事件的示例模式:

from pyspark import pipelines as dp

spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

df = spark.sql("SELECT * FROM my_view")

spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

@dp.materialized_view
def mytable():
  return df

如果没有环境版本, mytable 则包含 [(1, "Original Row")]。 使用环境版本时, mytable 包含 [(2, "Replaced Row")]

建议的修复: 每次创建一个临时视图,不替换它。 如果需要具有相关数据的多个视图,请为每个视图提供不同的名称。

UDF 和 UDTF 突变

当管道代码以更改环境版本下的行为的方式改变 UDF 或 UDTF 时,会发出这些问题。

触发事件的示例模式:

from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf

suffix = "a"

@udf
def my_udf(s):
  return s + suffix

suffix = "b"

@dp.materialized_view
def my_mv():
  return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))

如果没有环境版本, my_mv 则包含 [("alex_b",)]。 使用环境版本时, my_mv 包含 [("alex_a",)]

Suggested fix:将值作为参数传入 UDF,而不是从Python全局捕获它们,或在定义 UDF 之前设置全局值,然后不对其进行改变。

from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf

@udf
def append_suffix(s, suffix):
  return s + suffix

@dp.materialized_view
def my_mv():
  return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))

流函数中的预先执行

当管道代码在由管道修饰器(@table@materialized_view等)修饰的函数内执行预先 Spark 命令时,将发出这些问题。 流函数应定义并返回数据帧;不允许在具有环境版本集的流函数内编写数据、管理流查询、注册资源或运行 ML 操作的预先命令。

建议的修复: 将热切操作移到流函数外部,并改为从流函数返回 DataFrame。 向表写入或启动流式处理查询等副作用属于管道定义之外;管道引擎处理流函数返回的数据帧的具体化。

在事件日志中查找兼容性事件

以下查询返回管道的所有兼容性事件,这些事件是按最新顺序排序的:

SELECT
  timestamp,
  message,
  details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
  AND level = 'WARN'
ORDER BY timestamp DESC;

若要按最近更新中的问题代码对事件进行计数:

SELECT
  details:behavior_change_in_spark_connect:issue AS issue,
  COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
  AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;

有关如何查询事件日志,请参阅 查询事件日志

另请参阅