Dela via


Skapa en dataprofil med hjälp av API:et

Den här sidan beskriver hur du skapar en dataprofil i Databricks med hjälp av Databricks SDK och beskriver parametrarna som används i API-anrop. Du kan också skapa och hantera en dataprofil med hjälp av REST-API:et.

Referensinformation finns i SDK-referensen för dataprofilering och REST API-referensen.

Du kan skapa en profil i alla hanterade eller externa Delta-tabeller som registrerats i Unity Catalog. Endast en enskild profil kan skapas i ett Unity Catalog-metaarkiv för valfri tabell.

Anmärkning

Information om det inaktuella quality_monitors API:et finns i Skapa en dataprofil med api:et quality_monitors (inaktuellt).

Requirements

Om du vill använda den senaste versionen av API:et använder du följande kommando i början av notebook-filen för att installera Python-klienten:

%pip install "databricks-sdk>=0.68.0"

Information om hur du autentiserar för att använda Databricks SDK i din miljö finns i Autentisering.

Profiltyper

När du skapar en profil väljer du någon av följande profiltyper: TimeSeries, InferenceLogeller Snapshot. I det här avsnittet beskrivs kort varje alternativ. Mer information finns i SDK-referensen för dataprofilering eller REST API-referensen.

Anmärkning

  • När du först skapar en tidsserie- eller slutsatsdragningsprofil analyserar Databricks endast data från de 30 dagarna innan de skapas. När profilen har skapats bearbetas alla nya data.
  • Profiler som definierats för materialiserade vyer stöder inte inkrementell bearbetning.

Tips/Råd

För TimeSeries och Inference profiler är det bästa praxis att aktivera ändringsdataflöde (CDF) i tabellen. När CDF är aktiverat bearbetas endast nyligen tillagda data i stället för att bearbeta hela tabellen igen varje uppdatering. Detta gör exekveringen effektivare och minskar kostnaderna när man skalar över många tabeller.

TimeSeries profil

En TimeSeries profil jämför datadistributioner mellan tidsfönster. För en TimeSeries profil måste du ange följande:

  • En tidsstämpelkolumn (timestamp_column). Datatypen för tidsstämpelkolumnen måste vara antingen TIMESTAMP eller en typ som kan konverteras till tidsstämplar med funktionen to_timestampPySpark.
  • Den uppsättning granularities som mätvärden ska beräknas över. Följande kornigheter är tillgängliga:
    • AGGREGERING_GRANULARITET_5_MINUTER
    • AGGREGATION_GRANULARITY_30_MINUTES
    • AGGREGERING_GRADAVDETALJERING_1_TIMME
    • AGGREGERING_GRANULARITET_1_DAG
    • AGGREGATION_GRANULARITET_1_VECKA
    • AGGREGERING_GRANULARITET_2_VECKOR
    • AGGREGATION_GRANULARITY_3_VECKOR
    • AGGREGERINGSGRANULARITET_4_VECKOR
    • AGGREGATION_GRANULARITET_1_MÅNAD
    • AGGREGATION_GRANULARITY_1_YEAR
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, TimeSeriesConfig, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh

w = WorkspaceClient()

schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
 time_series=TimeSeriesConfig(
    timestamp_column="ts",
    granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY]),
 slicing_exprs=["type='Red'"]
)

info = w.data_quality.create_monitor(
   monitor=Monitor(
     object_type="table",     # object_type is always "table" for data profiling
     object_id=table.table_id,
     data_profiling_config=config,
   ),
)

InferenceLog profil

En InferenceLog profil liknar en TimeSeries profil men innehåller även modellkvalitetsmått. InferenceLog profiler använder följande parametrar:

Parameter Beskrivning
problem_type MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION eller MonitorInferenceLogProblemType.PROBLEM_TYPE_REGRESSION
prediction_column Kolumn som innehåller modellens förutsagda värden.
timestamp_column Kolumn som innehåller tidsstämpeln för inferensbegäran.
model_id_column Kolumn som innehåller ID:t för modellen som används för förutsägelse.
granularities Avgör hur data ska partitioneras i fönster över tid. Se TimeSeries profil för tillgängliga värden.
label_column (Valfritt) Kolumn som innehåller grundsanningen för modellförutsägelser.
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, InferenceLogConfig, InferenceProblemType, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh

w = WorkspaceClient()

schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
 inference_log=InferenceLogConfig(
    problem_type=InferenceProblemType.INFERENCE_PROBLEM_TYPE_CLASSIFICATION,
    prediction_column="preds",
    model_id_column="model_ver",
    label_column="label", # optional
    timestamp_column="ts",
    granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY])
)

info = w.data_quality.create_monitor(
   monitor=Monitor(
     object_type="table",
     object_id=table.table_id,
     data_profiling_config=config,
   ),
)

För InferenceLog-profiler skapas segment automatiskt baserat på de distinkta värdena för model_id_col.

Snapshot profil

Till skillnad från TimeSeries, profilerar en Snapshot hur det fullständiga innehållet i tabellen ändras över tid. Mått beräknas över alla data i tabellen och återspeglar tabelltillståndet varje gång profilen uppdateras.

