异步进度跟踪通过启用查询来异步更新每个微批处理中的检查点进度和处理数据,从而减少结构化流式处理管道的延迟。
在查询处理期间,结构化流式处理会持久保存和管理偏移量,以测量每个微批处理中的offsetLog和commitLog中的查询进度。 如果没有异步进度跟踪,偏移管理操作直接影响处理延迟,因为数据处理在完成之前无法继续。
注意
异步进度跟踪与 Trigger.once 或 Trigger.availableNow 触发器不兼容。 如果启用,包含Trigger.once或Trigger.availableNow的结构化流式处理查询将失败。
配置选项
| 选项 | 违约 | 描述 |
|---|---|---|
asyncProgressTrackingEnabled |
false |
是否启用异步进度跟踪。 |
asyncProgressTrackingCheckpointIntervalMs |
1000 |
写入偏移量和完成提交之间的间隔时间(以毫秒为单位)。 |
启用异步进度跟踪
若要启用异步进度跟踪,请设置为asyncProgressTrackingEnabledtrue:
Python
stream = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
)
query = (stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
)
Scala
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
使用检查点频率提高吞吐量
对于大部分查询,默认检查点频率为 1000 毫秒具有良好的吞吐量。 当偏移量管理操作发生的速度快于异步进度追踪的处理速度时,就会产生积压。 为了防止积压工作进一步增加,异步进度跟踪可能会阻止或减缓数据处理速度,从而可能侵蚀预期的延迟优势。
在此方案中,Databricks 建议增加检查点间隔:
Python
query = (stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.option("asyncProgressTrackingCheckpointIntervalMs", "5000")
.start()
)
Scala
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.option("asyncProgressTrackingCheckpointIntervalMs", "5000")
.start()
注意
故障恢复时间随着检查点间隔时间的增加而增加。 如果发生故障,管道必须重新处理自上一个成功检查点以来的所有数据。 在将此更改应用于生产之前,请考虑常规处理过程中较低的延迟与在发生故障时的恢复时间之间的权衡。
关闭异步进度跟踪
启用异步进度跟踪后,流不保证每个批处理的检查点进度。 必须检查点进度才能关闭此功能。
若要关闭,请执行以下步骤:
处理至少两个微批,并将
asyncProgressTrackingEnabled设置为true,同时将asyncProgressTrackingCheckpointIntervalMs设置为0。Python
query = (stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "true") .option("asyncProgressTrackingCheckpointIntervalMs", "0") .start() )Scala
val query = stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "true") .option("asyncProgressTrackingCheckpointIntervalMs", "0") .start()停止查询:
Python
query.stop()Scala
query.stop()关闭异步进度跟踪并重启查询:
Python
query = (stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "false") .start() )Scala
val query = stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "false") .start()
如果在未执行上述步骤的情况下关闭异步进度跟踪,可能会遇到以下错误:
java.lang.IllegalStateException: batch x doesn't exist
在驱动程序日志中,可能会看到以下错误:
The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.
Limitations
- 对于 Kafka 接收器,异步进度跟踪仅支持无状态管道。
- 异步进度跟踪不保证完全一次的端到端处理,因为批处理的偏移范围在失败时可能会更改。 某些接收器(例如 Kafka)从不提供“恰好一次”保证。