Important
此功能目前以公共预览版提供。
按需状态重新分区允许您在不丢失检查点状态的情况下,更改有状态的 Structured Streaming 查询的分区数量。
如果不进行按需状态重新分区,可以在创建检查点期间设置随机分区数。 如果更改 spark.sql.shuffle.partitions,具有现有检查点的查询将忽略新值。 应用新的分区计数要求使用新的检查点重启查询。
按需状态重新分区具有以下优点:
- 无需重建检查点即可通过调整分区数量来优化查询。
- 纵向扩展或缩减查询以匹配工作负荷更改。
Requirements
- Databricks Runtime 18 及更高版本。
- 查询必须使用 RocksDB 状态存储提供程序。 在 DBR 17.3 或更高版本上,RocksDB 是默认的状态存储提供程序。 请参阅 在 Azure Databricks 上配置 RocksDB 状态存储。
更改分区数
使用 Spark 配置 spark.sql.streaming.stateStore.partitions 并重启查询,以更改混洗分区和流式状态分区的数量:
Python
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()
Scala
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()
对于有状态查询, spark.sql.streaming.stateStore.partitions 优先于 spark.sql.shuffle.partitions. 查询重启并完成最后一个计划的微分块后,查询将运行重新分区操作,将状态数据重新分发到新的分区数。 重新分区操作完成后,查询将恢复处理。
监控重新分区状态
下一个微批处理完成后, StreamingQueryProgress 事件包括重新分区操作的持续时间。 在事件的 durationMs 指标中, controlBatch.REPARTITION 以毫秒为单位显示持续时间值。 状态规模更大可能会增加重新分区所需的时间。 请参阅 在 Azure Databricks 上监视结构化流查询。
结构化流式处理示例
以下示例将查询从 200(默认值)缩减为 100 个随机分区。 停止查询,设置新的分区计数,然后重启:
Python
# Start the query with the default partition count (200)
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
# Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
# Restart the query with the same options
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
Scala
// Start the query with the default partition count (200)
val query = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
// Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
// Restart the query with the same options
val query2 = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
Lakeflow Spark 声明性管道示例
在 Lakeflow Spark 声明式管道中,使用 @dp.table 或 @dp.append_flow 装饰器上的 spark_conf 参数设置 spark.sql.streaming.stateStore.partitions。
在流上设置分区:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
source_path = "/databricks-datasets/iot-stream/data-device/"
dp.create_streaming_table("target_table")
@dp.append_flow(
target="target_table",
name="my_flow_1",
spark_conf={"spark.sql.streaming.stateStore.partitions": "100"}
)
def my_flow_1():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(source_path)
.withColumn("timestamp", F.to_timestamp("timestamp"))
.withWatermark("timestamp", "10 minutes")
.groupBy(F.window("timestamp", "5 minutes"), "id")
.count())
在默认流的表级别设置分区:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
source_path = "/databricks-datasets/iot-stream/data-device/"
@dp.table(
name="table_1",
spark_conf={"spark.sql.streaming.stateStore.partitions": "100"}
)
def table_1():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(source_path)
.withColumn("timestamp", F.to_timestamp("timestamp"))
.withWatermark("timestamp", "10 minutes")
.groupBy(F.window("timestamp", "5 minutes"), "id")
.count())