Strömma från Apache Pulsar

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion.

I Databricks Runtime 14.1 och senare kan du använda Structured Streaming för att strömma data från Apache Pulsar på Azure Databricks.

Structured Streaming ger semantik för exakt en gångs bearbetning för data som läses från Pulsar-källor.

Syntaxexempel

Följande är ett grundläggande exempel på hur du använder Structured Streaming för att läsa från Pulsar:

Python

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()
)

Scala

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Om du vill läsa från Pulsar-ämnen måste du ange ett service.url och något av följande alternativ:

  • topic
  • topics
  • topicsPattern

En fullständig lista över alternativ finns i Konfigurera alternativ för Pulsar-strömning läs.

Autentisera till Pulsar

Azure Databricks stöder autentisering mot Pulsar med truststore och keystore. Databricks rekommenderar att du använder hemligheter för att lagra konfigurationsinformation.

Bland de tillgängliga konfigurationsalternativen för dataströmmar finns följande:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Om strömmen använder en PulsarAdminmåste du ange följande alternativ:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

Example

I följande exempel visas hur du konfigurerar autentiseringsalternativ:

Python

client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")

# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", starting_offsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", client_auth_params)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trust_store_path)
  .option("pulsar.client.tlsTrustStorePassword", client_pw)
  .load()
)

Scala

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulsar-schema

När du läser från Pulsar beror schemat för rader på scheman för källans ämnen.

  • För ämnen med Avro- eller JSON-schema bevaras fältnamn och fälttyper i den resulterande Spark DataFrame.
  • För ämnen utan schema eller med en enkel datatyp i Pulsar läses nyttolasten in i kolumn value.
  • Om du konfigurerar dataströmmen så att den läser flera ämnen med olika scheman anger du allowDifferentTopicSchemas att råinnehållet ska läsas in i en value kolumn.

Pulsar-poster har följande metadatafält:

Spalt Typ
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Konfigurera alternativ för strömningsläsning från Pulsar

Konfigurera alla följande alternativ med .option("<optionName>", "<optionValue>") syntax för läsströmmar. Du kan också konfigurera autentisering med ..options() Se Autentisera till Pulsar.

I följande tabell beskrivs nödvändiga konfigurationer för Pulsar. Du måste bara ange ett av alternativen topic, topics eller topicsPattern.

Alternativ Standardvärde Beskrivning
service.url inget Pulsar-konfigurationen serviceUrl för Pulsar-tjänsten.
topic inget En sträng med ämnesnamnet för ämnet som ska konsumeras.
topics inget En kommaavgränsad lista över ämnena som ska förbrukas.
topicsPattern inget En regexsträng i Java för att matcha ämnen som ska konsumeras.

I följande tabell beskrivs andra alternativ som stöds för Pulsar:

Alternativ Standardvärde Beskrivning
predefinedSubscription inget Det fördefinierade prenumerationsnamnet som används av anslutningen för att spåra Spark-programmets förlopp.
subscriptionPrefix inget Ett prefix som används av anslutningen för att generera en slumpmässig prenumeration för att spåra spark-programframsteg.
pollTimeoutMs 120000 Tidsgränsen för att läsa meddelanden från Pulsar i millisekunder.
waitingForNonExistedTopic false Om anslutningen ska vänta tills de önskade topicsen har skapats.
failOnDataLoss true Styr om en fråga ska misslyckas när data går förlorade (till exempel ämnen tas bort eller meddelanden tas bort på grund av kvarhållningsprincip).
allowDifferentTopicSchemas false Om du läser flera ämnen med olika scheman kan du använda det här alternativet för att inaktivera automatisk schemabaserad deserialisering av ämnesvärden. Endast råvärdena returneras när det här är true.
startingOffsets latest Om latest, läser läsaren de senaste posterna efter att den har startat. Om earliest, läser läsaren från den tidigaste offset. Du kan också ange en JSON-sträng för en specifik förskjutning.
maxBytesPerTrigger inget En mjuk gräns för det maximala antalet byte som ska bearbetas per mikrobatch. Om du anger det här alternativet måste du också ange admin.url.
admin.url inget Pulsar-konfigurationen serviceHttpUrl. Krävs när maxBytesPerTrigger anges.

Du kan också ange pulsar-klient-, administratörs- och läsarkonfigurationer med hjälp av följande mönster:

Mönster Konfigurationsalternativ
pulsar.client.* Pulsar-klientkonfiguration
pulsar.admin.* Pulsar-administratörskonfiguration
pulsar.reader.* Pulsar-läsarkonfiguration

Skapa JSON för startoffsetar

Om du vill använda ett anpassat meddelande-ID som anger en förskjutning, som JSON, med startingOffsets alternativet kan du se följande exempel:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()