Compatibilidade de versão do ambiente

Importante

As versões de ambiente para SDP estão em Beta.

Pipelines com uma versão environment definir Python código de execução por meio do Spark Connect. Esta página aborda o que é incompatível, o que se comporta de forma diferente, como verificar um pipeline em busca de padrões afetados e como migrar um pipeline existente.

Limitações

As versões de ambiente ainda não são compatíveis com todas as funcionalidades de pipeline. Uma execução de pipeline com um conjunto de versões de ambiente falhará se o código de Python do pipeline fizer o seguinte:

  • Modifica o estado da sessão spark dentro de uma função decorada com um decorador de pipelines. Exemplos incluem spark.conf.set(...), spark.sql("USE CATALOG ...")e createOrReplaceTempView.
  • Usa APIs do PySpark que não estão disponíveis no Spark Connect, incluindo SparkContext, RDDe SQLContextqualquer APIs Py4J. Veja o que tem suporte no Spark Connect.

Se a habilitação de uma versão de ambiente em um pipeline causar falha, desabilitar a versão do ambiente retornará o pipeline ao estado anterior.

Alterações de comportamento

O Spark Connect tem um pequeno número de diferenças de comportamento em relação ao runtime clássico do PySpark. Consulte Spark Connect vs. Spark clássico para obter a referência completa. A verificação de compatibilidade detecta esses padrões com antecedência e bloqueia a habilitação até que eles sejam resolvidos, para que você possa encontrá-los e corrigi-los antes que eles afetem os dados de produção.

Em um pipeline, as situações mais comuns em que o comportamento pode ser diferente são:

Construção de DataFrame intercalada e mutação de sessão

Quando um pipeline constrói um DataFrame, depois altera o estado da sessão do Spark (por exemplo, altera o catálogo ou esquema padrão, define uma configuração, substitui uma exibição temporária ou registra novamente um UDF) e usa o DataFrame:

  • Sem uma versão de ambiente, o DataFrame usa o estado da sessão de pré-mutação .
  • Com uma versão de ambiente, o DataFrame usa o estado da sessão pós-mutação .

Por exemplo:

from pyspark import pipelines as dp

spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

df = spark.sql("SELECT * FROM my_view")

spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

@dp.materialized_view
def mytable():
  return df

Sem uma versão de ambiente, mytable contém [(1, "Original Row")]. Com uma versão de ambiente, mytable contém [(2, "Replaced Row")].

UDFs que fazem referência ao estado de Python mutável

Quando uma UDF faz referência a uma variável global Python cujo valor é alterado após a definição da UDF:

  • Sem uma versão de ambiente, a UDF usa o valor mais recente da variável.
  • Com uma versão de ambiente, a UDF usa o valor no momento em que a UDF foi definida.

Por exemplo:

from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf

suffix = "a"

@udf
def my_udf(s):
  return s + suffix

suffix = "b"

@dp.materialized_view
def my_mv():
  return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))

Sem uma versão de ambiente, my_mv contém [("alex_b",)]. Com uma versão de ambiente, my_mv contém [("alex_a",)].

Se um pipeline depender de qualquer padrão, audite-o antes de habilitar uma versão do ambiente.

Verificação de compatibilidade

A verificação de compatibilidade ajuda você a encontrar padrões de código em seu pipeline que produziriam resultados diferentes em uma versão de ambiente, antes de habilitar um. A verificação é aceita. Quando a verificação está habilitada em um pipeline:

  • Cada execução de pipeline emite um BehaviorChangeInSparkConnectWARN evento no log de eventos do pipeline por padrão detectado.
  • Você não pode habilitar uma versão de ambiente no pipeline até resolver todos os avisos de compatibilidade da atualização bem-sucedida anterior.

Se a verificação não estiver habilitada, nenhum evento será emitido e environment_version a habilitação não será bloqueada. O Databricks recomenda habilitar a verificação e resolver os padrões detectados antes de habilitar uma versão de ambiente no pipeline.

