Apache Spark 结构化流式处理增量地处理数据。 触发器间隔控制结构化流式处理检查新数据的频率。 可以为准实时处理、计划的数据库刷新或批处理一天或一周的所有新数据配置触发器间隔。
由于 什么是自动加载程序? 使用结构化流式处理来加载数据,因此了解触发器的工作原理使你能够灵活地控制成本,同时以所需的频率引入数据。
重要
Azure Databricks建议设置一个触发器模式,用于平衡用例的延迟和成本。 否则,您可能会看到来自云服务提供商的意外存储费用。 有关详细信息,请参阅 控制云存储成本 。
触发器模式概述
下表总结了结构化流式处理中可用的触发器模式:
| 触发模式 | 语法示例(Python) | 最适用于 |
|---|---|---|
| 未指定 (默认值) | 不适用 | 具有 3-5 秒延迟的常规用途流式处理。 等效于使用 0 毫秒间隔的 processingTime 触发器。 只要新数据到达,流处理就持续运行。 |
| 处理时间 | .trigger(processingTime='10 seconds') |
平衡成本和性能。 通过防止系统过于频繁地检查数据来减少开销。 |
| 现已推出 | .trigger(availableNow=True) |
计划的增量批处理。 在触发流式处理作业时处理尽可能多的数据。 |
| 实时模式 | .trigger(realTime='5 minutes') |
超低延迟的操作负载需要毫秒级处理,例如欺诈检测或实时个性化。 公共预览版。 “5 分钟”表示微批处理的长度。 用 5 分钟时间将每个批次的开销(例如查询编译)降到最低。 |
| 连续 | .trigger(continuous='1 second') |
不支持。 这是 Spark OSS 中包含的实验性功能。 请改用实时模式。 |
:::note 无服务器计算
在无服务器计算中,仅 Trigger.AvailableNow() 受支持且 Trigger.Once() 受支持。 Databricks 建议使用Trigger.AvailableNow()。
对于无服务器计算上的连续流式处理,请在连续模式下使用触发与连续管道模式。
请参阅 流式处理限制。
:::
processingTime:基于时间的触发间隔
结构化流式处理将基于时间的触发器间隔称为“固定间隔微批处理”。 使用 processingTime 关键字,将持续时间指定为字符串,例如 .trigger(processingTime='10 seconds')。
此间隔的配置确定系统执行检查的频率,以查看新数据是否已到达。 配置处理时间以平衡延迟要求和数据到达源的速率。
AvailableNow:增量批处理
重要
在 Databricks Runtime 11.3 LTS 及更高版本中, Trigger.Once 已弃用。 使用 Trigger.AvailableNow 处理所有增量批处理工作负荷。
触发器 AvailableNow 选项会将所有可用记录作为增量批进行处理,并且可以使用选项(例如 maxBytesPerTrigger)来配置批大小。 尺寸选项因数据源而异。
支持的数据源
Azure Databricks支持使用 Trigger.AvailableNow 从许多结构化流源进行增量批处理。 下表包含每个数据源所需的最低受支持 Databricks Runtime 版本:
| 源 | Databricks Runtime最低版本 |
|---|---|
| 文件源(JSON、Parquet 等) | 9.1 LTS |
| Delta Lake | 10.4 LTS |
| 自动加载器 | 10.4 LTS |
| Apache Kafka | 10.4 LTS |
| 动动力 | 13.1 |
realTime:超低延迟的工作负载
结构化流式处理实时模式在尾部实现 1 秒以下的端到端延迟,在常见情况下约为 300 毫秒。 有关如何有效配置和使用实时模式的更多详细信息,请参阅 结构化流式处理中的实时模式。
Apache Spark 有一个称为 连续处理的附加触发器间隔。 自 Spark 2.3 以来,此模式已归类为实验模式。 Azure Databricks不支持或推荐此模式。 对于低延迟用例,请使用实时模式。
注意
此页上的连续处理模式与 Lakeflow Spark 声明性管道中的连续处理无关。
控制云存储成本
默认情况下,如果未设置触发模式,结构化流处理会将触发模式设置为 processingTime,将间隔设置为 0,每隔几毫秒检查一次新数据。 这可以每天生成大量云存储 API 调用,并导致云提供商产生意外费用。
Azure Databricks建议配置适合延迟和成本要求的触发器模式。 有关配置基于时间的触发器间隔的信息,请参阅 processingTime 。
更改运行之间的触发器间隔
你可以在使用同一检查点时更改运行之间的触发间隔。
更改间隔时的行为
如果结构化流式处理查询在微批处理当前正在处理时停止,则在应用新的触发器间隔之前,该微批处理必须完成。 更改触发器间隔后,您可能会观察到微批处理可能暂时仍按以前指定的配置运行。 下面描述了转换后的预期行为:
-
从基于时间的时间间隔到
AvailableNow: 在所有可用记录处理之前,微批处理可能会作为增量批处理进行处理。 -
从
AvailableNow到基于时间的间隔: 对于在上次AvailableNow触发的作业时可用的所有记录,处理可能继续进行。
从查询失败中恢复
如果尝试使用增量批处理从查询失败中恢复,则触发器间隔更改无法解决问题。 之前未成功完成的批处理必须完成,因为结构化流式处理需要具备幂等性的小批处理。 请参阅 Apache Spark 的容错语义。
若要解决故障,请纵向扩展计算容量,例如增加工作器节点的大小。 在极少数情况下,可能需要使用新的检查点重新启动流。