Erstellen eines Machine Learning-Modells mit Apache Spark MLlib

In diesem Artikel erfahren Sie, wie Sie Apache Spark MLlib verwenden, um eine Machine Learning-Anwendung zu erstellen, die die Predictive Analysis für ein Azure offenes Dataset behandelt. Spark bietet integrierte Machine Learning-Bibliotheken. In diesem Beispiel wird eine Klassifizierung mittels logistischer Regression verwendet.

In diesem Lernprogramm werden die folgenden Schritte behandelt:

  • Einrichten von Notizbuch und Importen
  • NYC-Taxidaten laden und Stichproben daraus entnehmen
  • Vorbereiten und Entwickeln von Features
  • Kategoriale Merkmale codieren
  • Trainieren des Logistischen Regressionsmodells
  • Auswerten und Visualisieren von Ergebnissen

Die SparkML und MLlib Spark-Kernbibliotheken bieten viele Dienstprogramme, die für Machine Learning Aufgaben nützlich sind. Diese Dienstprogramme eignen sich für:

  • Klassifizierung
  • Clusterbildung
  • Testen von Hypothesen und Berechnen von Beispielstatistiken
  • Regression
  • Singulärwertzerlegung (Singular Value Decomposition, SVD) und Hauptkomponentenanalyse (Principal Component Analysis, PCA)
  • Themenmodellierung

Voraussetzungen

Grundlegendes zu Klassifizierung und logistischer Regression

Bei der Klassifizierung, einer beliebten Aufgabe des Machine Learning, werden die Eingabedaten in Kategorien sortiert. Ein Klassifizierungsalgorithmus gibt an, wie den bereitgestellten Eingabedaten Bezeichnungen zugewiesen werden. Beispielsweise könnte ein Machine Learning-Algorithmus Aktieninformationen als Eingabe akzeptieren und die Aktie in zwei Kategorien unterteilen: Aktien, die Sie verkaufen sollten, und Aktien, die Sie behalten sollten.

Der Logistische Regressionsalgorithmus ist für die Klassifizierung nützlich. Die API für die logistische Regression von Spark ist nützlich für eine binäre Klassifizierung der Eingabedaten in einer von zwei Gruppen. Weitere Informationen zur logistischen Regression finden Sie in Wikipedia.

Die logistische Regression erzeugt eine Logistikfunktion , die die Wahrscheinlichkeit vorhersagt, dass ein Eingabevektor zu einer Gruppe oder zur anderen gehört.

Beispiel für eine Vorhersageanalyse von NYC-Taxidaten

Die Daten sind über die Ressource Azure Open Datasets verfügbar. Diese Teilmenge des Datasets enthält Informationen zu Taxifahrten von Yellow Cabs, einschließlich Informationen zu den Start- und Endzeiten, den Start- und Zielorten, den Fahrtkosten und anderer Attribute.

In diesem Lernprogramm wird Apache Spark verwendet, um Analysen zu den NYC-Taxitrip-Tippdaten durchzuführen und ein Modell zu entwickeln, um vorherzusagen, ob eine bestimmte Reise einen Tipp enthält.

Erstellen eines Apache Spark-Machine Learning-Modells

  1. Erstellen Sie ein PySpark-Notebook. Weitere Informationen finden Sie unter Erstellen eines Notizbuchs.

    Nachdem Sie das Notizbuch erstellt haben, fügen Sie es an ein Seehaus an, indem Sie im linken Bereich "Seehaus hinzufügen" auswählen.

  2. Importieren Sie die erforderlichen Typen für dieses Notizbuch. Fügen Sie den folgenden Code in die erste Zelle ein, und führen Sie ihn aus.

    import matplotlib.pyplot as plt
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    

    Überprüfen: Die Zelle wird ohne ImportError abgeschlossen. Wenn ein Fehler angezeigt wird, bestätigen Sie, dass Ihr Notizbuch die PySpark-Laufzeit verwendet.

  3. Verwenden Sie MLflow , um Ihre Machine Learning-Experimente und die entsprechenden Läufe nachzuverfolgen. Wenn die automatische Protokollierung von Microsoft Fabric aktiviert ist, werden die entsprechenden Metriken und Parameter automatisch erfasst.

    import mlflow
    

    Verifizieren: Die Zelle wird ohne Fehler ausgeführt. Führen Sie die Ausführung print(mlflow.__version__) aus, um zu bestätigen, dass MLflow verfügbar ist.

Erstellen des Eingabedatenrahmens

