本页介绍如何使用 Databricks SDK 在 Databricks 中创建数据配置文件,并描述 API 调用中使用的参数。 还可以使用 REST API 创建和管理数据资料概况。
有关参考信息,请参阅 数据分析 SDK 参考 和 REST API 参考。
您可以在注册于 Unity Catalog 的任何托管或外部 Delta 表上创建一个配置文件。 在 Unity Catalog 元存储中,对于任何表只能创建一个概况。
注释
有关已弃用quality_monitors的 API 的信息,请参阅使用 quality_monitors API 创建数据配置文件(已弃用)。
要求
若要使用最新版本的 API,请使用笔记本开头的以下命令安装 Python 客户端:
%pip install "databricks-sdk>=0.68.0"
若要进行身份验证以在环境中使用 Databricks SDK,请参阅 “身份验证”。
配置文件类型
创建配置文件时,请选择以下配置文件类型之一:TimeSeries或InferenceLogSnapshot。 本部分简要介绍了每个选项。 有关详细信息,请参阅 数据分析 SDK 参考 或 REST API 参考。
注释
- 首次创建时间序列或推理配置文件时,Databricks 仅分析创建前30天的数据。 创建配置文件后,将处理所有新数据。
- 在物化视图上定义的概要配置不支持增量处理。
小窍门
对于 TimeSeries 和 Inference 配置文件,最佳做法是在表上启用更改数据馈送(CDF)。 启用 CDF 后,只会处理新追加的数据,而不是每次刷新时重新处理整个表。 这使得执行更加高效,并在扩展到多个表时降低成本。
TimeSeries 轮廓
TimeSeries配置文件比较跨时间窗口的数据分布。 对于TimeSeries配置文件,必须提供以下内容:
- 时间戳列 (
timestamp_column)。 时间戳列数据类型必须是TIMESTAMP或可以使用to_timestamp转换为时间戳的类型。 - 要计算指标的
granularities集合。 以下粒度可用:- AGGREGATION_GRANULARITY_5_MINUTES
- 聚合粒度_30分钟 (AGGREGATION_GRANULARITY_30_MINUTES)
- 聚合粒度_1小时
- AGGREGATION_GRANULARITY_1_DAY
- 聚合粒度_1_周
- 汇总粒度_2周
- 聚合粒度_3周
- AGGREGATION_GRANULARITY_4_WEEKS
- 聚合粒度_1_月
- 聚合粒度_1_年
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, TimeSeriesConfig, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
time_series=TimeSeriesConfig(
timestamp_column="ts",
granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY]),
slicing_exprs=["type='Red'"]
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table", # object_type is always "table" for data profiling
object_id=table.table_id,
data_profiling_config=config,
),
)
InferenceLog 轮廓
配置文件 InferenceLog 类似于配置文件 TimeSeries,但也包括模型质量指标。
InferenceLog 配置文件使用以下参数:
| 参数 | 说明 |
|---|---|
problem_type |
MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION 或 MonitorInferenceLogProblemType.PROBLEM_TYPE_REGRESSION |
prediction_column |
包含模型预测值的列。 |
timestamp_column |
包含推理请求时间戳的列。 |
model_id_column |
包含用于预测的模型的 ID 的列。 |
granularities |
确定如何在时间窗口内对数据进行分区。 有关可用值,请参阅 TimeSeries 概要。 |
label_column |
(可选)包含模型预测的基础真相的列。 |
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, InferenceLogConfig, InferenceProblemType, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
inference_log=InferenceLogConfig(
problem_type=InferenceProblemType.INFERENCE_PROBLEM_TYPE_CLASSIFICATION,
prediction_column="preds",
model_id_column="model_ver",
label_column="label", # optional
timestamp_column="ts",
granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY])
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
对于 InferenceLog 配置文件,会根据 model_id_col 的不同值,自动创建切片。
Snapshot 轮廓
与TimeSeries不同,Snapshot描述了表格的完整内容如何随时间变化。 指标针对表中的所有数据进行计算,并在每次刷新概况时反映表状态。
注释
快照概况的最大表大小为 4TB。 对于较大的表,请改用时间序列分析。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
table_id = table.table_id
table_object_type = "table"
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
snapshot=SnapshotConfig(),
slicing_exprs=["type='Red'"]
)
刷新和查看结果
若要查看刷新历史记录,必须使用启用了数据分析的 Databricks 工作区。
若要刷新指标表,请使用 create_refresh。 例如:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
run_info = w.data_quality.create_refresh(
object_type=table_object_type, object_id=table_id, refresh=Refresh(
object_type=table_object_type,
object_id=table_id,
)
)
从笔记本调用 create_refresh 时,会创建或更新指标表。 此计算在无服务器计算上运行,而不是在笔记本附加到的群集上运行。 更新统计信息时,可以继续在笔记本中运行命令。
有关存储在指标表中的统计信息的信息,请参阅 监视指标表。指标表是 Unity 目录表。 可以在笔记本或 SQL 查询资源管理器中查询它们,并在目录资源管理器中查看它们。
若要显示与配置文件关联的所有刷新的历史记录,请使用 list_refreshes。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)
若要获取已排队、正在运行或已完成的特定运行的状态,请使用 get_refresh。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)
run_info = next(it, None)
while run_info.state in (RefreshState.MONITOR_REFRESH_STATE_PENDING, RefreshState.MONITOR_REFRESH_STATE_RUNNING):
run_info = w.data_quality.get_refresh(object_type=table_object_type, object_id=table_id, refresh_id=run_info.refresh_id)
time.sleep(30)
查看个人资料设置
可以使用 get_monitor API 查看用户资料设置。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.get_monitor(object_type="table", object_id=table.table_id)
时间表
若要设置配置文件以在指定时间运行,请使用 schedule 的 create_monitor 参数。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries, MonitorCronSchedule
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
snapshot=SnapshotConfig(),
schedule=CronSchedule(
quartz_cron_expression="0 0 12 * * ?", # schedules a refresh every day at 12 noon
timezone_id="PST",
)
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
有关详细信息,请参阅 cron 表达式 。
通知
若要为配置文件设置通知,请使用 notifications 参数 create_monitor。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, NotificationSettings, NotificationDestination
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
snapshot=SnapshotConfig(),
notification_settings=NotificationSettings(
# Notify the given email when a monitoring refresh fails or times out.
on_failure=NotificationDestination(
email_addresses=["your_email@domain.com"]
)
)
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
每个事件类型最多支持 5 个电子邮件地址(例如,“on_failure”)。
控制对指标表的访问
配置文件创建的指标表和仪表板由创建配置文件的用户拥有。 可以使用 Unity 目录特权来控制对指标表的访问。 若要在工作区中共享仪表板,请使用仪表板右上角的 “共享 ”按钮。
删除配置文件
删除用户配置:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.delete_monitor(object_type="table", object_id=table.table_id)
此命令不会删除个人资料表和由个人资料创建的仪表板。 必须在单独的步骤中删除这些资产,也可以将它们保存在其他位置。
示例笔记本
以下示例笔记本演示了如何创建配置文件、刷新配置文件以及检查所创建的指标表。
笔记本示例:时序特征
此笔记本演示如何创建 TimeSeries 类型配置文件。
TimeSeries 概述示例笔记本
笔记本示例:推理分析(回归)
此笔记本演示如何为回归问题创建 InferenceLog 类型配置文件。
推理配置回归示例笔记本文件
笔记本示例:推理概况(分类)
此笔记本演示如何为分类问题创建 InferenceLog 类型配置文件。
推理配置文件分类示例笔记本
笔记本示例:快照配置文件
此笔记本演示如何创建 Snapshot 类型配置文件。