Habilitar a verificação em um pipeline

Você pode habilitar a verificação de compatibilidade adicionando a configuração de pipelines.environmentVersion.enableCompatibilityScan pipeline Você pode adicionar configuração por meio da interface do usuário do editor de pipeline ou adicionando uma entrada ao JSON de configuração de pipeline.

Por meio da interface do usuário:

  1. No editor de pipeline, clique em Configurações.
  2. Localize a seção Configuração nas configurações do pipeline.
  3. Clique no ícone Plus.Adicionar configuração.
  4. Insira pipelines.environmentVersion.enableCompatibilityScan como a chave e true como o valor.
  5. Salve as configurações de pipeline.

No JSON do pipeline:

Adicione a seguinte entrada ao configuration bloco:

"configuration": {
  "pipelines.environmentVersion.enableCompatibilityScan": "true"
}
  1. Habilite a verificação no pipeline.
  2. Execute uma execução de pipeline.
  3. Consulte o log de eventos do pipeline para BehaviorChangeInSparkConnectWARN eventos. Consulte a referência de eventos de compatibilidade para a lista completa de códigos de problema, padrões de exemplo e correções sugeridas.
  4. Atualize o código do pipeline para remover os padrões detectados e execute o pipeline novamente até que nenhum outro evento seja emitido.
  5. Adicione environment_version ao pipeline usando um dos métodos em Habilitar uma versão de ambiente em um pipeline.

Se você acredita que um aviso de compatibilidade é um falso positivo e deseja habilitar environment_version de qualquer maneira, remova a pipelines.environmentVersion.enableCompatibilityScan entrada da configuração do pipeline para ignorar a verificação. (Definir o valor como false não é permitido – você deve remover a entrada inteiramente.)

A verificação de pré-vôo não é executada em pipelines que não têm nenhuma atualização anterior ou em pipelines que já têm uma versão de ambiente definida.

Migrar um pipeline existente para versões de ambiente

Para migrar um pipeline existente que ainda não usa uma versão de ambiente, siga este fluxo de trabalho de ponta a ponta. Ele orienta você a encontrar padrões de código que podem se comportar de forma diferente no Spark Connect, corrigi-los e distribuir a versão do ambiente com segurança.

  1. Habilite a verificação de compatibilidade no pipeline. Habilite a verificação no pipeline, conforme descrito na verificação de compatibilidade. Isso faz com que os padrões detectados sejam exibidos no log de eventos e o que permite a verificação de pré-vôo que protege sua tentativa de habilitação.

  2. Dispare uma execução de pipeline e examine os eventos de compatibilidade. Dispare uma atualização normal do pipeline. Depois que for concluído com êxito, consulte o log de eventos do pipeline em busca de BehaviorChangeInSparkConnectWARN eventos. Cada evento relata um padrão detectado. Consulte a referência de eventos de compatibilidade para a lista completa de códigos de problema, padrões de exemplo e correções sugeridas.

  3. Atualize o código do pipeline para atender aos padrões detectados. Para cada padrão detectado, atualize o código do pipeline após a correção sugerida. Após cada alteração, dispare outra atualização de pipeline e verifique se os eventos correspondentes não aparecem mais. Repita até que o log de eventos não exiba mais eventos de compatibilidade para uma atualização bem-sucedida.

  4. Habilite a versão do ambiente no pipeline. Depois que a atualização bem-sucedida mais recente não tiver eventos de compatibilidade, adicione environment_version ao pipeline usando a interface do usuário, a API ou o pacote, conforme descrito em Habilitar uma versão de ambiente em um pipeline. A próxima atualização é executada com o Spark Connect e a versão de linguagem Python fixada e bibliotecas pré-instaladas.

    Se a atualização falhar porque os avisos de compatibilidade ainda existem, solte a environment_versionetapa 2 e resolva os avisos restantes antes de tentar novamente.

  5. Verifique a migração. Após a conclusão da primeira atualização com a versão do ambiente, verifique:

    • O create_update evento no log de eventos mostra environment_version definido como o valor esperado.
    • O pipeline produz os dados esperados e nenhum novo evento de erro é exibido.
    • Verifique as tabelas downstream spot para quaisquer diferenças de comportamento sutis descritas nas alterações de comportamento.

