Lire les informations d'état de Structured Streaming

Vous pouvez utiliser des opérations DataFrame ou des fonctions SQL de valeur de table pour interroger les données et les métadonnées d’état de Structured Streaming. Utilisez ces fonctions pour observer les informations d’état des requêtes stateful dans Structured Streaming, ce qui est utile pour la surveillance et le débogage.

Vous devez disposer d’un accès en lecture au chemin de point de contrôle d’une requête de diffusion en continu pour interroger les métadonnées ou les données d’état. Les fonctions décrites dans cet article fournissent un accès en lecture seule aux métadonnées et aux données d’état. Vous pouvez uniquement utiliser la sémantique de lecture par lots pour interroger les informations d’état.

Remarque

Vous ne pouvez pas interroger les informations sur l'état des pipelines déclaratifs Lakeflow Spark, des tables de streaming ou des vues matérialisées. Vous ne pouvez pas interroger les informations d'état en utilisant l'informatique sans serveur ou avec des paramètres calculés selon le mode d'accès standard.

Spécifications

  • Utilisez l’une des configurations de calcul suivantes :
    • Databricks Runtime 16.3 et versions ultérieures sur le calcul configuré avec le mode d’accès standard.
    • Databricks Runtime 14.3 LTS et au-delà sur une infrastructure informatique configurée avec un mode d’accès dédié ou sans isolation.
  • Accès en lecture au chemin de point de contrôle utilisé par la requête de diffusion en continu.

Lire le magasin d’état Structured Streaming

Vous pouvez lire les informations de magasin d’état pour les requêtes Structured Streaming exécutées dans n’importe quel Databricks Runtime pris en charge. Utilisez la syntaxe suivante :

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

Scala

val df = spark.read
  .format("statestore")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Options et schéma de l’API lecteur d’état

Pour obtenir la liste complète des options de format, consultez statestore.

Les données de sortie ont le schéma suivant :

Colonne Catégorie Descriptif
key Struct (autre type dérivé de la clé d’état) Clé d’un enregistrement d’opérateur avec état dans le point de contrôle d’état.
value Struct (autre type dérivé de la valeur d’état) Valeur d’un enregistrement d’opérateur avec état dans le point de contrôle d’état.
partition_id Entier Partition du point de contrôle d’état qui contient l’enregistrement d’opérateur avec état.

Dans Databricks Runtime 16.4 LTS et versions ultérieures, lorsque l’option readChangeFeed est définie sur true, les données de sortie présentent le schéma suivant :

Colonne Catégorie Descriptif
batch_id Long ID de lot auquel appartient le changement d’état.
change_type Chaîne Type de modification appliqué par le lot : update pour les insertions et mises à jour, delete pour les suppressions.
key Struct (autre type dérivé de la clé d’état) Clé d’un enregistrement d’opérateur avec état dans le point de contrôle d’état.
value Struct (autre type dérivé de la valeur d’état) Valeur d’un enregistrement d’opérateur avec état dans le point de contrôle d’état. null pour les enregistrements où change_type est delete.
partition_id Entier Partition du point de contrôle d’état qui contient l’enregistrement d’opérateur avec état.

Consultez read_statestoreTVF.

Lire les modifications de l’état de la diffusion en continu structurée

Disponible sur Databricks Runtime 16.4 LTS et versions ultérieures. Pour lire comment l’état change entre les microbatches au lieu d’afficher l’état complet à un seul microbatch, défini readChangeFeed sur true et spécifié changeStartBatchId. Si vous le souhaitez, spécifiez changeEndBatchId. Pour obtenir la liste complète des options, consultez magasin d’états.

Par exemple, pour lire les modifications d’état du lot 2 via le dernier lot validé :

Python

df = (spark.read
  .format("statestore")
  .option("readChangeFeed", True)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")
)

Scala

val df = spark.read
  .format("statestore")
  .option("readChangeFeed", true)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_statestore(
    '<checkpointLocation>',
    readChangeFeed => true,
    changeStartBatchId => 2
);

Le schéma de sortie inclut des colonnes supplémentaires batch_idet change_type. Pour obtenir le schéma complet, consultez les options et le schéma de l’API lecteur d’état.

Lire les métadonnées d’état Structured Streaming

Disponible sur Databricks Runtime 14.3 LTS ou version ultérieure. Vous pouvez lire les métadonnées d’état pour les requêtes Structured Streaming :

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Les données retournées ont le schéma suivant :

Colonne Catégorie Descriptif
operatorId Entier ID entier de l’opérateur de diffusion en continu avec état.
operatorName Chaîne Nom de l’opérateur de diffusion en continu avec état.
stateStoreName Chaîne Nom de l'entrepôt d'état de l'opérateur.
numPartitions Entier Nombre de partitions du magasin d’état.
minBatchId Long ID de lot minimal disponible pour l’état d’interrogation.
maxBatchId Long L’ID de lot maximal disponible pour interroger l'état.

Remarque

Les valeurs d’ID de lot fournies par minBatchId et maxBatchId reflètent l’état au moment où le point de contrôle a été écrit. Les anciens lots sont nettoyés automatiquement avec l’exécution de micro-lots. Par conséquent, la valeur fournie ici n’est pas garantie d’être toujours disponible.

Consultez read_state_metadataTVF.

Exemple : interroger un côté d’une jointure de flux de flux

Utilisez la syntaxe suivante pour interroger le côté gauche d’une jointure de flux de flux :

Python

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

Exemple : magasin d’états de requête pour le flux avec plusieurs opérateurs avec état

Cet exemple utilise le lecteur de métadonnées d’état pour collecter les détails des métadonnées d’une requête de diffusion en continu avec plusieurs opérateurs avec état, puis utilise les résultats des métadonnées comme options pour le lecteur d’état.

Le lecteur de métadonnées d’état prend le chemin de point de contrôle comme seule option, comme dans l’exemple de syntaxe suivant :

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Le tableau suivant représente un exemple de sortie des métadonnées du magasin d’états :

operatorId nom de l'opérateur stateStoreName nombreDePartitions minBatchId maxBatchId
0 stateStoreSave par défaut 200 0 13
1 dedupeWithinWatermark par défaut 200 0 13

Pour obtenir les résultats de l’opérateur dedupeWithinWatermark , interrogez le lecteur d’état avec l’option operatorId , comme dans l’exemple suivant :

Python

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);