使用内置连接器订阅 Google Pub/Sub。 此连接器为来自订阅者的记录提供精确一次的处理语义。
注意
Pub/Sub 可能会发布重复的记录,或者记录可能会无序到达订阅者。 编写代码来处理重复和无序记录。
配置 Pub/Sub 流
下面的代码示例演示了用于配置从 Pub/Sub 读取的结构化流式处理的基本语法。
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
有关更多配置选项,请参阅“配置 Pub/Sub 流式处理读取”选项。
配置对 Pub/Sub 的访问权限
配置的凭据必须具有以下角色。
| 角色 | 必需或可选 | 角色的使用方式 |
|---|---|---|
roles/pubsub.viewer 或 roles/viewer |
必填 | 检查订阅是否存在,然后获取订阅。 |
roles/pubsub.subscriber |
必填 | 从订阅中提取数据。 |
roles/pubsub.editor 或 roles/editor |
可选 | 如果订阅不存在,则启用创建订阅的功能,并在流终止时使用deleteSubscriptionOnStreamStop删除订阅。 |
Databricks 建议在提供授权选项时使用机密。 授权连接需要以下选项:
clientEmailclientIdprivateKeyprivateKeyId
了解 Pub/Sub 架构
流的架构与从 Pub/Sub 提取的记录匹配,如下表所述。
| 字段 | 类型 |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
配置 Pub/Sub 流式读取选项
下表描述了 Pub/Sub 支持的其他选项。 所有选项都在使用 .option("<optionName>", "<optionValue>") 语法进行结构化流式传输读取的过程中配置。
注意
某些 Pub/Sub 配置选项使用提取概念,而不是微批处理。 这反映了内部实现的详细信息,并且选项的工作方式类似于其他结构化流连接器中的推论,只是记录是被提取然后处理的。
| 选项 | 默认值 | 说明 |
|---|---|---|
numFetchPartitions |
设置为流初始化时存在的执行程序数量的一半。 | 从订阅中提取记录的并行 Spark 任务数。 |
deleteSubscriptionOnStreamStop |
false |
如果true,流式处理作业结束时传递到流的订阅将被删除。 |
maxBytesPerTrigger |
none |
每个触发的微批处理期间待处理批量的软限制。 |
maxRecordsPerFetch |
1000 |
在处理记录之前,每个任务要获取的记录数。 |
maxFetchPeriod |
10s |
每个任务在处理记录之前的获取时间长度。 接受持续时间字符串,例如 1s 1 秒或 1m 1 分钟。 Databricks 建议使用默认值。 |
将增量批处理与 Pub/Sub 配合使用
可以使用 Trigger.AvailableNow 从 Pub/Sub 源中消费可用记录,作为增量批处理的一部分。
Azure Databricks 在 Trigger.AvailableNow 设置中记录你开始读取的时间戳。 批次处理的记录包括之前获取的所有数据,以及时间戳小于记录流开始时间戳的任何新发布记录。 有关详细信息,请参阅 AvailableNow:增量批处理。
监视发布/子流式处理指标
结构化流式处理进度指标报告了已提取且准备处理的记录数、记录大小,以及自流开始以来观察到的重复记录数。 以下是此类指标的示例:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
限制
Pub/Sub 不支持投机执行(spark.speculation)。