从 Apache Pulsar 流式传输数据

重要

此功能目前以公共预览版提供。

在 Databricks Runtime 14.1 及更高版本中,可以使用结构化流式传输功能从 Azure Databricks 上的 Apache Pulsar 流式传输数据。

结构化流处理为从 Pulsar 源读取的数据提供精确一次处理语义。

语法示例

下面是使用结构化流式传输从 Pulsar 进行读取的基本示例:

Python

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()
)

Scala

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

要从 Pulsar 主题中读取数据,必须提供 service.url 以及以下选项之一:

  • topic
  • topics
  • topicsPattern

有关选项的完整列表,请参阅配置 Pulsar 流式传输读取的选项

向 Pulsar 进行身份验证

Azure Databricks 支持向 Pulsar 进行信任存储和密钥存储身份验证。 Databricks 建议使用机密来存储配置详细信息。

可用的流配置选项包括:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

如果该流使用PulsarAdmin,则必须设置以下选项:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

Example

下面的示例演示了如何配置验证选项:

Python

client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")

# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", starting_offsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", client_auth_params)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trust_store_path)
  .option("pulsar.client.tlsTrustStorePassword", client_pw)
  .load()
)

Scala

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulsar 架构

从 Pulsar 读取时,行的架构取决于源主题的架构。

  • 对于具有 Avro 或 JSON 架构的主题,字段名称和字段类型将保留在生成的 Spark 数据帧中。
  • 对于没有架构或具有 Pulsar 中简单数据类型的主题,有效负载将加载到 value 列。
  • 如果将流配置为读取具有不同架构的多个主题,请设置 allowDifferentTopicSchemas 以将原始内容加载到 value 列。

Pulsar 记录具有以下元数据字段:

类型
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

配置 Pulsar 流式读取选项

对于读取流,请使用 .option("<optionName>", "<optionValue>") 语法配置以下所有选项。 您还可以使用 .options() 配置身份验证。 请参阅向 Pulsar 进行身份验证

下表描述了 Pulsar 的所需配置。 必须仅指定 topictopicstopicsPattern 选项中的一个。

选项 默认值 说明
service.url Pulsar 服务的 Pulsar serviceUrl 配置。
topic 要使用的主题的主题名称字符串。
topics 要消费的主题列表,以逗号分隔。
topicsPattern 与要使用的主题相匹配的 Java 正则表达式字符串。

下表描述了 Pulsar 支持的其他选项:

选项 默认值 说明
predefinedSubscription 连接器用于跟踪 Spark 应用程序进度的预定义订阅名称。
subscriptionPrefix 连接器用于生成随机订阅以跟踪 Spark 应用程序进度的前缀。
pollTimeoutMs 120000 从 Pulsar 读取消息的超时时间(以毫秒为单位)。
waitingForNonExistedTopic false 连接器是否应在创建所需主题之前等待。
failOnDataLoss true 控制数据丢失(例如,删除主题,或者由于保留策略而删除消息)时查询是否失败。
allowDifferentTopicSchemas false 如果读取了具有不同架构的多个主题,请使用此选项关闭基于架构的自动主题值反序列化。 仅当此项为 true 时,才会返回原始值。
startingOffsets latest 如果为 latest,读取器在开始运行后会读取最新记录。 如果为 earliest,读取器将从最早的偏移量开始读取。 还可以为特定偏移量指定 JSON 字符串。
maxBytesPerTrigger 每个微批次要处理的最大字节数的软限制。 如果指定此选项,还必须指定 admin.url
admin.url Pulsar 的 serviceHttpUrl 配置。 指定时 maxBytesPerTrigger 是必需的。

还可以使用以下模式指定任何 Pulsar 客户端、管理员和读取器配置:

模式 配置选项
pulsar.client.* Pulsar 客户端配置
pulsar.admin.* Pulsar 管理员配置
pulsar.reader.* Pulsar 读取器配置

构造起始偏移量 JSON

若要使用以 JSON 格式指定偏移量的自定义消息 ID 并结合 startingOffsets 选项,请参阅以下示例:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()