Byg en model til maskinel indlæring med Apache Spark MLlib

I denne artikel lærer du, hvordan du bruger Apache Spark MLlib til at skabe en maskinlæringsapplikation, der håndterer prædiktiv analyse på et Azure åbent datasæt. Spark leverer indbyggede biblioteker til maskinel indlæring. I dette eksempel bruges klassificering via logistisk regression.

I dette selvstudium beskrives disse trin:

  • Opsætning af notesbog og import
  • Indlæs og prøv NYC-taxadata
  • Forbered og konstruer funktioner
  • Kod kategoriske egenskaber
  • Toglogistisk regressionsmodel
  • Evaluer og visualiser resultater

Kernebibliotekerne SparkML og MLlib Spark indeholder mange hjælpeprogrammer, der er nyttige til opgaver i forbindelse med maskinel indlæring. Disse hjælpeprogrammer er velegnede til:

  • Klassificering
  • Klyngedannelse
  • Hypotesetest og beregning af eksempelstatistik
  • Regression
  • SVD (Singular Value Decomposition) og PCA (Principal Component Analysis)
  • Emnemodellering

Forudsætninger

  • Få et Microsoft Fabric-abonnement. Du kan også tilmelde dig en gratis Prøveversion af Microsoft Fabric.

  • Log på Microsoft Fabric.

  • Skift til Fabric ved at bruge experience-switcheren nederst til venstre på din startside.

    Skærmbillede, der viser valget af Fabric i oplevelsesskifter-menuen.

Forstå klassificering og logistisk regression

Klassificering, der er en populær maskinel indlæringsopgave, omfatter sortering af inputdata i kategorier. En klassifikationsalgoritme finder ud af, hvordan man tildeler etiketter til de leverede inputdata. For eksempel kunne en maskinlæringsalgoritme acceptere aktieinformation som input og opdele aktien i to kategorier: aktier, du bør sælge, og aktier, du bør beholde.

Den logistiske regressionsalgoritme er nyttig til klassificering. Spark-logistisk regressions-API er nyttig til binær klassificering af inputdata i en af to grupper. Du kan få flere oplysninger om logistisk regression under Wikipedia.

Logistisk regression producerer en logistisk funktion , der forudsiger sandsynligheden for, at en inputvektor tilhører den ene eller den anden gruppe.

Eksempel på forudsigende analyse af NYC-taxadata

Dataene er tilgængelige via azure Open Datasets-ressourcen. Dette datasætundersæt hoster oplysninger om gule taxature, herunder starttidspunkter, sluttidspunkter, startplaceringer, slutplaceringer, rejseomkostninger og andre attributter.

Denne tutorial bruger Apache Spark til at analysere NYC's taxa-tur-tip data og udvikle en model til at forudsige, om en bestemt tur indeholder et tip.

Opret en Apache Spark-model til maskinel indlæring

  1. Opret en PySpark-notesbog. For mere information, se Opret en notesbog.

    Når du har oprettet notesbogen, vedhæfter du den til et søhus ved at vælge Tilføj søhus i venstre panel.

  2. Importer de nødvendige typer til denne notesbog. Indsæt følgende kode i den første celle og kør den.

    import matplotlib.pyplot as plt
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    

    Verificer: Cellen fuldender uden ImportError. Hvis du ser en fejl, bekræft at din notebook bruger PySpark-runtime.

  3. Brug MLflow til at spore dine maskinlæringseksperimenter og tilsvarende kørsler. Hvis Microsoft Fabric Autologging er aktiveret, registreres de tilsvarende målepunkter og parametre automatisk.

    import mlflow
    

    Verificer: Cellen fuldføres uden fejl. Kør print(mlflow.__version__) for at bekræfte, at MLflow er tilgængeligt.

Konstruer inputdatarammen

