Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
I Databricks Runtime 14.1 och senare kan du använda Structured Streaming för att strömma data från Apache Pulsar på Azure Databricks.
Structured Streaming ger semantik för exakt en gångs bearbetning för data som läses från Pulsar-källor.
Syntaxexempel
Följande är ett grundläggande exempel på hur du använder Structured Streaming för att läsa från Pulsar:
Python
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
)
Scala
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Om du vill läsa från Pulsar-ämnen måste du ange ett service.url och något av följande alternativ:
topictopicstopicsPattern
En fullständig lista över alternativ finns i Konfigurera alternativ för Pulsar-strömning läs.
Autentisera till Pulsar
Azure Databricks stöder autentisering mot Pulsar med truststore och keystore. Databricks rekommenderar att du använder hemligheter för att lagra konfigurationsinformation.
Bland de tillgängliga konfigurationsalternativen för dataströmmar finns följande:
pulsar.client.authPluginClassNamepulsar.client.authParamspulsar.client.useKeyStoreTlspulsar.client.tlsTrustStoreTypepulsar.client.tlsTrustStorePathpulsar.client.tlsTrustStorePassword
Om strömmen använder en PulsarAdminmåste du ange följande alternativ:
pulsar.admin.authPluginClassNamepulsar.admin.authParams
Example
I följande exempel visas hur du konfigurerar autentiseringsalternativ:
Python
client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")
# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", starting_offsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", client_auth_params)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trust_store_path)
.option("pulsar.client.tlsTrustStorePassword", client_pw)
.load()
)
Scala
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Pulsar-schema
När du läser från Pulsar beror schemat för rader på scheman för källans ämnen.
- För ämnen med Avro- eller JSON-schema bevaras fältnamn och fälttyper i den resulterande Spark DataFrame.
- För ämnen utan schema eller med en enkel datatyp i Pulsar läses nyttolasten in i kolumn
value. - Om du konfigurerar dataströmmen så att den läser flera ämnen med olika scheman anger du
allowDifferentTopicSchemasatt råinnehållet ska läsas in i envaluekolumn.
Pulsar-poster har följande metadatafält:
| Spalt | Typ |
|---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Konfigurera alternativ för strömningsläsning från Pulsar
Konfigurera alla följande alternativ med .option("<optionName>", "<optionValue>") syntax för läsströmmar. Du kan också konfigurera autentisering med ..options() Se Autentisera till Pulsar.
I följande tabell beskrivs nödvändiga konfigurationer för Pulsar. Du måste bara ange ett av alternativen topic, topics eller topicsPattern.
| Alternativ | Standardvärde | Beskrivning |
|---|---|---|
service.url |
inget | Pulsar-konfigurationen serviceUrl för Pulsar-tjänsten. |
topic |
inget | En sträng med ämnesnamnet för ämnet som ska konsumeras. |
topics |
inget | En kommaavgränsad lista över ämnena som ska förbrukas. |
topicsPattern |
inget | En regexsträng i Java för att matcha ämnen som ska konsumeras. |
I följande tabell beskrivs andra alternativ som stöds för Pulsar:
| Alternativ | Standardvärde | Beskrivning |
|---|---|---|
predefinedSubscription |
inget | Det fördefinierade prenumerationsnamnet som används av anslutningen för att spåra Spark-programmets förlopp. |
subscriptionPrefix |
inget | Ett prefix som används av anslutningen för att generera en slumpmässig prenumeration för att spåra spark-programframsteg. |
pollTimeoutMs |
120000 | Tidsgränsen för att läsa meddelanden från Pulsar i millisekunder. |
waitingForNonExistedTopic |
false |
Om anslutningen ska vänta tills de önskade topicsen har skapats. |
failOnDataLoss |
true |
Styr om en fråga ska misslyckas när data går förlorade (till exempel ämnen tas bort eller meddelanden tas bort på grund av kvarhållningsprincip). |
allowDifferentTopicSchemas |
false |
Om du läser flera ämnen med olika scheman kan du använda det här alternativet för att inaktivera automatisk schemabaserad deserialisering av ämnesvärden. Endast råvärdena returneras när det här är true. |
startingOffsets |
latest |
Om latest, läser läsaren de senaste posterna efter att den har startat. Om earliest, läser läsaren från den tidigaste offset. Du kan också ange en JSON-sträng för en specifik förskjutning. |
maxBytesPerTrigger |
inget | En mjuk gräns för det maximala antalet byte som ska bearbetas per mikrobatch. Om du anger det här alternativet måste du också ange admin.url. |
admin.url |
inget | Pulsar-konfigurationen serviceHttpUrl. Krävs när maxBytesPerTrigger anges. |
Du kan också ange pulsar-klient-, administratörs- och läsarkonfigurationer med hjälp av följande mönster:
| Mönster | Konfigurationsalternativ |
|---|---|
pulsar.client.* |
Pulsar-klientkonfiguration |
pulsar.admin.* |
Pulsar-administratörskonfiguration |
pulsar.reader.* |
Pulsar-läsarkonfiguration |
Skapa JSON för startoffsetar
Om du vill använda ett anpassat meddelande-ID som anger en förskjutning, som JSON, med startingOffsets alternativet kan du se följande exempel:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()