通过


使用 API 创建数据剖析

本页介绍如何使用 Databricks SDK 在 Databricks 中创建数据配置文件,并描述 API 调用中使用的参数。 还可以使用 REST API 创建和管理数据资料概况。

有关参考信息,请参阅 数据分析 SDK 参考REST API 参考

您可以在注册于 Unity Catalog 的任何托管或外部 Delta 表上创建一个配置文件。 在 Unity Catalog 元存储中,对于任何表只能创建一个概况。

注释

有关已弃用quality_monitors的 API 的信息,请参阅使用 quality_monitors API 创建数据配置文件(已弃用)。

要求

若要使用最新版本的 API,请使用笔记本开头的以下命令安装 Python 客户端:

%pip install "databricks-sdk>=0.68.0"

若要进行身份验证以在环境中使用 Databricks SDK,请参阅 “身份验证”。

配置文件类型

创建配置文件时,请选择以下配置文件类型之一:TimeSeriesInferenceLogSnapshot。 本部分简要介绍了每个选项。 有关详细信息,请参阅 数据分析 SDK 参考REST API 参考

注释

  • 首次创建时间序列或推理配置文件时,Databricks 仅分析创建前30天的数据。 创建配置文件后,将处理所有新数据。
  • 在物化视图上定义的概要配置不支持增量处理。

小窍门

对于 TimeSeriesInference 配置文件,最佳做法是在表上启用更改数据馈送(CDF)。 启用 CDF 后,只会处理新追加的数据,而不是每次刷新时重新处理整个表。 这使得执行更加高效,并在扩展到多个表时降低成本。

TimeSeries 轮廓

TimeSeries配置文件比较跨时间窗口的数据分布。 对于TimeSeries配置文件,必须提供以下内容:

  • 时间戳列 (timestamp_column)。 时间戳列数据类型必须是TIMESTAMP或可以使用 to_timestamp转换为时间戳的类型。
  • 要计算指标的 granularities 集合。 以下粒度可用:
    • AGGREGATION_GRANULARITY_5_MINUTES
    • 聚合粒度_30分钟 (AGGREGATION_GRANULARITY_30_MINUTES)
    • 聚合粒度_1小时
    • AGGREGATION_GRANULARITY_1_DAY
    • 聚合粒度_1_周
    • 汇总粒度_2周
    • 聚合粒度_3周
    • AGGREGATION_GRANULARITY_4_WEEKS
    • 聚合粒度_1_月
    • 聚合粒度_1_年
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, TimeSeriesConfig, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh

w = WorkspaceClient()

schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
 time_series=TimeSeriesConfig(
    timestamp_column="ts",
    granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY]),
 slicing_exprs=["type='Red'"]
)

info = w.data_quality.create_monitor(
   monitor=Monitor(
     object_type="table",     # object_type is always "table" for data profiling
     object_id=table.table_id,
     data_profiling_config=config,
   ),
)

InferenceLog 轮廓

配置文件 InferenceLog 类似于配置文件 TimeSeries,但也包括模型质量指标。 InferenceLog 配置文件使用以下参数:

参数 说明
problem_type MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATIONMonitorInferenceLogProblemType.PROBLEM_TYPE_REGRESSION
prediction_column 包含模型预测值的列。
timestamp_column 包含推理请求时间戳的列。
model_id_column 包含用于预测的模型的 ID 的列。
granularities 确定如何在时间窗口内对数据进行分区。 有关可用值,请参阅 TimeSeries 概要
label_column (可选)包含模型预测的基础真相的列。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, InferenceLogConfig, InferenceProblemType, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh

w = WorkspaceClient()

schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
 inference_log=InferenceLogConfig(
    problem_type=InferenceProblemType.INFERENCE_PROBLEM_TYPE_CLASSIFICATION,
    prediction_column="preds",
    model_id_column="model_ver",
    label_column="label", # optional
    timestamp_column="ts",
    granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY])
)

info = w.data_quality.create_monitor(
   monitor=Monitor(
     object_type="table",
     object_id=table.table_id,
     data_profiling_config=config,
   ),
)

对于 InferenceLog 配置文件,会根据 model_id_col 的不同值,自动创建切片。

Snapshot 轮廓

TimeSeries不同,Snapshot描述了表格的完整内容如何随时间变化。 指标针对表中的所有数据进行计算,并在每次刷新概况时反映表状态。

