Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Esta página descreve como criar um perfil de dados no Databricks usando o Databricks SDK e descreve os parâmetros usados nas chamadas de API. Também pode criar e gerir um perfil de dados usando a API REST.
Para obter informações de referência, consulte a referência do SDK de criação de perfil de dados e a referência da API REST.
Você pode criar um perfil em qualquer tabela Delta gerenciada ou externa registrada no Unity Catalog. Apenas um único perfil pode ser criado em um metastore do Unity Catalog para qualquer tabela.
Observação
Para informações sobre a API obsoletaquality_monitors, veja Criar um perfil de dados usando a quality_monitors API (obsoleta).
Requisitos
Para usar a versão mais recente da API, use o seguinte comando no início do seu bloco de anotações para instalar o cliente Python:
%pip install "databricks-sdk>=0.68.0"
Para autenticar para usar o SDK do Databricks em seu ambiente, consulte Autenticação.
Tipos de perfil
Quando cria um perfil, seleciona um dos seguintes tipos de perfil: TimeSeries, InferenceLog, ou Snapshot. Esta seção descreve brevemente cada opção. Para detalhes, consulte a referência do SDK de perfilagem de dados ou a referência da API REST.
Observação
- Quando você cria pela primeira vez uma série temporal ou um perfil de inferência, o Databricks analisa apenas os dados dos 30 dias anteriores à sua criação. Depois que o perfil é criado, todos os novos dados são processados.
- Os perfis definidos em visualizações materializadas não suportam processamento incremental.
Sugestão
Para os perfis TimeSeries e Inference, é uma prática recomendada habilitar o feed de dados de alteração (CDF) na tabela. Quando o CDF está habilitado, apenas os dados recém-anexados são processados, em vez de reprocessar a tabela inteira a cada atualização. Isso torna a execução mais eficiente e reduz os custos à medida que você escala em muitas tabelas.
TimeSeries perfil
Um TimeSeries perfil compara distribuições de dados entre janelas de tempo. Para um TimeSeries perfil, você deve fornecer o seguinte:
- Uma coluna de carimbo de data/hora (
timestamp_column). O tipo de dados da coluna de carimbo de data/hora deve serTIMESTAMPou de um tipo que possa ser convertido em carimbos de data/hora utilizando a funçãoto_timestampPySpark. - O conjunto sobre o qual
granularitiescalcular métricas. As seguintes granularidades estão disponíveis:- AGGREGATION_GRANULARITY_5_MINUTOS
- AGREGACAO_GRANULARIDADE_30_MINUTOS
- GRANULARIDADE_AGREGAÇÃO_1_HORA
- AGREGAÇÃO_GRANULARIDADE_1_DIA
- GRANULARIDADE_DE_AGREGAÇÃO_1_SEMANA
- AGGREGATION_GRANULARITY_2_WEEKS
- AGREGAÇÃO_GRANULARIDADE_3_SEMANAS
- AGGREGATION_GRANULARITY_4_SEMANAS
- GRANULARIDADE_DE_AGREGAÇÃO_1_MÊS
- GRANULARIDADE_DE_AGREGAÇÃO_1_ANO
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, TimeSeriesConfig, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
time_series=TimeSeriesConfig(
timestamp_column="ts",
granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY]),
slicing_exprs=["type='Red'"]
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table", # object_type is always "table" for data profiling
object_id=table.table_id,
data_profiling_config=config,
),
)
InferenceLog perfil
Um InferenceLog perfil é semelhante a um TimeSeries perfil, mas também inclui métricas de qualidade do modelo.
InferenceLog Os perfis utilizam os seguintes parâmetros:
| Parâmetro | Descrição |
|---|---|
problem_type |
MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION ou MonitorInferenceLogProblemType.PROBLEM_TYPE_REGRESSION |
prediction_column |
Coluna que contém os valores previstos do modelo. |
timestamp_column |
Coluna que contém o timestamp da solicitação de inferência. |
model_id_column |
Coluna que contém a id do modelo usado para previsão. |
granularities |
Determina como particionar os dados no Windows ao longo do tempo. Consulte TimeSeries o perfil para valores disponíveis. |
label_column |
(Opcional) Coluna contendo a verdade fundamental para previsões de modelos. |
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, InferenceLogConfig, InferenceProblemType, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
inference_log=InferenceLogConfig(
problem_type=InferenceProblemType.INFERENCE_PROBLEM_TYPE_CLASSIFICATION,
prediction_column="preds",
model_id_column="model_ver",
label_column="label", # optional
timestamp_column="ts",
granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY])
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
Para InferenceLog perfis, os fragmentos são criados automaticamente com base nos valores distintos de model_id_col.
Snapshot perfil
Em contraste com TimeSeries, um Snapshot perfila como o conteúdo completo da tabela muda ao longo do tempo. As métricas são calculadas sobre todos os dados da tabela e refletem o estado da tabela sempre que o perfil é atualizado.
Observação
O tamanho máximo da tabela para um perfil snapshot é de 4TB. Para tabelas maiores, use antes perfis de séries temporais.
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
table_id = table.table_id
table_object_type = "table"
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
snapshot=SnapshotConfig(),
slicing_exprs=["type='Red'"]
)
Atualizar e visualizar resultados
Para ver o histórico de atualizações, deve usar o espaço de trabalho Databricks a partir do qual o perfil de dados foi ativado.
Para atualizar tabelas de métricas, use create_refresh. Por exemplo:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
run_info = w.data_quality.create_refresh(
object_type=table_object_type, object_id=table_id, refresh=Refresh(
object_type=table_object_type,
object_id=table_id,
)
)
Quando você chama create_refresh de um bloco de anotações, as tabelas métricas são criadas ou atualizadas. Esse cálculo é executado em computação sem servidor, não no cluster ao qual o bloco de anotações está conectado. Pode continuar a executar comandos no bloco de notas enquanto as estatísticas são atualizadas.
Para obter informações sobre as estatísticas armazenadas em tabelas métricas, consulte Monitorar tabelas métricas As tabelas métricas são tabelas do Catálogo Unity. Você pode consultá-los em blocos de anotações ou no explorador de consultas SQL e exibi-los no Gerenciador de Catálogos.
Para exibir o histórico de todas as atualizações associadas a um perfil, use list_refreshes.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)
Para obter o status de uma execução específica que foi enfileirada, executada ou concluída, use get_refresh.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)
run_info = next(it, None)
while run_info.state in (RefreshState.MONITOR_REFRESH_STATE_PENDING, RefreshState.MONITOR_REFRESH_STATE_RUNNING):
run_info = w.data_quality.get_refresh(object_type=table_object_type, object_id=table_id, refresh_id=run_info.refresh_id)
time.sleep(30)
Ver configurações de perfil
Você pode revisar as configurações de perfil usando a API get_monitor.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.get_monitor(object_type="table", object_id=table.table_id)
Schedule
Para configurar um perfil para ser executado de forma agendada, use o parâmetro schedule de create_monitor:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries, MonitorCronSchedule
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
snapshot=SnapshotConfig(),
schedule=CronSchedule(
quartz_cron_expression="0 0 12 * * ?", # schedules a refresh every day at 12 noon
timezone_id="PST",
)
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
Consulte expressões cron para obter mais informações.
Notificações
Para configurar notificações para um perfil, use o notifications parâmetro de create_monitor:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, NotificationSettings, NotificationDestination
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
snapshot=SnapshotConfig(),
notification_settings=NotificationSettings(
# Notify the given email when a monitoring refresh fails or times out.
on_failure=NotificationDestination(
email_addresses=["your_email@domain.com"]
)
)
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
É suportado um máximo de 5 endereços de e-mail por tipo de evento (por exemplo, "on_failure").
Controlar o acesso a tabelas métricas
As tabelas métricas e o painel criados por um perfil são de propriedade do usuário que criou o perfil. Você pode usar os privilégios do Catálogo Unity para controlar o acesso a tabelas métricas. Para compartilhar painéis em um espaço de trabalho, use o botão Compartilhar no canto superior direito do painel.
Excluir um perfil
Para excluir um perfil:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.delete_monitor(object_type="table", object_id=table.table_id)
Este comando não exclui as tabelas de perfil e o painel criado pelo perfil. Você deve excluir esses ativos em uma etapa separada ou pode salvá-los em um local diferente.
Exemplos de cadernos
Os blocos de anotações de exemplo a seguir ilustram como criar um perfil, atualizá-lo e examinar as tabelas métricas que ele cria.
Exemplo de bloco de notas: Perfil de série temporal
Este caderno ilustra como criar um perfil do tipo TimeSeries.
Bloco de anotações de exemplo de perfil TimeSeries
Exemplo de bloco de notas: Perfil de inferência (regressão)
Este bloco de anotações ilustra como criar um perfil de InferenceLog tipo para um problema de regressão.
Bloco de anotações de exemplo de regressão de perfil de inferência
Exemplo de bloco de notas: Perfil de inferência (classificação)
Este bloco de anotações ilustra como criar um perfil de InferenceLog tipo para um problema de classificação.
Caderno de exemplo de classificação de perfis de inferência
Exemplo de Notebook: Perfil de Snapshot
Este caderno ilustra como criar um perfil do tipo Snapshot.