本页介绍如何为Azure Databricks上的无服务器流式处理工作负荷选择正确的配置,包括连续管道、增量引入和托管连接器。 选择正确的配置取决于流的源、形状和延迟需求。
什么算作流式工作负载
流式处理工作负载从源(例如云对象存储、消息总线或变更馈送)读取无界数据,并以增量方式将数据写入接收端。 Azure Databricks支持两种流式处理工作负荷模式:
- 连续:一种持续运行且不会停止的管道,会在新数据到达时进行处理。 延迟以秒为单位。
- 增量 (也称为触发):按计划或触发器运行的管道,处理自上次运行以来到达的所有数据,然后停止。 延迟以分钟为单位进行度量。
某些工作负载看起来像是流式管道,但严格来说并不是真正的管道。 示例包括:保持 WebSocket 连接处于打开状态以侦听事件的服务、为每个用户维持持久连接的聊天应用程序,或处理传入 HTTP 请求的 Webhook 接收器。 这些是应用程序,不是流处理管道。 有关这些工作负载适用的无服务器选项,请参阅 非流式传输管道的工作负载。
选择正确的流式处理配置
此表将用例映射到最适合它们的无服务器配置。 本页上的各节提供了有关这些建议的更多详细信息。
| 用例 | 建议配置 | 为什么 |
|---|---|---|
| 连续低延迟流式处理 ETL 或转换 | Lakeflow Spark 声明式管道(连续模式) | 连续模式专为始终启用的流而设计。 流式流水线处理可并发处理微批次,从而提高吞吐量并降低延迟。 托管状态可确保自动恢复。 |
| 从云存储进行增量引入 | 在 Lakeflow Spark 声明式管道中使用 Auto Loader(用于低延迟),或在 无服务器作业 中结合 Trigger.AvailableNow() 使用(如果可以接受较高的延迟)。 |
自动加载程序高效跟踪新文件。
Trigger.AvailableNow() 处理积压任务后即退出,这适用于计划执行或按需执行的节奏。 |
| 从 SaaS 源或数据库 CDC 进行托管引入 | Lakeflow Connect 中的标准连接器 | 具有无服务器引入管道的完全托管连接器。 受支持的数据源无需编写任何代码。 |
| 通过 Delta 表流式处理 SQL | 流式处理表 | 适用于追加型数据源的原生 SQL 增量处理,支持托管管道和刷新。 |
| 在笔记本或作业中进行的定期微批处理 |
无服务器作业Trigger.AvailableNow() |
当分钟级的数据新鲜度已足够时,更具成本效益。 无服务器计算在批处理完成时快速启动并退出。 |
持续流式传输
对于在无服务器计算上进行连续流处理,请使用处于连续模式的 Lakeflow Spark 声明式管道。 数据管道会持续运行,在记录到达时进行处理,并在发生故障后自动恢复。
若要配置连续流,请:
Tip
流管道在无服务器 Lakeflow Spark 声明性管道中默认启用。 微批次以并发方式运行,而不是按顺序依次运行,从而提高摄取密集型流的吞吐量。
基于时间的结构化流触发器(例如 Trigger.ProcessingTime(interval) 和 Trigger.Continuous(interval))在无服务器笔记本或作业中不可用。 对于始终在线模式,请在持续模式下使用 Lakeflow Spark 声明性管道。 请参阅 流式处理限制。 支持 Trigger.Once(),但已弃用——请将现有查询迁移到 Trigger.AvailableNow()。
增量和触发式流处理
对于增量流式处理,请在无服务器作业中使用 Trigger.AvailableNow() 运行 Structured Streaming。 每次运行都会处理自上次检查点之后到达的所有数据,然后退出。
若要将无服务器作业配置为使用增量流式处理,请执行以下操作:
- 按所需的节奏安排作业。 请参阅按计划运行作业。
- 在该作业中的每个流式查询上使用
Trigger.AvailableNow()。 请参阅配置结构化流式处理触发器间隔。 - 使用
maxFilesPerTrigger或maxBytesPerTrigger调整批处理大小,以使内存占用保持可预测。 请参阅 无服务器计算的最佳做法。
以下示例使用自动加载程序从云存储(source_path)读取新文件,处理运行时可用的所有数据,以及写入 Delta 表:
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.maxFilesPerTrigger", 1000)
.load(source_path)
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.toTable("catalog.schema.target_table"))
当可以接受分钟级延迟时,计划 Trigger.AvailableNow() 作业是在无服务器计算环境中成本效益最高的流式处理模式。 计算以秒为单位启动,运行批处理并关闭。
托管引入
如果源是 SaaS 应用程序或操作数据库,请使用 Lakeflow Connect,而不是编写结构化流代码。 Lakeflow Connect 为 Salesforce、Workday、SQL Server CDC 和 PostgreSQL CDC 等连接器运行无服务器引入管道。 请参阅 Lakeflow Connect 中的托管连接器。
以下情况下,此路径是正确的答案:
- 有一个适用于您的源的连接器。
- 需要托管管道,而不是自定义代码。
- 你需要开箱即用的模式演进、数据沿袭和监控。
SQL 托管的增量数据处理
对于以 SQL 为主的团队,可使用 流式表 处理原生 SQL 流式工作负载。 可以在 Lakeflow Spark 声明性管道内定义流式处理表,也可以定义为 独立的流式处理表。
对于使用 CREATE OR REFRESH STREAMING TABLE SQL 语句创建的独立流式处理表,初始数据刷新和填充会立即开始。 系统会自动为每个流式处理表创建和管理专用无服务器管道。
如果您需要具有受管刷新的批量语义查询结果,请改用物化视图。 请参阅 具体化视图。
非流式管道的工作负载
需要保留长时间连接、侦听端口或响应传入 HTTP 请求的工作负荷不是流式处理管道;它是一个应用程序。 不要在无服务器作业上运行这些工作负荷。 正确的 Databricks 选项包括:
- 需要持久连接或 HTTP 终结点的长时间运行的服务:使用 Databricks Apps 生成服务。 Databricks Apps 是用于在 Azure Databricks 上托管自定义应用程序的无服务器平台,包括 FastAPI、Flask、Streamlit、Dash、Gradio、Node.js和 Shiny 应用。 请参阅 Databricks 应用。
- 传入的 Webhook 或事件侦听器:在 Databricks Apps 上公开 HTTP 终结点,或在外部服务中终止 Webhook,并将事件写入云存储或消息总线,然后使用无服务器流式处理管道选取它们。
- 自定义令牌或凭据交换:将 服务主体 与 OAuth 配合使用,或从应用调用 Databricks REST API 。 流式处理管道不保存每个用户会话或自定义令牌状态。
如果要评估工作负荷是否适合流式处理管道,请询问:
- 工作负荷是否从无界数据源读取数据,并将数据写入目标端? 如果是,则为流式处理管道。
- 工作负荷是否需要与客户端保持连接? 如果是,则为应用程序;使用 Databricks 应用。
局限性
无服务器计算有以下流式处理约束。 它们在与合适的产品搭配使用时,都不会妨碍上述工作负载。
- 无服务器笔记本或作业不支持基于时间的结构化流触发器(
Trigger.ProcessingTime(interval)和Trigger.Continuous(interval))。 对始终开启的流,请在连续模式下使用 Lakeflow Spark 声明式管道;对于触发式运行,请使用Trigger.AvailableNow()。 请参阅 流式处理限制。 - 未显式指定触发器的流式查询会因
INFINITE_STREAMING_TRIGGER_NOT_SUPPORTED而失败。 Apache Spark 默认使用Trigger.ProcessingTime("0 seconds"),而无服务器计算不支持该设置。 始终为每个流式查询设置Trigger.AvailableNow(),或者使用连续模式下的 Lakeflow Spark 声明性管道。 - 标准访问模式上流式传输的所有限制也适用于无服务器计算。 请参阅 流式处理限制。