指南:铸造工具 - 多变量异常检测

此方案演示如何在 Apache Spark 上使用 SynapseML 和 Foundry 工具进行多变量异常检测。 多变量异常检测涉及检测许多变量或时序之间的异常情况,同时考虑不同变量之间的所有关联和依赖关系。 此方案使用 SynapseML 和 Foundry 工具训练模型进行多变量异常检测。 然后,使用模型推断数据集中的多变量异常,该数据集包含来自三个 IoT 传感器的合成度量。

重要

从 2023 年 9 月 20 日起,无法创建新的异常检测器资源。 异常检测器服务将于 2026 年 10 月 1 日停用。

有关Azure AI Anomaly Detector的详细信息,请访问 Anomaly Detector 信息资源。

先决条件

  • Azure订阅 - 免费创建一个订阅
  • 将笔记本附加到湖屋。 在左侧,选择“添加”以添加现有湖屋或创建湖屋。

安装

从现有 Anomaly Detector 资源开始,可以探索处理各种表单数据的方法。

创建异常检测器资源

注释

自 2023 年 9 月 20 日以来,无法创建新的异常检测器资源。 仅当存在现有异常检测器资源时,以下步骤才适用。 有关不需要异常检测器服务的多变量异常检测方法,请参阅 使用隔离林的多变量异常情况检测

  • 在 Azure 门户中,选择资源组中的 Create,然后键入 Anomaly Detector。 选择“异常检测器”资源。
  • 为资源命名,理想情况下使用与资源组的其余区域相同的区域。 使用其余选项的默认选项,然后选择“查看 + 创建”,然后选择“创建”。
  • 创建异常检测器资源后,打开它,然后选择 Keys and Endpoints 左侧导航栏中的面板。 将异常检测器资源的密钥复制到 ANOMALY_API_KEY 环境变量中,或将其存储在 anomalyKey 变量中。

创建存储帐户资源

若要保存中间数据,必须创建Azure Blob Storage帐户。 在该存储帐户中,创建用于存储中间数据的容器。 记下容器名称,并将connection string复制到该容器。 稍后需要它来填充 containerName 变量和 BLOB_CONNECTION_STRING 环境变量。

输入服务密钥

首先,为服务密钥设置环境变量。 下一个单元格根据存储在Azure Key Vault中的值设置 ANOMALY_API_KEYBLOB_CONNECTION_STRING 环境变量。 如果在自己的环境中运行本教程,请确保在继续作之前设置以下环境变量:

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

ANOMALY_API_KEYBLOB_CONNECTION_STRING环境变量读取,并设置containerNamelocation变量。

# An Anomaly Detector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
    "wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"

连接到存储帐户,使异常检测器可以在该存储帐户中保存中间结果:

spark.sparkContext._jsc.hadoopConfiguration().set(
    f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)

导入所有必要的模块:

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt

import synapse.ml
from synapse.ml.services import *

将示例数据读入 Spark 数据帧:

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)

df = (
    df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)

# Let's inspect the dataframe:
df.show(5)

现在可以创建一个 estimator 对象,用于训练模型。 指定训练数据的开始和结束时间。 此外,指定要使用的输入列以及包含时间戳的列的名称。 最后,指定要在异常检测滑动窗口中使用的数据点数,并将连接字符串设置为 Azure Blob 存储帐户。

trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]

estimator = (
    FitMultivariateAnomaly()
    .setSubscriptionKey(anomalyKey)
    .setLocation(location)
    .setStartTime(trainingStartTime)
    .setEndTime(trainingEndTime)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
)

estimator拟合到数据中:

model = estimator.fit(df)

训练完成后,使用模型进行推理。 下一个单元格中的代码指定要在其中检测异常的数据的开始和结束时间:

inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"

result = (
    model.setStartTime(inferenceStartTime)
    .setEndTime(inferenceEndTime)
    .setOutputCol("results")
    .setErrorCol("errors")
    .setInputCols(inputColumns)
    .setTimestampCol(timestampColumn)
    .transform(df)
)

result.show(5)

在上一个单元格中, .show(5) 显示前五个数据帧行。 结果全部 null 是由于它们位于推理窗口之外。

若要仅显示推断数据的结果,请选择所需的列。 然后,可以按升序对数据帧中的行进行排序,并筛选结果以仅显示推理窗口范围内的行。 在这里, inferenceEndTime 匹配数据帧中的最后一行,以便可以忽略它。

最后,为了更好地绘制结果,请将 Spark 数据帧转换为 Pandas 数据帧:

rdf = (
    result.select(
        "timestamp",
        *inputColumns,
        "results.contributors",
        "results.isAnomaly",
        "results.severity"
    )
    .orderBy("timestamp", ascending=True)
    .filter(col("timestamp") >= lit(inferenceStartTime))
    .toPandas()
)

rdf

设置 contributors 列的格式,该列存储每个传感器对检测到的异常的贡献分数。 下一个单元格处理此问题,并将每个传感器的贡献分数拆分为其自己的列:

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {"series_0": 0, "series_1": 0, "series_2": 0}

rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
    [rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf

现在,你分别在series_0series_1series_2列中拥有传感器 1、2 和 3 的贡献分数。

若要绘制结果,请运行下一个单元格。 该 minSeverity 参数指定要绘制的异常的最小严重性:

minSeverity = 0.1

####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
    rdf["timestamp"],
    rdf["sensor_1"],
    color="tab:orange",
    linestyle="solid",
    linewidth=2,
    label="sensor_1",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_2"],
    color="tab:green",
    linestyle="solid",
    linewidth=2,
    label="sensor_2",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_3"],
    color="tab:blue",
    linestyle="solid",
    linewidth=2,
    label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()

anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)

plt.legend()
plt.title(
    "A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()

####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
    rdf["timestamp"],
    rdf["severity"],
    color="black",
    linestyle="solid",
    linewidth=2,
    label="Severity score",
)
plt.plot(
    rdf["timestamp"],
    [minSeverity] * len(rdf["severity"]),
    color="red",
    linestyle="dotted",
    linewidth=1,
    label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()

####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
    rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
    rdf["timestamp"],
    rdf["series_1"],
    width=2,
    color="tab:green",
    label="sensor_2",
    bottom=rdf["series_0"],
)
plt.bar(
    rdf["timestamp"],
    rdf["series_2"],
    width=2,
    color="tab:blue",
    label="sensor_3",
    bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()

多变量异常情况检测结果绘图的屏幕截图。

绘图以橙色、绿色和蓝色显示来自传感器(推理窗口内)的原始数据。 第一张图中的红色竖线显示检测到的异常,其严重程度大于或等于 minSeverity

第二张图显示所有检测到的异常的严重程度分数,阈值 minSeverity 以红色虚线显示。

最后,最后一张图显示每个传感器的数据对检测到的异常的贡献。 它有助于诊断和了解每个异常最有可能的原因。