变更数据馈送(CDF)用于跟踪 Delta Lake 表或 Apache Iceberg v3 表不同版本之间的行级更改。
Azure Databricks 支持两种方法:
- 自动变更数据馈送:在读取表时使用行沿袭元数据计算变更。 这不需要单独的表配置,并且适用于 Delta Lake 和 Apache Iceberg v3 表。 请参阅 自动更改数据馈送。
- 旧版变更数据馈送:在表写入期间将更改实体化。 仅支持 Delta Lake 表。 需要单独配置每个表。 请参阅 Delta Lake 的旧更改数据馈送。
您可以将变更数据馈送用于常见的数据使用场景,包括:
- 仅处理自上次管道运行以来更改的行的增量 ETL 管道。
- 用于满足合规和治理要求、跟踪数据修改的审计跟踪。
- 将更改同步到下游表、缓存或外部系统的数据复制工作负载。
自动更改数据馈送
重要
此功能目前以公共预览版提供。 工作区管理员可以从 预览 页控制对此功能的访问。 请参阅 Manage Azure Databricks 预览版。
自动变更数据馈送借助 Delta Lake 的行跟踪以及 Apache Iceberg v3 的行沿袭,在查询时而非写入时计算行级变更。 与传统变更数据馈送不同,自动变更数据馈送无需为每个表单独进行配置,并且可用于 Delta Lake 表和 Apache Iceberg v3 表。
由于对于 MERGE INTO 和 UPDATE 操作,并非在每次写入时都计算更改,因此与旧版变更数据馈送相比,自动变更数据馈送可提高写入性能并降低存储成本。
自动变更数据馈送使用与旧版变更数据馈送相同的 table_changes() 和 readChangeFeed API,并且可与批处理查询、结构化流处理以及 Databricks 到 Databricks 的 Delta Lake 共享配合使用。 请参阅 批量查询中的读取更改 ,并 增量处理更改数据。
要求
- Databricks Runtime 18 或更高版本
- 在 Unity Catalog 中注册的受支持的表格式:
- 采用 Delta Lake 格式且启用了行跟踪的托管表,或采用 Iceberg v3 格式的托管表。
- 采用 Delta Lake 格式且已启用行跟踪的外部表。
注意
更改数据馈送不属于 Apache Iceberg 规范。Azure Databricks读者可以查询 Apache Iceberg v3 表的自动更改数据馈送,但外部 Iceberg 读取器无法查询。 请参阅 冰山表规格。
对于 Delta Lake,只有 Azure Databricks 读取端可以查询自动变更数据馈送。
使用更改数据馈送
若要使用变更数据馈送,请确认所使用的表满足相关要求。 请参阅 要求。
若要批量读取更改数据馈送,请执行以下操作:
Python
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("<table_name>")
Scala
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("<table_name>")
SQL
SELECT * FROM table_changes('<table_name>', 0)
有关以批处理方式读取变更数据馈送的详细信息,请参阅 在批处理查询中读取更改。
若要以流式方式读取变更数据馈送,请执行以下操作:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
有关变更数据馈送的流式读取的更多信息,请参阅 增量处理变更数据。
从旧版更改数据馈送迁移
若要将 Delta Lake 表从旧更改数据馈送迁移到自动更改数据馈送,请执行以下操作:
- 验证表是否符合 要求。
- 通过运行以下命令关闭旧版更改数据馈送:
ALTER TABLE <table_name> UNSET TBLPROPERTIES ('delta.enableChangeDataFeed');
不能同时使用传统和自动两种变更数据馈送。
更改数据馈送架构
读取表的变更数据馈送时,查询会使用该表最新版本的架构。 Azure Databricks支持大多数架构更改和演变操作,但具有列映射的表有限制。 请参阅 具有列映射的表。
除了 Delta Lake 表架构中的数据列外,更改数据馈送还包含标识更改事件的类型的元数据列:
| 列名称 | 类型 | 价值观 |
|---|---|---|
_change_type |
String | 包含:insert、、update_preimageupdate_postimage、delete。preimage 是更新前的值, postimage 是更新后的值。 |
_commit_version |
Long | 包含:包含该变更的 Delta 日志或表版本。 |
_commit_timestamp |
时间戳 | 包含:创建提交时关联的时间戳。 |
如果架构中包含与这些元数据列同名的列,则不能在表上使用更改数据馈送。 在启用更改数据馈送之前,请重命名表中的列以解决此冲突。
以增量方式处理更改数据
Databricks 建议将变更数据馈送与结构化流处理结合使用,以增量方式处理表中的更改。 您必须使用 Azure Databricks 的结构化流式处理来自动跟踪表的变更数据流的版本变化。 有关 SCD 类型 1 或类型 2 表的 CDC 处理,请参阅 AUTO CDC API:使用管道简化更改数据捕获。
流首次启动时,更改数据馈送会先以 INSERT 记录的形式返回表的最新快照,然后再以更改数据的形式返回后续更改。 变更数据馈送会同时将变更数据和新数据行提交到表事务日志中。
若要将流配置为读取表的变更数据馈送,请按如下所示将选项 readChangeFeed 设置为 true:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
速率限制
Azure Databricks 在读取更改数据时支持速率限制(maxFilesPerTrigger、maxBytesPerTrigger)和 excludeRegex。 有关流式传输 Delta Lake 选项的完整列表,请参阅 Delta Lake。
(可选)可以指定起始版本,请参阅 “指定起始版本”。 对于除起始快照以外的版本,速率限制会以原子方式应用于整个提交。 当前批次要么包含整个提交,要么将该提交推迟到下一批次。
回放表历史
变更数据馈送并非旨在用作表中所有更改的永久记录。 它仅记录启用更改数据馈送后发生的更改。 您可以启动新的流式读取,以获取当前版本及其后的所有更改。
更改数据馈送中的记录是暂时性的,只能用于指定的保留时段。 事务日志会定期删除表版本及其对应的变更数据馈送版本。 删除某个版本后,无法再读取该版本的更改数据馈送。
存档永久历史记录的更改数据
如果用例要求保留对表的所有更改的永久历史记录,请使用增量逻辑将记录从更改数据馈送写入新表。
以下示例演示如何使用 trigger.AvailableNow 以批处理工作负载的方式处理可用数据,用于审计或完整变更重放:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
指定起始版本
若要从特定点读取更改,请使用时间戳或版本号指定起始版本。 批量读取需要起始版本。 (可选)可以指定结束版本来限制范围。 若要详细了解表的历史记录,请参阅 时间旅行。
配置使用更改数据馈送的结构化流式处理工作负荷时,指定起始版本可能会影响处理性能:
- 新的数据处理管道通常会受益于默认设置,该设置在流首次启动时将表中所有现有记录记录为
INSERT操作。 - 如果目标表已包含所有记录,并且在某一时间点之前进行了适当的更改,请指定一个起始版本,以避免将源表状态作为
INSERT事件进行处理。
以下示例展示了如何在检查点损坏的情况下从流处理故障中恢复。 在本例中,假设条件如下:
- 在表创建时,已经在源表上启用了变更数据捕获。
- 目标下游表已处理截至版本 75(含)的所有更改。
- 源表的版本历史记录可用于版本 70 及更高版本。
将写入流定义为现有目标表时,必须指定新的检查点位置:
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
重要
如果指定起始版本且该版本在表历史记录中不可用,则流无法从新检查点启动。 由于托管表会自动清理历史版本,因此最终会删除所有指定的起始版本。
请参阅 重播表历史记录。
在批处理查询中读取更改
可以使用批处理查询语法读取从特定版本开始的所有更改,或读取指定范围内的更改,如下所示:
- 请将版本指定为整数,将时间戳指定为格式为
yyyy-MM-dd[ HH:mm:ss[.SSS]]的字符串。 - 开始和结束版本是包容性的。 若要从起始版本读取到最新版本,请仅指定起始版本。
- 如果指定的版本早于启用变更数据馈送的时间,则会引发错误。
若要对起始和结束版本选项使用批处理读取,请执行以下操作:
SQL
若要从版本 0 读取到 10,请执行以下操作:
SELECT * FROM table_changes('tableName', 0, 10)
若要在两个时间戳版本之间读取,请执行以下操作:
--
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
若要从起始版本读取到最新版本,请执行以下操作:
SELECT * FROM table_changes('tableName', 0)
若要读取名称中具有特殊字符的表的更改,请执行以下操作:
SELECT * FROM table_changes('`schema`.`dotted.tableName`', '2021-04-21 06:45:46', '2021-05-21 12:00:00')
请参阅 table_changes 表值函数。
Python
若要从版本 0 读取到 10,请执行以下操作:
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
若要在两个时间戳之间读取,请执行以下操作:
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
若要从起始版本读取到最新版本,请执行以下操作:
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala
若要从版本 0 读取到 10,请执行以下操作:
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
若要在两个时间戳之间读取,请执行以下操作:
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
若要从起始版本读取到最新版本,请执行以下操作:
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
处理范围外版本
默认情况下,如果指定的版本或时间戳超过上次提交,查询将返回错误 timestampGreaterThanLatestCommit。
在 Databricks Runtime 11.3 LTS 及更高版本中,可以启用对超出范围的版本的容忍,如下所示:
SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
启用此配置后,查询将返回不同的结果,如下所示:
- 上次提交以外的起始版本或时间戳返回空结果。
- 最后一次提交之后的结束版本或时间戳返回从开始到最后一次提交的所有更改。
Delta Lake 的旧版变更数据馈送
旧更改数据馈送需要对单个 Delta Lake 表进行手动配置。 由于 Apache Iceberg 规范中不包含更改数据馈送,因此不支持 Apache Iceberg 表。 Databricks 建议您改用自动变更数据馈送。 请参阅 “从旧更改数据馈送迁移”。
启用旧更改数据馈送后,运行时会记录写入表中的所有数据的 更改事件 。 这包括行数据以及指示指定行是插入、删除还是更新的元数据。
旧版变更数据馈送使用与自动变更数据馈送相同的 readChangeFeed 和 table_changes() 读取 API。 请参阅增量处理更改数据和读取批处理查询中的更改。
启用旧版变更数据馈送
您必须在各个单独的表上显式启用旧版更改数据馈送。 使用下列方法之一:
新建表
在delta.enableChangeDataFeed = true命令中设置表属性CREATE TABLE。
CREATE TABLE student (id INT, name STRING, age INT)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
注意
如果您在某一时间段内关闭了旧版变更数据馈送,然后再次将其启用,则该时间段将无法查询。 使用自动变更数据馈送查询该时间间隔内的更改。 请参阅 自动更改数据馈送。
现有表
在delta.enableChangeDataFeed = true命令中设置表属性ALTER TABLE。
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
存储注意事项
托管表有效记录数据更改,并可能使用其他功能优化存储布局。
使用旧版更改数据馈送时,您必须考虑以下存储行为:
- 存储成本可能会小幅增加,因为更改可能会记录在单独的文件中。
- 某些操作(如仅插入或完全分区删除)不会生成更改数据文件。 Azure Databricks 直接根据事务日志计算出变更数据馈送。
- 变更数据文件遵循表的保留策略。 该
VACUUM命令会删除变更数据文件,并且事务日志中的更改使用检查点保留策略。
Databricks 建议不要尝试通过直接查询变更数据文件来重建变更数据馈送。 始终使用 Delta Lake 和 Apache Iceberg API。
局限性
请考虑更改数据馈送的以下限制:
带有列映射的表
在 Delta Lake 表上启用列映射后,无需重写数据文件即可删除或重命名列。 请参阅 有关使用 Delta Lake 列映射重命名和删除列的说明。
但是,在发生非新增型架构变更后,更改数据馈送会受到限制。 非累加架构更改包括以下操作:
- 重命名或删除列。
- 更改列数据类型。
- 更改列的可空性,例如使用
ALTER COLUMN ... SET NOT NULL。 请参阅NOT NULL约束。
无法读取发生非追加式架构更改的事务或范围内的变更数据馈送。
为了允许在指定批量读取范围之前或之后进行非累加式架构更改,查询会使用该范围结束版本的架构,而不是最新表版本的架构。 如果版本范围跨越非累加架构更改,查询仍然失败。