Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cette page explique comment créer un profil de données dans Databricks à l’aide du Kit de développement logiciel (SDK) Databricks et décrit les paramètres utilisés dans les appels d’API. Vous pouvez également créer et gérer un profil de données à l’aide de l’API REST.
Pour obtenir des informations de référence, consultez la référence du SDK de profilage des données et la référence de l’API REST.
Vous pouvez créer un profil sur n’importe quelle table Delta managée ou externe inscrite dans le catalogue Unity. Dans un metastore du catalogue Unity, un seul profil peut être créé pour chaque table.
Note
Pour plus d’informations sur l’API déconseilléequality_monitors, consultez Créer un profil de données à l’aide de l’API quality_monitors (déconseillée).
Exigences
Pour utiliser la version la plus récente de l’API, utilisez la commande suivante au début de votre notebook pour installer le client Python :
%pip install "databricks-sdk>=0.68.0"
Pour vous authentifier pour utiliser le Kit de développement logiciel (SDK) Databricks dans votre environnement, consultez Authentification.
Types de profils
Lorsque vous créez un profil, vous sélectionnez l’un des types de profil suivants : TimeSeries, InferenceLogou Snapshot. Cette section décrit brièvement chaque option. Pour plus d’informations, consultez la référence du Kit de développement logiciel (SDK) de profilage des données ou la référence de l’API REST.
Note
- Lorsque vous créez pour la première fois une série chronologique ou un profil d’inférence, Databricks analyse uniquement les données des 30 jours précédant sa création. Une fois le profil créé, toutes les nouvelles données sont traitées.
- Les profils définis sur les vues matérialisées ne prennent pas en charge le traitement incrémentiel.
Conseil / Astuce
Pour les profils TimeSeries et Inference, il est recommandé d’activer le flux de données modifiées (CDF) sur votre table. Lorsque la CDF est activée, seules les données nouvellement ajoutées sont traitées, plutôt que de réexécérer l’intégralité de la table toutes les actualisations. Cela rend l’exécution plus efficace et réduit les coûts à mesure que vous effectuez une mise à l’échelle sur de nombreuses tables.
TimeSeries profil
Un TimeSeries profil compare les distributions de données entre les fenêtres de temps. Pour un TimeSeries profil, vous devez fournir les éléments suivants :
- Colonne d’horodatage (
timestamp_column). Le type de données de colonne timestamp doit être soitTIMESTAMPun type qui peut être converti en horodatages à l’aide de lato_timestampfonction PySpark. - Jeu de
granularitiessur lequel calculer les métriques. Les granularités suivantes sont disponibles :- GRANULARITÉ_AGGRÉGATION_5_MINUTES
- GRANULARITÉ_AGRÉGATION_30_MINUTES
- AGGREGATION_GRANULARITE_1_HEURE
- AGRÉGATION_GRANULARITÉ_1_JOUR
- GRANULARITÉ_D'AGRÉGATION_1_SEMAINE
- AGRÉGATION_GRANULARITÉ_2_SEMAINES
- AGGREGATION_GRANULARITY_3_SEMAINES
- AGGREGATION_GRANULARITY_4_SEMAINES
- GRANULARITÉ_D_AGRÉGATION_1_MOIS
- GRANULARITÉ_AGGRÉGATION_1_AN
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, TimeSeriesConfig, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
time_series=TimeSeriesConfig(
timestamp_column="ts",
granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY]),
slicing_exprs=["type='Red'"]
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table", # object_type is always "table" for data profiling
object_id=table.table_id,
data_profiling_config=config,
),
)
InferenceLog profil
Un InferenceLog profil est similaire à un TimeSeries profil, mais inclut également des métriques de qualité de modèle.
InferenceLog les profils utilisent les paramètres suivants :
| Paramètre | Description |
|---|---|
problem_type |
MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION ou MonitorInferenceLogProblemType.PROBLEM_TYPE_REGRESSION |
prediction_column |
Colonne contenant les valeurs prédites du modèle. |
timestamp_column |
Colonne contenant l’horodatage de la demande d’inférence. |
model_id_column |
Colonne contenant l’ID du modèle utilisé pour la prédiction. |
granularities |
Détermine comment partitionner les données dans des fenêtres dans le temps. Consultez le profil pour connaîtreTimeSeries les valeurs disponibles. |
label_column |
(Facultatif) Colonne contenant la vérité de base pour les prédictions de modèle. |
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, InferenceLogConfig, InferenceProblemType, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
inference_log=InferenceLogConfig(
problem_type=InferenceProblemType.INFERENCE_PROBLEM_TYPE_CLASSIFICATION,
prediction_column="preds",
model_id_column="model_ver",
label_column="label", # optional
timestamp_column="ts",
granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY])
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
Pour les profils InferenceLog, les tranches sont créées automatiquement à partir des valeurs distinctes de model_id_col.
Snapshot profil
Contrairement à TimeSeries, un Snapshot profil montre comment le contenu complet de la table change au fil du temps. Les métriques sont calculées sur toutes les données de la table et reflètent l’état de la table à chaque actualisation du profil.
Note
La taille maximale de la table pour un profil de capture instantanée est de 4 To. Pour les tables plus volumineuses, utilisez plutôt des profils de série chronologique.
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
table_id = table.table_id
table_object_type = "table"
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
snapshot=SnapshotConfig(),
slicing_exprs=["type='Red'"]
)
Actualiser et afficher les résultats
Pour afficher l’historique d’actualisation, vous devez utiliser l’espace de travail Databricks à partir duquel le profilage des données a été activé.
Pour actualiser les tables de métriques, utilisez create_refresh. Par exemple:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
run_info = w.data_quality.create_refresh(
object_type=table_object_type, object_id=table_id, refresh=Refresh(
object_type=table_object_type,
object_id=table_id,
)
)
Lorsque vous appelez create_refresh à partir d’un notebook, les tables de métriques sont créées ou mises à jour. Ce calcul s’exécute sur le calcul serverless, et non sur le cluster auquel le notebook est attaché. Vous pouvez continuer à exécuter des commandes dans le notebook pendant la mise à jour des statistiques.
Pour plus d’informations sur les statistiques stockées dans les tables de métriques, consultez les tables de métriques du catalogue Unity. Vous pouvez les interroger dans les notebooks ou dans l’Explorateur de requêtes SQL et les afficher dans l’Explorateur de catalogues.
Pour afficher l’historique de toutes les actualisations associées à un profil, utilisez list_refreshes.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)
Pour obtenir l’état d’une exécution spécifique qui a été mise en file d’attente, en cours d’exécution ou terminée, utilisez get_refresh.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)
run_info = next(it, None)
while run_info.state in (RefreshState.MONITOR_REFRESH_STATE_PENDING, RefreshState.MONITOR_REFRESH_STATE_RUNNING):
run_info = w.data_quality.get_refresh(object_type=table_object_type, object_id=table_id, refresh_id=run_info.refresh_id)
time.sleep(30)
Afficher les paramètres de profil
Vous pouvez consulter les paramètres de profil à l’aide de l’API get_monitor.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.get_monitor(object_type="table", object_id=table.table_id)
Calendrier
Pour configurer un profil à exécuter selon une planification, utilisez le paramètre schedule de create_monitor.
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries, MonitorCronSchedule
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
snapshot=SnapshotConfig(),
schedule=CronSchedule(
quartz_cron_expression="0 0 12 * * ?", # schedules a refresh every day at 12 noon
timezone_id="PST",
)
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
Pour plus d’informations, consultez les expressions cron .
Notifications
Pour configurer des notifications pour un profil, utilisez le notifications paramètre :create_monitor
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, NotificationSettings, NotificationDestination
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
snapshot=SnapshotConfig(),
notification_settings=NotificationSettings(
# Notify the given email when a monitoring refresh fails or times out.
on_failure=NotificationDestination(
email_addresses=["your_email@domain.com"]
)
)
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
Un maximum de 5 adresses e-mail est pris en charge par type d’événement (par exemple, « on_failure »).
Contrôler l’accès aux tables de métriques
Les tables de métriques et le tableau de bord créés par un profil appartiennent à l’utilisateur qui a créé le profil. Vous pouvez utiliser des privilèges de catalogue Unity pour contrôler l’accès aux tables de métriques. Pour partager des tableaux de bord dans un espace de travail, utilisez le bouton Partager en haut à droite du tableau de bord.
Supprimer un profil
Pour supprimer un profil :
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.delete_monitor(object_type="table", object_id=table.table_id)
Cette commande ne supprime pas les tables de profil et le tableau de bord créé par le profil. Vous devez supprimer ces ressources dans une étape distincte, ou vous pouvez les enregistrer dans un autre emplacement.
Exemples de notebooks
Les exemples de notebooks suivants montrent comment créer un profil, actualiser le profil et examiner les tables de métriques qu’il crée.
Exemple de notebook : profil de série temporelle
Ce notebook montre comment créer un profil de type TimeSeries.
Exemple de notebook de profil TimeSeries
Obtenir un ordinateur portable
Exemple de bloc-notes : profil d’inférence (régression)
Ce notebook illustre comment créer un type de profil InferenceLog pour un problème de régression.
Exemple de cahier de régression pour le profilage d'inférence
Obtenir un ordinateur portable
Exemple de bloc-notes : profil d’inférence (classification)
Ce notebook montre comment créer un profil de type InferenceLog pour un problème de classification.
Exemple de cahier pour la classification des profils d'inférence
Obtenir un ordinateur portable
Exemple de notebook : profil d'instantané
Ce notebook montre comment créer un profil de type Snapshot.