Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Vous pouvez utiliser des opérations DataFrame ou des fonctions SQL de valeur de table pour interroger les données et les métadonnées d’état de Structured Streaming. Utilisez ces fonctions pour observer les informations d’état des requêtes stateful dans Structured Streaming, ce qui est utile pour la surveillance et le débogage.
Vous devez disposer d’un accès en lecture au chemin de point de contrôle d’une requête de diffusion en continu pour interroger les métadonnées ou les données d’état. Les fonctions décrites dans cet article fournissent un accès en lecture seule aux métadonnées et aux données d’état. Vous pouvez uniquement utiliser la sémantique de lecture par lots pour interroger les informations d’état.
Remarque
Vous ne pouvez pas interroger les informations sur l'état des pipelines déclaratifs Lakeflow Spark, des tables de streaming ou des vues matérialisées. Vous ne pouvez pas interroger les informations d'état en utilisant l'informatique sans serveur ou avec des paramètres calculés selon le mode d'accès standard.
Spécifications
- Utilisez l’une des configurations de calcul suivantes :
- Databricks Runtime 16.3 et versions ultérieures sur le calcul configuré avec le mode d’accès standard.
- Databricks Runtime 14.3 LTS et au-delà sur une infrastructure informatique configurée avec un mode d’accès dédié ou sans isolation.
- Accès en lecture au chemin de point de contrôle utilisé par la requête de diffusion en continu.
Lire le magasin d’état Structured Streaming
Vous pouvez lire les informations de magasin d’état pour les requêtes Structured Streaming exécutées dans n’importe quel Databricks Runtime pris en charge. Utilisez la syntaxe suivante :
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
Scala
val df = spark.read
.format("statestore")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Options et schéma de l’API lecteur d’état
Pour obtenir la liste complète des options de format, consultez statestore.
Les données de sortie ont le schéma suivant :
| Colonne | Catégorie | Descriptif |
|---|---|---|
key |
Struct (autre type dérivé de la clé d’état) | Clé d’un enregistrement d’opérateur avec état dans le point de contrôle d’état. |
value |
Struct (autre type dérivé de la valeur d’état) | Valeur d’un enregistrement d’opérateur avec état dans le point de contrôle d’état. |
partition_id |
Entier | Partition du point de contrôle d’état qui contient l’enregistrement d’opérateur avec état. |
Dans Databricks Runtime 16.4 LTS et versions ultérieures, lorsque l’option readChangeFeed est définie sur true, les données de sortie présentent le schéma suivant :
| Colonne | Catégorie | Descriptif |
|---|---|---|
batch_id |
Long | ID de lot auquel appartient le changement d’état. |
change_type |
Chaîne | Type de modification appliqué par le lot : update pour les insertions et mises à jour, delete pour les suppressions. |
key |
Struct (autre type dérivé de la clé d’état) | Clé d’un enregistrement d’opérateur avec état dans le point de contrôle d’état. |
value |
Struct (autre type dérivé de la valeur d’état) | Valeur d’un enregistrement d’opérateur avec état dans le point de contrôle d’état.
null pour les enregistrements où change_type est delete. |
partition_id |
Entier | Partition du point de contrôle d’état qui contient l’enregistrement d’opérateur avec état. |
Consultez read_statestoreTVF.
Lire les modifications de l’état de la diffusion en continu structurée
Disponible sur Databricks Runtime 16.4 LTS et versions ultérieures. Pour lire comment l’état change entre les microbatches au lieu d’afficher l’état complet à un seul microbatch, défini readChangeFeed sur true et spécifié changeStartBatchId. Si vous le souhaitez, spécifiez changeEndBatchId. Pour obtenir la liste complète des options, consultez magasin d’états.
Par exemple, pour lire les modifications d’état du lot 2 via le dernier lot validé :
Python
df = (spark.read
.format("statestore")
.option("readChangeFeed", True)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
)
Scala
val df = spark.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
SQL
SELECT * FROM read_statestore(
'<checkpointLocation>',
readChangeFeed => true,
changeStartBatchId => 2
);
Le schéma de sortie inclut des colonnes supplémentaires batch_idet change_type. Pour obtenir le schéma complet, consultez les options et le schéma de l’API lecteur d’état.
Lire les métadonnées d’état Structured Streaming
Disponible sur Databricks Runtime 14.3 LTS ou version ultérieure. Vous pouvez lire les métadonnées d’état pour les requêtes Structured Streaming :
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Les données retournées ont le schéma suivant :
| Colonne | Catégorie | Descriptif |
|---|---|---|
operatorId |
Entier | ID entier de l’opérateur de diffusion en continu avec état. |
operatorName |
Chaîne | Nom de l’opérateur de diffusion en continu avec état. |
stateStoreName |
Chaîne | Nom de l'entrepôt d'état de l'opérateur. |
numPartitions |
Entier | Nombre de partitions du magasin d’état. |
minBatchId |
Long | ID de lot minimal disponible pour l’état d’interrogation. |
maxBatchId |
Long | L’ID de lot maximal disponible pour interroger l'état. |
Remarque
Les valeurs d’ID de lot fournies par minBatchId et maxBatchId reflètent l’état au moment où le point de contrôle a été écrit. Les anciens lots sont nettoyés automatiquement avec l’exécution de micro-lots. Par conséquent, la valeur fournie ici n’est pas garantie d’être toujours disponible.
Consultez read_state_metadataTVF.
Exemple : interroger un côté d’une jointure de flux de flux
Utilisez la syntaxe suivante pour interroger le côté gauche d’une jointure de flux de flux :
Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Exemple : magasin d’états de requête pour le flux avec plusieurs opérateurs avec état
Cet exemple utilise le lecteur de métadonnées d’état pour collecter les détails des métadonnées d’une requête de diffusion en continu avec plusieurs opérateurs avec état, puis utilise les résultats des métadonnées comme options pour le lecteur d’état.
Le lecteur de métadonnées d’état prend le chemin de point de contrôle comme seule option, comme dans l’exemple de syntaxe suivant :
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Le tableau suivant représente un exemple de sortie des métadonnées du magasin d’états :
| operatorId | nom de l'opérateur | stateStoreName | nombreDePartitions | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSave | par défaut | 200 | 0 | 13 |
| 1 | dedupeWithinWatermark | par défaut | 200 | 0 | 13 |
Pour obtenir les résultats de l’opérateur dedupeWithinWatermark , interrogez le lecteur d’état avec l’option operatorId , comme dans l’exemple suivant :
Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);