Iscriviti a Google Pub/Sub

Usare il connettore predefinito per sottoscrivere Google Pub/Sub. Questo connettore ha una semantica di elaborazione una e una sola volta per le righe provenienti dal sottoscrittore.

Nota

Pub/Sub potrebbe pubblicare righe duplicate oppure le righe potrebbero arrivare al sottoscrittore non in ordine. È necessario scrivere codice per gestire righe duplicate e non ordinate.

Configurare un flusso Pub/Sub

L'esempio di codice seguente illustra come configurare un flusso strutturato letto da Pub/Sub ed eseguire l'autenticazione con chiavi private.

Python

auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
}

query = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(auth_options)
  .load()
)

Scala

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // Creates a Pub/Sub subscription if one does not already exist with this ID
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(authOptions)
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'mysub',
  projectId => 'myproject',
  topicId => 'mytopic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

Per altre opzioni di configurazione, vedere Configurare le opzioni per la lettura in streaming pub/sub.

Configurare l'accesso a Pub/Sub

Le credenziali devono avere i ruoli seguenti:

Ruoli Obbligatorio o facoltativo Modalità di utilizzo del ruolo
roles/pubsub.viewer oppure roles/viewer Richiesto Controlla se la sottoscrizione esiste e ottiene la sottoscrizione.
roles/pubsub.subscriber Richiesto Recupera i dati da una sottoscrizione.
roles/pubsub.editor oppure roles/editor Facoltativo Abilita la creazione di una sottoscrizione se non esiste e consente l'uso deleteSubscriptionOnStreamStop di per eliminare le sottoscrizioni alla terminazione del flusso.

Databricks consiglia di usare segreti quando si usano chiavi. Per autorizzare una connessione sono necessarie le opzioni seguenti:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Informazioni sullo schema Pub/Sub

Lo schema del flusso corrisponde alle righe ottenute da Pub/Sub, come descritto nella tabella seguente:

Campo TIPO
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Configurare le opzioni per la lettura in streaming Pub/Sub

Nella tabella seguente vengono descritte le opzioni supportate per Pub/Sub. Tutte le opzioni sono configurate con .option("<optionName>", "<optionValue>") nel lettore di flusso.

Nota

Alcune opzioni di configurazione Pub/Sub utilizzano il concetto di fetch invece di micro-batch. Si tratta di un dettaglio di implementazione interno e le opzioni funzionano in modo analogo ad altri connettori Structured Streaming, ad eccezione del fatto che le righe vengono recuperate e quindi elaborate.

Chiave Valore predefinito Descrizione
numFetchPartitions Impostare su metà del numero di esecutori presenti all'inizializzazione del flusso. Numero di attività Spark parallele che recuperano righe da una sottoscrizione.
deleteSubscriptionOnStreamStop false Se true, la sottoscrizione passata al flusso viene eliminata al termine del processo di streaming.
maxBytesPerTrigger none Un limite flessibile per la dimensione del batch da elaborare durante ogni micro-batch attivato.
maxRecordsPerFetch 1000 Numero di righe da recuperare per ogni attività prima di elaborarle.
maxFetchPeriod 10s Intervallo di tempo per ogni attività dedicato al recupero prima di elaborare le righe. Accetta una stringa di durata, 1s ad esempio per 1 secondo o 1m per 1 minuto. Databricks consiglia di usare il valore predefinito.

Usare l'elaborazione batch incrementale con Pub/Sub

È possibile usare Trigger.AvailableNow per utilizzare le righe disponibili dalle origini Pub/Sub come batch incrementale.

Azure Databricks registra il timestamp quando si inizia una lettura con l'impostazione Trigger.AvailableNow . Le righe elaborate dal batch includono tutti i dati recuperati in precedenza e tutte le righe appena pubblicate con un timestamp minore del timestamp di inizio registrato. Per altre informazioni, vedere AvailableNow: Elaborazione batch incrementale.

Monitorare le metriche di streaming pub/sub

Le metriche di stato di Structured Streaming segnalano il numero di righe recuperate e pronte per l'elaborazione, le dimensioni delle righe recuperate e pronte per l'elaborazione e il numero di duplicati rilevati dall'avvio del flusso.

Di seguito è riportato un esempio di metriche Pub/Sub:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Limiti

Pub/Sub non supporta l'esecuzione speculativa con spark.speculation.