Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questa pagina descrive come creare un profilo dati in Databricks usando Databricks SDK e descrive i parametri usati nelle chiamate API. È anche possibile creare e gestire un profilo dati usando l'API REST.
Per informazioni di riferimento, vedere le informazioni di riferimento sull'SDK di profilatura dei dati e le informazioni di riferimento sull'API REST.
È possibile creare un profilo in qualsiasi tabella Delta gestita o esterna registrata nel catalogo Unity. È possibile creare un solo profilo in un metastore del catalogo Unity per qualsiasi tabella.
Annotazioni
Per informazioni sull'API deprecataquality_monitors, vedere Creare un profilo dati usando l'API quality_monitors (deprecata).
Requisiti
Per usare la versione più recente dell'API, usare il comando seguente all'inizio del notebook per installare il client Python:
%pip install "databricks-sdk>=0.68.0"
Per eseguire l'autenticazione per usare Databricks SDK nell'ambiente in uso, vedere Autenticazione.
Tipi di profilo
Quando si crea un profilo, si seleziona uno dei tipi di profilo seguenti: TimeSeries, InferenceLogo Snapshot. Questa sezione descrive brevemente ogni opzione. Per informazioni dettagliate, vedere informazioni di riferimento sull'SDK di profilatura dei dati o informazioni di riferimento sull'API REST.
Annotazioni
- Quando si crea una serie temporale o un profilo di inferenza per la prima volta, Databricks analizza solo i dati dei 30 giorni precedenti alla creazione. Dopo aver creato il profilo, vengono elaborati tutti i nuovi dati.
- I profili definiti nelle viste materializzate non supportano l'elaborazione incrementale.
Suggerimento
È consigliabile, per i profili TimeSeries e Inference, abilitare il Change Data Feed (CDF) sulla tua tabella. Quando CDF è abilitato, vengono elaborati solo i dati appena accodati anziché rielaborare l'intera tabella ogni aggiornamento. In questo modo, l'esecuzione risulta più efficiente e riduce i costi quando si scala su più tabelle.
TimeSeries profilo
Un TimeSeries profilo confronta le distribuzioni dei dati tra le finestre temporali. Per un TimeSeries profilo, è necessario specificare quanto segue:
- Una colonna timestamp (
timestamp_column). Il tipo di dati della colonna timestamp deve essereTIMESTAMPo un tipo che può essere convertito in timestamp usando lato_timestampfunzione PySpark. - Set di
granularitiessu cui calcolare le metriche. Sono disponibili le granularità seguenti:- GRANULARITÀ_AGGREGAZIONE_5_MINUTI
- AGGREGATION_GRANULARITY_30_MINUTI
- AGGREGAZIONE_GRANULARITÀ_1_ORA
- AGGREGAZIONE_GRANULARITÀ_1_GIORNO
- AGGREGATION_GRANULARITY_1_SETTIMANA
- GRANULARITÀ_AGGREGAZIONE_2_SETTIMANE
- GRANULARITÀ_AGGREGAZIONE_3_SETTIMANE
- GRANULARITÀ_AGGREGAZIONE_4_SETTIMANE
- GRANULARITÀ_AGGREGAZIONE_1_MESE
- GRANULARITÀ_AGGREGAZIONE_1_ANNO
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, TimeSeriesConfig, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
time_series=TimeSeriesConfig(
timestamp_column="ts",
granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY]),
slicing_exprs=["type='Red'"]
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table", # object_type is always "table" for data profiling
object_id=table.table_id,
data_profiling_config=config,
),
)
InferenceLog profilo
Un InferenceLog profilo è simile a un TimeSeries profilo, ma include anche metriche di qualità del modello.
InferenceLog i profili usano i parametri seguenti:
| Parametro | Descrizione |
|---|---|
problem_type |
MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION oppure MonitorInferenceLogProblemType.PROBLEM_TYPE_REGRESSION |
prediction_column |
Colonna contenente i valori stimati del modello. |
timestamp_column |
Colonna contenente il timestamp della richiesta di inferenza. |
model_id_column |
Colonna contenente l'ID del modello usato per la stima. |
granularities |
Determina come partizionare i dati nelle finestre nel tempo. Vedere profiloTimeSeries per i valori disponibili. |
label_column |
(Facoltativo) Colonna contenente la verità di riferimento per le predizioni del modello. |
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, InferenceLogConfig, InferenceProblemType, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
inference_log=InferenceLogConfig(
problem_type=InferenceProblemType.INFERENCE_PROBLEM_TYPE_CLASSIFICATION,
prediction_column="preds",
model_id_column="model_ver",
label_column="label", # optional
timestamp_column="ts",
granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY])
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
Per i profili InferenceLog, le sezioni vengono create automaticamente in base ai valori distinti di model_id_col.
Snapshot profilo
A differenza di TimeSeries, un Snapshot descrive il modo in cui i contenuti completi della tabella cambiano nel tempo. Le metriche vengono calcolate su tutti i dati nella tabella e riflettono lo stato della tabella ogni volta che il profilo viene aggiornato.
Annotazioni
Le dimensioni massime della tabella per un profilo snapshot sono di 4 TB. Per le tabelle di dimensioni maggiori, usare invece i profili time series.
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
table_id = table.table_id
table_object_type = "table"
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
snapshot=SnapshotConfig(),
slicing_exprs=["type='Red'"]
)
Aggiornare e visualizzare i risultati
Per visualizzare la cronologia degli aggiornamenti, è necessario usare l'area di lavoro Databricks da cui è stata abilitata la profilatura dei dati.
Per aggiornare le tabelle delle metriche, usare create_refresh. Per esempio:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
run_info = w.data_quality.create_refresh(
object_type=table_object_type, object_id=table_id, refresh=Refresh(
object_type=table_object_type,
object_id=table_id,
)
)
Quando si chiama create_refresh da un notebook, le tabelle delle metriche vengono create o aggiornate. Questo calcolo viene eseguito nel calcolo serverless, non nel cluster a cui è collegato il notebook. È possibile continuare a eseguire i comandi nel notebook mentre le statistiche vengono aggiornate.
Per informazioni sulle statistiche archiviate nelle tabelle delle metriche, vedere Monitorare le tabelle delle metriche Le tabelle delle metriche sono tabelle del catalogo Unity. È possibile eseguire query nei notebook o in Esplora query SQL e visualizzare i risultati in Esplora Cataloghi.
Per visualizzare la cronologia di tutti gli aggiornamenti associati a un profilo, usare list_refreshes.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)
Per ottenere lo stato di un'esecuzione specifica che è stata accodata, in esecuzione o completata, utilizzare get_refresh.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)
run_info = next(it, None)
while run_info.state in (RefreshState.MONITOR_REFRESH_STATE_PENDING, RefreshState.MONITOR_REFRESH_STATE_RUNNING):
run_info = w.data_quality.get_refresh(object_type=table_object_type, object_id=table_id, refresh_id=run_info.refresh_id)
time.sleep(30)
Visualizzare le impostazioni del profilo
È possibile esaminare le impostazioni del profilo usando l'API get_monitor.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.get_monitor(object_type="table", object_id=table.table_id)
Orario
Per configurare un profilo da eseguire in base a una pianificazione, usare il parametro schedule di create_monitor.
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries, MonitorCronSchedule
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
snapshot=SnapshotConfig(),
schedule=CronSchedule(
quartz_cron_expression="0 0 12 * * ?", # schedules a refresh every day at 12 noon
timezone_id="PST",
)
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
Per altre informazioni, vedere Espressioni cron .
Notifiche
Per configurare le notifiche per un profilo, usare il notifications parametro di create_monitor:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, NotificationSettings, NotificationDestination
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
snapshot=SnapshotConfig(),
notification_settings=NotificationSettings(
# Notify the given email when a monitoring refresh fails or times out.
on_failure=NotificationDestination(
email_addresses=["your_email@domain.com"]
)
)
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
Un massimo di 5 indirizzi di posta elettronica è supportato per ogni tipo di evento (ad esempio, "on_failure").
Controllare l'accesso alle tabelle delle metriche
Le tabelle delle metriche e il dashboard creati da un profilo sono di proprietà dell'utente che ha creato il profilo. È possibile usare i privilegi del catalogo Unity per controllare l'accesso alle tabelle delle metriche. Per condividere i dashboard all'interno di un'area di lavoro, usare il pulsante Condividi in alto a destra del dashboard.
Eliminare un profilo
Per eliminare un profilo:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.delete_monitor(object_type="table", object_id=table.table_id)
Questo comando non elimina le tabelle del profilo e il dashboard creato dal profilo. È necessario eliminare tali asset in un passaggio separato oppure salvarli in una posizione diversa.
Notebook di esempio
I notebook di esempio seguenti illustrano come creare un profilo, aggiornare il profilo ed esaminare le tabelle delle metriche create.
Esempio di notebook: Profilo di serie temporali
Questo notebook illustra come creare un tipo di profilo TimeSeries.
Notebook di esempio di profilo TimeSeries
Esempio di notebook: profilo di inferenza (regressione)
Questo notebook illustra come creare un InferenceLog profilo di tipo per un problema di regressione.
Notebook di esempio di regressione del profilo di inferenza
Esempio di notebook: profilo di inferenza (classificazione)
Questo notebook illustra come creare un InferenceLog profilo di tipo per un problema di classificazione.
Esempio di notebook per la classificazione dei profili di inferenza
Esempio di notebook: Profilo snapshot
Questo notebook illustra come creare un tipo di profilo Snapshot.