Points de contrôle Structured Streaming

Les points de contrôle et les journaux en écriture anticipée fonctionnent ensemble pour fournir des garanties de traitement pour les charges de travail de Structured Streaming. Le point de contrôle suit les informations qui identifient la requête, notamment les informations d’état et les enregistrements traités. Lorsque vous supprimez les fichiers d’un répertoire de point de contrôle ou que vous passez à un nouvel emplacement de point de contrôle, l’exécution suivante de la requête recommence à zéro.

Un répertoire de point de contrôle contient les éléments suivants :

  • Décalages : les décalages des sources traités dans chaque micro-lot. Cela permet à la requête de reprendre à partir exactement de l’emplacement où elle s’est arrêtée sans retraiter les données.
  • Validations : enregistrement dont les micro-lots ont été validés sur le récepteur, ce qui permet une sémantique exactement une fois.
  • État : Pour les requêtes avec état (agrégations, jointures de flux, déduplication et opérateurs avec état personnalisés comme transformWithState), le point de contrôle stocke les métadonnées concernant l’opérateur avec état, le schéma d’état, et le contenu du magasin d’état tel que géré par le fournisseur.
  • Métadonnées : ID de requête unique utilisé pour identifier la requête. Les paramètres de configuration sont stockés dans le journal des décalages.

Chaque requête doit avoir un emplacement de point de contrôle distinct. Plusieurs requêtes ne doivent jamais partager le même emplacement.

Remarque

Cet article traite des points de contrôle Structured Streaming pour les requêtes de streaming. Pour plus d’informations sur l’utilisation DataFrame.checkpoint() avec les volumes Unity Catalog pour tronquer les plans d’exécution des DataFrames non diffusés en continu, consultez les points de contrôle DataFrame dans les volumes.

Activer la création de point de contrôle pour des requêtes de Structured Streaming

Vous devez spécifier l’option checkpointLocation avant d’exécuter une requête de diffusion en continu, comme dans l’exemple suivant :

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Remarque

Certains récepteurs, tels que la sortie pour display() dans les notebooks et le récepteur memory, génèrent automatiquement un emplacement de point de contrôle temporaire si vous omettez cette option. Ces emplacements de point de contrôle temporaires ne garantissent pas la tolérance de panne ou la cohérence des données, et peuvent ne pas être nettoyés correctement. Databricks conseille de toujours spécifier un emplacement de point de contrôle pour ces récepteurs.

Récupérer après des modifications dans une requête de Structured Streaming

Il existe des limitations aux modifications de requêtes de streaming autorisées entre les redémarrages à partir d’un même emplacement de point de contrôle.

Les modifications qui nécessitent généralement un nouveau point de contrôle incluent le nombre ou le type de sources d’entrée, les sujets Kafka souscrits ou les chemins Auto Loader, les types d’opérations avec état, le schéma d’état et le type de puits de sortie.

Les modifications généralement sécurisées incluent l’ajout ou la suppression de filtres, la modification des limites de débit, les intervalles de déclenchement et la mise à jour de la logique de fonction définie par l’utilisateur dans mapGroupsWithState (bien que la sémantique puisse changer).

La section suivante décrit les modifications qui ne sont pas autorisées ou l’effet de la modification n’est pas bien défini, où :

  • Autorisé signifie que vous pouvez apporter la modification spécifiée, mais que le fait que la sémantique de son effet soit bien définie dépend de la requête et de la modification.
  •  Non autorisée  signifie que vous ne deviez pas apporter la modification spécifiée, car la requête redémarrée risque d’échouer avec des erreurs imprévisibles.
  • sdf représente une trame de données ou un jeu de données en streaming générés avec sparkSession.readStream.