Reversão

Se o pipeline se comportar mal após a migração, remova as environment_version configurações do pipeline. A próxima atualização é executada com a configuração de runtime Python anterior. Use a execução revertida para depurar e repita a migração da etapa 2 depois de identificar e corrigir o problema.

Referência de eventos de compatibilidade

Quando a verificação de compatibilidade é habilitada em um pipeline, o SDP emite um BehaviorChangeInSparkConnectWARN evento no log de eventos do pipeline por padrão detectado. Quando a verificação está habilitada e a atualização anterior bem-sucedida detectou quaisquer padrões, o SDP também bloqueia environment_version a habilitação até que os padrões sejam resolvidos.

Cada evento relata um único código de problema que identifica o que foi detectado. Para pesquisar um código, localize-o na tabela Códigos de problema – cada linha vincula à seção de categoria que contém um padrão de exemplo e a correção sugerida.

Forma do evento

BehaviorChangeInSparkConnect os eventos seguem o esquema de log de eventos de pipeline padrão:

  • event_type é behavior_change_in_spark_connect.
  • level é WARN.
  • details contém o behavior_change_in_spark_connect objeto, que tem um único issue campo. O valor do problema é um dos códigos listados abaixo.
  • message é uma descrição legível pelo ser humano do padrão detectado.

Emitir códigos