Dette eksempel indlæser data fra Azure Open Datasets storage i en Apache Spark DataFrame. Derefter anvender du Spark-operationer til at rense og filtrere datasættet.

  1. Indsæt følgende kode i en ny celle og kør den for at oprette en Spark DataFrame. Dette trin henter NYC gule taxadata filtreret til maj 2018.

    blob_account_name = "azureopendatastorage"
    blob_container_name = "nyctlc"
    blob_relative_path = "yellow"
    wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}"
    
    nyc_tlc_df = spark.read.parquet(wasbs_path) \
        .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-01")) \
        .repartition(20)
    

    Verificer: Kør følgende celle for at bekræfte, at data indlæses med succes.

    print(f"Loaded {nyc_tlc_df.count()} rows")
    # Expected output: Loaded approximately 9,000,000+ rows
    
  2. Udtag datasættet for at fremskynde udvikling og træning.

    # Sample without replacement to avoid duplicates
    sampled_taxi_df = nyc_tlc_df.sample(False, 0.001, seed=1234)
    

    Verificer: Bekræft at stikprøvestørrelsen er håndterbar.

    print(f"Sampled {sampled_taxi_df.count()} rows")
    # Expected output: Sampled approximately 9,000-10,000 rows
    
  3. Se dataene ved at bruge den indbyggede display() kommando til at udforske dataprøven.

    display(sampled_taxi_df.limit(10))
    

    Verificer: En tabel med 10 rækker vises med kolonner som tpepPickupDateTime, fareAmount, tipAmount, og tripDistance.

Forbered dataene

Dataforberedelse er et afgørende trin i processen til maskinel indlæring. Det indebærer rensning, transformation og organisering af rådata for at gøre dem egnede til analyse og modellering. I dette afsnit udføres flere dataforberedelsestrin:

  • Filtrer datasættet for at fjerne outliers og forkerte værdier.
  • Fjern kolonner, der ikke er nødvendige til modeltræning.
  • Opret nye kolonner ud fra rådataene.
  • Lav en etiket for at afgøre, om en given taxatur indebærer drikkepenge.

Kør følgende kode for at vælge relevante kolonner, beregne afledte funktioner og filtrere outliers:

taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount',
                    'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime',
                    date_format('tpepPickupDateTime', 'HH').cast('integer').alias('pickupHour'),
                    date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString'),
                    (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs'),
                    (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                    ) \
            .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)
                    & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)
                    & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)
                    & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)
                    & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)
                    & (sampled_taxi_df.rateCodeId <= 5)
                    & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                    )

Vigtigt!

Funktionen date_format bruger mønsteret 'HH' (24-timers format, værdier 0-23) i stedet for 'hh' (12-timers format, værdier 1-12). 24-timers formatet er påkrævet for den time-of-day binning-logik, der følger.

Dernæst tilføjes funktionen trafiktidsbins baseret på dagens time:

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount',
                                    'tripDistance', 'weekdayString', 'pickupHour', 'tripTimeSecs', 'tipped',
                                    when((col('pickupHour') <= 6) | (col('pickupHour') >= 20), "Night")
                                    .when((col('pickupHour') >= 7) & (col('pickupHour') <= 10), "AMRush")
                                    .when((col('pickupHour') >= 11) & (col('pickupHour') <= 15), "Afternoon")
                                    .when((col('pickupHour') >= 16) & (col('pickupHour') <= 19), "PMRush")
                                    .otherwise("Other").alias('trafficTimeBins')
                                    ) \
                            .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Verificer: Bekræft at trafiktidsbingerne er korrekt fordelt.

taxi_featurised_df.groupBy('trafficTimeBins').count().show()
# Expected output: Shows counts for Night, AMRush, Afternoon, PMRush categories

Opret en logistisk regressionsmodel

Den endelige opgave konverterer de navngivne data til et format, som logistisk regression kan håndtere. Inputtet til en logistisk regressionsalgoritme skal have en mærkat-/funktionsvektorparstruktur, hvor funktionsvektoren er en vektor af tal, der repræsenterer inputpunktet.

Konverter de kategoriske kolonner trafficTimeBins og weekdayString til heltalsrepræsentationer ved at bruge denne OneHotEncoder tilgang:

# Convert categorical features into numeric representations
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")

# Apply the encodings to create a new DataFrame
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Verificer: Bekræft at den kodede DataFrame har de forventede nye kolonner.

print("Columns:", encoded_final_df.columns)
print(f"Row count: {encoded_final_df.count()}")
# Expected output: Columns list includes 'trafficTimeBinsVec' and 'weekdayVec'

Oplær en logistisk regressionsmodel

Del datasættet op i et træningssæt (70%) og et testsæt (30%):

# Split the DataFrame into training and test sets
trainingFraction = 0.7
testingFraction = (1 - trainingFraction)
seed = 1234

