Partager via


Créer un profil de données à l’aide de l’API

Cette page explique comment créer un profil de données dans Databricks à l’aide du Kit de développement logiciel (SDK) Databricks et décrit les paramètres utilisés dans les appels d’API. Vous pouvez également créer et gérer un profil de données à l’aide de l’API REST.

Pour obtenir des informations de référence, consultez la référence du SDK de profilage des données et la référence de l’API REST.

Vous pouvez créer un profil sur n’importe quelle table Delta managée ou externe inscrite dans le catalogue Unity. Dans un metastore du catalogue Unity, un seul profil peut être créé pour chaque table.

Note

Pour plus d’informations sur l’API déconseilléequality_monitors, consultez Créer un profil de données à l’aide de l’API quality_monitors (déconseillée).

Exigences

Pour utiliser la version la plus récente de l’API, utilisez la commande suivante au début de votre notebook pour installer le client Python :

%pip install "databricks-sdk>=0.68.0"

Pour vous authentifier pour utiliser le Kit de développement logiciel (SDK) Databricks dans votre environnement, consultez Authentification.

Types de profils

Lorsque vous créez un profil, vous sélectionnez l’un des types de profil suivants : TimeSeries, InferenceLogou Snapshot. Cette section décrit brièvement chaque option. Pour plus d’informations, consultez la référence du Kit de développement logiciel (SDK) de profilage des données ou la référence de l’API REST.

Note

  • Lorsque vous créez pour la première fois une série chronologique ou un profil d’inférence, Databricks analyse uniquement les données des 30 jours précédant sa création. Une fois le profil créé, toutes les nouvelles données sont traitées.
  • Les profils définis sur les vues matérialisées ne prennent pas en charge le traitement incrémentiel.

Conseil / Astuce

Pour les profils TimeSeries et Inference, il est recommandé d’activer le flux de données modifiées (CDF) sur votre table. Lorsque la CDF est activée, seules les données nouvellement ajoutées sont traitées, plutôt que de réexécérer l’intégralité de la table toutes les actualisations. Cela rend l’exécution plus efficace et réduit les coûts à mesure que vous effectuez une mise à l’échelle sur de nombreuses tables.

TimeSeries profil

Un TimeSeries profil compare les distributions de données entre les fenêtres de temps. Pour un TimeSeries profil, vous devez fournir les éléments suivants :

  • Colonne d’horodatage (timestamp_column). Le type de données de colonne timestamp doit être soit TIMESTAMP un type qui peut être converti en horodatages à l’aide de la to_timestampfonction PySpark.
  • Jeu de granularities sur lequel calculer les métriques. Les granularités suivantes sont disponibles :
    • GRANULARITÉ_AGGRÉGATION_5_MINUTES
    • GRANULARITÉ_AGRÉGATION_30_MINUTES
    • AGGREGATION_GRANULARITE_1_HEURE
    • AGRÉGATION_GRANULARITÉ_1_JOUR
    • GRANULARITÉ_D'AGRÉGATION_1_SEMAINE
    • AGRÉGATION_GRANULARITÉ_2_SEMAINES
    • AGGREGATION_GRANULARITY_3_SEMAINES
    • AGGREGATION_GRANULARITY_4_SEMAINES
    • GRANULARITÉ_D_AGRÉGATION_1_MOIS
    • GRANULARITÉ_AGGRÉGATION_1_AN
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, TimeSeriesConfig, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh

w = WorkspaceClient()

schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
 time_series=TimeSeriesConfig(
    timestamp_column="ts",
    granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY]),
 slicing_exprs=["type='Red'"]
)

info = w.data_quality.create_monitor(
   monitor=Monitor(
     object_type="table",     # object_type is always "table" for data profiling
     object_id=table.table_id,
     data_profiling_config=config,
   ),
)

InferenceLog profil

Un InferenceLog profil est similaire à un TimeSeries profil, mais inclut également des métriques de qualité de modèle. InferenceLog les profils utilisent les paramètres suivants :

Paramètre Description
problem_type MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION ou MonitorInferenceLogProblemType.PROBLEM_TYPE_REGRESSION
prediction_column Colonne contenant les valeurs prédites du modèle.
timestamp_column Colonne contenant l’horodatage de la demande d’inférence.
model_id_column Colonne contenant l’ID du modèle utilisé pour la prédiction.
granularities Détermine comment partitionner les données dans des fenêtres dans le temps. Consultez le profil pour connaîtreTimeSeries les valeurs disponibles.
label_column (Facultatif) Colonne contenant la vérité de base pour les prédictions de modèle.
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, InferenceLogConfig, InferenceProblemType, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh

w = WorkspaceClient()

schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
 inference_log=InferenceLogConfig(
    problem_type=InferenceProblemType.INFERENCE_PROBLEM_TYPE_CLASSIFICATION,
    prediction_column="preds",
    model_id_column="model_ver",
    label_column="label", # optional
    timestamp_column="ts",
    granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY])
)

info = w.data_quality.create_monitor(
   monitor=Monitor(
     object_type="table",
     object_id=table.table_id,
     data_profiling_config=config,
   ),
)

Pour les profils InferenceLog, les tranches sont créées automatiquement à partir des valeurs distinctes de model_id_col.

Snapshot profil