注释

快照概况的最大表大小为 4TB。 对于较大的表,请改用时间序列分析。

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, DataProfilingStatus, RefreshState, Refresh

w = WorkspaceClient()

schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
table_id = table.table_id
table_object_type = "table"

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
 snapshot=SnapshotConfig(),
 slicing_exprs=["type='Red'"]
)

刷新和查看结果

若要查看刷新历史记录,必须使用启用了数据分析的 Databricks 工作区。

若要刷新指标表,请使用 create_refresh。 例如:

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
run_info = w.data_quality.create_refresh(
  object_type=table_object_type, object_id=table_id, refresh=Refresh(
   object_type=table_object_type,
   object_id=table_id,
 )
)

从笔记本调用 create_refresh 时,会创建或更新指标表。 此计算在无服务器计算上运行,而不是在笔记本附加到的群集上运行。 更新统计信息时,可以继续在笔记本中运行命令。

有关存储在指标表中的统计信息的信息,请参阅 监视指标表。指标表是 Unity 目录表。 可以在笔记本或 SQL 查询资源管理器中查询它们,并在目录资源管理器中查看它们。

若要显示与配置文件关联的所有刷新的历史记录,请使用 list_refreshes

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)

若要获取已排队、正在运行或已完成的特定运行的状态,请使用 get_refresh

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)

run_info = next(it, None)
while run_info.state in (RefreshState.MONITOR_REFRESH_STATE_PENDING, RefreshState.MONITOR_REFRESH_STATE_RUNNING):
  run_info = w.data_quality.get_refresh(object_type=table_object_type, object_id=table_id, refresh_id=run_info.refresh_id)
  time.sleep(30)

查看个人资料设置

可以使用 get_monitor API 查看用户资料设置。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.get_monitor(object_type="table", object_id=table.table_id)

时间表

若要设置配置文件以在指定时间运行,请使用 schedulecreate_monitor 参数。

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries, MonitorCronSchedule

w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 snapshot=SnapshotConfig(),
 schedule=CronSchedule(
        quartz_cron_expression="0 0 12 * * ?", # schedules a refresh every day at 12 noon
        timezone_id="PST",
 )
)

info = w.data_quality.create_monitor(
   monitor=Monitor(
     object_type="table",
     object_id=table.table_id,
     data_profiling_config=config,
   ),
)

有关详细信息,请参阅 cron 表达式

通知

若要为配置文件设置通知,请使用 notifications 参数 create_monitor

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, NotificationSettings, NotificationDestination

w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 snapshot=SnapshotConfig(),
 notification_settings=NotificationSettings(
        # Notify the given email when a monitoring refresh fails or times out.
        on_failure=NotificationDestination(
            email_addresses=["your_email@domain.com"]
        )
 )
)

info = w.data_quality.create_monitor(
   monitor=Monitor(
     object_type="table",
     object_id=table.table_id,
     data_profiling_config=config,
   ),
)

每个事件类型最多支持 5 个电子邮件地址(例如,“on_failure”)。

控制对指标表的访问

配置文件创建的指标表和仪表板由创建配置文件的用户拥有。 可以使用 Unity 目录特权来控制对指标表的访问。 若要在工作区中共享仪表板,请使用仪表板右上角的 “共享 ”按钮。

删除配置文件

删除用户配置:

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.delete_monitor(object_type="table", object_id=table.table_id)

此命令不会删除个人资料表和由个人资料创建的仪表板。 必须在单独的步骤中删除这些资产,也可以将它们保存在其他位置。

示例笔记本

以下示例笔记本演示了如何创建配置文件、刷新配置文件以及检查所创建的指标表。

笔记本示例:时序特征

此笔记本演示如何创建 TimeSeries 类型配置文件。

TimeSeries 概述示例笔记本

获取笔记本

笔记本示例:推理分析(回归)

此笔记本演示如何为回归问题创建 InferenceLog 类型配置文件。

推理配置回归示例笔记本文件

获取笔记本

笔记本示例:推理概况(分类)

此笔记本演示如何为分类问题创建 InferenceLog 类型配置文件。

推理配置文件分类示例笔记本

获取笔记本

笔记本示例:快照配置文件

此笔记本演示如何创建 Snapshot 类型配置文件。

快照概要示例笔记本

获取笔记本