train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Bekræft: Bekræft at splittelsen gav rimelige størrelser.

print(f"Training rows: {train_data_df.count()}, Test rows: {test_data_df.count()}")
# Expected output: Approximately 70%/30% split of the encoded data

Skab modelformlen, træn den logistiske regressionsmodel, og evaluer den ved at bruge Area Under the ROC (Receiver Operating Characteristic) Curve:

# Create a logistic regression model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol='label')

# Define the formula: 'tipped' is the response variable, right-hand side are predictors
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType + trafficTimeBinsVec")

# Train the model using a pipeline
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

# Generate predictions on the test dataset
predictions = lrModel.transform(test_data_df)

# Evaluate using Area Under ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area under ROC = {auc}")

Verificer: Outputtet viser en AUC-værdi. En velfungerende model giver en værdi tæt på 1,0.

Area under ROC = 0.97 (approximately)

Bemærkning

Den præcise AUC-værdi varierer afhængigt af dataprøven. Værdier over 0,90 indikerer stærk prædiktiv præstation for dette datasæt.

Opret en visuel gengivelse af forudsigelsen

Byg en endelig visualisering for at fortolke modelresultaterne. En ROC-kurve præsenterer afvejningen mellem sand positivrate og falsk positiv-rate.

# Plot the ROC curve from the model training summary
modelSummary = lrModel.stages[-1].summary

# Extract FPR and TPR values as plain lists
roc_data = modelSummary.roc.select('FPR', 'TPR').toPandas()

plt.figure(figsize=(8, 6))
plt.plot([0, 1], [0, 1], 'r--', label='Random classifier')
plt.plot(roc_data['FPR'], roc_data['TPR'], label=f'Logistic Regression (AUC = {auc:.4f})')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - NYC Taxi Tip Prediction')
plt.legend(loc='lower right')
plt.show()

Verificer: Et plot vises, der viser ROC-kurven over den røde stiplede diagonale linje. Kurven bør bøje mod øverste venstre hjørne, hvilket indikerer stærk præstation i klassifikationen.

Graf, der viser ROC-kurven for logistisk regression i tipmodellen.

Ryd op i ressourcer

Når du er færdig med denne tutorial, sletter du notesbogen og lakehouse'en for at frigøre arbejdsområde:

  1. I dit arbejdsområde højreklikker du på notesbogen og vælger Slet.
  2. Hvis du har oprettet et søhus specifikt til denne tutorial, skal du højreklikke på det og vælge Slet.

For at bevare den trænede model til fremtidig brug, tilføj følgende kode før oprydning:

# Save the model to the lakehouse
model_path = "abfss://<your-workspace>@onelake.dfs.fabric.microsoft.com/<your-lakehouse>.Lakehouse/Files/models/taxi_tip_model"
lrModel.write().overwrite().save(model_path)
print(f"Model saved to: {model_path}")

Troubleshooting

Problem Årsag Løsning
Py4JJavaError Når man læser parket Netværksforbindelse til Azure blob storage Tjek at dit Fabric-arbejdsområde har udgående internetadgang. Prøv at genstarte Spark-sessionen.
AnalysisException: cannot resolve column Søjlenavn, stavefejl eller skema-mismatch Løb nyc_tlc_df.printSchema() for at inspicere tilgængelige søjler. NYC's taxidatasæt-skema kan ændre sig mellem årene.
Tomt DataFrame efter filtrering Filterbetingelser for restriktive for datavinduet Forskær datointervallet eller tjek sampled_taxi_df.count() før filtrering.
IllegalArgumentException i StringIndexer Usete etiketter under transformation Tilføj handleInvalid="skip" til dine StringIndexer opkald: StringIndexer(inputCol="...", outputCol="...", handleInvalid="skip")
Lav AUC (under 0,6) Utilstrækkelige data eller forkert feature engineering Øg prøvefraktionen (for eksempel 0.01 i stedet for 0.001) og verificér trafficTimeBins , at kategorierne er afbalancerede.
OutOfMemoryError Datasættet er for stort i forhold til tilgængelig kapacitet Reducer prøvefraktionen eller øg dit Fabric-kapacitetsniveau.
ROC-plot vises ikke Matplotlib backend-problem i notesbog Tilføj %matplotlib inline det øverst i notesbogen.