Categoria Código de problema Description
Mutações de banco de dados e catálogo USE_CATALOG_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR O catálogo padrão foi alterado depois que um DataFrame foi criado. O DataFrame existente pode resolver tabelas usando o novo catálogo padrão.
Mutações de banco de dados e catálogo USE_CATALOG_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR USE CATALOG foi chamado fora de uma função decorada por um decorador de pipelines. O catálogo padrão pode ser alterado inesperadamente para operações subsequentes.
Mutações de banco de dados e catálogo USE_DATABASE_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR O banco de dados padrão foi alterado depois que um DataFrame foi criado. O DataFrame existente pode resolver tabelas usando o novo banco de dados padrão.
Mutações de banco de dados e catálogo USE_DATABASE_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR USE DATABASE foi chamado fora de uma função decorada por um decorador de pipelines. O banco de dados padrão pode ser alterado inesperadamente para operações subsequentes.
Execução ansiosa dentro de funções de fluxo CHECKPOINT_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED A função de fluxo chama um comando de ponto de verificação.
Execução ansiosa dentro de funções de fluxo CREATE_DATAFRAME_VIEW_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED A função de fluxo cria ansiosamente um modo de exibição DataFrame (createOrReplaceTempView ou semelhante).
Execução ansiosa dentro de funções de fluxo CREATE_RESOURCE_PROFILE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED A função de fluxo cria um perfil de recurso.
Execução ansiosa dentro de funções de fluxo GET_RESOURCES_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED A função de fluxo chama spark.resources ou uma API de recurso relacionada.
Execução ansiosa dentro de funções de fluxo MERGE_INTO_TABLE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED A função de fluxo executa uma ávida MERGE INTO em uma tabela de destino.
Execução ansiosa dentro de funções de fluxo ML_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED A função de fluxo executa uma operação do Spark ML ansiosa.
Execução ansiosa dentro de funções de fluxo REGISTER_DATA_SOURCE_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED A função de fluxo registra uma fonte de dados Python.
Execução ansiosa dentro de funções de fluxo STREAMING_QUERY_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED A função de fluxo opera em um identificador de consulta de streaming ativo.
Execução ansiosa dentro de funções de fluxo STREAMING_QUERY_LISTENER_BUS_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED A função de fluxo registra ou remove um ouvinte de consulta de streaming.
Execução ansiosa dentro de funções de fluxo STREAMING_QUERY_MANAGER_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED A função de fluxo chama spark.streams para gerenciar consultas de streaming.
Execução ansiosa dentro de funções de fluxo WRITE_OPERATION_V2_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED A função de fluxo executa uma operação ansiosa DataFrameWriterV2 .
Execução ansiosa dentro de funções de fluxo WRITE_OPERATION_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED A função de fluxo executa uma operação ansiosa DataFrame.write .
Execução ansiosa dentro de funções de fluxo WRITE_STREAM_OPERATION_START_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED A função de fluxo inicia uma consulta de streaming (writeStream.start()).
Mutações de configuração do Spark CHANGE_CONF_INSIDE_QUERY_FUNCTION_NOT_SUPPORTED spark.conf.set() ou spark.conf.unset() foi chamado dentro de uma função decorada por um decorador de pipelines. Não há suporte para isso com uma versão de ambiente.
Mutações de configuração do Spark SET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR spark.conf.set() foi chamado fora de uma função decorada por um decorador de pipelines depois que um DataFrame foi criado. A alteração da configuração pode afetar o DataFrame existente no momento da execução.
Mutações de configuração do Spark UNSET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR spark.conf.unset() foi chamado fora de uma função decorada por um decorador de pipelines depois que um DataFrame foi criado. A alteração da configuração pode afetar o DataFrame existente no momento da execução.
Substituições de exibição temporária REPLACE_GLOBAL_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Uma exibição temporária global foi substituída depois que um DataFrame que fazia referência a ele foi criado. A substituição pode ser refletida no DataFrame existente.
Substituições de exibição temporária REPLACE_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Uma exibição temporária foi substituída depois que um DataFrame que fazia referência a ele foi criado. A substituição pode ser refletida no DataFrame existente.
Mutações UDF e UDTF OVERWRITE_SESSION_UDF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Uma UDF foi re-registrada com o mesmo nome depois que um DataFrame que fazia referência a ele foi criado. O DataFrame existente pode usar a nova definição de UDF.
Mutações UDF e UDTF OVERWRITE_SESSION_UDTF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Um UDTF foi registrado novamente com o mesmo nome depois que um DataFrame que fazia referência a ele foi criado. O DataFrame existente pode usar a nova definição UDTF.
Mutações UDF e UDTF UDF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR Uma UDF faz referência a uma variável de Python mutável global. Com uma versão de ambiente, a UDF usa o valor da variável no momento em que a UDF foi definida, não no momento da invocação.
Mutações UDF e UDTF UDTF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR Um UDTF faz referência a uma variável de Python mutável global. Com uma versão de ambiente, o UDTF usa o valor da variável no momento em que o UDTF foi definido, não no momento da invocação.

Mutações de banco de dados e catálogo

Esses problemas são emitidos quando o código do pipeline altera o banco de dados ou catálogo padrão. Com uma versão de ambiente, dataframes construídos antes da mutação podem resolver tabelas usando o novo banco de dados ou catálogo.

Padrão de exemplo que dispara um evento:

from pyspark import pipelines as dp

spark.sql("USE CATALOG marketing")
df = spark.read.table("events")

spark.sql("USE CATALOG sales")  # changes the default catalog after df was created

@dp.materialized_view
def events_summary():
  return df.groupBy("region").count()

Sem uma versão de ambiente, df resolve events do marketing catálogo. Com uma versão de ambiente, df é resolvida events a partir do sales catálogo.

Correção sugerida: Qualifique totalmente os nomes de tabela para que a resolução não dependa do catálogo ou banco de dados padrão e evite alterar o catálogo ou o banco de dados padrão entre a criação e o uso do DataFrame.

from pyspark import pipelines as dp

df = spark.read.table("marketing.default.events")