In diesem Beispiel werden die Daten aus Azure Open Datasets Speicher in einen Apache Spark DataFrame geladen. Anschließend wenden Sie Spark-Vorgänge zum Bereinigen und Filtern des Datasets an.

  1. Fügen Sie den folgenden Code in eine neue Zelle ein, und führen Sie ihn aus, um einen Spark DataFrame zu erstellen. Dieser Schritt ruft gelbe Taxidaten aus NYC ab, gefiltert auf Mai 2018.

    blob_account_name = "azureopendatastorage"
    blob_container_name = "nyctlc"
    blob_relative_path = "yellow"
    wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}"
    
    nyc_tlc_df = spark.read.parquet(wasbs_path) \
        .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-01")) \
        .repartition(20)
    

    Überprüfen: Führen Sie die folgende Zelle aus, um zu bestätigen, dass Die Daten erfolgreich geladen werden.

    print(f"Loaded {nyc_tlc_df.count()} rows")
    # Expected output: Loaded approximately 9,000,000+ rows
    
  2. Erstellen Sie eine Stichprobe des Datensatzes, um Entwicklung und Training zu beschleunigen.

    # Sample without replacement to avoid duplicates
    sampled_taxi_df = nyc_tlc_df.sample(False, 0.001, seed=1234)
    

    Überprüfen: Überprüfen Sie, ob die Beispielgröße verwaltbar ist.

    print(f"Sampled {sampled_taxi_df.count()} rows")
    # Expected output: Sampled approximately 9,000-10,000 rows
    
  3. Zeigen Sie die Daten mithilfe des integrierten display() Befehls an, um das Datenbeispiel zu untersuchen.

    display(sampled_taxi_df.limit(10))
    

    Überprüfen: Eine Tabelle mit 10 Zeilen wird angezeigt, mit Spalten wie tpepPickupDateTime, fareAmount, tipAmount und tripDistance.

Vorbereiten der Daten

Die Datenaufbereitung ist ein wichtiger Schritt im Machine Learning-Prozess. Es umfasst das Bereinigen, Transformieren und Organisieren von Rohdaten, um sie für Die Analyse und Modellierung geeignet zu machen. Führen Sie in diesem Abschnitt mehrere Schritte zur Datenvorbereitung aus:

  • Filtern Sie das Dataset, um Ausreißer und falsche Werte zu entfernen.
  • Entfernen Sie Spalten, die für die Modellschulung nicht erforderlich sind.
  • Erstellen Sie neue Spalten aus den Rohdaten.
  • Generieren Sie ein Etikett, um zu bestimmen, ob eine bestimmte Taxireise einen Tipp beinhaltet.

Führen Sie den folgenden Code aus, um relevante Spalten auszuwählen, abgeleitete Features zu berechnen und Ausreißer zu filtern:

taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount',
                    'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime',
                    date_format('tpepPickupDateTime', 'HH').cast('integer').alias('pickupHour'),
                    date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString'),
                    (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs'),
                    (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                    ) \
            .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)
                    & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)
                    & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)
                    & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)
                    & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)
                    & (sampled_taxi_df.rateCodeId <= 5)
                    & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                    )

Important

Die date_format Funktion verwendet das Muster 'HH' (24-Stunden-Format, Werte 0-23) und nicht 'hh' (12-Stunden-Format, Werte 1-12). Das 24-Stunden-Format ist für die darauf folgende Logik zur Einteilung nach Tageszeit erforderlich.

Fügen Sie als Nächstes die Funktion für Verkehrszeitintervalle auf Grundlage der Tagesstunde hinzu:

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount',
                                    'tripDistance', 'weekdayString', 'pickupHour', 'tripTimeSecs', 'tipped',
                                    when((col('pickupHour') <= 6) | (col('pickupHour') >= 20), "Night")
                                    .when((col('pickupHour') >= 7) & (col('pickupHour') <= 10), "AMRush")
                                    .when((col('pickupHour') >= 11) & (col('pickupHour') <= 15), "Afternoon")
                                    .when((col('pickupHour') >= 16) & (col('pickupHour') <= 19), "PMRush")
                                    .otherwise("Other").alias('trafficTimeBins')
                                    ) \
                            .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Überprüfen: Stellen Sie sicher, dass die Verkehrszeitintervalle korrekt verteilt sind.

taxi_featurised_df.groupBy('trafficTimeBins').count().show()
# Expected output: Shows counts for Night, AMRush, Afternoon, PMRush categories

Erstellen eines logistischen Regressionsmodells

Die letzte Aufgabe besteht darin, die Daten mit Label in ein Format zu konvertieren, das die logistische Regression verarbeiten kann. Die Eingabe für einen logistischen Regressionsalgorithmus muss eine Struktur von Paaren aus Bezeichnung und Featurevektor aufweisen, wobei der Featurevektor aus Zahlen besteht, die den Eingabepunkt darstellen.

Konvertieren Sie die kategorisierten Spalten trafficTimeBins und weekdayString in ganzzahlige Darstellungen mithilfe des OneHotEncoder Ansatzes:

# Convert categorical features into numeric representations
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")

# Apply the encodings to create a new DataFrame
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Überprüfen: Stellen Sie sicher, dass der kodierte DataFrame die erwarteten neuen Spalten enthält.

print("Columns:", encoded_final_df.columns)
print(f"Row count: {encoded_final_df.count()}")
# Expected output: Columns list includes 'trafficTimeBinsVec' and 'weekdayVec'

Trainieren eines logistischen Regressionsmodells

Teilen Sie das Dataset in einen Schulungssatz (70%) und einen Testsatz (30%):

# Split the DataFrame into training and test sets
trainingFraction = 0.7
testingFraction = (1 - trainingFraction)
seed = 1234

