Créer un modèle Machine Learning avec Apache Spark MLlib

Dans cet article, vous allez apprendre à utiliser Apache Spark MLlib pour créer une application Machine Learning qui gère l’analyse prédictive sur un jeu de données ouvert Azure. Spark fournit des bibliothèques d’apprentissage automatique intégrées. Cet exemple utilise la classification par régression logistique.

Ce tutoriel décrit les étapes suivantes :

  • Configurer le notebook et les importations
  • Charger et prélever un échantillon de données sur les taxis de NYC
  • Préparer et concevoir des fonctionnalités
  • Encoder des caractéristiques catégorielles
  • Entraîner le modèle de régression logistique
  • Évaluer et visualiser les résultats

Les bibliothèques Spark de base SparkML et MLlib fournissent de nombreux utilitaires utiles pour les tâches d’apprentissage automatique. Ces utilitaires conviennent pour les tâches suivantes :

  • classification
  • Regroupement
  • Test d'hypothèse et calcul des statistiques d'échantillon
  • régression ;
  • Décomposition de valeur singulière (SVD) et analyse des composants principaux (PCA)
  • Modélisation de rubrique

Prerequisites

Comprendre la classification et la régression logistique

Une classification, tâche d’apprentissage automatique très courante, implique le tri de données d’entrée par catégories. Un algorithme de classification montre comment affecter des étiquettes aux données d’entrée fournies. Par exemple, un algorithme Machine Learning peut accepter des informations sur les actions comme entrée et diviser le stock en deux catégories : les actions que vous devez vendre et les actions que vous devez conserver.

L’algorithme de régression logistique est utile pour la classification. L’API de régression logistique Spark est utile pour la classification binaire des données d’entrée dans l’un des deux groupes. Pour plus d’informations sur la régression logistique, consultez Wikipedia.

La régression logistique produit une fonction logistique qui prédit la probabilité qu’un vecteur d’entrée appartient à un groupe ou à l’autre.

Exemple d’analyse prédictive des données des taxis de New York

Les données sont disponibles via la ressource Azure Open Datasets. Ce sous-ensemble du jeu de données contient des informations sur les courses de taxis jaunes, notamment les heures et les lieux de départ et d’arrivée, le coût des courses et d’autres attributs.

Ce tutoriel utilise Apache Spark pour analyser les données sur les pourboires des trajets en taxi à New York et développer un modèle permettant de prédire si un trajet donné comporte un pourboire.

Créer un modèle d’apprentissage automatique Apache Spark

  1. Créez un notebook PySpark. Pour plus d’informations, consultez Créer un bloc-notes.

    Après avoir créé le bloc-notes, attachez-le à un lakehouse en sélectionnant Ajouter lakehouse dans le volet gauche.

  2. Importez les types requis pour ce notebook. Collez le code suivant dans la première cellule et exécutez-le.

    import matplotlib.pyplot as plt
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    

    Vérifier : la cellule se termine sans ImportError. Si vous voyez une erreur, vérifiez que votre notebook utilise le runtime PySpark.

  3. Utilisez MLflow pour suivre vos expériences machine learning et les exécutions correspondantes. Si la journalisation automatique Microsoft Fabric est activée, les métriques et paramètres correspondants sont automatiquement capturés.

    import mlflow
    

    Vérifiez : la cellule s’exécute sans aucune erreur. Exécutez print(mlflow.__version__) pour confirmer que MLflow est disponible.

Construire le DataFrame d’entrée

Cet exemple charge les données depuis le stockage Azure Open Datasets dans un DataFrame Apache Spark. Ensuite, vous appliquez des opérations Spark pour nettoyer et filtrer le jeu de données.

  1. Collez le code suivant dans une nouvelle cellule et exécutez-le pour créer un DataFrame Spark. Cette étape récupère les données des taxis jaunes de NYC filtrées pour mai 2018.

    blob_account_name = "azureopendatastorage"
    blob_container_name = "nyctlc"
    blob_relative_path = "yellow"
    wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}"
    
    nyc_tlc_df = spark.read.parquet(wasbs_path) \
        .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-01")) \
        .repartition(20)
    

    Vérifier : exécutez la cellule suivante pour confirmer que les données sont chargées correctement.

    print(f"Loaded {nyc_tlc_df.count()} rows")
    # Expected output: Loaded approximately 9,000,000+ rows
    
  2. Échantillonnez le jeu de données pour accélérer le développement et l’entraînement.

    # Sample without replacement to avoid duplicates
    sampled_taxi_df = nyc_tlc_df.sample(False, 0.001, seed=1234)
    

    Vérifiez : confirmez que la taille de l’échantillon est raisonnable.

    print(f"Sampled {sampled_taxi_df.count()} rows")
    # Expected output: Sampled approximately 9,000-10,000 rows
    
  3. Affichez les données à l’aide de la commande intégrée display() pour explorer l’échantillon de données.

    display(sampled_taxi_df.limit(10))
    

    Vérifier : une table avec 10 lignes s’affiche avec des colonnes telles que tpepPickupDateTime, , fareAmounttipAmount, et tripDistance.

