Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Esta página descreve como criar um perfil de dados no Databricks usando o SDK do Databricks e descreve os parâmetros usados em chamadas à API. Você também pode criar e gerenciar um perfil de dados usando a API REST.
Para obter informações de referência, consulte a referência do SDK de criação de perfil de dados e a referência da API REST.
Você pode criar um perfil em qualquer tabela Delta gerenciada ou externa registrada no Catálogo do Unity. Somente um único perfil pode ser criado no metastore do Unity Catalog de cada tabela.
Observação
Para obter informações sobre a API preterida quality_monitors , consulte Criar um perfil de dados usando a quality_monitors API (preterida).
Requisitos
Para usar a versão mais recente da API, use o seguinte comando no início do notebook para instalar o cliente Python:
%pip install "databricks-sdk>=0.68.0"
Para autenticar para usar o SDK do Databricks em seu ambiente, consulte Autenticação.
Tipos de perfil
Ao criar um perfil, você seleciona um dos seguintes tipos de perfil: TimeSeries, ou InferenceLogSnapshot. Esta seção descreve brevemente cada opção. Para obter detalhes, consulte a referência do SDK de criação de perfil de dados ou a referência da API REST.
Observação
- Quando você cria pela primeira vez um perfil de série temporal ou inferência, o Databricks analisa apenas os dados dos 30 dias anteriores à sua criação. Depois que o perfil é criado, todos os novos dados são processados.
- Perfis definidos em exibições materializadas não dão suporte ao processamento incremental.
Dica
Para os perfis TimeSeries e Inference, é uma prática recomendada habilitar o feed de dados de alteração (CDF) em sua tabela. Quando o CDF está habilitado, somente os dados acrescentados recentemente são processados, em vez de reprocessar toda a tabela a cada atualização. Isso torna a execução mais eficiente e reduz os custos conforme você escala em várias tabelas de dados.
TimeSeries perfil
Um TimeSeries perfil compara distribuições de dados entre janelas de tempo. Para um TimeSeries perfil, você deve fornecer o seguinte:
- Uma coluna de timestamp (
timestamp_column). O tipo de dado da coluna de carimbo de data/hora deve serTIMESTAMP, ou um tipo que possa ser convertido em carimbos de data/hora usando ato_timestampfunção PySpark. - O conjunto de
granularitiessobre o qual calcular as métricas. As granularidades a seguir estão disponíveis:- GRANULARIDADE_DE_AGREGAÇÃO_5_MINUTOS
- GRANULARIDADE_DE_AGREGAÇÃO_30_MINUTOS
- GRANULARIDADE_AGREGAÇÃO_1_HORA
- GRANULARIDADE_AGREGAÇÃO_1_DIA
- GRANULARIDADE_AGREGAÇÃO_1_SEMANA
- AGGREGATION_GRANULARITY_2_SEMANAS
- GRANULARIDADE_AGREGAÇÃO_3_SEMANAS
- AGGREGATION_GRANULARITY_4_WEEKS
- GRANULARIDADE_DE_AGREGAÇÃO_1_MÊS
- GRANULARIDADE_DE_AGREGAÇÃO_1_ANO
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, TimeSeriesConfig, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
time_series=TimeSeriesConfig(
timestamp_column="ts",
granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY]),
slicing_exprs=["type='Red'"]
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table", # object_type is always "table" for data profiling
object_id=table.table_id,
data_profiling_config=config,
),
)
InferenceLog perfil
Um InferenceLog perfil é semelhante a um TimeSeries perfil, mas também inclui métricas de qualidade do modelo.
InferenceLog os perfis usam os seguintes parâmetros:
| Parâmetro | Descrição |
|---|---|
problem_type |
MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION ou MonitorInferenceLogProblemType.PROBLEM_TYPE_REGRESSION |
prediction_column |
Coluna que contém os valores previstos do modelo. |
timestamp_column |
Coluna que contém o registro de data e hora da solicitação de inferência. |
model_id_column |
Coluna que contém a ID do modelo usado para previsão. |
granularities |
Determina como particionar os dados em janelas ao longo do tempo. Consulte oTimeSeries perfil para obter valores disponíveis. |
label_column |
(Opcional) Coluna que contém a verdade básica para previsões de modelo. |
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, InferenceLogConfig, InferenceProblemType, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
inference_log=InferenceLogConfig(
problem_type=InferenceProblemType.INFERENCE_PROBLEM_TYPE_CLASSIFICATION,
prediction_column="preds",
model_id_column="model_ver",
label_column="label", # optional
timestamp_column="ts",
granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY])
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
Para perfis InferenceLog, os segmentos são criados automaticamente com base nos valores distintos de model_id_col.
Snapshot perfil
Ao contrário de TimeSeries, um perfil Snapshot mostra como o conteúdo completo da tabela muda ao longo do tempo. As métricas são calculadas em todos os dados da tabela e refletem o estado da tabela sempre que o perfil é atualizado.
Observação
O tamanho máximo da tabela para um perfil de instantâneo é 4 TB. Em vez disso, para tabelas maiores, use perfis de série temporal.
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
table_id = table.table_id
table_object_type = "table"
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
snapshot=SnapshotConfig(),
slicing_exprs=["type='Red'"]
)
Atualizar e exibir resultados
Para ver o histórico de atualização, você deve usar o espaço de trabalho do Databricks em que a criação de perfil de dados foi habilitada.
Para atualizar tabelas de métricas, use create_refresh. Por exemplo:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
run_info = w.data_quality.create_refresh(
object_type=table_object_type, object_id=table_id, refresh=Refresh(
object_type=table_object_type,
object_id=table_id,
)
)
Quando você chama create_refresh de um notebook, as tabelas de métricas são criadas ou atualizadas. Esse cálculo é executado na computação sem servidor, não no cluster ao qual o notebook está anexado. Você pode continuar a executar comandos no notebook enquanto as estatísticas são atualizadas.
Para obter informações sobre as estatísticas armazenadas em tabelas de métricas, consulte Monitor de Tabelas de Métricas. As tabelas de métricas são tabelas do Catálogo do Unity. Você pode consultá-los em notebooks ou no gerenciador de consultas SQL e exibi-los no Gerenciador de Catálogos.
Para exibir o histórico de todas as atualizações associadas a um perfil, use list_refreshes.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)
Para obter o status de uma execução específica que foi enfileirada, em execução ou concluída, use get_refresh.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)
run_info = next(it, None)
while run_info.state in (RefreshState.MONITOR_REFRESH_STATE_PENDING, RefreshState.MONITOR_REFRESH_STATE_RUNNING):
run_info = w.data_quality.get_refresh(object_type=table_object_type, object_id=table_id, refresh_id=run_info.refresh_id)
time.sleep(30)
Exibir configurações de perfil
Você pode examinar as configurações de perfil usando a API get_monitor.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.get_monitor(object_type="table", object_id=table.table_id)
Cronograma
Para configurar um perfil para ser executado de forma agendada, use o schedule parâmetro de create_monitor:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries, MonitorCronSchedule
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
snapshot=SnapshotConfig(),
schedule=CronSchedule(
quartz_cron_expression="0 0 12 * * ?", # schedules a refresh every day at 12 noon
timezone_id="PST",
)
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
Consulte expressões cron para obter mais informações.
Notificações
Para configurar notificações para um perfil, use o notifications parâmetro de create_monitor:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, NotificationSettings, NotificationDestination
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
snapshot=SnapshotConfig(),
notification_settings=NotificationSettings(
# Notify the given email when a monitoring refresh fails or times out.
on_failure=NotificationDestination(
email_addresses=["your_email@domain.com"]
)
)
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
Há suporte para no máximo 5 endereços de email por tipo de evento (por exemplo, "on_failure").
Controlar o acesso a tabelas de métricas
As tabelas de métrica e o painel criados por um perfil pertencem ao usuário que criou o perfil. Você pode usar privilégios do Catálogo do Unity para controlar o acesso a tabelas de métricas. Para compartilhar dashboards em um workspace, use o botão Compartilhar no canto superior direito do painel.
Excluir um perfil
Para excluir um perfil:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.delete_monitor(object_type="table", object_id=table.table_id)
Esse comando não exclui as tabelas de perfil e o painel criado pelo perfil. Você deve excluir esses ativos em uma etapa separada ou salvá-los em um local diferente.
Exemplos de notebooks
Os notebooks de exemplo a seguir ilustram como criar um perfil, atualizar o perfil e examinar as tabelas de métrica que ele cria.
Exemplo de bloco de anotações: perfil de série temporal
Este bloco de anotações ilustra como criar um TimeSeries perfil de tipo.
Notebook de exemplo do perfil TimeSeries
Exemplo de bloco de anotações: perfil de inferência (regressão)
Este notebook ilustra como criar um InferenceLog perfil de tipo para um problema de regressão.
Notebook de exemplo de regressão de perfil de inferência
Exemplo de bloco de anotações: perfil de inferência (classificação)
Este notebook ilustra como criar um InferenceLog perfil de tipo para um problema de classificação.
Notebook de exemplo de classificação de perfil de inferência
Exemplo de notebook: perfil de instantâneo
Este bloco de anotações ilustra como criar um Snapshot perfil de tipo.