Types de modifications dans des requêtes de Structured Streaming

  • Modifications apportées au nombre ou au type de sources d’entrée : cela n’est pas autorisé par défaut, car Structured Streaming identifie les sources par leur position dans le plan de requête. Si vous activez le nommage de la source, vous pouvez réorganiser les sources existantes et ajouter de nouvelles sources sans commencer à partir d’un nouveau point de contrôle. Consultez Modifier les sources de diffusion en continu avec l’évolution de la source.

  • Modifications apportées aux paramètres des sources d’entrée : que ce soit autorisé et si la sémantique de la modification est bien définie dépend de la source et de la requête, y compris les contrôles d’admission tels que maxFilesPerTrigger ou maxOffsetsPerTrigger. Voici quelques exemples :

    • L’ajout, la suppression et la modification des limites de taux sont autorisés :

      spark.readStream.format("kafka").option("subscribe", "article")
      

      à

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      

      Pour plus d’informations, consultez Configurer la taille du lot Structured Streaming sur Azure Databricks

    • Les modifications d’articles et de fichiers souscrits ne sont généralement pas autorisées, car leurs résultats sont imprévisibles : de spark.readStream.format("kafka").option("subscribe", "article") en spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Modifications dans l’intervalle de déclencheur : vous pouvez modifier les déclencheurs entre les lots incrémentiels et les intervalles de temps. Consultez Modifier les intervalles de déclenchement entre les exécutions.

  • Modifications du type de récepteur de sortie : les modifications de quelques combinaisons spécifiques de récepteurs sont autorisées. Cela doit être vérifié au cas par cas. Voici quelques exemples :

    • La transformation de puits de fichiers en puits Kafka est autorisée. Kafka ne verra que les nouvelles données.
    • La conversion de collecteur Kafka en collecteur de fichiers n’est pas autorisée.
    • La modification du récepteur Kafka en foreach, ou l’inverse, est autorisée.
  • Modifications des paramètres de récepteur de sortie : le récepteur et la requête déterminent si la modification est autorisée et si sa sémantique est bien définie. Voici quelques exemples :

    • Les modifications apportées au répertoire de sortie d’un récepteur de fichiers ne sont pas autorisées : de sdf.writeStream.format("parquet").option("path", "/somePath") en sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Les modifications apportées à la rubrique de sortie sont autorisées : sdf.writeStream.format("kafka").option("topic", "topic1") à sdf.writeStream.format("kafka").option("topic", "topic2")
    • Les modifications apportées au récepteur foreach défini par l’utilisateur (autrement dit, le code ForeachWriter) sont autorisées, mais leur sémantique dépend du code.
  • Modifications des opérations de type projection, filtrage et mappage  : certains cas sont autorisés. Par exemple:

    • L’ajout et la suppression de filtres sont autorisé : de sdf.selectExpr("a") en sdf.where(...).selectExpr("a").filter(...).
    • Les modifications apportées aux projections avec le même schéma de sortie sont autorisées : de sdf.selectExpr("stringColumn AS json").writeStream en sdf.select(to_json(...).as("json")).writeStream.
    • Les modifications apportées aux projections avec un schéma de sortie différent sont autorisées sous condition : une modification de sdf.selectExpr("a").writeStream en sdf.selectExpr("b").writeStream n’est autorisée que si le récepteur de sortie autorise la modification de schéma de "a" en "b".
  • Modifications des opérations avec état : certaines opérations dans des requêtes de diffusion en continu doivent gérer les données d’état de façon à mettre à jour le résultat en permanence. La diffusion en continu structurée crée automatiquement des points de contrôle pour les données d’état dans un stockage tolérant aux pannes (par exemple, DBFS, stockage Blob Azure) et les restaure après le redémarrage. Toutefois, cela suppose que le schéma des données d’état ne change pas au fil des redémarrages. Cela signifie que l’apport de modifications (ajouts, suppressions ou modifications de schéma) aux opérations avec état d’une requête de diffusion en continu n’est pas autorisés entre les redémarrages. Voici la liste des opérations avec état dont le schéma ne doit pas être modifié entre les redémarrages afin de garantir la récupération d’état :

    • Agrégation par flux : par exemple, sdf.groupBy("a").agg(...). Les modifications du nombre ou du type de clés de regroupement ou d’agrégats ne sont pas autorisées.
    • Déduplication de streaming : par exemple, sdf.dropDuplicates("a"). Les modifications du nombre ou du type de clés de regroupement ou d’agrégats ne sont pas autorisées.
    • Jointure flux-flux : par exemple, sdf1.join(sdf2, ...) (les deux entrées sont générées avec sparkSession.readStream). Les modifications des colonnes schema ou equi-joining ne sont pas autorisées. Les modifications de type de jointure (externe ou interne) ne sont pas autorisées. Les autres modifications de la condition de jointure sont mal définies.
    • Opération avec état arbitraire : par exemple, sdf.groupByKey(...).mapGroupsWithState(...) ou sdf.groupByKey(...).flatMapGroupsWithState(...). Les modifications du schéma de l’état défini par l’utilisateur et du type de délai d’expiration ne sont pas autorisées. Toute modification au sein de la fonction de mappage d’état défini par l’utilisateur est autorisée, mais l’effet sémantique de la modification dépend de la logique définie par l’utilisateur. Si vous souhaitez vraiment prendre en charge les modifications de schéma d’état, vous pouvez encoder/décoder explicitement vos structures de données d’état complexes en octets à l’aide d’un schéma d’encodage/décodage prenant en charge la migration de schéma. Par exemple, si vous enregistrez votre état sous la forme d’octets encodés Avro, vous pouvez modifier le schéma d’état Avro entre les redémarrages de requête, car cela a pour effet de restaurer l’état binaire.

