Abonneren op Google Pub/Sub

Gebruik de ingebouwde connector om u te abonneren op Google Pub/Sub. Deze connector heeft exact-once-verwerkingssemantiek voor rijen afkomstig van de abonnee.

Notitie

Pub/Sub levert mogelijk dubbele rijen, of rijen kunnen in een andere volgorde bij de abonnee aankomen. U moet code schrijven om dubbele en verouderde rijen af te handelen.

Een Pub/Substream configureren

In het volgende codevoorbeeld ziet u hoe u een Structured Streaming-leesbewerking configureert vanuit Pub/Sub en verifieert met persoonlijke sleutels.

Python

auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
}

query = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(auth_options)
  .load()
)

Scala

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // Creates a Pub/Sub subscription if one does not already exist with this ID
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(authOptions)
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'mysub',
  projectId => 'myproject',
  topicId => 'mytopic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

Voor meer configuratieopties, zie Opties configureren voor Pub/Sub-streamingleesbewerkingen.

Toegang tot Pub/Sub configureren

Uw inloggegevens moeten de volgende rollen hebben:

Rollen Vereist of optioneel Hoe rol wordt gebruikt
roles/pubsub.viewer of roles/viewer Vereist Controleert of het abonnement bestaat en het abonnement ophaalt.
roles/pubsub.subscriber Vereist Hiermee worden gegevens opgehaald uit een abonnement.
roles/pubsub.editor of roles/editor Optioneel Hiermee kunt u een abonnement aanmaken als er nog geen bestaat en kunt u het deleteSubscriptionOnStreamStop gebruiken om abonnementen te verwijderen bij het beƫindigen van een stream.

Databricks raadt u aan geheimen te gebruiken bij het gebruik van sleutels. De volgende opties zijn vereist om een verbinding te autoriseren:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Inzicht in het pub-/subschema

Het schema voor de stream komt overeen met de rijen die worden opgehaald uit Pub/Sub, zoals beschreven in de volgende tabel:

Veld Typologie
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Opties configureren voor Pub/Sub-streaminglezen

In de volgende tabel worden de opties beschreven die worden ondersteund voor Pub/Sub. Alle opties zijn geconfigureerd met .option("<optionName>", "<optionValue>") op uw streamlezer.

Notitie

Bepaalde Pub/Sub-configuratieopties gebruiken het concept van ophalen in plaats van microbatches. Dit is een intern implementatiedetail en de opties werken vergelijkbaar met andere Structured Streaming-connectors, behalve dat rijen worden opgehaald en vervolgens verwerkt.

Key Standaardwaarde Beschrijving
numFetchPartitions Ingesteld op de helft van het aantal uitvoerders dat aanwezig is bij de initialisatie van de stream. Het aantal parallelle Spark-taken waarmee rijen uit een abonnement worden opgehaald.
deleteSubscriptionOnStreamStop false Als true wordt voldaan, wordt het abonnement dat aan de stream wordt gekoppeld verwijderd wanneer de streamingjob eindigt.
maxBytesPerTrigger none Een zachte limiet voor de batch-grootte die moet worden verwerkt tijdens elke getriggerde microbatch.
maxRecordsPerFetch 1000 Het aantal rijen dat per taak moet worden opgehaald voordat rijen worden verwerkt.
maxFetchPeriod 10s De tijdsduur die elke taak besteedt aan het ophalen voordat rijen worden verwerkt. Accepteert een duurtekenreeks, bijvoorbeeld 1s voor 1 seconde of 1m voor 1 minuut. Databricks raadt aan de standaardwaarde te gebruiken.

Incrementele batchverwerking gebruiken met Pub/Sub

U kunt Trigger.AvailableNow gebruiken om beschikbare rijen uit de Pub/Sub-bronnen te verwerken als een incrementele batch.

Azure Databricks registreert de tijdstempel wanneer u begint met lezen met de Trigger.AvailableNow instelling. Rijen die door de batch worden verwerkt, bevatten alle eerder opgehaalde gegevens en nieuwe gepubliceerde rijen met een tijdstempel die kleiner is dan de vastgelegde begintijdstempel. Zie AvailableNowvoor meer informatie: Incrementele batchverwerking.

Pub/Sub-streamingmetrieken bewaken

Metrische gegevens over de voortgang van gestructureerd streamen rapporteren het aantal rijen dat is opgehaald en gereed is voor verwerking, de grootte van de opgehaalde en gereed voor verwerking van rijen en het aantal duplicaten dat is gezien sinds het begin van de stream.

Hier volgt een voorbeeld van metrische gegevens van Pub/Sub:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Beperkingen

Pub/Sub biedt geen ondersteuning voor speculatieve uitvoering met spark.speculation.