读取结构化流式处理状态信息

可以使用 DataFrame 操作或 SQL 表值函数来查询结构化流式处理状态数据和元数据。 使用这些函数可观察结构化流式处理有状态查询的状态信息,这对于监视和调试非常有用。

必须具有读取权限,访问流查询的检查点路径,才能查询状态数据和元数据。 本文中所述的函数提供对状态数据和元数据的只读访问权限。 只能使用批量读取语义来查询状态信息。

注意

无法查询 Lakeflow Spark 声明性管道、流式处理表或具体化视图的状态信息。 不能使用无服务器计算或配置了标准访问模式的计算来查询状态信息。

要求

  • 使用以下计算配置之一:
    • Databricks Runtime 16.3 及更高版本,用在配置了标准访问模式的计算上。
    • Databricks Runtime 14.3 LTS 及更高版本可在配置为专用或无隔离访问模式的计算上运行。
  • 对流式处理查询所使用的检查点路径的读取访问权限。

读取结构化流式处理状态存储器

可以读取在任何受支持的 Databricks Runtime 中执行的结构化流式处理查询的状态存储信息。 使用以下语法:

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

Scala

val df = spark.read
  .format("statestore")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore('/checkpoint/path')

状态读取器 API 选项和架构

有关 statestore 格式选项的完整列表,请参阅 状态存储

输出数据具有以下架构:

类型 说明
key 结构体(从状态键派生出的后续类型) 状态检查点中有状态操作记录的密钥。
value 结构体(从状态值派生出的后续类型) 状态检查点中记录有状态运算符的值。
partition_id 整数 包含有状态运算符记录的状态检查点的分区。

在 Databricks Runtime 16.4 LTS 及更高版本中,当 readChangeFeed 选项设置为 true时,输出数据具有以下架构:

类型 说明
batch_id 长整型 状态更改所属的批次 ID。
change_type 字符串 批处理应用的更改类型: update 用于插入和更新, delete 用于删除。
key 结构体(从状态键派生出的后续类型) 状态检查点中有状态操作记录的密钥。
value 结构体(从状态值派生出的后续类型) 状态检查点中记录有状态运算符的值。 null 适用于 change_typedelete 的记录。
partition_id 整数 包含有状态运算符记录的状态检查点的分区。

请参阅 read_statestore 表值函数

读取结构化流状态更改

在 Databricks Runtime 16.4 LTS 及更高版本上可用。 若要了解如何在微批处理中更改状态,而不是在单个微批处理中查看完整状态,请readChangeFeed设置为true并指定changeStartBatchId。 (可选)指定 changeEndBatchId。 有关选项的完整列表,请参阅 状态存储

例如,若要读取从批处理 2 到最新已提交批处理的状态变更:

Python

df = (spark.read
  .format("statestore")
  .option("readChangeFeed", True)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")
)

Scala

val df = spark.read
  .format("statestore")
  .option("readChangeFeed", true)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_statestore(
    '<checkpointLocation>',
    readChangeFeed => true,
    changeStartBatchId => 2
);

输出模式包括额外的 batch_id 列和 change_type 列。 有关完整架构,请参阅 状态读取器 API 选项和架构

读取结构化流式处理状态元数据

在 Databricks Runtime 14.3 LTS 或更高版本上可用。 可以读取结构化流式处理查询的状态元数据信息:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

返回的数据具有以下架构:

类型 说明
operatorId 整数 有状态流处理操作符的整数 ID。
operatorName 字符串 有状态流式处理运算符的名称。
stateStoreName 字符串 运算符的状态存储的名称。
numPartitions 整数 状态存储的分区数。
minBatchId 长整型 可用于查询状态的最小批次 ID。
maxBatchId 长整型 可用于查询状态的最大批 ID。

注意

minBatchIdmaxBatchId 提供的批处理 ID 值在写入检查点时反映状态。 系统会使用微批处理执行自动清理旧的批处理,因此不能保证此处提供的值仍可用。

请参阅 read_state_metadata 表值函数

示例:查询流间联接的一侧

使用以下语法查询流间联接的左侧:

Python

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

示例:查询具有多个有状态运算符的流的状态存储

此示例使用状态元数据读取器收集具有多个有状态运算符的流式处理查询的元数据详细信息,然后使用元数据结果作为状态读取器的选项。

状态元数据读取器将检查点路径作为唯一选项,如以下语法示例所示:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

下表表示状态存储元数据的示例输出:

operatorId 操作员名称 状态存储名称 分区数量 minBatchId maxBatchId
0 状态存储保存 默认 200 0 13
1 dedupeWithinWatermark 默认 200 0 13

若要获取运算符的结果 dedupeWithinWatermark ,请使用该选项查询状态读取器 operatorId ,如以下示例所示:

Python

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);