Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Importante
As versões do ambiente para os Lakeflow Spark Declarative Pipelines (SDP) estão em Beta.
Pipelines com uma versão ambiente executam Python código através de Spark Connect. Esta página aborda o que é incompatível, o que se comporta de forma diferente, como analisar um pipeline à procura de padrões afetados e como migrar um pipeline existente.
Limitações
As versões do ambiente ainda não são compatíveis com toda a funcionalidade do pipeline. Uma execução de pipeline com um conjunto de versões de ambiente falha se o código Python do pipeline fizer qualquer uma das seguintes coisas:
- Muta o estado de sessão do Spark dentro de uma função decorada com um decorador de pipelines. Exemplos incluem
spark.conf.set(...),spark.sql("USE CATALOG ..."), ecreateOrReplaceTempView. - Utiliza APIs PySpark que não estão disponíveis no Spark Connect, incluindo
SparkContext,RDD,SQLContext, e quaisquer APIs Py4J. Veja o que é suportado no Spark Connect.
Se ativar uma versão do ambiente num pipeline fizer com que este falhe, desativar a versão do ambiente devolve o pipeline ao seu estado anterior.
Alterações comportamentais
O Spark Connect apresenta um pequeno número de diferenças de comportamento em relação ao clássico runtime do PySpark. Veja Spark Connect vs. Spark clássico para a referência completa. A varredura de compatibilidade deteta estes padrões antecipadamente e bloqueia a ativação até que sejam resolvidos, para que possas encontrá-los e corrigi-los antes que afetem os dados de produção.
Num pipeline, as situações mais comuns em que o comportamento pode diferir são:
- Construção Interlaçada de DataFrame e mutação de sessão
- UDFs que fazem referência ao estado Python mutável
Construção Interlaçada de DataFrame e mutação de sessão
Quando um pipeline constrói um DataFrame e depois altera o estado da sessão do Spark (por exemplo, altera o catálogo ou esquema predefinido, define uma configuração, substitui uma vista temporária ou volta a registar um UDF), depois utiliza o DataFrame:
- Sem uma versão de ambiente, o DataFrame utiliza o estado de sessão pré-mutação .
- Com uma versão de ambiente, o DataFrame utiliza o estado de sessão pós-mutação .
Por exemplo:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Sem uma versão de ambiente, mytable contém [(1, "Original Row")]. Com uma versão de ambiente, mytable contém [(2, "Replaced Row")].
UDFs que fazem referência ao estado de Python mutável
Quando um UDF faz referência a uma variável global Python cujo valor muda após a definição do UDF:
- Sem uma versão de ambiente, a UDF utiliza o valor mais recente da variável.
- Com uma versão de ambiente, a UDF usa o valor no momento em que a UDF foi definida.
Por exemplo:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Sem uma versão de ambiente, my_mv contém [("alex_b",)]. Com uma versão de ambiente, my_mv contém [("alex_a",)].
Se um pipeline depender de qualquer um dos padrões, audite-o antes de ativar uma versão do ambiente.
Varredura de compatibilidade
A análise de compatibilidade ajuda-te a encontrar padrões de código no teu pipeline que produziriam resultados diferentes numa versão do ambiente, antes de ativares um. A digitalização é opcional. Quando a varredura é ativada num pipeline:
- Cada execução de pipeline emite um
BehaviorChangeInSparkConnectWARNevento no registo de eventos do pipeline por padrão detetado. - Não pode ativar uma versão do ambiente no pipeline até resolver todos os avisos de compatibilidade da atualização anterior bem-sucedida.
Se a varredura não estiver ativada, não são emitidos eventos e environment_version a ativação não é bloqueada. A Databricks recomenda ativar a varredura e resolver quaisquer padrões detetados antes de ativar uma versão do ambiente no pipeline.
Ativar a varredura num pipeline
Podes ativar a análise de compatibilidade adicionando a configuração do pipelines.environmentVersion.enableCompatibilityScan pipeline. Pode adicionar configuração através da interface do editor do pipeline ou adicionando uma entrada ao JSON da configuração do pipeline.
Através da interface:
- No editor de pipeline, clique em Configurações.
- Encontre a secção Configuração nas definições do pipeline.
-
Adicionar configuração.
- Enter
pipelines.environmentVersion.enableCompatibilityScancomo chave etruecomo valor. - Guarda as definições do pipeline.
JSON em preparação:
Adicione a seguinte entrada ao configuration bloco:
"configuration": {
"pipelines.environmentVersion.enableCompatibilityScan": "true"
}
Fluxo de trabalho recomendado
- Ativa a varredura no pipeline.
- Acionar uma execução de pipeline.
-
Consulta o registo de eventos do pipeline para
BehaviorChangeInSparkConnectWARNeventos. Consulte a referência de eventos de compatibilidade para a lista completa de códigos de problemas, padrões de exemplo e correções sugeridas. - Atualize o código do pipeline para remover os padrões detetados e execute novamente o pipeline até que não sejam emitidos mais eventos.
- Adicione
environment_versionao pipeline usando um dos métodos em Habilitar uma versão de ambiente num pipeline.
Se acredita que um aviso de compatibilidade é um falso positivo e quiser ativar environment_version na mesma, remova a pipelines.environmentVersion.enableCompatibilityScan entrada da configuração do pipeline para contornar a verificação. (Não é permitido definir o valor para false — deve remover a entrada por completo.)
A verificação pré-voo não é executada em pipelines que não tenham atualização prévia, nem em pipelines que já tenham uma versão de ambiente definida.
Migrar um pipeline existente para versões do ambiente
Para migrar um pipeline existente que ainda não utiliza uma versão do ambiente, siga este fluxo de trabalho de ponta a ponta. Ele guia-te como encontrar padrões de código que possam comportar-se de forma diferente no Spark Connect, corrigi-los e implementar a versão do ambiente em segurança.
Ativa a análise de compatibilidade no pipeline. Ative a varredura no pipeline conforme descrito na análise de compatibilidade. É isto que faz com que padrões detetados apareçam no registo de eventos e que permite a verificação pré-voo que protege a tua tentativa de ativação.
Desencadeie uma execução de pipeline e reveja eventos de compatibilidade. Desencadeia uma atualização normal do pipeline. Depois de concluído com sucesso, consulte o registo de eventos do pipeline para eventos
BehaviorChangeInSparkConnectWARN. Cada evento reporta um padrão detetado. Consulte a referência de eventos de compatibilidade para a lista completa de códigos de problemas, padrões de exemplo e correções sugeridas.Atualize o código do seu pipeline para abordar padrões detetados. Para cada padrão detetado, atualize o código do seu pipeline seguindo a correção sugerida. Após cada alteração, acione outra atualização do pipeline e verifique se os eventos correspondentes já não aparecem. Repita até que o registo de eventos deixe de aparecer quaisquer eventos de compatibilidade para uma atualização bem-sucedida.
Ativa a versão do ambiente no pipeline. Após a atualização bem-sucedida mais recente não apresentar eventos de compatibilidade, adicione
environment_versionao pipeline usando a interface, API ou bundle conforme descrito em Habilitar uma versão de ambiente num pipeline. A atualização seguinte corre com o Spark Connect e a versão fixada da linguagem Python e as bibliotecas pré-instaladas.Se a atualização falhar porque ainda existem avisos de compatibilidade, retira o
environment_version, volta ao passo 2 e resolve os avisos restantes antes de tentar novamente.Verifica a migração. Após a conclusão da primeira atualização com a versão ambiental, verifique:
- O
create_updateevento no registo de eventos mostraenvironment_versiondefinido como o valor esperado. - O pipeline produz os dados esperados e não aparecem novos eventos de erro.
- Verificar tabelas a jusante para quaisquer diferenças subtis de comportamento descritas em Mudanças de Comportamento.
- O
Rollback
Se o pipeline se comportar mal após a migração, remova-o environment_version das definições do pipeline. A atualização seguinte corre com a configuração de runtime anterior em Python. Use a execução reversa para depurar, depois repita a migração a partir do passo 2 depois de identificar e resolver o problema.
Referência de eventos de compatibilidade
Quando a varredura de compatibilidade está ativada num pipeline, o SDP emite um BehaviorChangeInSparkConnectWARN evento no registo de eventos do pipeline por padrão detetado. Quando a varredura está ativada e a atualização anterior bem-sucedida detetou quaisquer padrões, o SDP também bloqueia environment_version a ativação até que os padrões sejam corrigidos.
Cada evento reporta um único código de emissão que identifica o que foi detetado. Para consultar um código, encontre-o na tabela de códigos de problemas — cada linha liga à secção de categoria que contém um padrão de exemplo e a correção sugerida.
Forma do evento
BehaviorChangeInSparkConnect Os eventos seguem o esquema padrão do registo de eventos do pipeline:
-
event_typeébehavior_change_in_spark_connect. -
leveléWARN. -
detailscontém obehavior_change_in_spark_connectobjeto, que tem um únicoissuecampo. O valor de emissão é um dos códigos listados abaixo. -
messageé uma descrição legível pelo homem do padrão detetado.
Códigos de emissão
| Categoria | Código de emissão | Description |
|---|---|---|
| Mutações em bases de dados e catálogos | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
O catálogo padrão era alterado após a criação de um DataFrame. O DataFrame existente pode resolver tabelas usando o novo catálogo predefinido. |
| Mutações em bases de dados e catálogos | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE CATALOG foi chamado fora de uma festa decorada por um decorador de oleodutos. O catálogo padrão pode mudar inesperadamente em operações subsequentes. |
| Mutações em bases de dados e catálogos | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
A base de dados padrão foi alterada após a criação de um DataFrame. O DataFrame existente pode resolver tabelas usando a nova base de dados predefinida. |
| Mutações em bases de dados e catálogos | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE DATABASE foi chamado fora de uma festa decorada por um decorador de oleodutos. A base de dados padrão pode mudar inesperadamente em operações subsequentes. |
| Execução entusiasta dentro de funções de fluxo | CHECKPOINT_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo chama um comando de checkpoint. |
| Execução entusiasta dentro de funções de fluxo | CREATE_DATAFRAME_VIEW_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo cria com entusiasmo uma vista DataFrame (createOrReplaceTempView ou similar). |
| Execução entusiasta dentro de funções de fluxo | CREATE_RESOURCE_PROFILE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo cria um perfil de recursos. |
| Execução entusiasta dentro de funções de fluxo | GET_RESOURCES_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo chama spark.resources ou uma API de recurso relacionada. |
| Execução entusiasta dentro de funções de fluxo | MERGE_INTO_TABLE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo executa um eager MERGE INTO numa tabela alvo. |
| Execução entusiasta dentro de funções de fluxo | ML_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo executa uma operação Spark ML com entusiasmo. |
| Execução entusiasta dentro de funções de fluxo | REGISTER_DATA_SOURCE_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo regista uma fonte de dados em Python. |
| Execução entusiasta dentro de funções de fluxo | STREAMING_QUERY_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo opera com um handle ativo de consulta de streaming. |
| Execução entusiasta dentro de funções de fluxo | STREAMING_QUERY_LISTENER_BUS_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo regista ou remove um ouvinte de consultas em streaming. |
| Execução entusiasta dentro de funções de fluxo | STREAMING_QUERY_MANAGER_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo chama spark.streams para gerir consultas de streaming. |
| Execução entusiasta dentro de funções de fluxo | WRITE_OPERATION_V2_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo executa uma operação rápida DataFrameWriterV2 . |
| Execução entusiasta dentro de funções de fluxo | WRITE_OPERATION_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo executa uma operação rápida DataFrame.write . |
| Execução entusiasta dentro de funções de fluxo | WRITE_STREAM_OPERATION_START_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo inicia uma consulta de streaming (writeStream.start()). |
| Mutações na configuração da centelha | CHANGE_CONF_INSIDE_QUERY_FUNCTION_NOT_SUPPORTED |
spark.conf.set() ou spark.conf.unset() era chamado dentro de uma festa decorada por um decorador de pipelines. Isto não é suportado com uma versão ambiental. |
| Mutações na configuração da centelha | SET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.set() foi chamada fora de uma função decorada por um decorador de pipelines após a criação de um DataFrame. A alteração de configuração pode afetar o DataFrame existente em tempo de execução. |
| Mutações na configuração da centelha | UNSET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.unset() foi chamada fora de uma função decorada por um decorador de pipelines após a criação de um DataFrame. A alteração de configuração pode afetar o DataFrame existente em tempo de execução. |
| Substituições temporárias de visualização | REPLACE_GLOBAL_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Uma vista global temporária era substituída após a criação de um DataFrame que a referenciava. A substituição pode ser refletida no DataFrame existente. |
| Substituições temporárias de visualização | REPLACE_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Uma vista temporária foi substituída após a criação de um DataFrame que a referenciava. A substituição pode ser refletida no DataFrame existente. |
| Mutações UDF e UDTF | OVERWRITE_SESSION_UDF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Um UDF era re-registado com o mesmo nome após a criação de um DataFrame que o referenciava. O DataFrame existente pode usar a nova definição UDF. |
| Mutações UDF e UDTF | OVERWRITE_SESSION_UDTF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Um UDTF foi re-registado com o mesmo nome após a criação de um DataFrame que o referenciava. O DataFrame existente pode usar a nova definição UDTF. |
| Mutações UDF e UDTF | UDF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
Um UDF faz referência a uma variável global mutável em Python. Numa versão de ambiente, a UDF utiliza o valor da variável no momento em que a UDF foi definida, não na altura da invocação. |
| Mutações UDF e UDTF | UDTF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
Um UDTF faz referência a uma variável global mutável em Python. Numa versão de ambiente, a UDTF utiliza o valor da variável no momento em que a UDTF foi definida, não na altura da invocação. |
Mutações em bases de dados e catálogos
Estes problemas surgem quando o código do pipeline altera a base de dados ou catálogo predefinido. Com uma versão de ambiente, DataFrames construídos antes da mutação podem resolver tabelas usando a nova base de dados ou catálogo.
Exemplo de padrão que desencadeia um evento:
from pyspark import pipelines as dp
spark.sql("USE CATALOG marketing")
df = spark.read.table("events")
spark.sql("USE CATALOG sales") # changes the default catalog after df was created
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Sem uma versão de ambiente, df resolve-se events a partir do marketing catálogo. Com uma versão de ambiente, df resolve events a partir do sales catálogo.
Solução sugerida: Qualifique totalmente os nomes das tabelas para que a resolução não dependa do catálogo ou base de dados predefinida, e evite alterar o catálogo ou base de dados padrão entre a criação e utilização do DataFrame.
from pyspark import pipelines as dp
df = spark.read.table("marketing.default.events")
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Mutações na configuração da centelha
Estes problemas surgem quando o código do pipeline altera a configuração do Spark de formas que podem alterar o comportamento do DataFrame numa versão do ambiente.
Exemplo de padrão que desencadeia um evento:
from pyspark import pipelines as dp
df = spark.read.table("events")
spark.conf.set("spark.sql.ansi.enabled", "true") # changes session conf after df was created
@dp.materialized_view
def events_strict():
return df.selectExpr("CAST(price AS INT) AS price")
Sem uma versão de ambiente, o cast utiliza o valor de conf na altura da criação do DataFrame. Numa versão ambiental, o cast usa spark.sql.ansi.enabled=true e pode falhar em caso de entrada inválida.
Solução sugerida: Defina todas as configurações necessárias do Spark no topo do ficheiro do pipeline, antes de qualquer DataFrame ser criado. Para configuração por consulta, use a definição do configuration pipeline na especificação do pipeline.
Substituições temporárias de visualização
Estes problemas surgem quando o código do pipeline substitui uma vista temporária após a criação de um DataFrame que a referencia. Com uma versão de ambiente, o DataFrame existente pode refletir o conteúdo da nova vista.
Exemplo de padrão que desencadeia um evento:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Sem uma versão de ambiente, mytable contém [(1, "Original Row")]. Com uma versão de ambiente, mytable contém [(2, "Replaced Row")].
Solução sugerida: Crie cada vista temporária uma única vez e não a substitua. Se precisar de múltiplas vistas com dados relacionados, dê a cada uma um nome distinto.
Mutações UDF e UDTF
Estes problemas surgem quando o código do pipeline altera um UDF ou UDTF de formas que alteram o comportamento numa versão do ambiente.
Exemplo de padrão que desencadeia um evento:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Sem uma versão de ambiente, my_mv contém [("alex_b",)]. Com uma versão de ambiente, my_mv contém [("alex_a",)].
Sugestão de correção: Passar valores para a UDF como argumentos em vez de os capturar de Python globais, ou definir o global antes de definir a UDF e não a mutar depois.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf
@udf
def append_suffix(s, suffix):
return s + suffix
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))
Execução entusiasta dentro de funções de fluxo
Estes problemas surgem quando o código do pipeline executa um comando Spark entusiasta dentro de uma função decorada por um decorador de pipelines (@table, @materialized_view, etc.). Espera-se que as funções de fluxo definam e retornem um DataFrame; comandos eager que escrevem dados, gerem consultas em fluxo, registam recursos ou executam operações de ML não são permitidos dentro de uma função de fluxo com uma versão do ambiente definida.
Solução sugerida: Mova a operação eager para fora da função de fluxo e devolve um DataFrame da função de fluxo em vez disso. Efeitos secundários, como escrever numa tabela ou iniciar uma consulta de streaming, pertencem fora da definição do pipeline; o motor pipeline trata da materialização do DataFrame devolvido pela função de fluxo.
Encontre eventos de compatibilidade no registo de eventos
A consulta seguinte devolve todos os eventos de compatibilidade para um pipeline, ordenados primeiro:
SELECT
timestamp,
message,
details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
ORDER BY timestamp DESC;
Para contar eventos por código de emissão nas atualizações recentes:
SELECT
details:behavior_change_in_spark_connect:issue AS issue,
COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;
Para saber como consultar o registo de eventos, consulte Consultar o registo de eventos.
Recursos adicionais
- Configurar versões do ambiente para pipelines — visão geral das funcionalidades, como ativar uma versão do ambiente.
- Esquema do registo de eventos do pipeline — esquema completo do registo de eventos do pipeline.
- Registo de eventos do pipeline — como consultar o registo de eventos do pipeline.