Préparer les données

La préparation des données est une étape cruciale du processus d’apprentissage automatique. Il implique le nettoyage, la transformation et l’organisation des données brutes pour qu’elles conviennent à l’analyse et à la modélisation. Dans cette section, effectuez plusieurs étapes de préparation des données :

  • Filtrez le jeu de données pour supprimer les valeurs hors norme et les valeurs incorrectes.
  • Supprimez les colonnes qui ne sont pas nécessaires pour l’entraînement du modèle.
  • Créez de nouvelles colonnes à partir des données brutes.
  • Générez une étiquette pour déterminer si un trajet en taxi donné implique un pourboire.

Exécutez le code suivant pour sélectionner les colonnes pertinentes, les fonctionnalités dérivées du calcul et filtrer les valeurs hors norme :

taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount',
                    'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime',
                    date_format('tpepPickupDateTime', 'HH').cast('integer').alias('pickupHour'),
                    date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString'),
                    (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs'),
                    (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                    ) \
            .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)
                    & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)
                    & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)
                    & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)
                    & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)
                    & (sampled_taxi_df.rateCodeId <= 5)
                    & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                    )

Important

La date_format fonction utilise le modèle 'HH' (format de 24 heures, valeurs 0-23) plutôt que 'hh' (format de 12 heures, valeurs 1-12). Le format sur 24 heures est requis pour la logique de regroupement par heure de la journée ci-dessous.

Ensuite, ajoutez la fonctionnalité de tranches horaires du trafic en fonction de l’heure de la journée :

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount',
                                    'tripDistance', 'weekdayString', 'pickupHour', 'tripTimeSecs', 'tipped',
                                    when((col('pickupHour') <= 6) | (col('pickupHour') >= 20), "Night")
                                    .when((col('pickupHour') >= 7) & (col('pickupHour') <= 10), "AMRush")
                                    .when((col('pickupHour') >= 11) & (col('pickupHour') <= 15), "Afternoon")
                                    .when((col('pickupHour') >= 16) & (col('pickupHour') <= 19), "PMRush")
                                    .otherwise("Other").alias('trafficTimeBins')
                                    ) \
                            .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Vérifier : confirmez que les intervalles de temps du trafic sont correctement répartis.

taxi_featurised_df.groupBy('trafficTimeBins').count().show()
# Expected output: Shows counts for Night, AMRush, Afternoon, PMRush categories

Créer un modèle de régression logistique

La dernière tâche convertit les données étiquetées dans un format qui peut être analysé par régression logistique. L’entrée dans un algorithme de régression logistique doit avoir une structure de paires de vecteurs étiquette/caractéristique, où le vecteur caractéristique est un vecteur de nombres qui représente le point d’entrée.

Convertissez les colonnes trafficTimeBins catégorielles et weekdayString en représentations entières à l’aide de l’approche OneHotEncoder :

# Convert categorical features into numeric representations
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")

# Apply the encodings to create a new DataFrame
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Vérifiez : vérifiez que le DataFrame encodé contient les nouvelles colonnes attendues.

print("Columns:", encoded_final_df.columns)
print(f"Row count: {encoded_final_df.count()}")
# Expected output: Columns list includes 'trafficTimeBinsVec' and 'weekdayVec'

Entraîner un modèle de régression logistique

Fractionnez le jeu de données en un jeu d’entraînement (70%) et un jeu de test (30%) :

# Split the DataFrame into training and test sets
trainingFraction = 0.7
testingFraction = (1 - trainingFraction)
seed = 1234

train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Vérifiez : confirmez que le fractionnement a produit des tailles raisonnables.

print(f"Training rows: {train_data_df.count()}, Test rows: {test_data_df.count()}")
# Expected output: Approximately 70%/30% split of the encoded data