train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Überprüfen: Stellen Sie sicher, dass die Aufteilung sinnvolle Größen ergeben hat.

print(f"Training rows: {train_data_df.count()}, Test rows: {test_data_df.count()}")
# Expected output: Approximately 70%/30% split of the encoded data

Erstellen Sie die Modellformel, trainieren Sie das logistische Regressionsmodell und bewerten Sie es anhand der Fläche unter der ROC-Kurve (Receiver Operating Characteristic Curve):

# Create a logistic regression model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol='label')

# Define the formula: 'tipped' is the response variable, right-hand side are predictors
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType + trafficTimeBinsVec")

# Train the model using a pipeline
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

# Generate predictions on the test dataset
predictions = lrModel.transform(test_data_df)

# Evaluate using Area Under ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area under ROC = {auc}")

Überprüfen: Die Ausgabe zeigt einen AUC-Wert an. Ein leistungsfähiges Modell erzeugt einen Wert nahe 1,0.

Area under ROC = 0.97 (approximately)

Note

Der genaue AUC-Wert variiert je nach Datenbeispiel. Werte über 0,90 deuten auf eine starke prädiktive Leistung für dieses Dataset hin.

Erstellen einer visuellen Darstellung der Vorhersage

Erstellen Sie eine endgültige Visualisierung, um die Modellergebnisse zu interpretieren. Eine ROC-Kurve stellt den Kompromiss zwischen wahrer positiver Und falsch positiver Rate dar.

# Plot the ROC curve from the model training summary
modelSummary = lrModel.stages[-1].summary

# Extract FPR and TPR values as plain lists
roc_data = modelSummary.roc.select('FPR', 'TPR').toPandas()

plt.figure(figsize=(8, 6))
plt.plot([0, 1], [0, 1], 'r--', label='Random classifier')
plt.plot(roc_data['FPR'], roc_data['TPR'], label=f'Logistic Regression (AUC = {auc:.4f})')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - NYC Taxi Tip Prediction')
plt.legend(loc='lower right')
plt.show()

Überprüfen: Eine Zeichnung wird mit der ROC-Kurve oberhalb der roten gestrichelten diagonalen Linie angezeigt. Die Kurve sollte sich zur oberen linken Ecke hin wölben, was auf eine hohe Klassifizierungsleistung hindeutet.

Diagramm: ROC-Kurve für die logistische Regression im Trinkgeldmodell

Bereinigen von Ressourcen

Nachdem Sie dieses Lernprogramm abgeschlossen haben, löschen Sie das Notizbuch und das Seehaus, um die Arbeitsbereichskapazität freizugeben:

  1. Klicken Sie in Ihrem Arbeitsbereich mit der rechten Maustaste auf das Notizbuch, und wählen Sie "Löschen" aus.
  2. Wenn Sie speziell für dieses Lernprogramm ein Seehaus erstellt haben, klicken Sie mit der rechten Maustaste darauf, und wählen Sie "Löschen" aus.

Um das trainierte Modell für die zukünftige Verwendung beizubehalten, fügen Sie den folgenden Code vor der Bereinigung hinzu:

# Save the model to the lakehouse
model_path = "abfss://<your-workspace>@onelake.dfs.fabric.microsoft.com/<your-lakehouse>.Lakehouse/Files/models/taxi_tip_model"
lrModel.write().overwrite().save(model_path)
print(f"Model saved to: {model_path}")

Troubleshooting

Thema Ursache Lösung
Py4JJavaError beim Lesen von Parkett Netzwerkkonnektivität zum Azure BLOB-Speicher Überprüfen Sie, ob Ihr Fabric Arbeitsbereich über ausgehenden Internetzugriff verfügt. Starten Sie die Spark-Sitzung neu.
AnalysisException: cannot resolve column Tippfehler im Spaltennamen oder Schemaabweichung Führen Sie die Ausführung nyc_tlc_df.printSchema() aus, um die verfügbaren Spalten zu prüfen. Das NYC-Taxi-Dataset-Schema kann sich zwischen Jahren ändern.
Leeres DataFrame nach dem Filtern Die Filterbedingungen für das Datenfenster sind zu eingeschränkt. Erhöhen Sie den Datumsbereich, oder überprüfen Sie sampled_taxi_df.count() vor dem Filtern.
IllegalArgumentException in StringIndexer Nicht erkannte Beschriftungen beim Transformieren Fügen Sie handleInvalid="skip" zu Ihren StringIndexer-Aufrufen hinzu: StringIndexer(inputCol="...", outputCol="...", handleInvalid="skip")
Niedrige AUC (unter 0,6) Unzureichende Daten oder falsches Feature engineering Erhöhen Sie den Stichprobenbruch (z. B 0.01 . anstelle von 0.001) und überprüfen Sie, ob trafficTimeBins Kategorien ausgeglichen sind.
OutOfMemoryError Dataset zu groß für verfügbare Kapazität Reduzieren Sie den Stichprobenanteil oder erhöhen Sie Ihre Fabric-Kapazitätsstufe.
ROC-Diagramm wird nicht angezeigt Matplotlib-Backend-Problem im Notebook Fügen Sie %matplotlib inline oben im Notebook hinzu.