Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Important
Miljöversioner för SDP finns i Beta.
Pipelines med en environment version ställa in kör Python kod via Spark Connect. Den här sidan beskriver vad som är inkompatibelt, vad som fungerar annorlunda, hur du söker igenom en pipeline efter berörda mönster och hur du migrerar en befintlig pipeline.
Limitations
Miljöversioner är ännu inte kompatibla med alla pipelinefunktioner. En pipelinekörning med en miljöversionsuppsättning misslyckas om pipelinens Python kod gör något av följande:
- Muterar Spark-sessionstillståndet i en funktion som är dekorerad med en pipelinesdekoratör. Exempel är
spark.conf.set(...),spark.sql("USE CATALOG ...")ochcreateOrReplaceTempView. - Använder PySpark-API:er som inte är tillgängliga i Spark Connect, inklusive
SparkContext,RDD,SQLContextoch eventuella Py4J-API:er. Se Vad stöds i Spark Connect.
Om det inte går att aktivera en miljöversion på en pipeline returnerar inaktivering av miljöversionen pipelinen till dess tidigare tillstånd.
Beteendeförändringar
Spark Connect har ett litet antal beteendeskillnader jämfört med den klassiska PySpark-körningen. Se Spark Connect jämfört med klassiska Spark för den fullständiga referensen. Kompatibilitetsgenomsökningen identifierar dessa mönster i förväg och blockerar aktivering tills de åtgärdas, så att du kan hitta och åtgärda dem innan de påverkar produktionsdata.
I en pipeline är de vanligaste situationerna där beteendet kan skilja sig åt:
- Interleaved DataFrame konstruktion och session mutation
- UDF:er som refererar till föränderligt Python tillstånd
Interleaved DataFrame konstruktion och session mutation
När en pipeline skapar en DataFrame, muterar sedan Spark-sessionstillståndet (till exempel ändrar standardkatalogen eller schemat, anger en konfiguration, ersätter en temporär vy eller omregistrerar en UDF) använder sedan DataFrame:
- Utan en miljöversion använder DataFrame sessionstillståndet före mutationen .
- Med en miljöversion använder DataFrame sessionstillståndet efter mutationen .
Ett exempel:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Utan en miljöversion mytable innehåller [(1, "Original Row")]. Med en miljöversion mytable innehåller [(2, "Replaced Row")].
UDF:er som refererar till föränderligt Python tillstånd
När en UDF refererar till en Python global variabel vars värde ändras efter att UDF har definierats:
- Utan en miljöversion använder UDF det senaste värdet för variabeln.
- Med en miljöversion använder UDF värdet vid den tidpunkt då UDF definierades.
Ett exempel:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Utan en miljöversion my_mv innehåller [("alex_b",)]. Med en miljöversion my_mv innehåller [("alex_a",)].
Om en pipeline förlitar sig på något av mönstret granskar du det innan du aktiverar en miljöversion.
Kompatibilitetsgenomsökning
Kompatibilitetsgenomsökningen hjälper dig att hitta kodmönster i pipelinen som skulle ge olika resultat under en miljöversion innan du aktiverar en. Genomsökningen är anmäld. När genomsökningen är aktiverad på en pipeline:
- Varje pipelinekörning genererar en
BehaviorChangeInSparkConnectWARNhändelse i pipelinehändelseloggen per identifierat mönster. - Du kan inte aktivera en miljöversion på pipelinen förrän du åtgärdar alla kompatibilitetsvarningar från den tidigare lyckade uppdateringen.
Om genomsökningen inte är aktiverad genereras inga händelser och environment_version aktiveringen blockeras inte. Databricks rekommenderar att du aktiverar genomsökningen och löser eventuella identifierade mönster innan du aktiverar en miljöversion på pipelinen.
Aktivera genomsökningen på en pipeline
Du kan aktivera kompatibilitetsgenomsökningen genom att lägga till pipelinekonfigurationen pipelines.environmentVersion.enableCompatibilityScan Du kan lägga till konfiguration via pipelineredigerarens användargränssnitt eller genom att lägga till en post i pipelinekonfigurationens JSON.
Via användargränssnittet:
- Klicka på Inställningar i pipelineredigeraren.
- Leta reda på avsnittet Konfiguration i pipelineinställningar.
- Klicka på
Lägg till konfiguration.
- Ange
pipelines.environmentVersion.enableCompatibilityScansom nyckel ochtruesom värde. - Spara pipelineinställningarna.
I pipelinens JSON:
Lägg till följande post i configuration blocket:
"configuration": {
"pipelines.environmentVersion.enableCompatibilityScan": "true"
}
Rekommenderat arbetsflöde
- Aktivera genomsökningen på pipelinen.
- Utlös en pipelinekörning.
-
Fråga händelseloggen för pipelinen efter
BehaviorChangeInSparkConnectWARNhändelser. Se Referens för kompatibilitetshändelser för den fullständiga listan över problemkoder, exempelmönster och föreslagna korrigeringar. - Uppdatera pipelinekoden för att ta bort de identifierade mönstren och kör pipelinen igen tills inga fler händelser genereras.
- Lägg till
environment_versioni pipelinen med någon av metoderna i Aktivera en miljöversion på en pipeline.
Om du tror att en kompatibilitetsvarning är en falsk positiv identifiering och vill aktivera environment_version ändå tar du bort pipelines.environmentVersion.enableCompatibilityScan posten från pipelinekonfigurationen för att kringgå kontrollen. (Det går inte att ange värdet till false – du måste ta bort posten helt.)
Preflight-kontrollen körs inte på pipelines som inte har någon tidigare uppdatering eller på pipelines som redan har en miljöversionsuppsättning.
Migrera en befintlig pipeline till miljöversioner
Om du vill migrera en befintlig pipeline som ännu inte använder en miljöversion följer du det här arbetsflödet från slutpunkt till slutpunkt. Den vägleder dig genom att hitta kodmönster som kan bete sig annorlunda under Spark Connect, åtgärda dem och distribuera miljöversionen på ett säkert sätt.
Aktivera kompatibilitetsgenomsökningen på pipelinen. Aktivera genomsökningen på pipelinen enligt beskrivningen i Kompatibilitetsgenomsökning. Det här är vad som gör att identifierade mönster visas i händelseloggen och vad som gör det möjligt att kontrollera preflight som skyddar ditt aktiveringsförsök.
Utlös en pipelinekörning och granska kompatibilitetshändelser. Utlös en normal pipelineuppdatering. När den har slutförts frågar du händelseloggen för pipelinen efter
BehaviorChangeInSparkConnectWARNhändelser. Varje händelse rapporterar ett identifierat mönster. Se Referens för kompatibilitetshändelser för den fullständiga listan över problemkoder, exempelmönster och föreslagna korrigeringar.Uppdatera pipelinekoden för att åtgärda identifierade mönster. Uppdatera pipelinekoden efter den föreslagna korrigeringen för varje identifierat mönster. Efter varje ändring utlöser du en annan pipelineuppdatering och kontrollerar att motsvarande händelser inte längre visas. Upprepa tills händelseloggen inte längre innehåller några kompatibilitetshändelser för en lyckad uppdatering.
Aktivera miljöversionen på pipelinen. När den senaste lyckade uppdateringen inte har några kompatibilitetshändelser lägger du till
environment_versioni pipelinen med hjälp av användargränssnittet, API:et eller paketet enligt beskrivningen i Aktivera en miljöversion på en pipeline. Nästa uppdatering körs med Spark Connect och den fästa Python språkversion och förinstallerade bibliotek.Om uppdateringen misslyckas eftersom det fortfarande finns kompatibilitetsvarningar släpper du
environment_version, återgår till steg 2 och löser de återstående varningarna innan du försöker igen.Verifiera migreringen. När den första uppdateringen med miljöversionen har slutförts kontrollerar du:
- Händelsen
create_updatei händelseloggen visarenvironment_versioninställt på det förväntade värdet. - Pipelinen genererar förväntade data och inga nya felhändelser visas.
- Kontrollera underordnade tabeller med oanvänd kapacitet för eventuella diskreta beteendeskillnader som beskrivs i Beteendeändringar.
- Händelsen
Rollback
Om pipelinen fungerar felaktigt efter migreringen tar du bort environment_version från pipelineinställningarna. Nästa uppdatering körs med föregående Python körningskonfiguration. Använd den återställda körningen för att felsöka och upprepa sedan migreringen från steg 2 när du har identifierat och åtgärdat problemet.
Referens för kompatibilitetshändelser
När kompatibilitetsgenomsökningen är aktiverad på en pipeline genererar SDP en BehaviorChangeInSparkConnectWARN händelse i pipelinehändelseloggen enligt det identifierade mönstret. När genomsökningen är aktiverad och den tidigare lyckade uppdateringen identifierade eventuella mönster blockerar environment_version SDP även aktivering tills mönstren åtgärdas.
Varje händelse rapporterar en enda problemkod som identifierar vad som har identifierats. Om du vill leta upp en kod hittar du den i tabellen Problemkoder – varje rad länkar till kategoriavsnittet som innehåller ett exempelmönster och den föreslagna korrigeringen.
Händelseform
BehaviorChangeInSparkConnect händelser följer standardschemat för händelseloggen för pipeline:
-
event_typeärbehavior_change_in_spark_connect. -
levelärWARN. -
detailsinnehåller objektetbehavior_change_in_spark_connect, som har ett endaissuefält. Problemvärdet är en av de koder som anges nedan. -
messageär en läsbar beskrivning av det identifierade mönstret.
Problemkoder
| Kategori | Problemkod | Description |
|---|---|---|
| Databas- och katalogmutationer | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Standardkatalogen ändrades efter att en DataFrame skapades. Den befintliga DataFrame kan matcha tabeller med hjälp av den nya standardkatalogen. |
| Databas- och katalogmutationer | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE CATALOG kallades utanför en funktion dekorerad av en pipelines dekoratör. Standardkatalogen kan ändras oväntat för efterföljande åtgärder. |
| Databas- och katalogmutationer | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Standarddatabasen ändrades efter att en DataFrame skapades. Den befintliga DataFrame kan matcha tabeller med hjälp av den nya standarddatabasen. |
| Databas- och katalogmutationer | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE DATABASE kallades utanför en funktion dekorerad av en pipelines dekoratör. Standarddatabasen kan ändras oväntat för efterföljande åtgärder. |
| Ivrig körning i flödesfunktioner | CHECKPOINT_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Flödesfunktionen anropar ett kontrollpunktskommando. |
| Ivrig körning i flödesfunktioner | CREATE_DATAFRAME_VIEW_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Flödesfunktionen skapar ivrigt en DataFrame-vy (createOrReplaceTempView eller liknande). |
| Ivrig körning i flödesfunktioner | CREATE_RESOURCE_PROFILE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Flödesfunktionen skapar en resursprofil. |
| Ivrig körning i flödesfunktioner | GET_RESOURCES_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Flödesfunktionen anropar spark.resources eller ett relaterat resurs-API. |
| Ivrig körning i flödesfunktioner | MERGE_INTO_TABLE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Flödesfunktionen utför en ivrig MERGE INTO på en måltabell. |
| Ivrig körning i flödesfunktioner | ML_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Flödesfunktionen utför en ivrig Spark ML-åtgärd. |
| Ivrig körning i flödesfunktioner | REGISTER_DATA_SOURCE_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Flödesfunktionen registrerar en Python datakälla. |
| Ivrig körning i flödesfunktioner | STREAMING_QUERY_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Flödesfunktionen fungerar på ett aktivt strömningsfrågehandtag. |
| Ivrig körning i flödesfunktioner | STREAMING_QUERY_LISTENER_BUS_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Flödesfunktionen registrerar eller tar bort en strömningsfrågalyssnare. |
| Ivrig körning i flödesfunktioner | STREAMING_QUERY_MANAGER_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Flödesfunktionen anropar spark.streams för att hantera strömmande frågor. |
| Ivrig körning i flödesfunktioner | WRITE_OPERATION_V2_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Flödesfunktionen utför en ivrig DataFrameWriterV2 åtgärd. |
| Ivrig körning i flödesfunktioner | WRITE_OPERATION_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Flödesfunktionen utför en ivrig DataFrame.write åtgärd. |
| Ivrig körning i flödesfunktioner | WRITE_STREAM_OPERATION_START_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Flödesfunktionen startar en direktuppspelningsfråga (writeStream.start()). |
| Spark-konfigurationsmutationer | CHANGE_CONF_INSIDE_QUERY_FUNCTION_NOT_SUPPORTED |
spark.conf.set() eller spark.conf.unset() kallades inuti en funktion dekorerad av en pipelines dekoratör. Detta stöds inte med en miljöversion. |
| Spark-konfigurationsmutationer | SET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.set() anropades utanför en funktion som dekorerats av en pipelines-dekoratör efter att en DataFrame skapades. Konfigurationsändringen kan påverka den befintliga dataramen vid körningstillfället. |
| Spark-konfigurationsmutationer | UNSET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.unset() anropades utanför en funktion som dekorerats av en pipelines-dekoratör efter att en DataFrame skapades. Konfigurationsändringen kan påverka den befintliga dataramen vid körningstillfället. |
| Temporära vyersättningar | REPLACE_GLOBAL_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
En global temporär vy ersattes efter att en DataFrame som refererar till den skapades. Ersättningen kan återspeglas i den befintliga dataramen. |
| Temporära vyersättningar | REPLACE_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
En tillfällig vy ersattes efter att en DataFrame som refererar till den skapades. Ersättningen kan återspeglas i den befintliga dataramen. |
| UDF- och UDTF-mutationer | OVERWRITE_SESSION_UDF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
En UDF registrerades på nytt med samma namn efter att en DataFrame som refererar till den skapades. Den befintliga DataFrame kan använda den nya UDF-definitionen. |
| UDF- och UDTF-mutationer | OVERWRITE_SESSION_UDTF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
En UDTF registrerades på nytt med samma namn efter att en DataFrame som refererar till den skapades. Den befintliga DataFrame kan använda den nya UDTF-definitionen. |
| UDF- och UDTF-mutationer | UDF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
En UDF refererar till en global föränderlig Python variabel. Med en miljöversion använder UDF värdet för variabeln vid den tidpunkt då UDF definierades, inte vid anrop. |
| UDF- och UDTF-mutationer | UDTF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
En UDTF refererar till en global föränderlig Python variabel. Med en miljöversion använder UDTF värdet för variabeln vid den tidpunkt då UDTF definierades, inte vid anrop. |
Databas- och katalogmutationer
Dessa problem genereras när pipelinekoden muterar standarddatabasen eller katalogen. Med en miljöversion kan DataFrames som skapats före mutationen lösa tabeller med hjälp av den nya databasen eller katalogen.
Exempelmönster som utlöser en händelse:
from pyspark import pipelines as dp
spark.sql("USE CATALOG marketing")
df = spark.read.table("events")
spark.sql("USE CATALOG sales") # changes the default catalog after df was created
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Utan en miljöversion df löses events från marketing katalogen. Med en miljöversion df löses events från sales katalogen.
Föreslagen korrigering: Fullständigt kvalificerade tabellnamn så att lösningen inte är beroende av standardkatalogen eller databasen, och undvik att ändra standardkatalogen eller databasen mellan skapande och användning av DataFrame.
from pyspark import pipelines as dp
df = spark.read.table("marketing.default.events")
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Spark-konfigurationsmutationer
Dessa problem genereras när pipelinekoden muterar Spark-konfigurationen på sätt som kan ändra DataFrame-beteendet under en miljöversion.
Exempelmönster som utlöser en händelse:
from pyspark import pipelines as dp
df = spark.read.table("events")
spark.conf.set("spark.sql.ansi.enabled", "true") # changes session conf after df was created
@dp.materialized_view
def events_strict():
return df.selectExpr("CAST(price AS INT) AS price")
Utan en miljöversion använder rollbesättningen konfigurationsvärdet vid skapandetiden för DataFrame. Med en miljöversion använder spark.sql.ansi.enabled=true casten och kan misslyckas med ogiltiga indata.
Föreslagen korrigering: Ställ in alla nödvändiga Spark-konfigurationer överst i pipelinefilen innan någon DataFrame skapas. För konfiguration per fråga använder du pipelinens configuration inställning i pipelinespecifikationen.
Temporära vyersättningar
Dessa problem genereras när pipelinekoden ersätter en tillfällig vy efter att en DataFrame-referens har skapats. Med en miljöversion kan den befintliga DataFrame återspegla det nya visningsinnehållet.
Exempelmönster som utlöser en händelse:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Utan en miljöversion mytable innehåller [(1, "Original Row")]. Med en miljöversion mytable innehåller [(2, "Replaced Row")].
Föreslagen korrigering: Skapa varje temporär vy en gång och ersätt den inte. Om du behöver flera vyer med relaterade data ger du var och en ett distinkt namn.
UDF- och UDTF-mutationer
Dessa problem genereras när pipelinekod muterar en UDF eller UDTF på sätt som ändrar beteende under en miljöversion.
Exempelmönster som utlöser en händelse:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Utan en miljöversion my_mv innehåller [("alex_b",)]. Med en miljöversion my_mv innehåller [("alex_a",)].
Suggested fix: Skicka värden till UDF som argument i stället för att samla in dem från Python globaler, eller ange det globala innan du definierar UDF och mutera det inte efteråt.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf
@udf
def append_suffix(s, suffix):
return s + suffix
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))
Ivrig körning i flödesfunktioner
Dessa problem genereras när pipelinekoden utför ett ivrigt Spark-kommando i en funktion som är dekorerad av en pipelines-dekoratör (@table, @materialized_viewosv.). Flödesfunktioner förväntas definiera och returnera en DataFrame. ivriga kommandon som skriver data, hanterar strömmande frågor, registrerar resurser eller kör ML-åtgärder tillåts inte i en flödesfunktion med en miljöversionsuppsättning.
Föreslagen korrigering: Flytta den ivriga åtgärden utanför flödesfunktionen och returnera en DataFrame från flödesfunktionen i stället. Biverkningar som att skriva till en tabell eller starta en direktuppspelningsfråga hör utanför pipelinedefinitionen. pipelinemotorn hanterar materialisering av dataramen som returneras av flödesfunktionen.
Hitta kompatibilitetshändelser i händelseloggen
Följande fråga returnerar alla kompatibilitetshändelser för en pipeline, ordnade senast först:
SELECT
timestamp,
message,
details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
ORDER BY timestamp DESC;
Så här räknar du händelser efter problemkod för de senaste uppdateringarna:
SELECT
details:behavior_change_in_spark_connect:issue AS issue,
COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;
Information om hur du kör frågor mot händelseloggen finns i Fråga efter händelseloggen.
Se även
- Konfigurera miljöversioner för pipelines – funktionsöversikt , hur du aktiverar en miljöversion.
- Schema för pipelinehändelselogg – fullständigt schema för pipelinehändelseloggar.
- Händelselogg för pipeline – så här kör du frågor mot händelseloggen för pipelinen.