Anmärkning

Den maximala tabellstorleken för en ögonblicksbildsprofil är 4 TB. För större tabeller använder du tidsserieprofiler i stället.

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, DataProfilingStatus, RefreshState, Refresh

w = WorkspaceClient()

schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
table_id = table.table_id
table_object_type = "table"

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
 snapshot=SnapshotConfig(),
 slicing_exprs=["type='Red'"]
)

Uppdatera och visa resultat

Om du vill se uppdateringshistoriken måste du använda databricks-arbetsytan från vilken dataprofilering har aktiverats.

Om du vill uppdatera måtttabeller använder du create_refresh. Som exempel:

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
run_info = w.data_quality.create_refresh(
  object_type=table_object_type, object_id=table_id, refresh=Refresh(
   object_type=table_object_type,
   object_id=table_id,
 )
)

När du anropar create_refresh från en notebook skapas eller uppdateras måtttabellerna. Den här beräkningen körs på en serverlös miljö, inte på det kluster som anteckningsblocket är kopplat till. Du kan fortsätta att köra kommandon i notebook-filen medan statistiken uppdateras.

Information om den statistik som lagras i måtttabeller finns i Övervaka måtttabeller Måtttabeller är Unity Catalog-tabeller. Du kan fråga dem i notebook-filer eller i SQL-frågeutforskaren och visa dem i Katalogutforskaren.

Om du vill visa historiken för alla uppdateringar som är associerade med en profil använder du list_refreshes.

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)

Om du vill hämta status för en specifik körning som har placerats i kö, körts eller slutförts använder du get_refresh.

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)

run_info = next(it, None)
while run_info.state in (RefreshState.MONITOR_REFRESH_STATE_PENDING, RefreshState.MONITOR_REFRESH_STATE_RUNNING):
  run_info = w.data_quality.get_refresh(object_type=table_object_type, object_id=table_id, refresh_id=run_info.refresh_id)
  time.sleep(30)

Visa profilinställningar

Du kan granska profilinställningarna med hjälp av API get_monitor:et .

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.get_monitor(object_type="table", object_id=table.table_id)

Schema

Om du vill konfigurera en profil som ska köras enligt ett schema använder du parametern schedulecreate_monitor:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries, MonitorCronSchedule

w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 snapshot=SnapshotConfig(),
 schedule=CronSchedule(
        quartz_cron_expression="0 0 12 * * ?", # schedules a refresh every day at 12 noon
        timezone_id="PST",
 )
)

info = w.data_quality.create_monitor(
   monitor=Monitor(
     object_type="table",
     object_id=table.table_id,
     data_profiling_config=config,
   ),
)

Mer information finns i cron-uttryck .

Aviseringar

Om du vill konfigurera meddelanden för en profil använder du parametern notificationscreate_monitor:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, NotificationSettings, NotificationDestination

w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 snapshot=SnapshotConfig(),
 notification_settings=NotificationSettings(
        # Notify the given email when a monitoring refresh fails or times out.
        on_failure=NotificationDestination(
            email_addresses=["your_email@domain.com"]
        )
 )
)

info = w.data_quality.create_monitor(
   monitor=Monitor(
     object_type="table",
     object_id=table.table_id,
     data_profiling_config=config,
   ),
)

Högst 5 e-postadresser stöds per händelsetyp (till exempel "on_failure").

Kontrollera åtkomsten till måtttabeller

Måtttabellerna och instrumentpanelen som skapas av en profil ägs av användaren som skapade profilen. Du kan använda Behörigheter för Unity Catalog för att styra åtkomsten till måtttabeller. För att dela instrumentpaneler inom en arbetsyta, använd knappen Dela uppe till höger på instrumentpanelen.

Ta bort en profil

Så här tar du bort en profil:

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.delete_monitor(object_type="table", object_id=table.table_id)

Det här kommandot tar inte bort profiltabellerna och instrumentpanelen som skapats av profilen. Du måste ta bort dessa tillgångar i ett separat steg, eller så kan du spara dem på en annan plats.

Exempelanteckningsböcker

Följande notebook-exempel visar hur du skapar en profil, uppdaterar profilen och undersöker de måtttabeller som skapas.

Notebook-exempel: Tidsserieprofil

Denna anteckningsbok visar hur du skapar en TimeSeries typprofil.

TimeSeries-profilexempelanteckningsbok

Hämta anteckningsbok

Notebook-exempel: Slutsatsdragningsprofil (regression)

Den här notebook-filen visar hur du skapar en InferenceLog typprofil för ett regressionsproblem.

Exempel på notebook-fil för inferensprofilregression

Hämta anteckningsbok

Notebook-exempel: Slutsatsdragningsprofil (klassificering)

Den här notebook-filen visar hur du skapar en InferenceLog typprofil för ett klassificeringsproblem.

Exempelanteckningsbok för inferensprofilklassificering

Hämta anteckningsbok

Notebook-exempel: Profil för ögonblicksbild

Denna anteckningsbok visar hur du skapar en Snapshot typprofil.

Exempelanteckningsbok för ögonblicksbildprofil

Hämta anteckningsbok