Miljöversionskompatibilitet

Important

Miljöversioner för SDP finns i Beta.

Pipelines med en environment version ställa in kör Python kod via Spark Connect. Den här sidan beskriver vad som är inkompatibelt, vad som fungerar annorlunda, hur du söker igenom en pipeline efter berörda mönster och hur du migrerar en befintlig pipeline.

Limitations

Miljöversioner är ännu inte kompatibla med alla pipelinefunktioner. En pipelinekörning med en miljöversionsuppsättning misslyckas om pipelinens Python kod gör något av följande:

  • Muterar Spark-sessionstillståndet i en funktion som är dekorerad med en pipelinesdekoratör. Exempel är spark.conf.set(...), spark.sql("USE CATALOG ...")och createOrReplaceTempView.
  • Använder PySpark-API:er som inte är tillgängliga i Spark Connect, inklusive SparkContext, RDD, SQLContextoch eventuella Py4J-API:er. Se Vad stöds i Spark Connect.

Om det inte går att aktivera en miljöversion på en pipeline returnerar inaktivering av miljöversionen pipelinen till dess tidigare tillstånd.

Beteendeförändringar

Spark Connect har ett litet antal beteendeskillnader jämfört med den klassiska PySpark-körningen. Se Spark Connect jämfört med klassiska Spark för den fullständiga referensen. Kompatibilitetsgenomsökningen identifierar dessa mönster i förväg och blockerar aktivering tills de åtgärdas, så att du kan hitta och åtgärda dem innan de påverkar produktionsdata.

I en pipeline är de vanligaste situationerna där beteendet kan skilja sig åt:

Interleaved DataFrame konstruktion och session mutation

När en pipeline skapar en DataFrame, muterar sedan Spark-sessionstillståndet (till exempel ändrar standardkatalogen eller schemat, anger en konfiguration, ersätter en temporär vy eller omregistrerar en UDF) använder sedan DataFrame:

  • Utan en miljöversion använder DataFrame sessionstillståndet före mutationen .
  • Med en miljöversion använder DataFrame sessionstillståndet efter mutationen .

Ett exempel:

from pyspark import pipelines as dp

spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

df = spark.sql("SELECT * FROM my_view")

spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

@dp.materialized_view
def mytable():
  return df

Utan en miljöversion mytable innehåller [(1, "Original Row")]. Med en miljöversion mytable innehåller [(2, "Replaced Row")].

UDF:er som refererar till föränderligt Python tillstånd

När en UDF refererar till en Python global variabel vars värde ändras efter att UDF har definierats:

  • Utan en miljöversion använder UDF det senaste värdet för variabeln.
  • Med en miljöversion använder UDF värdet vid den tidpunkt då UDF definierades.

Ett exempel:

from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf

suffix = "a"

@udf
def my_udf(s):
  return s + suffix

suffix = "b"

@dp.materialized_view
def my_mv():
  return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))

Utan en miljöversion my_mv innehåller [("alex_b",)]. Med en miljöversion my_mv innehåller [("alex_a",)].

Om en pipeline förlitar sig på något av mönstret granskar du det innan du aktiverar en miljöversion.

Kompatibilitetsgenomsökning

Kompatibilitetsgenomsökningen hjälper dig att hitta kodmönster i pipelinen som skulle ge olika resultat under en miljöversion innan du aktiverar en. Genomsökningen är anmäld. När genomsökningen är aktiverad på en pipeline:

  • Varje pipelinekörning genererar en BehaviorChangeInSparkConnectWARN händelse i pipelinehändelseloggen per identifierat mönster.
  • Du kan inte aktivera en miljöversion på pipelinen förrän du åtgärdar alla kompatibilitetsvarningar från den tidigare lyckade uppdateringen.

Om genomsökningen inte är aktiverad genereras inga händelser och environment_version aktiveringen blockeras inte. Databricks rekommenderar att du aktiverar genomsökningen och löser eventuella identifierade mönster innan du aktiverar en miljöversion på pipelinen.

