Streamen van Apache Pulsar

Belangrijk

Deze functie is beschikbaar als openbare preview.

In Databricks Runtime 14.1 en hoger kunt u Structured Streaming gebruiken om gegevens van Apache Pulsar op Azure Databricks te streamen.

Structured Streaming biedt exact-once-verwerkingssemantiek voor data die uit Pulsar-bronnen wordt gelezen.

Voorbeeld van syntaxis

Hier volgt een eenvoudig voorbeeld van het gebruik van Structured Streaming om te lezen uit Pulsar:

Python

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()
)

Scala

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Als u wilt lezen uit Pulsar-onderwerpen, moet u een service.url en een van de volgende opties opgeven:

  • topic
  • topics
  • topicsPattern

Zie Opties configureren voor Pulsar-streamingvoor een volledige lijst met opties.

Verifiëren bij Pulsar

Azure Databricks biedt ondersteuning voor truststore- en sleutelopslagverificatie voor Pulsar. Databricks raadt u aan geheimen te gebruiken om configuratiegegevens op te slaan.

De beschikbare opties voor stroomconfiguratie zijn onder andere:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Als de stream een PulsarAdmin gebruikt, moet u de volgende opties instellen:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

Example

In het volgende voorbeeld ziet u hoe u verificatieopties configureert:

Python

client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")

# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", starting_offsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", client_auth_params)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trust_store_path)
  .option("pulsar.client.tlsTrustStorePassword", client_pw)
  .load()
)

Scala

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulsar-schema

Wanneer u uit Pulsar leest, is het schema van rijen afhankelijk van de schema's van de onderwerpen van de bron.

  • Voor onderwerpen met avro- of JSON-schema blijven veldnamen en veldtypen behouden in het resulterende Spark DataFrame.
  • Voor topics zonder schema of met een eenvoudig gegevenstype in Pulsar wordt de payload geladen in een value kolom.
  • Als u de stream configureert om meerdere topics met verschillende schema's te lezen, stelt u allowDifferentTopicSchemas in om de onbewerkte inhoud naar een kolom van het type value te laden.

Pulsar-records hebben de volgende metagegevensvelden:

Kolom Type
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Opties configureren voor streaminglezen uit Pulsar

Configureer alle volgende opties met .option("<optionName>", "<optionValue>") syntaxis voor leesstreams. U kunt ook verificatie configureren met behulp van .options(). Zie Verifiëren bij Pulsar.

In de volgende tabel worden de vereiste configuraties voor Pulsar beschreven. U moet slechts één van de opties topicopgeven, topics of topicsPattern.

Optie Standaardwaarde Beschrijving
service.url Geen De Pulsar serviceUrl-configuratie voor de Pulsar-service.
topic Geen Een tekenreeks met de topicnaam van het te consumeren topic.
topics Geen Een door komma's gescheiden lijst met onderwerpen om te consumeren.
topicsPattern Geen Een Java regex-tekenreeks die overeenkomt met onderwerpen die moeten worden gebruikt.

In de volgende tabel worden andere opties beschreven die worden ondersteund voor Pulsar:

Optie Standaardwaarde Beschrijving
predefinedSubscription Geen De vooraf gedefinieerde abonnementsnaam die door de connector wordt gebruikt om de voortgang van spark-toepassingen bij te houden.
subscriptionPrefix Geen Een voorvoegsel dat door de connector wordt gebruikt om een willekeurig abonnement te genereren om de voortgang van spark-toepassingen bij te houden.
pollTimeoutMs 120.000 De time-out voor het lezen van berichten van Pulsar in milliseconden.
waitingForNonExistedTopic false Of de connector moet wachten totdat de gewenste topics zijn aangemaakt.
failOnDataLoss true Hiermee bepaalt u of een query mislukt wanneer gegevens verloren gaan (bijvoorbeeld onderwerpen worden verwijderd of berichten worden verwijderd vanwege bewaarbeleid).
allowDifferentTopicSchemas false Als meerdere onderwerpen met verschillende schema's worden gelezen, gebruikt u deze optie om automatische deserialisatie van onderwerpwaarden op basis van schema's uit te schakelen. Alleen de onbewerkte waarden worden geretourneerd wanneer dit wordt true.
startingOffsets latest Als latestde lezer de nieuwste records leest nadat deze is gestart. Als earliest, leest de lezer vanaf het vroegst mogelijke offset. U kunt ook een JSON-tekenreeks opgeven voor een specifieke offset.
maxBytesPerTrigger Geen Een zachte limiet voor het maximum aantal bytes dat per microbatch moet worden verwerkt. Als u deze optie opgeeft, moet u ook opgeven admin.url.
admin.url Geen De Pulsar serviceHttpUrl-configuratie. Vereist wanneer maxBytesPerTrigger is opgegeven.

U kunt ook configuraties voor Pulsar-clients, beheerders en lezers opgeven met behulp van de volgende patronen:

Patroon Configuratieopties
pulsar.client.* Pulsar-clientconfiguratie
pulsar.admin.* Configuratie van Pulsar-beheerder
pulsar.reader.* Configuratie van Pulsar-lezer

JSON met beginoffsets samenstellen

Als u een aangepaste bericht-id wilt gebruiken die een offset opgeeft, zoals JSON, met de startingOffsets optie, raadpleegt u het volgende voorbeeld:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()