Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Use el conector integrado para suscribirse a Google Pub/Sub. Este conector tiene una semántica de procesamiento exactamente una vez para las filas del suscriptor.
Nota:
Pub/Sub podría publicar filas duplicadas, o estas podrían llegar desordenadas al suscriptor. Debe escribir código para gestionar las filas duplicadas y fuera de orden.
Configurar una secuencia Pub/Sub
En el ejemplo de código siguiente se muestra cómo configurar una lectura de Structured Streaming desde Pub/Sub y autenticarse con claves privadas.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Para obtener más opciones de configuración, consulte Configurar opciones para la lectura de streaming de Pub/Sub.
Configurar acceso a Pub/Sub
Las credenciales deben tener asignadas las siguientes funciones:
| Funciones | Requerido u Opcional | Cómo se usa el rol |
|---|---|---|
roles/pubsub.viewer o roles/viewer |
Obligatorio | Comprueba si la suscripción existe y obtiene la suscripción. |
roles/pubsub.subscriber |
Obligatorio | Captura datos de una suscripción. |
roles/pubsub.editor o roles/editor |
Opcionales | Habilita la creación de una suscripción si no existe y permite el uso de deleteSubscriptionOnStreamStop para eliminar suscripciones en la terminación del flujo. |
Databricks recomienda usar secretos al usar claves. Se requieren las siguientes opciones para autorizar una conexión:
clientEmailclientIdprivateKeyprivateKeyId
Descripción del esquema Pub/Sub
El esquema del flujo coincide con las filas que se obtienen de Pub/Sub, como se describe en la tabla siguiente:
| Campo | Tipo |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Configurar opciones para la lectura de streaming de Pub/Sub
En la tabla siguiente se describen las opciones admitidas para Pub/Sub. Todas las opciones se configuran con .option("<optionName>", "<optionValue>") en el lector de secuencias.
Nota:
Algunas opciones de configuración de Pub/Sub usan el concepto de capturas en lugar de microlotes. Se trata de un detalle de implementación interno y las opciones funcionan de forma similar a otros conectores de Structured Streaming, excepto que las filas se capturan y, a continuación, se procesan.
| Clave | Valor predeterminado | Descripción |
|---|---|---|
numFetchPartitions |
Se establece en la mitad del número de ejecutores presentes en la inicialización de la transmisión. | El número de tareas paralelas de Spark que recuperan filas de una suscripción. |
deleteSubscriptionOnStreamStop |
false |
Si true, la suscripción pasada al flujo se elimina cuando finaliza el trabajo de streaming. |
maxBytesPerTrigger |
none |
Límite flexible para el tamaño del lote que se va a procesar durante cada microlote desencadenado. |
maxRecordsPerFetch |
1000 |
Número de filas que se deben obtener por tarea antes de procesar las filas. |
maxFetchPeriod |
10s |
Tiempo que cada tarea debe emplear para recuperar datos antes de procesar las filas. Acepta una cadena de duración, por ejemplo, 1s durante 1 segundo o 1m durante 1 minuto. Databricks recomienda usar el valor predeterminado. |
Uso del procesamiento por lotes incremental con Pub/Sub
Puede utilizar Trigger.AvailableNow para consumir las filas disponibles de las fuentes de Pub/Sub en forma de lote incremental.
Azure Databricks registra la marca de tiempo cuando se inicia una lectura con el valor Trigger.AvailableNow. Las filas procesadas por el lote incluyen todos los datos capturados anteriormente y todas las filas recién publicadas con una marca de tiempo menor que la marca de tiempo de inicio registrada. Para obtener más información, vea AvailableNow: Procesamiento por lotes incremental.
Supervisión de métricas de streaming de Pub/Sub
Las métricas de progreso de Structured Streaming notifican el número de filas capturadas y listas para procesarse, el tamaño de las filas capturadas y listas para procesarse, y el número de duplicados vistos desde el inicio de la secuencia.
A continuación se muestra un ejemplo de métricas Pub/Sub:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limitaciones
Pub/Sub no admite la ejecución especulativa con spark.speculation.