Aktivera genomsökningen på en pipeline

Du kan aktivera kompatibilitetsgenomsökningen genom att lägga till pipelinekonfigurationen pipelines.environmentVersion.enableCompatibilityScan Du kan lägga till konfiguration via pipelineredigerarens användargränssnitt eller genom att lägga till en post i pipelinekonfigurationens JSON.

Via användargränssnittet:

  1. Klicka på Inställningar i pipelineredigeraren.
  2. Leta reda på avsnittet Konfiguration i pipelineinställningar.
  3. Klicka på Plus-ikonen.Lägg till konfiguration.
  4. Ange pipelines.environmentVersion.enableCompatibilityScan som nyckel och true som värde.
  5. Spara pipelineinställningarna.

I pipelinens JSON:

Lägg till följande post i configuration blocket:

"configuration": {
  "pipelines.environmentVersion.enableCompatibilityScan": "true"
}
  1. Aktivera genomsökningen på pipelinen.
  2. Utlös en pipelinekörning.
  3. Fråga händelseloggen för pipelinen efter BehaviorChangeInSparkConnectWARN händelser. Se Referens för kompatibilitetshändelser för den fullständiga listan över problemkoder, exempelmönster och föreslagna korrigeringar.
  4. Uppdatera pipelinekoden för att ta bort de identifierade mönstren och kör pipelinen igen tills inga fler händelser genereras.
  5. Lägg till environment_version i pipelinen med någon av metoderna i Aktivera en miljöversion på en pipeline.

Om du tror att en kompatibilitetsvarning är en falsk positiv identifiering och vill aktivera environment_version ändå tar du bort pipelines.environmentVersion.enableCompatibilityScan posten från pipelinekonfigurationen för att kringgå kontrollen. (Det går inte att ange värdet till false – du måste ta bort posten helt.)

Preflight-kontrollen körs inte på pipelines som inte har någon tidigare uppdatering eller på pipelines som redan har en miljöversionsuppsättning.

Migrera en befintlig pipeline till miljöversioner

Om du vill migrera en befintlig pipeline som ännu inte använder en miljöversion följer du det här arbetsflödet från slutpunkt till slutpunkt. Den vägleder dig genom att hitta kodmönster som kan bete sig annorlunda under Spark Connect, åtgärda dem och distribuera miljöversionen på ett säkert sätt.

  1. Aktivera kompatibilitetsgenomsökningen på pipelinen. Aktivera genomsökningen på pipelinen enligt beskrivningen i Kompatibilitetsgenomsökning. Det här är vad som gör att identifierade mönster visas i händelseloggen och vad som gör det möjligt att kontrollera preflight som skyddar ditt aktiveringsförsök.

  2. Utlös en pipelinekörning och granska kompatibilitetshändelser. Utlös en normal pipelineuppdatering. När den har slutförts frågar du händelseloggen för pipelinen efter BehaviorChangeInSparkConnectWARN händelser. Varje händelse rapporterar ett identifierat mönster. Se Referens för kompatibilitetshändelser för den fullständiga listan över problemkoder, exempelmönster och föreslagna korrigeringar.

  3. Uppdatera pipelinekoden för att åtgärda identifierade mönster. Uppdatera pipelinekoden efter den föreslagna korrigeringen för varje identifierat mönster. Efter varje ändring utlöser du en annan pipelineuppdatering och kontrollerar att motsvarande händelser inte längre visas. Upprepa tills händelseloggen inte längre innehåller några kompatibilitetshändelser för en lyckad uppdatering.

  4. Aktivera miljöversionen på pipelinen. När den senaste lyckade uppdateringen inte har några kompatibilitetshändelser lägger du till environment_version i pipelinen med hjälp av användargränssnittet, API:et eller paketet enligt beskrivningen i Aktivera en miljöversion på en pipeline. Nästa uppdatering körs med Spark Connect och den fästa Python språkversion och förinstallerade bibliotek.

    Om uppdateringen misslyckas eftersom det fortfarande finns kompatibilitetsvarningar släpper du environment_version, återgår till steg 2 och löser de återstående varningarna innan du försöker igen.

  5. Verifiera migreringen. När den första uppdateringen med miljöversionen har slutförts kontrollerar du:

    • Händelsen create_update i händelseloggen visar environment_version inställt på det förväntade värdet.
    • Pipelinen genererar förväntade data och inga nya felhändelser visas.
    • Kontrollera underordnade tabeller med oanvänd kapacitet för eventuella diskreta beteendeskillnader som beskrivs i Beteendeändringar.