Important

Les opérateurs dropDuplicates() avec état et dropDuplicatesWithinWatermark() peuvent ne pas redémarrer en raison d’une vérification de compatibilité de schéma d’état lors de la modification entre les modes d’accès au calcul.

La modification entre les modes d’accès dédié et sans isolation est autorisée. La modification entre les modes d’accès standard et serverless est autorisée. N’essayez pas de changer entre d’autres combinaisons de mode d’accès.

Pour éviter cette erreur, ne modifiez pas le mode d’accès de calcul pour les requêtes de streaming qui contiennent ces opérateurs.

Modifier les sources de diffusion en continu avec l’évolution de la source

Par défaut, Structured Streaming identifie les sources par leur position dans le plan d’exécution de la requête, par exemple 0, 1, 2, etc. Toute modification du nombre ou de l’ordre des sources d’entrée rompt la compatibilité avec les points de contrôle et nécessite de recréer un point de contrôle. L’évolution de la source vous permet d’affecter des noms stables et définis par l’utilisateur à chaque source de diffusion en continu afin de pouvoir réorganiser, ajouter ou supprimer des sources d’une requête sans perdre l’état du point de contrôle.

L’évolution de la source nécessite Databricks Runtime 18.2 et versions ultérieures.

Configuration requise

Pour activer l’évolution de la source, définissez deux configurations Spark :

  • spark.sql.streaming.queryEvolution.enableSourceEvolution: Quand true, toutes les sources de diffusion en continu dans la requête doivent être explicitement nommées à l’aide de l’API .name() . La valeur par défaut est false.
  • spark.sql.streaming.offsetLog.formatVersion: doit être défini sur 2 pour utiliser le format de suivi des décalages basé sur le nom. La valeur par défaut est 1.

Définissez les deux configurations avant de définir la requête de streaming :

spark.conf.set("spark.sql.streaming.queryEvolution.enableSourceEvolution", "true")
spark.conf.set("spark.sql.streaming.offsetLog.formatVersion", "2")

Règles de dénomination

  • Les noms doivent contenir uniquement des caractères alphanumériques et des traits de soulignement ([a-zA-Z0-9_]+).
  • Chaque nom source doit être unique dans une requête.
  • Lorsque l’évolution de la source est activée, chaque source de diffusion en continu doit avoir un nom. Les sources non nommées provoquent une UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT erreur.

Réorganiser, ajouter et supprimer des sources

Les modifications suivantes peuvent être appliquées sans risque lors des redémarrages de la requête avec le même point de contrôle :

  • Réorganiser les sources : redémarrez la requête avec un ordre différent de sources. Chaque source reprend à partir de son dernier offset validé d’après son nom et ne modifie pas l’état du checkpoint.
  • Ajouter de nouvelles sources : redémarrez la requête avec une nouvelle source. Les nouvelles sources traitent depuis le début, et les sources existantes reprennent à partir de leurs dernières positions.
  • Supprimer des sources : redémarrez la requête sans la source. La source est définitivement supprimée du point de contrôle. Une source supprimée ne peut pas être ajoutée à nouveau avec le même nom.

Example

Utilisez .name() sur DataStreamReader avant d’appeler .load() ou .table() :

Python

orders_us = (spark.readStream
  .name("orders_us")
  .table("catalog.schema.orders_us")
)

orders_eu = (spark.readStream
  .name("orders_eu")
  .table("catalog.schema.orders_eu")
)

all_orders = orders_us.union(orders_eu)

Scala

val ordersUS = spark.readStream
  .name("orders_us")
  .table("catalog.schema.orders_us")

val ordersEU = spark.readStream
  .name("orders_eu")
  .table("catalog.schema.orders_eu")

val allOrders = ordersUS.union(ordersEU)

Limitations

  • Le nommage source nécessite un nouveau point de contrôle. Vous ne pouvez pas activer l’évolution de la source avec un point de contrôle existant qui utilise le format de journal de décalage V1.
  • Après une mise à niveau vers le format du journal des décalages V2, vous ne pouvez pas rétrograder vers V1. Consultez configuration requise.
  • Les noms des sources sont permanents. Pour renommer une source, supprimez-la, puis ajoutez-la avec un nouveau nom. Le processus source renommé est traité depuis le début.