Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Puede usar operaciones de DataFrame o funciones de valores de tabla SQL para consultar datos de estado y metadatos de Structured Streaming. Utiliza estas funciones para observar la información de estado en las consultas con estado en Structured Streaming, lo que puede ser útil para la supervisión y depuración.
Debe tener acceso de lectura a la ruta del punto de control de una consulta de transmisión para poder acceder a los datos de estado o a los metadatos. Las funciones descritas en este artículo proporcionan acceso de solo lectura a los datos de estado y a los metadatos. Solo puede usar la semántica de lectura por lotes para consultar la información de estado.
Nota:
No se puede consultar la información de estado de las canalizaciones declarativas de Spark de Lakeflow, de las tablas de streaming o de las vistas materializadas. No se puede consultar la información de estado mediante proceso sin servidor o proceso configurado con el modo de acceso estándar.
Requisitos
- Use una de las siguientes configuraciones de proceso:
- Databricks Runtime 16.3 y versiones posteriores en instancias de cálculo configuradas con el modo de acceso estándar.
- Databricks Runtime 14.3 LTS y versiones posteriores en entornos de cómputo configurados con modo de acceso dedicado o sin aislamiento.
- Acceso de lectura a la ruta del punto de control usada por la consulta de streaming.
Lectura del almacén de estado de Structured Streaming
Puede leer la información del almacén de estado de las consultas de Structured Streaming ejecutadas en cualquier Databricks Runtime compatible. Use la sintaxis siguiente:
Pitón
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
Scala
val df = spark.read
.format("statestore")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Opciones y esquema de la API del lector de estado
Para obtener una lista completa de las opciones de statestore formato, consulte Almacén de estados.
Los datos de salida tienen el esquema siguiente:
| Columna | Tipo | Descripción |
|---|---|---|
key |
Estructura (tipo adicional derivado de la clave de estado) | Clave de un registro de operador con estado en el punto de control de estado. |
value |
Estructura (tipo adicional derivado del valor de estado) | El valor de un registro de operador con estado en el punto de control de estado. |
partition_id |
Entero | La partición del punto de control de estado que contiene el registro de operador con estado. |
En Databricks Runtime 16.4 LTS y versiones posteriores, cuando la opción readChangeFeed se establece en true, los datos de salida tienen el siguiente esquema:
| Columna | Tipo | Descripción |
|---|---|---|
batch_id |
Long | Identificador de lote al que pertenece el cambio de estado. |
change_type |
Cadena | Tipo de cambio aplicado por el lote: update para inserciones y actualizaciones, delete para eliminaciones. |
key |
Estructura (tipo adicional derivado de la clave de estado) | Clave de un registro de operador con estado en el punto de control de estado. |
value |
Estructura (tipo adicional derivado del valor de estado) | El valor de un registro de operador con estado en el punto de control de estado.
null para los registros donde change_type es delete. |
partition_id |
Entero | La partición del punto de control de estado que contiene el registro de operador con estado. |
Consulte read_statestore función con valores de tabla.
Leer los cambios de estado de Structured Streaming
Disponible en Databricks Runtime 16.4 LTS y versiones posteriores. Para leer cómo cambia el estado entre microlotes en lugar de ver el estado completo de un único microlote, establezca readChangeFeed en true y especifique changeStartBatchId. Opcionalmente, especifique changeEndBatchId. Para ver una lista completa de opciones, consulte Almacén de estado.
Por ejemplo, para leer los cambios de estado del lote 2 a través del lote confirmado más reciente:
Pitón
df = (spark.read
.format("statestore")
.option("readChangeFeed", True)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
)
Scala
val df = spark.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
SQL
SELECT * FROM read_statestore(
'<checkpointLocation>',
readChangeFeed => true,
changeStartBatchId => 2
);
El esquema de salida incluye columnas batch_id y change_type adicionales. Para ver el esquema completo, consulte Opciones y esquema de la API del lector de estado.
Leer los metadatos del estado de Structured Streaming
Disponible en Databricks Runtime 14.3 LTS o superior. Puede leer la información de metadatos de estado de las consultas de Structured Streaming:
Pitón
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Los datos devueltos tienen el siguiente esquema:
| Columna | Tipo | Descripción |
|---|---|---|
operatorId |
Entero | El identificador entero del operador de streaming con estado. |
operatorName |
Cadena | Nombre del operador de streaming con estado. |
stateStoreName |
Cadena | Nombre del almacén de estado del operador. |
numPartitions |
Entero | Número de particiones del almacén de estado. |
minBatchId |
Long | ID de lote mínimo disponible para consultar el estado. |
maxBatchId |
Long | Identificador de lote máximo disponible para consultar el estado. |
Nota:
Los valores de identificador de lote proporcionados por minBatchId y maxBatchId reflejan el estado en el momento en que se escribió el punto de control. Los lotes antiguos se limpian automáticamente con la ejecución de microproceso, por lo que no se garantiza que el valor proporcionado aquí siga estando disponible.
Consulte read_state_metadata función con valores de tabla.
Ejemplo: Consultar un lado de una unión de flujos
Use la siguiente sintaxis para consultar el lado izquierdo de una unión de flujo-flujo:
Pitón
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Ejemplo: Consulta del almacén de estado para la secuencia con varios operadores con estado
En este ejemplo se usa el lector de metadatos de estado para recopilar detalles de metadatos de una consulta de streaming con varios operadores con estado y, a continuación, se usan los resultados de los metadatos como opciones para el lector de estado.
El lector de metadatos de estado toma la ruta de acceso del punto de control como la única opción, como en el ejemplo de sintaxis siguiente:
Pitón
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
En la tabla siguiente se representa una salida de ejemplo de los metadatos del almacén de estado:
| operatorId | NombreDelOperador | stateStoreName | numPartitions | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSave | Predeterminado. | 200 | 0 | 13 |
| 1 | dedupeWithinWatermark | Predeterminado. | 200 | 0 | 13 |
Para obtener los resultados del dedupeWithinWatermark operador, consulte el lector de estado con la operatorId opción , como en el ejemplo siguiente:
Pitón
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);