Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Belangrijk
Deze functie is beschikbaar als openbare preview.
In Databricks Runtime 14.1 en hoger kunt u Structured Streaming gebruiken om gegevens van Apache Pulsar op Azure Databricks te streamen.
Structured Streaming biedt exact-once-verwerkingssemantiek voor data die uit Pulsar-bronnen wordt gelezen.
Voorbeeld van syntaxis
Hier volgt een eenvoudig voorbeeld van het gebruik van Structured Streaming om te lezen uit Pulsar:
Python
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
)
Scala
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Als u wilt lezen uit Pulsar-onderwerpen, moet u een service.url en een van de volgende opties opgeven:
topictopicstopicsPattern
Zie Opties configureren voor Pulsar-streamingvoor een volledige lijst met opties.
Verifiëren bij Pulsar
Azure Databricks biedt ondersteuning voor truststore- en sleutelopslagverificatie voor Pulsar. Databricks raadt u aan geheimen te gebruiken om configuratiegegevens op te slaan.
De beschikbare opties voor stroomconfiguratie zijn onder andere:
pulsar.client.authPluginClassNamepulsar.client.authParamspulsar.client.useKeyStoreTlspulsar.client.tlsTrustStoreTypepulsar.client.tlsTrustStorePathpulsar.client.tlsTrustStorePassword
Als de stream een PulsarAdmin gebruikt, moet u de volgende opties instellen:
pulsar.admin.authPluginClassNamepulsar.admin.authParams
Example
In het volgende voorbeeld ziet u hoe u verificatieopties configureert:
Python
client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")
# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", starting_offsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", client_auth_params)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trust_store_path)
.option("pulsar.client.tlsTrustStorePassword", client_pw)
.load()
)
Scala
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Pulsar-schema
Wanneer u uit Pulsar leest, is het schema van rijen afhankelijk van de schema's van de onderwerpen van de bron.
- Voor onderwerpen met avro- of JSON-schema blijven veldnamen en veldtypen behouden in het resulterende Spark DataFrame.
- Voor topics zonder schema of met een eenvoudig gegevenstype in Pulsar wordt de payload geladen in een
valuekolom. - Als u de stream configureert om meerdere topics met verschillende schema's te lezen, stelt u
allowDifferentTopicSchemasin om de onbewerkte inhoud naar een kolom van het typevaluete laden.
Pulsar-records hebben de volgende metagegevensvelden:
| Kolom | Type |
|---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Opties configureren voor streaminglezen uit Pulsar
Configureer alle volgende opties met .option("<optionName>", "<optionValue>") syntaxis voor leesstreams. U kunt ook verificatie configureren met behulp van .options(). Zie Verifiëren bij Pulsar.
In de volgende tabel worden de vereiste configuraties voor Pulsar beschreven. U moet slechts één van de opties topicopgeven, topics of topicsPattern.
| Optie | Standaardwaarde | Beschrijving |
|---|---|---|
service.url |
Geen | De Pulsar serviceUrl-configuratie voor de Pulsar-service. |
topic |
Geen | Een tekenreeks met de topicnaam van het te consumeren topic. |
topics |
Geen | Een door komma's gescheiden lijst met onderwerpen om te consumeren. |
topicsPattern |
Geen | Een Java regex-tekenreeks die overeenkomt met onderwerpen die moeten worden gebruikt. |
In de volgende tabel worden andere opties beschreven die worden ondersteund voor Pulsar:
| Optie | Standaardwaarde | Beschrijving |
|---|---|---|
predefinedSubscription |
Geen | De vooraf gedefinieerde abonnementsnaam die door de connector wordt gebruikt om de voortgang van spark-toepassingen bij te houden. |
subscriptionPrefix |
Geen | Een voorvoegsel dat door de connector wordt gebruikt om een willekeurig abonnement te genereren om de voortgang van spark-toepassingen bij te houden. |
pollTimeoutMs |
120.000 | De time-out voor het lezen van berichten van Pulsar in milliseconden. |
waitingForNonExistedTopic |
false |
Of de connector moet wachten totdat de gewenste topics zijn aangemaakt. |
failOnDataLoss |
true |
Hiermee bepaalt u of een query mislukt wanneer gegevens verloren gaan (bijvoorbeeld onderwerpen worden verwijderd of berichten worden verwijderd vanwege bewaarbeleid). |
allowDifferentTopicSchemas |
false |
Als meerdere onderwerpen met verschillende schema's worden gelezen, gebruikt u deze optie om automatische deserialisatie van onderwerpwaarden op basis van schema's uit te schakelen. Alleen de onbewerkte waarden worden geretourneerd wanneer dit wordt true. |
startingOffsets |
latest |
Als latestde lezer de nieuwste records leest nadat deze is gestart. Als earliest, leest de lezer vanaf het vroegst mogelijke offset. U kunt ook een JSON-tekenreeks opgeven voor een specifieke offset. |
maxBytesPerTrigger |
Geen | Een zachte limiet voor het maximum aantal bytes dat per microbatch moet worden verwerkt. Als u deze optie opgeeft, moet u ook opgeven admin.url. |
admin.url |
Geen | De Pulsar serviceHttpUrl-configuratie. Vereist wanneer maxBytesPerTrigger is opgegeven. |
U kunt ook configuraties voor Pulsar-clients, beheerders en lezers opgeven met behulp van de volgende patronen:
| Patroon | Configuratieopties |
|---|---|
pulsar.client.* |
Pulsar-clientconfiguratie |
pulsar.admin.* |
Configuratie van Pulsar-beheerder |
pulsar.reader.* |
Configuratie van Pulsar-lezer |
JSON met beginoffsets samenstellen
Als u een aangepaste bericht-id wilt gebruiken die een offset opgeeft, zoals JSON, met de startingOffsets optie, raadpleegt u het volgende voorbeeld:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()