Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
In dit artikel leert u hoe u Apache Spark MLlib kunt gebruiken om een machine learning-toepassing te maken waarmee voorspellende analyses op een Azure geopende gegevensset worden verwerkt. Spark biedt ingebouwde machine learning-bibliotheken. In dit voorbeeld wordt gebruikgemaakt van classificatie via logistieke regressie.
In deze zelfstudie worden de volgende stappen behandeld:
- Notebook instellen en importeren
- NYC-taxigegevens laden en bemonsteren
- Functies voorbereiden en engineeren
- Categorische functies coderen
- Model voor logistische regressie trainen
- Resultaten evalueren en visualiseren
De sparkML- en MLlib Spark-kernbibliotheken bieden veel hulpprogramma's die nuttig zijn voor machine learning-taken. Deze hulpprogramma's zijn geschikt voor:
- Classificatie
- Clusteranalyse
- Hypothesen voor het testen en berekenen van voorbeeldstatistieken
- Regressie
- SVD (Singular Value Decomposition) en PCA (Principal Component Analysis)
- Modellering van onderwerpen
Prerequisites
Haal een Microsoft Fabric-abonnement op. Of meld u aan voor een gratis proefversie van Microsoft Fabric.
Meld u aan bij Microsoft Fabric.
Schakel over naar Fabric met behulp van de ervaringsschakelaar aan de linkerkant van de startpagina.
- Maak indien nodig een Microsoft Fabric lakehouse zoals beschreven in Maak een lakehouse in Microsoft Fabric.
- Maak een nieuw notitieblok in uw werkruimte door + te selecteren en Notebook. Zie Een notitieblok maken voor meer informatie.
Classificatie en logistieke regressie begrijpen
Classificatie, een populaire machine learning-taak, omvat het sorteren van invoergegevens in categorieën. Een classificatie-algoritme bepaalt hoe labels worden toegewezen aan de opgegeven invoergegevens. Een machine learning-algoritme kan bijvoorbeeld aandelengegevens accepteren als invoer en het aandeel in twee categorieën verdelen: aandelen die u moet verkopen en aandelen die u moet behouden.
Het logistieke regressie-algoritme is handig voor classificatie. De Logistieke Regressie-API van Spark is handig voor binaire classificatie van invoergegevens in een van twee groepen. Zie Wikipedia voor meer informatie over logistieke regressie.
Logistieke regressie produceert een logistieke functie die de kans voorspelt dat een invoervector deel uitmaakt van de ene groep of de andere.
Voorbeeld van voorspellende analyse van NYC-taxigegevens
De gegevens zijn beschikbaar via de Azure Open Datasets-resource . Deze gegevensset bevat informatie over gele taxiritten, waaronder de begintijden, eindtijden, beginlocaties, eindlocaties, reiskosten en andere kenmerken.
In deze zelfstudie wordt Apache Spark gebruikt om analyses uit te voeren op de tipgegevens van taxiritken in NYC en een model te ontwikkelen om te voorspellen of een bepaalde reis een tip bevat.
Een machine learning-model in Apache Spark maken
Maak een PySpark-notebook. Zie Een notitieblok maken voor meer informatie.
Nadat u het notitieblok hebt gemaakt, koppelt u het aan een lakehouse door Lakehouse toevoegen te selecteren in het linkerdeelvenster.
Importeer de vereiste typen voor dit notebook. Plak de volgende code in de eerste cel en voer deze uit.
import matplotlib.pyplot as plt from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluatorControleer: De cel wordt voltooid zonder
ImportError. Als er een fout wordt weergegeven, controleert u of uw notebook gebruikmaakt van de PySpark-runtime.Gebruik MLflow om uw machine learning-experimenten en bijbehorende uitvoeringen bij te houden. Als Automatische aanmelding van Microsoft Fabric is ingeschakeld, worden de bijbehorende metrische gegevens en parameters automatisch vastgelegd.
import mlflowControleer: De cel wordt zonder fouten voltooid. Voer uit
print(mlflow.__version__)om te controleren of MLflow beschikbaar is.
Het dataframe voor invoer maken
In dit voorbeeld worden de gegevens uit Azure Open Datasets opslag in een Apache Spark DataFrame geladen. Vervolgens past u Spark-bewerkingen toe om de gegevensset op te schonen en te filteren.
Plak de volgende code in een nieuwe cel en voer deze uit om een Spark DataFrame te maken. Met deze stap worden NYC-gegevens van gele taxi's opgehaald, gefilterd voor mei 2018.
blob_account_name = "azureopendatastorage" blob_container_name = "nyctlc" blob_relative_path = "yellow" wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}" nyc_tlc_df = spark.read.parquet(wasbs_path) \ .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-01")) \ .repartition(20)Controleer: Voer de volgende cel uit om te bevestigen dat de gegevens zijn geladen.
print(f"Loaded {nyc_tlc_df.count()} rows") # Expected output: Loaded approximately 9,000,000+ rowsNeem een steekproef van de dataset om de ontwikkeling en training te versnellen.
# Sample without replacement to avoid duplicates sampled_taxi_df = nyc_tlc_df.sample(False, 0.001, seed=1234)Controleer: Controleer of de grootte van het voorbeeld beheerbaar is.
print(f"Sampled {sampled_taxi_df.count()} rows") # Expected output: Sampled approximately 9,000-10,000 rowsBekijk de gegevens met behulp van de ingebouwde
display()opdracht om het gegevensvoorbeeld te verkennen.display(sampled_taxi_df.limit(10))Controleer of: Een tabel met tien rijen wordt weergegeven met kolommen zoals
tpepPickupDateTime,fareAmount,tipAmountentripDistance.
De gegevens voorbereiden
Gegevensvoorbereiding is een cruciale stap in het machine learning-proces. Het omvat het opschonen, transformeren en ordenen van onbewerkte gegevens om deze geschikt te maken voor analyse en modellering. Voer in deze sectie verschillende stappen voor gegevensvoorbereiding uit:
- Filter de gegevensset om uitbijters en onjuiste waarden te verwijderen.
- Verwijder kolommen die niet nodig zijn voor modeltraining.
- Nieuwe kolommen maken op basis van de onbewerkte gegevens.
- Genereer een label om te bepalen of een bepaalde taxirit een tip omvat.
Voer de volgende code uit om relevante kolommen te selecteren, afgeleide functies te berekenen en uitbijters te filteren:
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount',
'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime',
date_format('tpepPickupDateTime', 'HH').cast('integer').alias('pickupHour'),
date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString'),
(unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs'),
(when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
) \
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Important
De functie date_format gebruikt het patroon 'HH' (24-uursnotatie, waarden 0-23) in plaats van 'hh' (12-uursnotatie, waarden 1-12). De 24-uursnotatie is vereist voor de logica voor het indelen in tijdvakken op basis van het tijdstip van de dag die hierop volgt.
Voeg vervolgens de functionaliteit voor verkeerstijdvakken toe op basis van het uur van de dag:
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount',
'tripDistance', 'weekdayString', 'pickupHour', 'tripTimeSecs', 'tipped',
when((col('pickupHour') <= 6) | (col('pickupHour') >= 20), "Night")
.when((col('pickupHour') >= 7) & (col('pickupHour') <= 10), "AMRush")
.when((col('pickupHour') >= 11) & (col('pickupHour') <= 15), "Afternoon")
.when((col('pickupHour') >= 16) & (col('pickupHour') <= 19), "PMRush")
.otherwise("Other").alias('trafficTimeBins')
) \
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Controleer: Bevestig dat de verkeerstijdsintervallen correct zijn verdeeld.
taxi_featurised_df.groupBy('trafficTimeBins').count().show()
# Expected output: Shows counts for Night, AMRush, Afternoon, PMRush categories
Een logistiek regressiemodel maken
Met de laatste taak worden de gelabelde gegevens geconverteerd naar een indeling die logistieke regressie kan verwerken. De invoer voor een logistiek regressiealgoritme moet een structuur van label-/functievectorparen hebben, waarbij de functievector een vector is van getallen die het invoerpunt vertegenwoordigen.
Converteer de categorische kolommen trafficTimeBins en weekdayString naar gehele getallen met behulp van de OneHotEncoder methode:
# Convert categorical features into numeric representations
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")
# Apply the encodings to create a new DataFrame
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Controleer: Controleer of het gecodeerde DataFrame de verwachte nieuwe kolommen bevat.
print("Columns:", encoded_final_df.columns)
print(f"Row count: {encoded_final_df.count()}")
# Expected output: Columns list includes 'trafficTimeBinsVec' and 'weekdayVec'
Een logistiek regressiemodel trainen
Splits de gegevensset in een trainingsset (70%) en een testset (30%):
# Split the DataFrame into training and test sets
trainingFraction = 0.7
testingFraction = (1 - trainingFraction)
seed = 1234
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Controleer: Controleer of de splitsing redelijke grootten heeft geproduceerd.
print(f"Training rows: {train_data_df.count()}, Test rows: {test_data_df.count()}")
# Expected output: Approximately 70%/30% split of the encoded data
Maak de modelformule, train het logistieke regressiemodel en evalueer dit met behulp van Area Under the ROC (Receiver Operating Characteristic) Curve:
# Create a logistic regression model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol='label')
# Define the formula: 'tipped' is the response variable, right-hand side are predictors
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType + trafficTimeBinsVec")
# Train the model using a pipeline
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
# Generate predictions on the test dataset
predictions = lrModel.transform(test_data_df)
# Evaluate using Area Under ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area under ROC = {auc}")
Controleer: In de uitvoer wordt een AUC-waarde weergegeven. Een goed presterend model produceert een waarde dicht bij 1.0.
Area under ROC = 0.97 (approximately)
Note
De exacte AUC-waarde is afhankelijk van het gegevensvoorbeeld. Waarden boven 0,90 geven sterke voorspellende prestaties voor deze gegevensset aan.
Een visuele weergave van de voorspelling maken
Bouw een uiteindelijke visualisatie om de modelresultaten te interpreteren. Een ROC-curve toont de afweging tussen de sensitiviteit en de valspositieve ratio.
# Plot the ROC curve from the model training summary
modelSummary = lrModel.stages[-1].summary
# Extract FPR and TPR values as plain lists
roc_data = modelSummary.roc.select('FPR', 'TPR').toPandas()
plt.figure(figsize=(8, 6))
plt.plot([0, 1], [0, 1], 'r--', label='Random classifier')
plt.plot(roc_data['FPR'], roc_data['TPR'], label=f'Logistic Regression (AUC = {auc:.4f})')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - NYC Taxi Tip Prediction')
plt.legend(loc='lower right')
plt.show()
Controleer: Er wordt een plot weergegeven met de ROC-curve boven de rode schuine diagonale lijn. De curve moet naar de linkerbovenhoek buigen, wat wijst op goede classificatieprestaties.
De hulpbronnen opschonen
Nadat u deze zelfstudie hebt voltooid, verwijdert u het notebook en het lakehouse om werkruimtecapaciteit vrij te maken:
- Klik in uw werkruimte met de rechtermuisknop op het notitieblok en selecteer Verwijderen.
- Als u specifiek voor deze zelfstudie een lakehouse hebt gemaakt, klikt u er met de rechtermuisknop op en selecteert u Verwijderen.
Als u het getrainde model voor toekomstig gebruik wilt behouden, voegt u de volgende code toe voordat u het opschoont:
# Save the model to the lakehouse
model_path = "abfss://<your-workspace>@onelake.dfs.fabric.microsoft.com/<your-lakehouse>.Lakehouse/Files/models/taxi_tip_model"
lrModel.write().overwrite().save(model_path)
print(f"Model saved to: {model_path}")
Troubleshooting
| Issue | Oorzaak | Solution |
|---|---|---|
Py4JJavaError bij het lezen van Parquet |
Netwerkverbinding met Azure blobopslag | Controleer of uw Fabric werkruimte uitgaande internettoegang heeft. Start de Spark-sessie opnieuw. |
AnalysisException: cannot resolve column |
Typefout of schema van kolomnaam komt niet overeen | Voer uit nyc_tlc_df.printSchema() om beschikbare kolommen te controleren. Het schema van de NYC-taxidataset kan van jaar tot jaar veranderen. |
| Leeg DataFrame na het filteren | Filtervoorwaarden die te beperkend zijn voor het gegevensvenster | Vergroot het datumbereik of controleer sampled_taxi_df.count() dit voordat u gaat filteren. |
IllegalArgumentException in StringIndexer |
Ongeziene labels tijdens transformatie | Toevoegen handleInvalid="skip" aan uw StringIndexer oproepen: StringIndexer(inputCol="...", outputCol="...", handleInvalid="skip") |
| Lage AUC (lager dan 0,6) | Onvoldoende gegevens of onjuiste functie-engineering | Verhoog de steekproeffractie (bijvoorbeeld 0.01 in plaats van 0.001) en controleer of trafficTimeBins categorieën in balans zijn. |
OutOfMemoryError |
Gegevensset is te groot voor de beschikbare capaciteit | Verminder de steekproeffractie of verhoog uw Fabric capaciteitslaag. |
| ROC-plot wordt niet weergegeven | Matplotlib-back-endprobleem in notebook | Voeg %matplotlib inline bovenaan het notitieblok toe. |
Gerelateerde inhoud
- AI-voorbeelden gebruiken om machine learning-modellen te bouwen: AI-voorbeelden gebruiken
- Machine learning-uitvoeringen bijhouden met experimenten: Machine learning-experimenten