Een machine learning-model bouwen met Apache Spark MLlib

In dit artikel leert u hoe u Apache Spark MLlib kunt gebruiken om een machine learning-toepassing te maken waarmee voorspellende analyses op een Azure geopende gegevensset worden verwerkt. Spark biedt ingebouwde machine learning-bibliotheken. In dit voorbeeld wordt gebruikgemaakt van classificatie via logistieke regressie.

In deze zelfstudie worden de volgende stappen behandeld:

  • Notebook instellen en importeren
  • NYC-taxigegevens laden en bemonsteren
  • Functies voorbereiden en engineeren
  • Categorische functies coderen
  • Model voor logistische regressie trainen
  • Resultaten evalueren en visualiseren

De sparkML- en MLlib Spark-kernbibliotheken bieden veel hulpprogramma's die nuttig zijn voor machine learning-taken. Deze hulpprogramma's zijn geschikt voor:

  • Classificatie
  • Clusteranalyse
  • Hypothesen voor het testen en berekenen van voorbeeldstatistieken
  • Regressie
  • SVD (Singular Value Decomposition) en PCA (Principal Component Analysis)
  • Modellering van onderwerpen

Prerequisites

Classificatie en logistieke regressie begrijpen

Classificatie, een populaire machine learning-taak, omvat het sorteren van invoergegevens in categorieën. Een classificatie-algoritme bepaalt hoe labels worden toegewezen aan de opgegeven invoergegevens. Een machine learning-algoritme kan bijvoorbeeld aandelengegevens accepteren als invoer en het aandeel in twee categorieën verdelen: aandelen die u moet verkopen en aandelen die u moet behouden.

Het logistieke regressie-algoritme is handig voor classificatie. De Logistieke Regressie-API van Spark is handig voor binaire classificatie van invoergegevens in een van twee groepen. Zie Wikipedia voor meer informatie over logistieke regressie.

Logistieke regressie produceert een logistieke functie die de kans voorspelt dat een invoervector deel uitmaakt van de ene groep of de andere.

Voorbeeld van voorspellende analyse van NYC-taxigegevens

De gegevens zijn beschikbaar via de Azure Open Datasets-resource . Deze gegevensset bevat informatie over gele taxiritten, waaronder de begintijden, eindtijden, beginlocaties, eindlocaties, reiskosten en andere kenmerken.

In deze zelfstudie wordt Apache Spark gebruikt om analyses uit te voeren op de tipgegevens van taxiritken in NYC en een model te ontwikkelen om te voorspellen of een bepaalde reis een tip bevat.

Een machine learning-model in Apache Spark maken

  1. Maak een PySpark-notebook. Zie Een notitieblok maken voor meer informatie.

    Nadat u het notitieblok hebt gemaakt, koppelt u het aan een lakehouse door Lakehouse toevoegen te selecteren in het linkerdeelvenster.

  2. Importeer de vereiste typen voor dit notebook. Plak de volgende code in de eerste cel en voer deze uit.

    import matplotlib.pyplot as plt
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    

    Controleer: De cel wordt voltooid zonder ImportError. Als er een fout wordt weergegeven, controleert u of uw notebook gebruikmaakt van de PySpark-runtime.

  3. Gebruik MLflow om uw machine learning-experimenten en bijbehorende uitvoeringen bij te houden. Als Automatische aanmelding van Microsoft Fabric is ingeschakeld, worden de bijbehorende metrische gegevens en parameters automatisch vastgelegd.

    import mlflow
    

    Controleer: De cel wordt zonder fouten voltooid. Voer uit print(mlflow.__version__) om te controleren of MLflow beschikbaar is.

Het dataframe voor invoer maken

