Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Important
Las versiones de entorno para SDP están en beta.
Canalizaciones con una versión de environment set run Python code through Spark Connect. En esta página se describe lo que no es compatible, qué se comporta de forma diferente, cómo examinar una canalización para los patrones afectados y cómo migrar una canalización existente.
Limitaciones
Las versiones del entorno aún no son compatibles con todas las funcionalidades de canalización. Se produce un error en una ejecución de canalización con un conjunto de versiones de entorno si el código de Python de la canalización realiza alguna de las siguientes acciones:
- Muta el estado de sesión de Spark dentro de una función decorada con un decorador de canalizaciones. Algunos ejemplos son
spark.conf.set(...),spark.sql("USE CATALOG ...")ycreateOrReplaceTempView. - Usa las API de PySpark que no están disponibles en Spark Connect, incluidas
SparkContext,RDD,SQLContexty las API de Py4J. Consulte ¿Qué se admite en Spark Connect?
Si habilitar una versión de entorno en una canalización hace que se produzca un error, al deshabilitar la versión del entorno se devuelve la canalización a su estado anterior.
Cambios de comportamiento
Spark Connect tiene un pequeño número de diferencias de comportamiento del entorno de ejecución clásico de PySpark. Consulte Spark Connect frente a Spark clásico para obtener la referencia completa. El examen de compatibilidad detecta estos patrones con antelación y bloquea la habilitación hasta que se abordan, por lo que puede encontrarlos y corregirlos antes de que afecten a los datos de producción.
En una canalización, las situaciones más comunes en las que el comportamiento puede diferir son:
- Construcción de dataframe intercalado y mutación de sesión
- UDFs que hacen referencia al estado mutable Python
Construcción de dataframe intercalado y mutación de sesión
Cuando una canalización construye un dataframe, muta el estado de sesión de Spark (por ejemplo, cambia el catálogo o el esquema predeterminados, establece una configuración, reemplaza una vista temporal o vuelve a registrar una UDF) y, a continuación, usa el dataframe:
- Sin una versión del entorno, dataFrame usa el estado de sesión anterior a la mutación .
- Con una versión del entorno, dataFrame usa el estado de sesión posterior a la mutación .
Por ejemplo:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Sin una versión del entorno, mytable contiene [(1, "Original Row")]. Con una versión de entorno, mytable contiene [(2, "Replaced Row")].
UDFs que hacen referencia al estado de Python mutable
Cuando una UDF hace referencia a una variable global de Python cuyo valor cambia después de definir la UDF:
- Sin una versión del entorno, la UDF usa el valor más reciente de la variable.
- Con una versión del entorno, la UDF usa el valor en el momento en que se definió la UDF.
Por ejemplo:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Sin una versión del entorno, my_mv contiene [("alex_b",)]. Con una versión de entorno, my_mv contiene [("alex_a",)].
Si una canalización se basa en cualquier patrón, auditela antes de habilitar una versión del entorno.
Examen de compatibilidad
El examen de compatibilidad le ayuda a encontrar patrones de código en la canalización que generarían resultados diferentes en una versión de entorno, antes de habilitar uno. El examen es opcional. Cuando el examen está habilitado en una canalización:
- Cada ejecución de canalización emite un
BehaviorChangeInSparkConnectWARNevento en el registro de eventos de canalización por patrón detectado. - No se puede habilitar una versión del entorno en la canalización hasta que se aborden todas las advertencias de compatibilidad de la actualización correcta anterior.
Si el examen no está habilitado, no se emite ningún evento y environment_version no se bloquea la habilitación. Databricks recomienda habilitar el examen y resolver los patrones detectados antes de habilitar una versión del entorno en la canalización.
Habilitación del examen en una canalización
Puede habilitar el examen de compatibilidad agregando la configuración de canalización pipelines.environmentVersion.enableCompatibilityScan . Puede agregar la configuración a través de la interfaz de usuario del editor de canalizaciones o agregando una entrada al JSON de configuración de canalización.
A través de la interfaz de usuario:
- En el editor de canalización, haga clic en Configuración.
- Busque la sección Configuración en la configuración de canalización.
- Haga clic en
Agregar configuración.
- Escriba
pipelines.environmentVersion.enableCompatibilityScancomo clave ytruecomo valor. - Guarde la configuración de la canalización.
En el JSON de la canalización:
Agregue la siguiente entrada al configuration bloque :
"configuration": {
"pipelines.environmentVersion.enableCompatibilityScan": "true"
}
Flujo de trabajo recomendado
- Habilite el examen en la canalización.
- Desencadenamiento de una ejecución de la canalización
-
Consulte el registro de eventos de canalización para ver
BehaviorChangeInSparkConnectWARNlos eventos. Consulte Referencia de eventos de compatibilidad para obtener la lista completa de códigos de problema, patrones de ejemplo y correcciones sugeridas. - Actualice el código de canalización para quitar los patrones detectados y vuelva a ejecutar la canalización hasta que no se emitan más eventos.
- Agregue
environment_versiona la canalización mediante uno de los métodos de Habilitación de una versión de entorno en una canalización.
Si cree que una advertencia de compatibilidad es un falso positivo y desea habilitar environment_version de todos modos, quite la pipelines.environmentVersion.enableCompatibilityScan entrada de la configuración de canalización para omitir la comprobación. (No se permite establecer el valor en false ; debe quitar la entrada por completo).
La comprobación preparatoria no se ejecuta en canalizaciones que no tienen ninguna actualización anterior o en canalizaciones que ya tienen establecida una versión del entorno.
Migración de una canalización existente a versiones de entorno
Para migrar una canalización existente que aún no usa una versión del entorno, siga este flujo de trabajo de un extremo a otro. Le guía a través de la búsqueda de patrones de código que pueden comportarse de forma diferente en Spark Connect, corregirlos e implementar la versión del entorno de forma segura.
Habilite el examen de compatibilidad en la canalización. Habilite el examen en la canalización como se describe en Examen de compatibilidad. Esto es lo que hace que los patrones detectados se muestren en el registro de eventos y lo que habilita la comprobación preparatoria que protege el intento de habilitación.
Desencadene una ejecución de canalización y revise los eventos de compatibilidad. Desencadene una actualización de canalización normal. Una vez completado correctamente, consulte el registro de eventos de canalización para
BehaviorChangeInSparkConnectWARNver los eventos. Cada evento notifica un patrón detectado. Consulte Referencia de eventos de compatibilidad para obtener la lista completa de códigos de problema, patrones de ejemplo y correcciones sugeridas.Actualice el código de canalización para abordar los patrones detectados. Para cada patrón detectado, actualice el código de canalización siguiendo la corrección sugerida. Después de cada cambio, desencadene otra actualización de canalización y compruebe que los eventos correspondientes ya no aparecen. Repita esta operación hasta que el registro de eventos ya no muestre ningún evento de compatibilidad para una actualización correcta.
Habilite la versión del entorno en la canalización. Después de que la actualización correcta más reciente no tenga eventos de compatibilidad, agregue
environment_versiona la canalización mediante la interfaz de usuario, la API o la agrupación, tal como se describe en Habilitación de una versión de entorno en una canalización. La siguiente actualización se ejecuta con Spark Connect y las bibliotecas preinstaladas Python idioma y las bibliotecas preinstaladas.Si se produce un error en la actualización porque todavía existen advertencias de compatibilidad, quite ,
environment_versionvuelva al paso 2 y resuelva las advertencias restantes antes de volver a intentarlo.Compruebe la migración. Una vez completada la primera actualización con la versión del entorno, compruebe lo siguiente:
- El
create_updateevento del registro de eventos muestraenvironment_versionestablecido en el valor esperado. - La canalización genera los datos esperados y no aparecen nuevos eventos de error.
- Las tablas de nivel inferior de comprobación de acceso puntual para ver las diferencias sutiles de comportamiento descritas en Cambios de comportamiento.
- El
Reversión
Si la canalización se comporta mal después de la migración, quite de environment_version la configuración de la canalización. La siguiente actualización se ejecuta con la configuración anterior del entorno de ejecución de Python. Use la ejecución revierida para depurar y, a continuación, repita la migración del paso 2 después de identificar y corregir el problema.
Referencia de eventos de compatibilidad
Cuando el examen de compatibilidad está habilitado en una canalización, SDP emite un BehaviorChangeInSparkConnectWARN evento en el registro de eventos de canalización por patrón detectado. Cuando el examen está habilitado y la actualización correcta anterior detectó cualquier patrón, SDP también bloquea environment_version la habilitación hasta que se abordan los patrones.
Cada evento informa de un único código de problema que identifica lo que se detectó. Para buscar un código, fíjelo en la tabla Códigos de problema : cada fila se vincula a la sección de categorías que contiene un patrón de ejemplo y la corrección sugerida.
Forma de evento
BehaviorChangeInSparkConnect eventos siguen el esquema de registro de eventos de canalización estándar:
-
event_typeesbehavior_change_in_spark_connect. -
levelesWARN. -
detailscontiene elbehavior_change_in_spark_connectobjeto , que tiene un únicoissuecampo. El valor del problema es uno de los códigos que se enumeran a continuación. -
messagees una descripción legible del patrón detectado.
Códigos de problema
| Categoría | Código de problema | Description |
|---|---|---|
| Mutaciones de base de datos y catálogo | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
El catálogo predeterminado se cambió después de crear un dataframe. El dataframe existente puede resolver tablas mediante el nuevo catálogo predeterminado. |
| Mutaciones de base de datos y catálogo | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE CATALOG se llamó fuera de una función decorada por un decorador de canalizaciones. El catálogo predeterminado puede cambiar inesperadamente para las operaciones posteriores. |
| Mutaciones de base de datos y catálogo | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
La base de datos predeterminada se cambió después de crear un dataframe. El dataframe existente puede resolver tablas mediante la nueva base de datos predeterminada. |
| Mutaciones de base de datos y catálogo | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE DATABASE se llamó fuera de una función decorada por un decorador de canalizaciones. La base de datos predeterminada puede cambiar inesperadamente para las operaciones posteriores. |
| Ejecución diligente dentro de las funciones de flujo | CHECKPOINT_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La función de flujo llama a un comando de punto de control. |
| Ejecución diligente dentro de las funciones de flujo | CREATE_DATAFRAME_VIEW_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La función de flujo crea diligentemente una vista DataFrame (createOrReplaceTempView o similar). |
| Ejecución diligente dentro de las funciones de flujo | CREATE_RESOURCE_PROFILE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La función flow crea un perfil de recursos. |
| Ejecución diligente dentro de las funciones de flujo | GET_RESOURCES_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La función flow llama a spark.resources o a una API de recursos relacionada. |
| Ejecución diligente dentro de las funciones de flujo | MERGE_INTO_TABLE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La función de flujo realiza una operación diligente MERGE INTO en una tabla de destino. |
| Ejecución diligente dentro de las funciones de flujo | ML_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La función de flujo realiza una operación de Spark ML diligente. |
| Ejecución diligente dentro de las funciones de flujo | REGISTER_DATA_SOURCE_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La función flow registra un origen de datos Python. |
| Ejecución diligente dentro de las funciones de flujo | STREAMING_QUERY_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La función flow funciona en un identificador de consulta de streaming activo. |
| Ejecución diligente dentro de las funciones de flujo | STREAMING_QUERY_LISTENER_BUS_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La función de flujo registra o quita un agente de escucha de consulta de streaming. |
| Ejecución diligente dentro de las funciones de flujo | STREAMING_QUERY_MANAGER_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La función de flujo llama spark.streams a para administrar consultas de streaming. |
| Ejecución diligente dentro de las funciones de flujo | WRITE_OPERATION_V2_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La función flow realiza una operación diligente DataFrameWriterV2 . |
| Ejecución diligente dentro de las funciones de flujo | WRITE_OPERATION_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La función flow realiza una operación diligente DataFrame.write . |
| Ejecución diligente dentro de las funciones de flujo | WRITE_STREAM_OPERATION_START_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La función flow inicia una consulta de streaming (writeStream.start()). |
| Mutaciones de configuración de Spark | CHANGE_CONF_INSIDE_QUERY_FUNCTION_NOT_SUPPORTED |
spark.conf.set() o spark.conf.unset() se llamó dentro de una función decorada por un decorador de canalizaciones. Esto no se admite con una versión de entorno. |
| Mutaciones de configuración de Spark | SET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.set() se llamó fuera de una función decorada por un decorador de canalizaciones después de crear un DataFrame. El cambio de configuración puede afectar al dataframe existente en tiempo de ejecución. |
| Mutaciones de configuración de Spark | UNSET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.unset() se llamó fuera de una función decorada por un decorador de canalizaciones después de crear un DataFrame. El cambio de configuración puede afectar al dataframe existente en tiempo de ejecución. |
| Reemplazos de vistas temporales | REPLACE_GLOBAL_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Una vista temporal global se reemplazó después de crear una trama de datos que hace referencia a ella. El reemplazo puede reflejarse en el dataframe existente. |
| Reemplazos de vistas temporales | REPLACE_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Una vista temporal se reemplazó después de crear una trama de datos que hace referencia a ella. El reemplazo puede reflejarse en el dataframe existente. |
| Mutaciones UDF y UDTF | OVERWRITE_SESSION_UDF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Una UDF se volvió a registrar con el mismo nombre después de que se creara una trama de datos que hace referencia a ella. El dataframe existente puede usar la nueva definición de UDF. |
| Mutaciones UDF y UDTF | OVERWRITE_SESSION_UDTF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Una UDTF se volvió a registrar con el mismo nombre después de crear una trama de datos que hace referencia a ella. El dataframe existente puede usar la nueva definición UDTF. |
| Mutaciones UDF y UDTF | UDF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
Una UDF hace referencia a una variable de Python mutable global. Con una versión de entorno, la UDF usa el valor de la variable en el momento en que se definió la UDF, no en el momento de la invocación. |
| Mutaciones UDF y UDTF | UDTF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
Un UDTF hace referencia a una variable de Python mutable global. Con una versión de entorno, el UDTF usa el valor de la variable en el momento en que se definió la UDTF, no en el momento de la invocación. |
Mutaciones de base de datos y catálogo
Estos problemas se emiten cuando el código de canalización muta la base de datos o el catálogo predeterminados. Con una versión del entorno, los dataframes construidos antes de la mutación pueden resolver tablas mediante la nueva base de datos o catálogo.
Patrón de ejemplo que desencadena un evento:
from pyspark import pipelines as dp
spark.sql("USE CATALOG marketing")
df = spark.read.table("events")
spark.sql("USE CATALOG sales") # changes the default catalog after df was created
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Sin una versión del entorno, df se events resuelve desde el marketing catálogo. Con una versión de entorno, df se events resuelve desde el sales catálogo.
Corrección sugerida: Nombres de tabla completos para que la resolución no dependa del catálogo o la base de datos predeterminados y evite cambiar el catálogo o la base de datos predeterminados entre la creación y el uso de DataFrame.
from pyspark import pipelines as dp
df = spark.read.table("marketing.default.events")
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Mutaciones de configuración de Spark
Estos problemas se emiten cuando el código de canalización muta la configuración de Spark de maneras que pueden cambiar el comportamiento de DataFrame en una versión del entorno.
Patrón de ejemplo que desencadena un evento:
from pyspark import pipelines as dp
df = spark.read.table("events")
spark.conf.set("spark.sql.ansi.enabled", "true") # changes session conf after df was created
@dp.materialized_view
def events_strict():
return df.selectExpr("CAST(price AS INT) AS price")
Sin una versión del entorno, la conversión usa el valor conf en tiempo de creación de DataFrame. Con una versión del entorno, la conversión usa spark.sql.ansi.enabled=true y puede producir un error en la entrada no válida.
Corrección sugerida: Establezca todas las configuraciones de Spark necesarias en la parte superior del archivo de canalización, antes de crear cualquier dataframe. Para la configuración por consulta, use la configuración de configuration la canalización en la especificación de canalización.
Reemplazos de vistas temporales
Estos problemas se emiten cuando el código de canalización reemplaza una vista temporal después de que se creó una trama de datos que hace referencia a ella. Con una versión de entorno, el dataframe existente puede reflejar el nuevo contenido de la vista.
Patrón de ejemplo que desencadena un evento:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Sin una versión del entorno, mytable contiene [(1, "Original Row")]. Con una versión de entorno, mytable contiene [(2, "Replaced Row")].
Corrección sugerida: Cree cada vista temporal una sola vez y no la reemplace. Si necesita varias vistas con datos relacionados, asigne a cada una un nombre distinto.
Mutaciones UDF y UDTF
Estos problemas se emiten cuando el código de canalización muta una UDF o UDTF de maneras que cambian el comportamiento en una versión del entorno.
Patrón de ejemplo que desencadena un evento:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Sin una versión del entorno, my_mv contiene [("alex_b",)]. Con una versión de entorno, my_mv contiene [("alex_a",)].
Suggested fix: Pase los valores a la UDF como argumentos en lugar de capturarlos de Python globales, o establezca el global antes de definir la UDF y no la mute después.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf
@udf
def append_suffix(s, suffix):
return s + suffix
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))
Ejecución diligente dentro de las funciones de flujo
Estos problemas se emiten cuando el código de canalización realiza un comando de Spark diligente dentro de una función decorada por un decorador de canalizaciones (@table, @materialized_view, etc.). Se espera que las funciones de flujo definan y devuelvan un DataFrame; Los comandos diligentes que escriben datos, administran consultas de streaming, registran recursos o ejecutan operaciones de ML no se permiten dentro de una función de flujo con un conjunto de versiones del entorno.
Corrección sugerida: Mueva la operación diligente fuera de la función de flujo y devuelva un dataframe de la función de flujo en su lugar. Efectos secundarios, como escribir en una tabla o iniciar una consulta de streaming, pertenecen fuera de la definición de canalización; El motor de canalización controla la materialización del DataFrame devuelto por la función de flujo.
Búsqueda de eventos de compatibilidad en el registro de eventos
La consulta siguiente devuelve todos los eventos de compatibilidad de una canalización, ordenados en primer lugar:
SELECT
timestamp,
message,
details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
ORDER BY timestamp DESC;
Para contar eventos por código de emisión en las actualizaciones recientes:
SELECT
details:behavior_change_in_spark_connect:issue AS issue,
COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;
Para obtener información sobre cómo consultar el registro de eventos, consulte Consulta del registro de eventos.
Consulte también
- Configuración de versiones de entorno para canalizaciones : información general de características, cómo habilitar una versión de entorno.
- Esquema de registro de eventos de canalización : esquema completo del registro de eventos de canalización.
- Registro de eventos de canalización : cómo consultar el registro de eventos de canalización.