Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Use o conector incorporado para subscrever o Google Pub/Sub. Este conector tem semântica de processamento exatamente uma vez para linhas provenientes do subscritor.
Nota
Pub/Sub pode publicar linhas duplicadas, ou linhas podem chegar ao assinante fora de ordem. Tens de escrever código para lidar com linhas duplicadas e fora de ordem.
Configurar um stream Pub/Sub
O exemplo de código seguinte mostra como configurar uma leitura de Streaming Estruturado a partir de Pub/Sub e autenticar com chaves privadas.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Para obter mais opções de configuração, consulte Configurar opções de leitura de streaming do Pub/Sub.
Configurar o acesso a Pub/Sub
As suas credenciais devem ter as seguintes funções:
| Funções | Obrigatório ou opcional | Como o papel é utilizado |
|---|---|---|
roles/pubsub.viewer ou roles/viewer |
Necessário | Verifica se existe subscrição e recebe subscrição. |
roles/pubsub.subscriber |
Necessário | Recolhe dados de uma subscrição. |
roles/pubsub.editor ou roles/editor |
Opcional | Permite a criação de uma subscrição caso não exista e permite o uso de deleteSubscriptionOnStreamStop para eliminar subscrições na terminação do fluxo. |
Nota
Se atribuir roles/pubsub.viewer e roles/pubsub.subscriber ao nível do recurso, em vez de ao nível do projeto, tem de aplicar ambas as funções tanto ao tópico como à subscrição. Se não usares as funções opcionais roles/pubsub.editor ou roles/editor, atribuir as funções obrigatórias apenas no tópico não é suficiente.
O Databricks recomenda que uses segredos ao usar chaves. As seguintes opções são necessárias para autorizar uma conexão:
clientEmailclientIdprivateKeyprivateKeyId
Compreenda o esquema Pub/Sub
O esquema do fluxo corresponde às linhas obtidas a partir do Pub/Sub, conforme descrito na tabela seguinte:
| Campo | Tipo |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Configurar opções para leitura de streaming Pub/Sub
Algumas opções de configuração Pub/Sub usam o conceito de buscas em vez de microlotes. Este é um detalhe interno de implementação, e as opções funcionam de forma semelhante a outros conectores de Structured Streaming, exceto que as linhas são recolhidas e depois processadas.
Para a lista completa de opções, veja Pub/Sub.
Use processamento incremental em lotes com Pub/Sub
Podes usar Trigger.AvailableNow para consumir linhas disponíveis das fontes Pub/Sub como um lote incremental.
O Azure Databricks registra o carimbo de data/hora quando você inicia uma leitura com a Trigger.AvailableNow configuração. As linhas processadas pelo lote incluem todos os dados obtidos anteriormente e quaisquer linhas publicadas entretanto com uma marca temporal inferior à marca temporal de início registada. Para mais informações, vejaAvailableNow: Processamento incremental em lote.
Monitorizar as métricas de streaming do Pub/Sub
As métricas de progresso do Structured Streaming reportam o número de linhas recolhidas e prontas a processar, o tamanho das linhas recolhidas e prontas a processar, e o número de duplicados vistos desde o início do fluxo.
Segue-se um exemplo de métricas Pub/Sub:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limitações
Pub/Sub não suporta execução especulativa com spark.speculation.