Rollback

Om pipelinen fungerar felaktigt efter migreringen tar du bort environment_version från pipelineinställningarna. Nästa uppdatering körs med föregående Python körningskonfiguration. Använd den återställda körningen för att felsöka och upprepa sedan migreringen från steg 2 när du har identifierat och åtgärdat problemet.

Referens för kompatibilitetshändelser

När kompatibilitetsgenomsökningen är aktiverad på en pipeline genererar SDP en BehaviorChangeInSparkConnectWARN händelse i pipelinehändelseloggen enligt det identifierade mönstret. När genomsökningen är aktiverad och den tidigare lyckade uppdateringen identifierade eventuella mönster blockerar environment_version SDP även aktivering tills mönstren åtgärdas.

Varje händelse rapporterar en enda problemkod som identifierar vad som har identifierats. Om du vill leta upp en kod hittar du den i tabellen Problemkoder – varje rad länkar till kategoriavsnittet som innehåller ett exempelmönster och den föreslagna korrigeringen.

Händelseform

BehaviorChangeInSparkConnect händelser följer standardschemat för händelseloggen för pipeline:

  • event_type är behavior_change_in_spark_connect.
  • level är WARN.
  • details innehåller objektet behavior_change_in_spark_connect , som har ett enda issue fält. Problemvärdet är en av de koder som anges nedan.
  • message är en läsbar beskrivning av det identifierade mönstret.

Problemkoder

