Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Usare il connettore predefinito per sottoscrivere Google Pub/Sub. Questo connettore ha una semantica di elaborazione una e una sola volta per le righe provenienti dal sottoscrittore.
Nota
Pub/Sub potrebbe pubblicare righe duplicate oppure le righe potrebbero arrivare al sottoscrittore non in ordine. È necessario scrivere codice per gestire righe duplicate e non ordinate.
Configurare un flusso Pub/Sub
L'esempio di codice seguente illustra come configurare un flusso strutturato letto da Pub/Sub ed eseguire l'autenticazione con chiavi private.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Per altre opzioni di configurazione, vedere Configurare le opzioni per la lettura in streaming pub/sub.
Configurare l'accesso a Pub/Sub
Le credenziali devono avere i ruoli seguenti:
| Ruoli | Obbligatorio o facoltativo | Modalità di utilizzo del ruolo |
|---|---|---|
roles/pubsub.viewer oppure roles/viewer |
Richiesto | Controlla se la sottoscrizione esiste e ottiene la sottoscrizione. |
roles/pubsub.subscriber |
Richiesto | Recupera i dati da una sottoscrizione. |
roles/pubsub.editor oppure roles/editor |
Facoltativo | Abilita la creazione di una sottoscrizione se non esiste e consente l'uso deleteSubscriptionOnStreamStop di per eliminare le sottoscrizioni alla terminazione del flusso. |
Databricks consiglia di usare segreti quando si usano chiavi. Per autorizzare una connessione sono necessarie le opzioni seguenti:
clientEmailclientIdprivateKeyprivateKeyId
Informazioni sullo schema Pub/Sub
Lo schema del flusso corrisponde alle righe ottenute da Pub/Sub, come descritto nella tabella seguente:
| Campo | TIPO |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Configurare le opzioni per la lettura in streaming Pub/Sub
Nella tabella seguente vengono descritte le opzioni supportate per Pub/Sub. Tutte le opzioni sono configurate con .option("<optionName>", "<optionValue>") nel lettore di flusso.
Nota
Alcune opzioni di configurazione Pub/Sub utilizzano il concetto di fetch invece di micro-batch. Si tratta di un dettaglio di implementazione interno e le opzioni funzionano in modo analogo ad altri connettori Structured Streaming, ad eccezione del fatto che le righe vengono recuperate e quindi elaborate.
| Chiave | Valore predefinito | Descrizione |
|---|---|---|
numFetchPartitions |
Impostare su metà del numero di esecutori presenti all'inizializzazione del flusso. | Numero di attività Spark parallele che recuperano righe da una sottoscrizione. |
deleteSubscriptionOnStreamStop |
false |
Se true, la sottoscrizione passata al flusso viene eliminata al termine del processo di streaming. |
maxBytesPerTrigger |
none |
Un limite flessibile per la dimensione del batch da elaborare durante ogni micro-batch attivato. |
maxRecordsPerFetch |
1000 |
Numero di righe da recuperare per ogni attività prima di elaborarle. |
maxFetchPeriod |
10s |
Intervallo di tempo per ogni attività dedicato al recupero prima di elaborare le righe. Accetta una stringa di durata, 1s ad esempio per 1 secondo o 1m per 1 minuto. Databricks consiglia di usare il valore predefinito. |
Usare l'elaborazione batch incrementale con Pub/Sub
È possibile usare Trigger.AvailableNow per utilizzare le righe disponibili dalle origini Pub/Sub come batch incrementale.
Azure Databricks registra il timestamp quando si inizia una lettura con l'impostazione Trigger.AvailableNow . Le righe elaborate dal batch includono tutti i dati recuperati in precedenza e tutte le righe appena pubblicate con un timestamp minore del timestamp di inizio registrato. Per altre informazioni, vedere AvailableNow: Elaborazione batch incrementale.
Monitorare le metriche di streaming pub/sub
Le metriche di stato di Structured Streaming segnalano il numero di righe recuperate e pronte per l'elaborazione, le dimensioni delle righe recuperate e pronte per l'elaborazione e il numero di duplicati rilevati dall'avvio del flusso.
Di seguito è riportato un esempio di metriche Pub/Sub:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limiti
Pub/Sub non supporta l'esecuzione speculativa con spark.speculation.