Créez la formule du modèle, entraînez le modèle de régression logistique et évaluez-le à l’aide de La zone sous la courbe ROC (caractéristique d’exploitation du récepteur) :

# Create a logistic regression model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol='label')

# Define the formula: 'tipped' is the response variable, right-hand side are predictors
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType + trafficTimeBinsVec")

# Train the model using a pipeline
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

# Generate predictions on the test dataset
predictions = lrModel.transform(test_data_df)

# Evaluate using Area Under ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area under ROC = {auc}")

Vérifiez : la sortie affiche une valeur AUC. Un modèle performant produit une valeur proche de 1,0.

Area under ROC = 0.97 (approximately)

Note

La valeur exacte de l’AUC varie en fonction de l’exemple de données. Les valeurs supérieures à 0,90 indiquent des performances prédictives fortes pour ce jeu de données.

Créer une représentation visuelle de la prédiction

Créez une visualisation finale pour interpréter les résultats du modèle. Une courbe ROC présente le compromis entre le taux positif vrai et le taux faux positif.

# Plot the ROC curve from the model training summary
modelSummary = lrModel.stages[-1].summary

# Extract FPR and TPR values as plain lists
roc_data = modelSummary.roc.select('FPR', 'TPR').toPandas()

plt.figure(figsize=(8, 6))
plt.plot([0, 1], [0, 1], 'r--', label='Random classifier')
plt.plot(roc_data['FPR'], roc_data['TPR'], label=f'Logistic Regression (AUC = {auc:.4f})')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - NYC Taxi Tip Prediction')
plt.legend(loc='lower right')
plt.show()

Vérifiez : un tracé apparaît montrant la courbe ROC au-dessus de la ligne diagonale en pointillés rouges. La courbe doit s’incliner vers l’angle supérieur gauche, indiquant des performances de classification fortes.

Graphique montrant la courbe ROC pour la régression logistique dans le modèle de pointe.

Nettoyer les ressources

Une fois ce tutoriel terminé, supprimez le notebook et lakehouse pour libérer la capacité de l’espace de travail :

  1. Dans votre espace de travail, cliquez avec le bouton droit sur le bloc-notes, puis sélectionnez Supprimer.
  2. Si vous avez créé un lakehouse spécifiquement pour ce didacticiel, cliquez dessus avec le bouton droit et sélectionnez Supprimer.

Pour conserver le modèle entraîné pour une utilisation ultérieure, ajoutez le code suivant avant l’étape de nettoyage :

# Save the model to the lakehouse
model_path = "abfss://<your-workspace>@onelake.dfs.fabric.microsoft.com/<your-lakehouse>.Lakehouse/Files/models/taxi_tip_model"
lrModel.write().overwrite().save(model_path)
print(f"Model saved to: {model_path}")

Résolution des problèmes

Problème Cause Solution
Py4JJavaError lors de la lecture de Parquet Connectivité réseau au stockage Blob Azure Vérifiez que votre espace de travail Fabric dispose d’un accès Internet sortant. Essayez de redémarrer la session Spark.
AnalysisException: cannot resolve column Faute de frappe de nom de colonne ou incompatibilité de schéma Exécutez nyc_tlc_df.printSchema() pour inspecter les colonnes disponibles. Le schéma du jeu de données de taxi nyC peut changer d’une année à l’autre.
DataFrame vide après filtrage Conditions de filtre trop restrictives pour la fenêtre de données Augmentez la plage de dates ou vérifiez sampled_taxi_df.count() avant le filtrage.
IllegalArgumentException dans StringIndexer Étiquettes invisibles pendant la transformation Ajoutez handleInvalid="skip" à vos appels StringIndexer : StringIndexer(inputCol="...", outputCol="...", handleInvalid="skip")
Faible AUC (inférieur à 0,6) Données insuffisantes ou ingénierie des fonctionnalités incorrectes Augmentez la fraction d’échantillon (par exemple, 0.01 au lieu de 0.001) et vérifiez trafficTimeBins que les catégories sont équilibrées.
OutOfMemoryError Jeu de données trop volumineux pour la capacité disponible Réduisez la fraction d’échantillon ou augmentez votre niveau de capacité Fabric.
Courbe ROC ne s’affiche pas Problème de backend Matplotlib dans un notebook Ajoutez %matplotlib inline en haut du bloc-notes.