In dit voorbeeld worden de gegevens uit Azure Open Datasets opslag in een Apache Spark DataFrame geladen. Vervolgens past u Spark-bewerkingen toe om de gegevensset op te schonen en te filteren.

  1. Plak de volgende code in een nieuwe cel en voer deze uit om een Spark DataFrame te maken. Met deze stap worden NYC-gegevens van gele taxi's opgehaald, gefilterd voor mei 2018.

    blob_account_name = "azureopendatastorage"
    blob_container_name = "nyctlc"
    blob_relative_path = "yellow"
    wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}"
    
    nyc_tlc_df = spark.read.parquet(wasbs_path) \
        .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-01")) \
        .repartition(20)
    

    Controleer: Voer de volgende cel uit om te bevestigen dat de gegevens zijn geladen.

    print(f"Loaded {nyc_tlc_df.count()} rows")
    # Expected output: Loaded approximately 9,000,000+ rows
    
  2. Neem een steekproef van de dataset om de ontwikkeling en training te versnellen.

    # Sample without replacement to avoid duplicates
    sampled_taxi_df = nyc_tlc_df.sample(False, 0.001, seed=1234)
    

    Controleer: Controleer of de grootte van het voorbeeld beheerbaar is.

    print(f"Sampled {sampled_taxi_df.count()} rows")
    # Expected output: Sampled approximately 9,000-10,000 rows
    
  3. Bekijk de gegevens met behulp van de ingebouwde display() opdracht om het gegevensvoorbeeld te verkennen.

    display(sampled_taxi_df.limit(10))
    

    Controleer of: Een tabel met tien rijen wordt weergegeven met kolommen zoals tpepPickupDateTime, fareAmount, tipAmounten tripDistance.

De gegevens voorbereiden

Gegevensvoorbereiding is een cruciale stap in het machine learning-proces. Het omvat het opschonen, transformeren en ordenen van onbewerkte gegevens om deze geschikt te maken voor analyse en modellering. Voer in deze sectie verschillende stappen voor gegevensvoorbereiding uit:

  • Filter de gegevensset om uitbijters en onjuiste waarden te verwijderen.
  • Verwijder kolommen die niet nodig zijn voor modeltraining.
  • Nieuwe kolommen maken op basis van de onbewerkte gegevens.
  • Genereer een label om te bepalen of een bepaalde taxirit een tip omvat.

Voer de volgende code uit om relevante kolommen te selecteren, afgeleide functies te berekenen en uitbijters te filteren:

taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount',
                    'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime',
                    date_format('tpepPickupDateTime', 'HH').cast('integer').alias('pickupHour'),
                    date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString'),
                    (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs'),
                    (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                    ) \
            .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)
                    & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)
                    & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)
                    & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)
                    & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)
                    & (sampled_taxi_df.rateCodeId <= 5)
                    & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                    )

Important

De functie date_format gebruikt het patroon 'HH' (24-uursnotatie, waarden 0-23) in plaats van 'hh' (12-uursnotatie, waarden 1-12). De 24-uursnotatie is vereist voor de logica voor het indelen in tijdvakken op basis van het tijdstip van de dag die hierop volgt.

Voeg vervolgens de functionaliteit voor verkeers­tijdvakken toe op basis van het uur van de dag:

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount',
                                    'tripDistance', 'weekdayString', 'pickupHour', 'tripTimeSecs', 'tipped',
                                    when((col('pickupHour') <= 6) | (col('pickupHour') >= 20), "Night")
                                    .when((col('pickupHour') >= 7) & (col('pickupHour') <= 10), "AMRush")
                                    .when((col('pickupHour') >= 11) & (col('pickupHour') <= 15), "Afternoon")
                                    .when((col('pickupHour') >= 16) & (col('pickupHour') <= 19), "PMRush")
                                    .otherwise("Other").alias('trafficTimeBins')
                                    ) \
                            .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Controleer: Bevestig dat de verkeerstijdsintervallen correct zijn verdeeld.

taxi_featurised_df.groupBy('trafficTimeBins').count().show()
# Expected output: Shows counts for Night, AMRush, Afternoon, PMRush categories

Een logistiek regressiemodel maken

Met de laatste taak worden de gelabelde gegevens geconverteerd naar een indeling die logistieke regressie kan verwerken. De invoer voor een logistiek regressiealgoritme moet een structuur van label-/functievectorparen hebben, waarbij de functievector een vector is van getallen die het invoerpunt vertegenwoordigen.

Converteer de categorische kolommen trafficTimeBins en weekdayString naar gehele getallen met behulp van de OneHotEncoder methode:

# Convert categorical features into numeric representations
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")

# Apply the encodings to create a new DataFrame
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Controleer: Controleer of het gecodeerde DataFrame de verwachte nieuwe kolommen bevat.

print("Columns:", encoded_final_df.columns)
print(f"Row count: {encoded_final_df.count()}")
# Expected output: Columns list includes 'trafficTimeBinsVec' and 'weekdayVec'

Een logistiek regressiemodel trainen

Splits de gegevensset in een trainingsset (70%) en een testset (30%):

# Split the DataFrame into training and test sets
trainingFraction = 0.7
testingFraction = (1 - trainingFraction)
seed = 1234

train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Controleer: Controleer of de splitsing redelijke grootten heeft geproduceerd.