@dp.materialized_view
def events_summary():
  return df.groupBy("region").count()

Mutações de configuração do Spark

Esses problemas são emitidos quando o código de pipeline altera a configuração do Spark de maneiras que podem alterar o comportamento do DataFrame em uma versão de ambiente.

Padrão de exemplo que dispara um evento:

from pyspark import pipelines as dp

df = spark.read.table("events")

spark.conf.set("spark.sql.ansi.enabled", "true")  # changes session conf after df was created

@dp.materialized_view
def events_strict():
  return df.selectExpr("CAST(price AS INT) AS price")

Sem uma versão de ambiente, a conversão usa o valor de configuração na hora de criação do DataFrame. Com uma versão de ambiente, a conversão usa spark.sql.ansi.enabled=true e pode falhar na entrada inválida.

Correção sugerida: Defina todas as configurações necessárias do Spark na parte superior do arquivo de pipeline, antes que qualquer DataFrame seja criado. Para a configuração por consulta, use a configuração do configuration pipeline na especificação do pipeline.

Substituições de exibição temporária

Esses problemas são emitidos quando o código de pipeline substitui uma exibição temporária depois que um DataFrame que faz referência a ele foi criado. Com uma versão de ambiente, o DataFrame existente pode refletir o novo conteúdo de exibição.

Padrão de exemplo que dispara um evento:

from pyspark import pipelines as dp

spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

df = spark.sql("SELECT * FROM my_view")

spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

@dp.materialized_view
def mytable():
  return df

Sem uma versão de ambiente, mytable contém [(1, "Original Row")]. Com uma versão de ambiente, mytable contém [(2, "Replaced Row")].

Correção sugerida: Crie cada exibição temporária uma única vez e não a substitua. Se você precisar de várias exibições com dados relacionados, dê a cada um um nome distinto.

Mutações UDF e UDTF

Esses problemas são emitidos quando o código de pipeline altera uma UDF ou UDTF de maneiras que alteram o comportamento em uma versão de ambiente.

Padrão de exemplo que dispara um evento:

from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf

suffix = "a"

@udf
def my_udf(s):
  return s + suffix

suffix = "b"

@dp.materialized_view
def my_mv():
  return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))

Sem uma versão de ambiente, my_mv contém [("alex_b",)]. Com uma versão de ambiente, my_mv contém [("alex_a",)].

Suggested fix: Pass values into the UDF as arguments instead of capture them from Python globals, or set the global before de define the UDF and do not mutate it afterward.

from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf

@udf
def append_suffix(s, suffix):
  return s + suffix

@dp.materialized_view
def my_mv():
  return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))

Execução ansiosa dentro de funções de fluxo

Esses problemas são emitidos quando o código de pipeline executa um comando Spark ansioso dentro de uma função decorada por um decorador de pipelines (@table, @materialized_viewetc.). Espera-se que as funções de fluxo definam e retornem um DataFrame; Comandos ansiosos que gravam dados, gerenciam consultas de streaming, registram recursos ou executam operações de ML não são permitidos dentro de uma função de fluxo com um conjunto de versões de ambiente.

Correção sugerida: Mova a operação ansiosa para fora da função de fluxo e retorne um DataFrame da função de fluxo. Efeitos colaterais, como gravar em uma tabela ou iniciar uma consulta de streaming, pertencem fora da definição do pipeline; o mecanismo de pipeline manipula a materialização do DataFrame retornado pela função de fluxo.

Localizar eventos de compatibilidade no log de eventos

A consulta a seguir retorna todos os eventos de compatibilidade de um pipeline, ordenados primeiro:

SELECT
  timestamp,
  message,
  details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
  AND level = 'WARN'
ORDER BY timestamp DESC;

Para contar eventos por código de emissão em atualizações recentes:

SELECT
  details:behavior_change_in_spark_connect:issue AS issue,
  COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
  AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;

Para saber como consultar o log de eventos, consulte Consultar o log de eventos.

Consulte também