使用 SynapseML 执行分类任务

本文介绍如何使用两种方法执行文本分类任务。 一种方法使用普通 pyspark方法,另一种方法使用 synapseml 库。 这两种方法的性能相同,但也凸显了与 pyspark 相比,SynapseML 如何降低代码复杂性。

该任务根据评论文本预测亚马逊上销售的书的客户评论是否良好(评级 > 3)或差。 使用不同的超参数训练 LogisticRegression 学习器,然后选择最佳模型。

先决条件

  • 创建 笔记本
  • 将笔记本附加到湖屋。 在笔记本中,选择左侧窗格上的 “添加”,以连接现有的 Lakehouse 或创建新的 Lakehouse。

注释

本文中使用的所有库(pysparksynapsemlnumpy)都预安装在 Fabric Spark 运行时中。 无需安装任何包。

加载和浏览数据

在Fabric笔记本中,Spark 会话已作为 spark 变量提供。 从公共Azure Blob 存储位置加载 Amazon 书籍评论数据集:

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

验证数据集是否已正确加载:

print(f"Row count: {rawData.count()}")
print(f"Columns: {rawData.columns}")
assert rawData.count() == 10000, "Expected 10,000 rows"
assert set(rawData.columns) == {"text", "rating"}, "Expected columns: text, rating"
print("Data loaded successfully")

提取特征和处理数据

实际数据通常具有多种类型的特征,例如文本、数字和分类。 若要演示如何使用混合特征类型,请将两个数值特征添加到数据集:审阅的字数和平均字长

定义用户定义的函数(UDF)

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DoubleType
import numpy as np


def calc_word_count(s):
    return len(s.split())


def calc_word_length(s):
    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(calc_word_length, DoubleType())
wordCountUDF = udf(calc_word_count, IntegerType())

使用 SynapseML UDFTransformer 应用 UDF

使用 SynapseML 中的 UDFTransformer 将 UDF 包装为与管道兼容的转换器:

from synapse.ml.stages import UDFTransformer

wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol="wordLength", udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol="wordCount", udf=wordCountUDF
)

运行特征流水线

应用这两个转换器,并根据评分创建一个二元标签列:

from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)

验证特征提取:

data.show(5)
print(f"Columns: {data.columns}")
assert "wordLength" in data.columns, "wordLength column missing"
assert "wordCount" in data.columns, "wordCount column missing"
assert "label" in data.columns, "label column missing"
assert "rating" not in data.columns, "rating column should be dropped"
print("Feature extraction successful")

使用 pyspark 进行分类

若要使用 pyspark 库选择最佳 LogisticRegression 分类器,必须显式执行以下步骤:

  1. 处理这些功能:
    • 标记文本列。
    • 使用哈希将标记化列哈希到向量中。
    • 将数值特征与向量合并。
  2. 将标签列从布尔值转换为整数类型。
  3. train 数据集上使用不同的超参数训练多个 LogisticRegression 模型。
  4. 计算每个已训练模型的 ROC 曲线下面积(AUC),并在 test 数据集上选择该指标最高的模型。
  5. validation 集上评估最佳模型。

进行特征提取并准备数据

from pyspark.ml.feature import Tokenizer, HashingTF, VectorAssembler
from pyspark.sql.types import IntegerType

# Tokenize the text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features into one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only the label and features columns, cast label to integer
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)

验证特征化数据:

print(f"Feature vector size: {processedData.first()['features'].size}")
print(f"Label values: {sorted(processedData.select('label').distinct().rdd.flatMap(lambda x: x).collect())}")
assert processedData.first()["features"].size == 10002, "Expected 10000 text + 2 numeric features"
print("Featurization successful")

训练和评估模型

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Split the data into train, test, and validation sets
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train models with different regularization parameters
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Train each model and evaluate on the test set
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))

bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Evaluate the best model on the validation dataset
scoredVal = bestModel.transform(validation)
validationAUC = evaluator.evaluate(scoredVal)
print(f"Best model's AUC on validation set = {validationAUC:.4f}")

验证结果:

print(f"Number of models trained: {len(models)}")
print(f"Best regularization parameter: {lrHyperParams[metrics.index(bestMetric)]}")
print(f"Test AUC scores: {[f'{m:.4f}' for m in metrics]}")
assert 0.5 < validationAUC <= 1.0, f"AUC {validationAUC} is outside expected range (0.5, 1.0]"
print(f"pyspark classification complete - AUC: {validationAUC:.4f}")

注释

确切的 AUC 值取决于随机拆分。 预期值介于 0.65 和 0.85 之间。

使用 SynapseML 进行分类

此方法 synapseml 通过更少的步骤实现相同的结果。 SynapseML 在内部处理特征化,从而减少需要编写的代码:

  1. TrainClassifier 估计器会在内部对数据进行特征化处理,只要 traintestvalidation 数据集中的列表示特征。
  2. 估算 FindBestModel 器通过评估具有指定指标的 test 数据集的性能,从已训练的模型池中查找最佳模型。
  3. ComputeModelStatistics 转换器同时计算已评分的数据集(在本例中为 validation 数据集)上的多个指标。
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
from pyspark.ml.classification import LogisticRegression

# Split the raw feature data (SynapseML handles featurization internally)
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train models with different regularization parameters
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model based on AUC
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)

# Compute metrics on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)

验证 SynapseML 结果:

auc_value = metrics.first()["AUC"]
print(f"Available metrics: {metrics.columns}")
assert 0.5 < auc_value <= 1.0, f"AUC {auc_value} is outside expected range (0.5, 1.0]"
print(f"SynapseML classification complete - AUC: {auc_value:.4f}")

注释

pyspark 和 SynapseML 方法应生成类似的 AUC 值,因为它们使用相同数据上的相同超参数训练相同的模型类型。

比较两种方法

方面 pyspark SynapseML
特征处理 手动方式(Tokenizer 到 HashingTF 再到 VectorAssembler) 自动(由 TrainClassifier 处理)
模型选择 使用评估器的手动循环 内置 FindBestModel
指标计算 每次评估调用仅对应一个指标 具有多个指标 ComputeModelStatistics
代码行 约 30 行 约 15 行
Result 同一 AUC 同一 AUC

故障排除

问题 原因 解决方案
AnalysisException: Path does not exist 公共 Blob 存储 URL 暂时不可用 等待几分钟,然后重试。 通过运行 spark.read.parquet("wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet").count() 来验证连通性
IllegalArgumentException: Field "features" does not exist 转换器之间的特征列名称不匹配 通过在 VectorAssembler 步骤之前运行 data.columns 来验证列名称
NameError: name 'LogisticRegression' is not defined 缺少导入语句 在单元格顶部添加from pyspark.ml.classification import LogisticRegression
ModuleNotFoundError: No module named 'synapse.ml' 笔记本未使用 Fabric Spark 运行时 验证笔记本是否使用 Fabric Runtime 1.2 或更高版本。 在功能区中选择“ 环境 ”进行检查。
低 AUC (低于 0.6) 数据拆分问题或收敛问题 使用 data.groupBy("label").count().show() 验证标签分布。 需要大致均衡的数据集。
Py4JJavaError: An error occurred while calling Java/Spark 内部错误 检查 Spark UI 以获取详细的错误日志。 通过选择 Session>停止会话 来重新启动 Spark 会话,然后重新运行所有单元格。

清理资源

如果为了本文而创建了一个新的 Lakehouse,并且不再需要它:

  1. 在工作区中,右键单击 lakehouse 名称。
  2. 选择删除
  3. 确认删除。

除非单独删除笔记本,否则笔记本将保留在工作区中。