Kategori Problemkod Description
Databas- och katalogmutationer USE_CATALOG_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Standardkatalogen ändrades efter att en DataFrame skapades. Den befintliga DataFrame kan matcha tabeller med hjälp av den nya standardkatalogen.
Databas- och katalogmutationer USE_CATALOG_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR USE CATALOG kallades utanför en funktion dekorerad av en pipelines dekoratör. Standardkatalogen kan ändras oväntat för efterföljande åtgärder.
Databas- och katalogmutationer USE_DATABASE_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Standarddatabasen ändrades efter att en DataFrame skapades. Den befintliga DataFrame kan matcha tabeller med hjälp av den nya standarddatabasen.
Databas- och katalogmutationer USE_DATABASE_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR USE DATABASE kallades utanför en funktion dekorerad av en pipelines dekoratör. Standarddatabasen kan ändras oväntat för efterföljande åtgärder.
Ivrig körning i flödesfunktioner CHECKPOINT_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Flödesfunktionen anropar ett kontrollpunktskommando.
Ivrig körning i flödesfunktioner CREATE_DATAFRAME_VIEW_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Flödesfunktionen skapar ivrigt en DataFrame-vy (createOrReplaceTempView eller liknande).
Ivrig körning i flödesfunktioner CREATE_RESOURCE_PROFILE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Flödesfunktionen skapar en resursprofil.
Ivrig körning i flödesfunktioner GET_RESOURCES_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Flödesfunktionen anropar spark.resources eller ett relaterat resurs-API.
Ivrig körning i flödesfunktioner MERGE_INTO_TABLE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Flödesfunktionen utför en ivrig MERGE INTO på en måltabell.
Ivrig körning i flödesfunktioner ML_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Flödesfunktionen utför en ivrig Spark ML-åtgärd.
Ivrig körning i flödesfunktioner REGISTER_DATA_SOURCE_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Flödesfunktionen registrerar en Python datakälla.
Ivrig körning i flödesfunktioner STREAMING_QUERY_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Flödesfunktionen fungerar på ett aktivt strömningsfrågehandtag.
Ivrig körning i flödesfunktioner STREAMING_QUERY_LISTENER_BUS_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Flödesfunktionen registrerar eller tar bort en strömningsfrågalyssnare.
Ivrig körning i flödesfunktioner STREAMING_QUERY_MANAGER_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Flödesfunktionen anropar spark.streams för att hantera strömmande frågor.
Ivrig körning i flödesfunktioner WRITE_OPERATION_V2_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Flödesfunktionen utför en ivrig DataFrameWriterV2 åtgärd.
Ivrig körning i flödesfunktioner WRITE_OPERATION_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Flödesfunktionen utför en ivrig DataFrame.write åtgärd.
Ivrig körning i flödesfunktioner WRITE_STREAM_OPERATION_START_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Flödesfunktionen startar en direktuppspelningsfråga (writeStream.start()).
Spark-konfigurationsmutationer CHANGE_CONF_INSIDE_QUERY_FUNCTION_NOT_SUPPORTED spark.conf.set() eller spark.conf.unset() kallades inuti en funktion dekorerad av en pipelines dekoratör. Detta stöds inte med en miljöversion.
Spark-konfigurationsmutationer SET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR spark.conf.set() anropades utanför en funktion som dekorerats av en pipelines-dekoratör efter att en DataFrame skapades. Konfigurationsändringen kan påverka den befintliga dataramen vid körningstillfället.
Spark-konfigurationsmutationer UNSET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR spark.conf.unset() anropades utanför en funktion som dekorerats av en pipelines-dekoratör efter att en DataFrame skapades. Konfigurationsändringen kan påverka den befintliga dataramen vid körningstillfället.
Temporära vyersättningar REPLACE_GLOBAL_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR En global temporär vy ersattes efter att en DataFrame som refererar till den skapades. Ersättningen kan återspeglas i den befintliga dataramen.
Temporära vyersättningar REPLACE_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR En tillfällig vy ersattes efter att en DataFrame som refererar till den skapades. Ersättningen kan återspeglas i den befintliga dataramen.
UDF- och UDTF-mutationer OVERWRITE_SESSION_UDF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR En UDF registrerades på nytt med samma namn efter att en DataFrame som refererar till den skapades. Den befintliga DataFrame kan använda den nya UDF-definitionen.
UDF- och UDTF-mutationer OVERWRITE_SESSION_UDTF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR En UDTF registrerades på nytt med samma namn efter att en DataFrame som refererar till den skapades. Den befintliga DataFrame kan använda den nya UDTF-definitionen.
UDF- och UDTF-mutationer UDF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR En UDF refererar till en global föränderlig Python variabel. Med en miljöversion använder UDF värdet för variabeln vid den tidpunkt då UDF definierades, inte vid anrop.
UDF- och UDTF-mutationer UDTF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR En UDTF refererar till en global föränderlig Python variabel. Med en miljöversion använder UDTF värdet för variabeln vid den tidpunkt då UDTF definierades, inte vid anrop.

Databas- och katalogmutationer

Dessa problem genereras när pipelinekoden muterar standarddatabasen eller katalogen. Med en miljöversion kan DataFrames som skapats före mutationen lösa tabeller med hjälp av den nya databasen eller katalogen.

Exempelmönster som utlöser en händelse:

from pyspark import pipelines as dp

spark.sql("USE CATALOG marketing")
df = spark.read.table("events")

spark.sql("USE CATALOG sales")  # changes the default catalog after df was created

@dp.materialized_view
def events_summary():
  return df.groupBy("region").count()

Utan en miljöversion df löses events från marketing katalogen. Med en miljöversion df löses events från sales katalogen.

Föreslagen korrigering: Fullständigt kvalificerade tabellnamn så att lösningen inte är beroende av standardkatalogen eller databasen, och undvik att ändra standardkatalogen eller databasen mellan skapande och användning av DataFrame.

from pyspark import pipelines as dp

df = spark.read.table("marketing.default.events")

