Bemærk
Adgang til denne side kræver godkendelse. Du kan prøve at logge på eller ændre mapper.
Adgang til denne side kræver godkendelse. Du kan prøve at ændre mapper.
I denne artikel lærer du, hvordan du bruger Apache Spark MLlib til at skabe en maskinlæringsapplikation, der håndterer prædiktiv analyse på et Azure åbent datasæt. Spark leverer indbyggede biblioteker til maskinel indlæring. I dette eksempel bruges klassificering via logistisk regression.
I dette selvstudium beskrives disse trin:
- Opsætning af notesbog og import
- Indlæs og prøv NYC-taxadata
- Forbered og konstruer funktioner
- Kod kategoriske egenskaber
- Toglogistisk regressionsmodel
- Evaluer og visualiser resultater
Kernebibliotekerne SparkML og MLlib Spark indeholder mange hjælpeprogrammer, der er nyttige til opgaver i forbindelse med maskinel indlæring. Disse hjælpeprogrammer er velegnede til:
- Klassificering
- Klyngedannelse
- Hypotesetest og beregning af eksempelstatistik
- Regression
- SVD (Singular Value Decomposition) og PCA (Principal Component Analysis)
- Emnemodellering
Forudsætninger
Få et Microsoft Fabric-abonnement. Du kan også tilmelde dig en gratis Prøveversion af Microsoft Fabric.
Log på Microsoft Fabric.
Skift til Fabric ved at bruge experience-switcheren nederst til venstre på din startside.
- Hvis nødvendigt, opret et Microsoft Fabric søhus som beskrevet i Skab et søhus i Microsoft Fabric.
- Opret en ny notesbog i arbejdsområdet ved at vælge + og derefter Notesbog. For mere information, se Opret en notesbog.
Forstå klassificering og logistisk regression
Klassificering, der er en populær maskinel indlæringsopgave, omfatter sortering af inputdata i kategorier. En klassifikationsalgoritme finder ud af, hvordan man tildeler etiketter til de leverede inputdata. For eksempel kunne en maskinlæringsalgoritme acceptere aktieinformation som input og opdele aktien i to kategorier: aktier, du bør sælge, og aktier, du bør beholde.
Den logistiske regressionsalgoritme er nyttig til klassificering. Spark-logistisk regressions-API er nyttig til binær klassificering af inputdata i en af to grupper. Du kan få flere oplysninger om logistisk regression under Wikipedia.
Logistisk regression producerer en logistisk funktion , der forudsiger sandsynligheden for, at en inputvektor tilhører den ene eller den anden gruppe.
Eksempel på forudsigende analyse af NYC-taxadata
Dataene er tilgængelige via azure Open Datasets-ressourcen. Dette datasætundersæt hoster oplysninger om gule taxature, herunder starttidspunkter, sluttidspunkter, startplaceringer, slutplaceringer, rejseomkostninger og andre attributter.
Denne tutorial bruger Apache Spark til at analysere NYC's taxa-tur-tip data og udvikle en model til at forudsige, om en bestemt tur indeholder et tip.
Opret en Apache Spark-model til maskinel indlæring
Opret en PySpark-notesbog. For mere information, se Opret en notesbog.
Når du har oprettet notesbogen, vedhæfter du den til et søhus ved at vælge Tilføj søhus i venstre panel.
Importer de nødvendige typer til denne notesbog. Indsæt følgende kode i den første celle og kør den.
import matplotlib.pyplot as plt from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluatorVerificer: Cellen fuldender uden
ImportError. Hvis du ser en fejl, bekræft at din notebook bruger PySpark-runtime.Brug MLflow til at spore dine maskinlæringseksperimenter og tilsvarende kørsler. Hvis Microsoft Fabric Autologging er aktiveret, registreres de tilsvarende målepunkter og parametre automatisk.
import mlflowVerificer: Cellen fuldføres uden fejl. Kør
print(mlflow.__version__)for at bekræfte, at MLflow er tilgængeligt.
Konstruer inputdatarammen
Dette eksempel indlæser data fra Azure Open Datasets storage i en Apache Spark DataFrame. Derefter anvender du Spark-operationer til at rense og filtrere datasættet.
Indsæt følgende kode i en ny celle og kør den for at oprette en Spark DataFrame. Dette trin henter NYC gule taxadata filtreret til maj 2018.
blob_account_name = "azureopendatastorage" blob_container_name = "nyctlc" blob_relative_path = "yellow" wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}" nyc_tlc_df = spark.read.parquet(wasbs_path) \ .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-01")) \ .repartition(20)Verificer: Kør følgende celle for at bekræfte, at data indlæses med succes.
print(f"Loaded {nyc_tlc_df.count()} rows") # Expected output: Loaded approximately 9,000,000+ rowsUdtag datasættet for at fremskynde udvikling og træning.
# Sample without replacement to avoid duplicates sampled_taxi_df = nyc_tlc_df.sample(False, 0.001, seed=1234)Verificer: Bekræft at stikprøvestørrelsen er håndterbar.
print(f"Sampled {sampled_taxi_df.count()} rows") # Expected output: Sampled approximately 9,000-10,000 rowsSe dataene ved at bruge den indbyggede
display()kommando til at udforske dataprøven.display(sampled_taxi_df.limit(10))Verificer: En tabel med 10 rækker vises med kolonner som
tpepPickupDateTime,fareAmount,tipAmount, ogtripDistance.
Forbered dataene
Dataforberedelse er et afgørende trin i processen til maskinel indlæring. Det indebærer rensning, transformation og organisering af rådata for at gøre dem egnede til analyse og modellering. I dette afsnit udføres flere dataforberedelsestrin:
- Filtrer datasættet for at fjerne outliers og forkerte værdier.
- Fjern kolonner, der ikke er nødvendige til modeltræning.
- Opret nye kolonner ud fra rådataene.
- Lav en etiket for at afgøre, om en given taxatur indebærer drikkepenge.
Kør følgende kode for at vælge relevante kolonner, beregne afledte funktioner og filtrere outliers:
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount',
'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime',
date_format('tpepPickupDateTime', 'HH').cast('integer').alias('pickupHour'),
date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString'),
(unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs'),
(when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
) \
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Vigtigt!
Funktionen date_format bruger mønsteret 'HH' (24-timers format, værdier 0-23) i stedet for 'hh' (12-timers format, værdier 1-12). 24-timers formatet er påkrævet for den time-of-day binning-logik, der følger.
Dernæst tilføjes funktionen trafiktidsbins baseret på dagens time:
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount',
'tripDistance', 'weekdayString', 'pickupHour', 'tripTimeSecs', 'tipped',
when((col('pickupHour') <= 6) | (col('pickupHour') >= 20), "Night")
.when((col('pickupHour') >= 7) & (col('pickupHour') <= 10), "AMRush")
.when((col('pickupHour') >= 11) & (col('pickupHour') <= 15), "Afternoon")
.when((col('pickupHour') >= 16) & (col('pickupHour') <= 19), "PMRush")
.otherwise("Other").alias('trafficTimeBins')
) \
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Verificer: Bekræft at trafiktidsbingerne er korrekt fordelt.
taxi_featurised_df.groupBy('trafficTimeBins').count().show()
# Expected output: Shows counts for Night, AMRush, Afternoon, PMRush categories
Opret en logistisk regressionsmodel
Den endelige opgave konverterer de navngivne data til et format, som logistisk regression kan håndtere. Inputtet til en logistisk regressionsalgoritme skal have en mærkat-/funktionsvektorparstruktur, hvor funktionsvektoren er en vektor af tal, der repræsenterer inputpunktet.
Konverter de kategoriske kolonner trafficTimeBins og weekdayString til heltalsrepræsentationer ved at bruge denne OneHotEncoder tilgang:
# Convert categorical features into numeric representations
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")
# Apply the encodings to create a new DataFrame
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Verificer: Bekræft at den kodede DataFrame har de forventede nye kolonner.
print("Columns:", encoded_final_df.columns)
print(f"Row count: {encoded_final_df.count()}")
# Expected output: Columns list includes 'trafficTimeBinsVec' and 'weekdayVec'
Oplær en logistisk regressionsmodel
Del datasættet op i et træningssæt (70%) og et testsæt (30%):
# Split the DataFrame into training and test sets
trainingFraction = 0.7
testingFraction = (1 - trainingFraction)
seed = 1234
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Bekræft: Bekræft at splittelsen gav rimelige størrelser.
print(f"Training rows: {train_data_df.count()}, Test rows: {test_data_df.count()}")
# Expected output: Approximately 70%/30% split of the encoded data
Skab modelformlen, træn den logistiske regressionsmodel, og evaluer den ved at bruge Area Under the ROC (Receiver Operating Characteristic) Curve:
# Create a logistic regression model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol='label')
# Define the formula: 'tipped' is the response variable, right-hand side are predictors
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType + trafficTimeBinsVec")
# Train the model using a pipeline
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
# Generate predictions on the test dataset
predictions = lrModel.transform(test_data_df)
# Evaluate using Area Under ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area under ROC = {auc}")
Verificer: Outputtet viser en AUC-værdi. En velfungerende model giver en værdi tæt på 1,0.
Area under ROC = 0.97 (approximately)
Bemærkning
Den præcise AUC-værdi varierer afhængigt af dataprøven. Værdier over 0,90 indikerer stærk prædiktiv præstation for dette datasæt.
Opret en visuel gengivelse af forudsigelsen
Byg en endelig visualisering for at fortolke modelresultaterne. En ROC-kurve præsenterer afvejningen mellem sand positivrate og falsk positiv-rate.
# Plot the ROC curve from the model training summary
modelSummary = lrModel.stages[-1].summary
# Extract FPR and TPR values as plain lists
roc_data = modelSummary.roc.select('FPR', 'TPR').toPandas()
plt.figure(figsize=(8, 6))
plt.plot([0, 1], [0, 1], 'r--', label='Random classifier')
plt.plot(roc_data['FPR'], roc_data['TPR'], label=f'Logistic Regression (AUC = {auc:.4f})')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - NYC Taxi Tip Prediction')
plt.legend(loc='lower right')
plt.show()
Verificer: Et plot vises, der viser ROC-kurven over den røde stiplede diagonale linje. Kurven bør bøje mod øverste venstre hjørne, hvilket indikerer stærk præstation i klassifikationen.
Ryd op i ressourcer
Når du er færdig med denne tutorial, sletter du notesbogen og lakehouse'en for at frigøre arbejdsområde:
- I dit arbejdsområde højreklikker du på notesbogen og vælger Slet.
- Hvis du har oprettet et søhus specifikt til denne tutorial, skal du højreklikke på det og vælge Slet.
For at bevare den trænede model til fremtidig brug, tilføj følgende kode før oprydning:
# Save the model to the lakehouse
model_path = "abfss://<your-workspace>@onelake.dfs.fabric.microsoft.com/<your-lakehouse>.Lakehouse/Files/models/taxi_tip_model"
lrModel.write().overwrite().save(model_path)
print(f"Model saved to: {model_path}")
Troubleshooting
| Problem | Årsag | Løsning |
|---|---|---|
Py4JJavaError Når man læser parket |
Netværksforbindelse til Azure blob storage | Tjek at dit Fabric-arbejdsområde har udgående internetadgang. Prøv at genstarte Spark-sessionen. |
AnalysisException: cannot resolve column |
Søjlenavn, stavefejl eller skema-mismatch | Løb nyc_tlc_df.printSchema() for at inspicere tilgængelige søjler. NYC's taxidatasæt-skema kan ændre sig mellem årene. |
| Tomt DataFrame efter filtrering | Filterbetingelser for restriktive for datavinduet | Forskær datointervallet eller tjek sampled_taxi_df.count() før filtrering. |
IllegalArgumentException i StringIndexer |
Usete etiketter under transformation | Tilføj handleInvalid="skip" til dine StringIndexer opkald: StringIndexer(inputCol="...", outputCol="...", handleInvalid="skip") |
| Lav AUC (under 0,6) | Utilstrækkelige data eller forkert feature engineering | Øg prøvefraktionen (for eksempel 0.01 i stedet for 0.001) og verificér trafficTimeBins , at kategorierne er afbalancerede. |
OutOfMemoryError |
Datasættet er for stort i forhold til tilgængelig kapacitet | Reducer prøvefraktionen eller øg dit Fabric-kapacitetsniveau. |
| ROC-plot vises ikke | Matplotlib backend-problem i notesbog | Tilføj %matplotlib inline det øverst i notesbogen. |
Relateret indhold
- Brug AI-eksempler til at bygge modeller til maskinel indlæring: Brug AI-eksempler
- Spor kørsler af maskinel indlæring ved hjælp af eksperimenter: Eksperimenter med maskinel indlæring