Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Gebruik de ingebouwde connector om u te abonneren op Google Pub/Sub. Deze connector heeft exact-once-verwerkingssemantiek voor rijen afkomstig van de abonnee.
Notitie
Pub/Sub levert mogelijk dubbele rijen, of rijen kunnen in een andere volgorde bij de abonnee aankomen. U moet code schrijven om dubbele en verouderde rijen af te handelen.
Een Pub/Substream configureren
In het volgende codevoorbeeld ziet u hoe u een Structured Streaming-leesbewerking configureert vanuit Pub/Sub en verifieert met persoonlijke sleutels.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Voor meer configuratieopties, zie Opties configureren voor Pub/Sub-streamingleesbewerkingen.
Toegang tot Pub/Sub configureren
Uw inloggegevens moeten de volgende rollen hebben:
| Rollen | Vereist of optioneel | Hoe rol wordt gebruikt |
|---|---|---|
roles/pubsub.viewer of roles/viewer |
Vereist | Controleert of het abonnement bestaat en het abonnement ophaalt. |
roles/pubsub.subscriber |
Vereist | Hiermee worden gegevens opgehaald uit een abonnement. |
roles/pubsub.editor of roles/editor |
Optioneel | Hiermee kunt u een abonnement aanmaken als er nog geen bestaat en kunt u het deleteSubscriptionOnStreamStop gebruiken om abonnementen te verwijderen bij het beƫindigen van een stream. |
Databricks raadt u aan geheimen te gebruiken bij het gebruik van sleutels. De volgende opties zijn vereist om een verbinding te autoriseren:
clientEmailclientIdprivateKeyprivateKeyId
Inzicht in het pub-/subschema
Het schema voor de stream komt overeen met de rijen die worden opgehaald uit Pub/Sub, zoals beschreven in de volgende tabel:
| Veld | Typologie |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Opties configureren voor Pub/Sub-streaminglezen
In de volgende tabel worden de opties beschreven die worden ondersteund voor Pub/Sub. Alle opties zijn geconfigureerd met .option("<optionName>", "<optionValue>") op uw streamlezer.
Notitie
Bepaalde Pub/Sub-configuratieopties gebruiken het concept van ophalen in plaats van microbatches. Dit is een intern implementatiedetail en de opties werken vergelijkbaar met andere Structured Streaming-connectors, behalve dat rijen worden opgehaald en vervolgens verwerkt.
| Key | Standaardwaarde | Beschrijving |
|---|---|---|
numFetchPartitions |
Ingesteld op de helft van het aantal uitvoerders dat aanwezig is bij de initialisatie van de stream. | Het aantal parallelle Spark-taken waarmee rijen uit een abonnement worden opgehaald. |
deleteSubscriptionOnStreamStop |
false |
Als true wordt voldaan, wordt het abonnement dat aan de stream wordt gekoppeld verwijderd wanneer de streamingjob eindigt. |
maxBytesPerTrigger |
none |
Een zachte limiet voor de batch-grootte die moet worden verwerkt tijdens elke getriggerde microbatch. |
maxRecordsPerFetch |
1000 |
Het aantal rijen dat per taak moet worden opgehaald voordat rijen worden verwerkt. |
maxFetchPeriod |
10s |
De tijdsduur die elke taak besteedt aan het ophalen voordat rijen worden verwerkt. Accepteert een duurtekenreeks, bijvoorbeeld 1s voor 1 seconde of 1m voor 1 minuut. Databricks raadt aan de standaardwaarde te gebruiken. |
Incrementele batchverwerking gebruiken met Pub/Sub
U kunt Trigger.AvailableNow gebruiken om beschikbare rijen uit de Pub/Sub-bronnen te verwerken als een incrementele batch.
Azure Databricks registreert de tijdstempel wanneer u begint met lezen met de Trigger.AvailableNow instelling. Rijen die door de batch worden verwerkt, bevatten alle eerder opgehaalde gegevens en nieuwe gepubliceerde rijen met een tijdstempel die kleiner is dan de vastgelegde begintijdstempel. Zie AvailableNowvoor meer informatie: Incrementele batchverwerking.
Pub/Sub-streamingmetrieken bewaken
Metrische gegevens over de voortgang van gestructureerd streamen rapporteren het aantal rijen dat is opgehaald en gereed is voor verwerking, de grootte van de opgehaalde en gereed voor verwerking van rijen en het aantal duplicaten dat is gezien sinds het begin van de stream.
Hier volgt een voorbeeld van metrische gegevens van Pub/Sub:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Beperkingen
Pub/Sub biedt geen ondersteuning voor speculatieve uitvoering met spark.speculation.