@dp.materialized_view
def events_summary():
  return df.groupBy("region").count()

Spark-konfigurationsmutationer

Dessa problem genereras när pipelinekoden muterar Spark-konfigurationen på sätt som kan ändra DataFrame-beteendet under en miljöversion.

Exempelmönster som utlöser en händelse:

from pyspark import pipelines as dp

df = spark.read.table("events")

spark.conf.set("spark.sql.ansi.enabled", "true")  # changes session conf after df was created

@dp.materialized_view
def events_strict():
  return df.selectExpr("CAST(price AS INT) AS price")

Utan en miljöversion använder rollbesättningen konfigurationsvärdet vid skapandetiden för DataFrame. Med en miljöversion använder spark.sql.ansi.enabled=true casten och kan misslyckas med ogiltiga indata.

Föreslagen korrigering: Ställ in alla nödvändiga Spark-konfigurationer överst i pipelinefilen innan någon DataFrame skapas. För konfiguration per fråga använder du pipelinens configuration inställning i pipelinespecifikationen.

Temporära vyersättningar

Dessa problem genereras när pipelinekoden ersätter en tillfällig vy efter att en DataFrame-referens har skapats. Med en miljöversion kan den befintliga DataFrame återspegla det nya visningsinnehållet.

Exempelmönster som utlöser en händelse:

from pyspark import pipelines as dp

spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

df = spark.sql("SELECT * FROM my_view")

spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

@dp.materialized_view
def mytable():
  return df

Utan en miljöversion mytable innehåller [(1, "Original Row")]. Med en miljöversion mytable innehåller [(2, "Replaced Row")].

Föreslagen korrigering: Skapa varje temporär vy en gång och ersätt den inte. Om du behöver flera vyer med relaterade data ger du var och en ett distinkt namn.

UDF- och UDTF-mutationer

Dessa problem genereras när pipelinekod muterar en UDF eller UDTF på sätt som ändrar beteende under en miljöversion.

Exempelmönster som utlöser en händelse:

from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf

suffix = "a"

@udf
def my_udf(s):
  return s + suffix

suffix = "b"

@dp.materialized_view
def my_mv():
  return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))

Utan en miljöversion my_mv innehåller [("alex_b",)]. Med en miljöversion my_mv innehåller [("alex_a",)].

Suggested fix: Skicka värden till UDF som argument i stället för att samla in dem från Python globaler, eller ange det globala innan du definierar UDF och mutera det inte efteråt.

from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf

@udf
def append_suffix(s, suffix):
  return s + suffix

@dp.materialized_view
def my_mv():
  return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))

Ivrig körning i flödesfunktioner

Dessa problem genereras när pipelinekoden utför ett ivrigt Spark-kommando i en funktion som är dekorerad av en pipelines-dekoratör (@table, @materialized_viewosv.). Flödesfunktioner förväntas definiera och returnera en DataFrame. ivriga kommandon som skriver data, hanterar strömmande frågor, registrerar resurser eller kör ML-åtgärder tillåts inte i en flödesfunktion med en miljöversionsuppsättning.

Föreslagen korrigering: Flytta den ivriga åtgärden utanför flödesfunktionen och returnera en DataFrame från flödesfunktionen i stället. Biverkningar som att skriva till en tabell eller starta en direktuppspelningsfråga hör utanför pipelinedefinitionen. pipelinemotorn hanterar materialisering av dataramen som returneras av flödesfunktionen.

Hitta kompatibilitetshändelser i händelseloggen

Följande fråga returnerar alla kompatibilitetshändelser för en pipeline, ordnade senast först:

SELECT
  timestamp,
  message,
  details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
  AND level = 'WARN'
ORDER BY timestamp DESC;

Så här räknar du händelser efter problemkod för de senaste uppdateringarna:

SELECT
  details:behavior_change_in_spark_connect:issue AS issue,
  COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
  AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;

Information om hur du kör frågor mot händelseloggen finns i Fråga efter händelseloggen.

Se även