Contrairement à TimeSeries, un Snapshot profil montre comment le contenu complet de la table change au fil du temps. Les métriques sont calculées sur toutes les données de la table et reflètent l’état de la table à chaque actualisation du profil.

Note

La taille maximale de la table pour un profil de capture instantanée est de 4 To. Pour les tables plus volumineuses, utilisez plutôt des profils de série chronologique.

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, DataProfilingStatus, RefreshState, Refresh

w = WorkspaceClient()

schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
table_id = table.table_id
table_object_type = "table"

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
 snapshot=SnapshotConfig(),
 slicing_exprs=["type='Red'"]
)

Actualiser et afficher les résultats

Pour afficher l’historique d’actualisation, vous devez utiliser l’espace de travail Databricks à partir duquel le profilage des données a été activé.

Pour actualiser les tables de métriques, utilisez create_refresh. Par exemple:

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
run_info = w.data_quality.create_refresh(
  object_type=table_object_type, object_id=table_id, refresh=Refresh(
   object_type=table_object_type,
   object_id=table_id,
 )
)

Lorsque vous appelez create_refresh à partir d’un notebook, les tables de métriques sont créées ou mises à jour. Ce calcul s’exécute sur le calcul serverless, et non sur le cluster auquel le notebook est attaché. Vous pouvez continuer à exécuter des commandes dans le notebook pendant la mise à jour des statistiques.

Pour plus d’informations sur les statistiques stockées dans les tables de métriques, consultez les tables de métriques du catalogue Unity. Vous pouvez les interroger dans les notebooks ou dans l’Explorateur de requêtes SQL et les afficher dans l’Explorateur de catalogues.

Pour afficher l’historique de toutes les actualisations associées à un profil, utilisez list_refreshes.

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)

Pour obtenir l’état d’une exécution spécifique qui a été mise en file d’attente, en cours d’exécution ou terminée, utilisez get_refresh.

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)

run_info = next(it, None)
while run_info.state in (RefreshState.MONITOR_REFRESH_STATE_PENDING, RefreshState.MONITOR_REFRESH_STATE_RUNNING):
  run_info = w.data_quality.get_refresh(object_type=table_object_type, object_id=table_id, refresh_id=run_info.refresh_id)
  time.sleep(30)

Afficher les paramètres de profil

Vous pouvez consulter les paramètres de profil à l’aide de l’API get_monitor.

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.get_monitor(object_type="table", object_id=table.table_id)

Calendrier

Pour configurer un profil à exécuter selon une planification, utilisez le paramètre schedule de create_monitor.

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries, MonitorCronSchedule

w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 snapshot=SnapshotConfig(),
 schedule=CronSchedule(
        quartz_cron_expression="0 0 12 * * ?", # schedules a refresh every day at 12 noon
        timezone_id="PST",
 )
)

info = w.data_quality.create_monitor(
   monitor=Monitor(
     object_type="table",
     object_id=table.table_id,
     data_profiling_config=config,
   ),
)

Pour plus d’informations, consultez les expressions cron .

Notifications

Pour configurer des notifications pour un profil, utilisez le notifications paramètre :create_monitor

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, NotificationSettings, NotificationDestination

w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 snapshot=SnapshotConfig(),
 notification_settings=NotificationSettings(
        # Notify the given email when a monitoring refresh fails or times out.
        on_failure=NotificationDestination(
            email_addresses=["your_email@domain.com"]
        )
 )
)

info = w.data_quality.create_monitor(
   monitor=Monitor(
     object_type="table",
     object_id=table.table_id,
     data_profiling_config=config,
   ),
)

Un maximum de 5 adresses e-mail est pris en charge par type d’événement (par exemple, « on_failure »).

Contrôler l’accès aux tables de métriques

Les tables de métriques et le tableau de bord créés par un profil appartiennent à l’utilisateur qui a créé le profil. Vous pouvez utiliser des privilèges de catalogue Unity pour contrôler l’accès aux tables de métriques. Pour partager des tableaux de bord dans un espace de travail, utilisez le bouton Partager en haut à droite du tableau de bord.

Supprimer un profil

Pour supprimer un profil :

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.delete_monitor(object_type="table", object_id=table.table_id)

Cette commande ne supprime pas les tables de profil et le tableau de bord créé par le profil. Vous devez supprimer ces ressources dans une étape distincte, ou vous pouvez les enregistrer dans un autre emplacement.

Exemples de notebooks

Les exemples de notebooks suivants montrent comment créer un profil, actualiser le profil et examiner les tables de métriques qu’il crée.

Exemple de notebook : profil de série temporelle

Ce notebook montre comment créer un profil de type TimeSeries.

Exemple de notebook de profil TimeSeries

Obtenir un ordinateur portable

Exemple de bloc-notes : profil d’inférence (régression)

Ce notebook illustre comment créer un type de profil InferenceLog pour un problème de régression.

Exemple de cahier de régression pour le profilage d'inférence

Obtenir un ordinateur portable

Exemple de bloc-notes : profil d’inférence (classification)

Ce notebook montre comment créer un profil de type InferenceLog pour un problème de classification.

Exemple de cahier pour la classification des profils d'inférence

Obtenir un ordinateur portable

Exemple de notebook : profil d'instantané

Ce notebook montre comment créer un profil de type Snapshot.

Notebook d'exemple de profil instantané

Obtenir un ordinateur portable