Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Important
Les versions d’environnement pour SDP sont en version bêta.
Les pipelines avec une version environment définissent l’exécution Python code via Spark Connect. Cette page décrit ce qui est incompatible, ce qui se comporte différemment, comment analyser un pipeline pour détecter les modèles affectés et comment migrer un pipeline existant.
Limitations
Les versions d’environnement ne sont pas encore compatibles avec toutes les fonctionnalités de pipeline. Une exécution de pipeline avec un jeu de versions d'environnement échoue si le code Python du pipeline effectue l'une des opérations suivantes :
- Mute l’état de session Spark à l’intérieur d’une fonction décorée avec un décorateur de pipelines. Exemples : ,
spark.conf.set(...)spark.sql("USE CATALOG ...")etcreateOrReplaceTempView. - Utilise des API PySpark qui ne sont pas disponibles dans Spark Connect, y compris
SparkContext,RDD,SQLContextet toutes les API Py4J. Voir ce qui est pris en charge dans Spark Connect.
Si vous activez une version d’environnement sur un pipeline, la désactivation de la version de l’environnement retourne le pipeline à son état précédent.
Changements de comportement
Spark Connect présente un petit nombre de différences de comportement par rapport au runtime PySpark classique. Consultez Spark Connect et Spark Classic pour obtenir la référence complète. L’analyse de compatibilité détecte ces modèles à l’avance et bloque l’activation jusqu’à ce qu’ils soient traités. Vous pouvez donc les trouver et les corriger avant qu’ils n’affectent les données de production.
Dans un pipeline, les situations les plus courantes où le comportement peut différer sont les suivants :
- Mutation de la construction et de la session entrelacées du DataFrame
- UDF qui font référence à l’état Python mutable
Mutation de la construction et de la session entrelacées du DataFrame
Lorsqu’un pipeline construit un DataFrame, mute l’état de session Spark (par exemple, modifie le catalogue ou le schéma par défaut, définit une configuration, remplace une vue temporaire ou réinscrit une fonction UDF), puis utilise le DataFrame :
- Sans version d’environnement, le DataFrame utilise l’état de session de pré-mutation .
- Avec une version d’environnement, le DataFrame utilise l’état de session post-mutation .
Par exemple:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Sans version d’environnement, mytable contient [(1, "Original Row")]. Avec une version d’environnement, mytable contient [(2, "Replaced Row")].
UDF qui référencent l’état Python mutable
Lorsqu’une fonction UDF fait référence à une variable globale Python dont la valeur change après la définition de la fonction UDF :
- Sans version d’environnement, l’UDF utilise la dernière valeur de la variable.
- Avec une version d’environnement, l’UDF utilise la valeur au moment où l’UDF a été définie.
Par exemple:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Sans version d’environnement, my_mv contient [("alex_b",)]. Avec une version d’environnement, my_mv contient [("alex_a",)].
Si un pipeline s’appuie sur l’un ou l’autre modèle, auditez-le avant d’activer une version d’environnement.
Analyse de compatibilité
L’analyse de compatibilité vous permet de trouver des modèles de code dans votre pipeline qui produisent des résultats différents sous une version d’environnement, avant d’en activer un. L’analyse est activée. Lorsque l’analyse est activée sur un pipeline :
- Chaque exécution de pipeline émet un
BehaviorChangeInSparkConnectWARNévénement dans le journal des événements du pipeline par modèle détecté. - Vous ne pouvez pas activer une version d’environnement sur le pipeline tant que vous n’avez pas résolu tous les avertissements de compatibilité de la mise à jour réussie précédente.
Si l’analyse n’est pas activée, aucun événement n’est émis et environment_version l’activation n’est pas bloquée. Databricks recommande d’activer l’analyse et de résoudre les modèles détectés avant d’activer une version d’environnement sur le pipeline.
Activer l’analyse sur un pipeline
Vous pouvez activer l’analyse de compatibilité en ajoutant la configuration du pipelines.environmentVersion.enableCompatibilityScan pipeline, vous pouvez ajouter la configuration via l’interface utilisateur de l’éditeur de pipeline ou en ajoutant une entrée au json de configuration de pipeline.
Via l’interface utilisateur :
- Dans l’éditeur de pipeline, cliquez sur Paramètres.
- Recherchez la section Configuration dans les paramètres du pipeline.
- Cliquez sur
Ajoutez une configuration.
- Entrez
pipelines.environmentVersion.enableCompatibilityScanla clé ettruela valeur. - Enregistrez les paramètres du pipeline.
Dans le json du pipeline :
Ajoutez l’entrée configuration suivante au bloc :
"configuration": {
"pipelines.environmentVersion.enableCompatibilityScan": "true"
}
Flux de travail recommandé
- Activez l’analyse sur le pipeline.
- Démarrer l'exécution d'un pipeline.
-
Interrogez le journal des événements du pipeline pour les
BehaviorChangeInSparkConnectWARNévénements. Consultez la référence des événements de compatibilité pour obtenir la liste complète des codes de problème, des exemples de modèles et des correctifs suggérés. - Mettez à jour le code du pipeline pour supprimer les modèles détectés et réexécutez le pipeline jusqu’à ce qu’aucun autre événement ne soit émis.
- Ajoutez
environment_versionau pipeline à l’aide de l’une des méthodes dans Activer une version d’environnement sur un pipeline.
Si vous pensez qu’un avertissement de compatibilité est un faux positif et que vous souhaitez l’activer environment_version de toute façon, supprimez l’entrée pipelines.environmentVersion.enableCompatibilityScan de la configuration du pipeline pour contourner la vérification. (La définition de la valeur à false n’est pas autorisée : vous devez supprimer entièrement l’entrée.)
La vérification préliminaire ne s’exécute pas sur les pipelines qui n’ont aucune mise à jour précédente ou sur les pipelines qui ont déjà un jeu de versions d’environnement.
Migrer un pipeline existant vers des versions d’environnement
Pour migrer un pipeline existant qui n’utilise pas encore de version d’environnement, suivez ce workflow de bout en bout. Il vous guide tout au long de la recherche de modèles de code qui peuvent se comporter différemment sous Spark Connect, les corriger et déployer la version de l’environnement en toute sécurité.
Activez l’analyse de compatibilité sur le pipeline. Activez l’analyse sur le pipeline, comme décrit dans l’analyse de compatibilité. C’est ce qui provoque la surface des modèles détectés dans le journal des événements et ce qui permet la vérification préliminaire qui protège votre tentative d’activation.
Déclenchez une exécution de pipeline et passez en revue les événements de compatibilité. Déclenchez une mise à jour normale du pipeline. Une fois l’opération terminée, interrogez le journal des événements du pipeline pour les
BehaviorChangeInSparkConnectWARNévénements. Chaque événement signale un modèle détecté. Consultez la référence des événements de compatibilité pour obtenir la liste complète des codes de problème, des exemples de modèles et des correctifs suggérés.Mettez à jour votre code de pipeline pour résoudre les modèles détectés. Pour chaque modèle détecté, mettez à jour votre code de pipeline en suivant le correctif suggéré. Après chaque modification, déclenchez une autre mise à jour du pipeline et vérifiez que les événements correspondants n’apparaissent plus. Répétez jusqu’à ce que le journal des événements n’affiche plus d’événements de compatibilité pour une mise à jour réussie.
Activez la version de l’environnement sur le pipeline. Une fois la mise à jour réussie la plus récente, aucun événement de compatibilité n’a été ajouté
environment_versionau pipeline à l’aide de l’interface utilisateur, de l’API ou de l’offre groupée, comme décrit dans Activer une version d’environnement sur un pipeline. La mise à jour suivante s’exécute avec Spark Connect et la version de langage épinglée Python et les bibliothèques préinstallées.Si la mise à jour échoue, car les avertissements de compatibilité existent toujours, supprimez l’étape
environment_version2 et résolvez les avertissements restants avant de réessayer.Vérifiez la migration. Une fois la première mise à jour terminée avec la version de l’environnement, vérifiez :
- L’événement
create_updatedans le journal des événements indiqueenvironment_versionla valeur attendue. - Le pipeline produit les données attendues et aucun nouvel événement d’erreur n’apparaît.
- Vérifiez les tables en aval pour connaître les différences de comportement subtiles décrites dans les modifications de comportement.
- L’événement
Restauration
Si le pipeline se comporte mal après la migration, supprimez les environment_version paramètres du pipeline. La prochaine mise à jour s’exécute avec la configuration du runtime Python précédente. Utilisez l’exécution restaurée pour déboguer, puis répétez la migration à l’étape 2 après avoir identifié et résolu le problème.
Informations de référence sur les événements de compatibilité
Lorsque l’analyse de compatibilité est activée sur un pipeline, SDP émet un BehaviorChangeInSparkConnectWARN événement dans le journal des événements du pipeline par modèle détecté. Lorsque l’analyse est activée et que la mise à jour réussie précédente a détecté tous les modèles, SDP bloque environment_version également l’activation jusqu’à ce que les modèles soient traités.
Chaque événement signale un code de problème unique qui identifie ce qui a été détecté. Pour rechercher un code, recherchez-le dans la table Codes de problème : chaque ligne est liée à la section catégorie qui contient un exemple de modèle et le correctif suggéré.
Forme d’événement
BehaviorChangeInSparkConnect les événements suivent le schéma standard du journal des événements du pipeline :
-
event_typea la valeurbehavior_change_in_spark_connect. -
levela la valeurWARN. -
detailscontient l’objetbehavior_change_in_spark_connect, qui a un seulissuechamp. La valeur du problème est l’un des codes répertoriés ci-dessous. -
messageest une description lisible par l’homme du modèle détecté.
Codes de problème
| Category | Code de problème | Description |
|---|---|---|
| Mutations de base de données et de catalogue | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Le catalogue par défaut a été modifié après la création d’un DataFrame. Le DataFrame existant peut résoudre les tables à l’aide du nouveau catalogue par défaut. |
| Mutations de base de données et de catalogue | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE CATALOG a été appelé en dehors d’une fonction décorée par un décorateur de pipelines. Le catalogue par défaut peut changer de manière inattendue pour les opérations suivantes. |
| Mutations de base de données et de catalogue | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
La base de données par défaut a été modifiée après la création d’un DataFrame. Le DataFrame existant peut résoudre les tables à l’aide de la nouvelle base de données par défaut. |
| Mutations de base de données et de catalogue | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE DATABASE a été appelé en dehors d’une fonction décorée par un décorateur de pipelines. La base de données par défaut peut changer de manière inattendue pour les opérations suivantes. |
| Exécution impatiente dans les fonctions de flux | CHECKPOINT_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La fonction de flux appelle une commande de point de contrôle. |
| Exécution impatiente dans les fonctions de flux | CREATE_DATAFRAME_VIEW_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La fonction de flux crée avec impatience une vue DataFrame (createOrReplaceTempView ou similaire). |
| Exécution impatiente dans les fonctions de flux | CREATE_RESOURCE_PROFILE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La fonction de flux crée un profil de ressource. |
| Exécution impatiente dans les fonctions de flux | GET_RESOURCES_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La fonction de flux appelle spark.resources ou une API de ressource associée. |
| Exécution impatiente dans les fonctions de flux | MERGE_INTO_TABLE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La fonction de flux exécute une table cible avec impatience MERGE INTO . |
| Exécution impatiente dans les fonctions de flux | ML_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La fonction de flux effectue une opération Spark ML impatiente. |
| Exécution impatiente dans les fonctions de flux | REGISTER_DATA_SOURCE_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La fonction de flux inscrit une source de données Python. |
| Exécution impatiente dans les fonctions de flux | STREAMING_QUERY_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La fonction de flux fonctionne sur un handle de requête de streaming actif. |
| Exécution impatiente dans les fonctions de flux | STREAMING_QUERY_LISTENER_BUS_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La fonction de flux inscrit ou supprime un écouteur de requête de diffusion en continu. |
| Exécution impatiente dans les fonctions de flux | STREAMING_QUERY_MANAGER_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Les appels spark.streams de fonction de flux pour gérer les requêtes de diffusion en continu. |
| Exécution impatiente dans les fonctions de flux | WRITE_OPERATION_V2_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La fonction de flux effectue une opération impatiente DataFrameWriterV2 . |
| Exécution impatiente dans les fonctions de flux | WRITE_OPERATION_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La fonction de flux effectue une opération impatiente DataFrame.write . |
| Exécution impatiente dans les fonctions de flux | WRITE_STREAM_OPERATION_START_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La fonction de flux démarre une requête de streaming (writeStream.start()). |
| Mutations de configuration Spark | CHANGE_CONF_INSIDE_QUERY_FUNCTION_NOT_SUPPORTED |
spark.conf.set() ou spark.conf.unset() a été appelé à l’intérieur d’une fonction décorée par un décorateur de pipelines. Cela n’est pas pris en charge avec une version d’environnement. |
| Mutations de configuration Spark | SET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.set() a été appelé en dehors d’une fonction décorée par un décorateur de pipelines après la création d’un DataFrame. La modification de la configuration peut affecter le DataFrame existant au moment de l’exécution. |
| Mutations de configuration Spark | UNSET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.unset() a été appelé en dehors d’une fonction décorée par un décorateur de pipelines après la création d’un DataFrame. La modification de la configuration peut affecter le DataFrame existant au moment de l’exécution. |
| Remplacements de vues temporaires | REPLACE_GLOBAL_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Une vue temporaire globale a été remplacée après la création d’un DataFrame. Le remplacement peut être reflété dans le DataFrame existant. |
| Remplacements de vues temporaires | REPLACE_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Une vue temporaire a été remplacée après la création d’un DataFrame référencé. Le remplacement peut être reflété dans le DataFrame existant. |
| Mutations UDF et UDTF | OVERWRITE_SESSION_UDF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Une fonction UDF a été réinscrite avec le même nom après qu’un DataFrame référençant celui-ci a été créé. Le DataFrame existant peut utiliser la nouvelle définition UDF. |
| Mutations UDF et UDTF | OVERWRITE_SESSION_UDTF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Un UDTF a été réinscrit avec le même nom après qu’un DataFrame référençant celui-ci a été créé. Le DataFrame existant peut utiliser la nouvelle définition UDTF. |
| Mutations UDF et UDTF | UDF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
Une fonction UDF fait référence à une variable Python mutable globale. Avec une version d’environnement, l’UDF utilise la valeur de la variable au moment où l’UDF a été définie, pas au moment de l’appel. |
| Mutations UDF et UDTF | UDTF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
Un UDTF fait référence à une variable Python mutable globale. Avec une version d’environnement, l’UDTF utilise la valeur de la variable au moment où l’UDTF a été défini, pas au moment de l’appel. |
Mutations de base de données et de catalogue
Ces problèmes sont émis lorsque le code de pipeline mute la base de données ou le catalogue par défaut. Avec une version d’environnement, les DataFrames construits avant la mutation peuvent résoudre les tables à l’aide de la nouvelle base de données ou catalogue.
Exemple de modèle qui déclenche un événement :
from pyspark import pipelines as dp
spark.sql("USE CATALOG marketing")
df = spark.read.table("events")
spark.sql("USE CATALOG sales") # changes the default catalog after df was created
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Sans version d’environnement, df résout events à partir du marketing catalogue. Avec une version d’environnement, df résout events à partir du sales catalogue.
Correctif suggéré : Qualifiez entièrement les noms de tables afin que la résolution ne dépende pas du catalogue ou de la base de données par défaut, et évitez de modifier le catalogue ou la base de données par défaut entre la création et l’utilisation de DataFrame.
from pyspark import pipelines as dp
df = spark.read.table("marketing.default.events")
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Mutations de configuration Spark
Ces problèmes sont émis lorsque le code de pipeline mute la configuration Spark de manière à modifier le comportement du DataFrame sous une version d’environnement.
Exemple de modèle qui déclenche un événement :
from pyspark import pipelines as dp
df = spark.read.table("events")
spark.conf.set("spark.sql.ansi.enabled", "true") # changes session conf after df was created
@dp.materialized_view
def events_strict():
return df.selectExpr("CAST(price AS INT) AS price")
Sans version d’environnement, le cast utilise la valeur conf au moment de la création du DataFrame. Avec une version d’environnement, le cast utilise spark.sql.ansi.enabled=true et peut échouer sur une entrée non valide.
Correctif suggéré : Définissez toutes les configurations Spark requises en haut du fichier de pipeline, avant la création d’un DataFrame. Pour la configuration par requête, utilisez le paramètre du configuration pipeline dans la spécification du pipeline.
Remplacements de vues temporaires
Ces problèmes sont émis lorsque le code du pipeline remplace une vue temporaire après la création d’un DataFrame. Avec une version d’environnement, le DataFrame existant peut refléter le nouveau contenu de l’affichage.
Exemple de modèle qui déclenche un événement :
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Sans version d’environnement, mytable contient [(1, "Original Row")]. Avec une version d’environnement, mytable contient [(2, "Replaced Row")].
Correctif suggéré : Créez chaque affichage temporaire une seule fois et ne le remplacez pas. Si vous avez besoin de plusieurs vues avec des données associées, attribuez un nom distinct.
Mutations UDF et UDTF
Ces problèmes sont émis lorsque le code de pipeline mute une fonction UDF ou UDTF de manière à modifier le comportement dans une version d’environnement.
Exemple de modèle qui déclenche un événement :
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Sans version d’environnement, my_mv contient [("alex_b",)]. Avec une version d’environnement, my_mv contient [("alex_a",)].
Suggested fix : Passez des valeurs dans la fonction UDF en tant qu’arguments au lieu de les capturer à partir de Python globals, ou définissez le global avant de définir la fonction UDF et ne la mutez pas ultérieurement.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf
@udf
def append_suffix(s, suffix):
return s + suffix
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))
Exécution impatiente dans les fonctions de flux
Ces problèmes sont émis lorsque le code du pipeline exécute une commande Spark impatiente à l’intérieur d’une fonction décorée par un décorateur de pipelines (@table, @materialized_viewetc.). Les fonctions de flux sont censées définir et retourner un DataFrame ; Les commandes qui écrivent des données, gèrent les requêtes de diffusion en continu, inscrivent des ressources ou exécutent des opérations ML ne sont pas autorisées à l’intérieur d’une fonction de flux avec un jeu de versions d’environnement.
Correctif suggéré : Déplacez l’opération impatiente en dehors de la fonction de flux et retournez un DataFrame à partir de la fonction de flux à la place. Les effets secondaires tels que l’écriture dans une table ou le démarrage d’une requête de diffusion en continu appartiennent en dehors de la définition du pipeline ; le moteur de pipeline gère la matérialisation du DataFrame retourné par la fonction de flux.
Rechercher des événements de compatibilité dans le journal des événements
La requête suivante retourne tous les événements de compatibilité d’un pipeline, classés d’abord les plus récents :
SELECT
timestamp,
message,
details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
ORDER BY timestamp DESC;
Pour compter les événements par code de problème dans les mises à jour récentes :
SELECT
details:behavior_change_in_spark_connect:issue AS issue,
COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;
Pour savoir comment interroger le journal des événements, consultez Interroger le journal des événements.
Voir également :
- Configurer des versions d’environnement pour les pipelines : vue d’ensemble des fonctionnalités, comment activer une version d’environnement.
- Schéma du journal des événements de pipeline : schéma complet du journal des événements de pipeline.
- Journal des événements de pipeline : comment interroger le journal des événements du pipeline.