print(f"Training rows: {train_data_df.count()}, Test rows: {test_data_df.count()}")
# Expected output: Approximately 70%/30% split of the encoded data

Maak de modelformule, train het logistieke regressiemodel en evalueer dit met behulp van Area Under the ROC (Receiver Operating Characteristic) Curve:

# Create a logistic regression model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol='label')

# Define the formula: 'tipped' is the response variable, right-hand side are predictors
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType + trafficTimeBinsVec")

# Train the model using a pipeline
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

# Generate predictions on the test dataset
predictions = lrModel.transform(test_data_df)

# Evaluate using Area Under ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area under ROC = {auc}")

Controleer: In de uitvoer wordt een AUC-waarde weergegeven. Een goed presterend model produceert een waarde dicht bij 1.0.

Area under ROC = 0.97 (approximately)

Note

De exacte AUC-waarde is afhankelijk van het gegevensvoorbeeld. Waarden boven 0,90 geven sterke voorspellende prestaties voor deze gegevensset aan.

Een visuele weergave van de voorspelling maken

Bouw een uiteindelijke visualisatie om de modelresultaten te interpreteren. Een ROC-curve toont de afweging tussen de sensitiviteit en de valspositieve ratio.

# Plot the ROC curve from the model training summary
modelSummary = lrModel.stages[-1].summary

# Extract FPR and TPR values as plain lists
roc_data = modelSummary.roc.select('FPR', 'TPR').toPandas()

plt.figure(figsize=(8, 6))
plt.plot([0, 1], [0, 1], 'r--', label='Random classifier')
plt.plot(roc_data['FPR'], roc_data['TPR'], label=f'Logistic Regression (AUC = {auc:.4f})')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - NYC Taxi Tip Prediction')
plt.legend(loc='lower right')
plt.show()

Controleer: Er wordt een plot weergegeven met de ROC-curve boven de rode schuine diagonale lijn. De curve moet naar de linkerbovenhoek buigen, wat wijst op goede classificatieprestaties.

Grafiek met de ROC-curve voor logistieke regressie in het tipmodel.

De hulpbronnen opschonen

Nadat u deze zelfstudie hebt voltooid, verwijdert u het notebook en het lakehouse om werkruimtecapaciteit vrij te maken:

  1. Klik in uw werkruimte met de rechtermuisknop op het notitieblok en selecteer Verwijderen.
  2. Als u specifiek voor deze zelfstudie een lakehouse hebt gemaakt, klikt u er met de rechtermuisknop op en selecteert u Verwijderen.

Als u het getrainde model voor toekomstig gebruik wilt behouden, voegt u de volgende code toe voordat u het opschoont:

# Save the model to the lakehouse
model_path = "abfss://<your-workspace>@onelake.dfs.fabric.microsoft.com/<your-lakehouse>.Lakehouse/Files/models/taxi_tip_model"
lrModel.write().overwrite().save(model_path)
print(f"Model saved to: {model_path}")

Troubleshooting

Issue Oorzaak Solution
Py4JJavaError bij het lezen van Parquet Netwerkverbinding met Azure blobopslag Controleer of uw Fabric werkruimte uitgaande internettoegang heeft. Start de Spark-sessie opnieuw.
AnalysisException: cannot resolve column Typefout of schema van kolomnaam komt niet overeen Voer uit nyc_tlc_df.printSchema() om beschikbare kolommen te controleren. Het schema van de NYC-taxidataset kan van jaar tot jaar veranderen.
Leeg DataFrame na het filteren Filtervoorwaarden die te beperkend zijn voor het gegevensvenster Vergroot het datumbereik of controleer sampled_taxi_df.count() dit voordat u gaat filteren.
IllegalArgumentException in StringIndexer Ongeziene labels tijdens transformatie Toevoegen handleInvalid="skip" aan uw StringIndexer oproepen: StringIndexer(inputCol="...", outputCol="...", handleInvalid="skip")
Lage AUC (lager dan 0,6) Onvoldoende gegevens of onjuiste functie-engineering Verhoog de steekproeffractie (bijvoorbeeld 0.01 in plaats van 0.001) en controleer of trafficTimeBins categorieën in balans zijn.
OutOfMemoryError Gegevensset is te groot voor de beschikbare capaciteit Verminder de steekproeffractie of verhoog uw Fabric capaciteitslaag.
ROC-plot wordt niet weergegeven Matplotlib-back-endprobleem in notebook Voeg %matplotlib inline bovenaan het notitieblok toe.