Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Importante
Questa funzionalità è in Anteprima Pubblica.
Lakeflow Designer consente di creare operatori definiti dall'utente visualizzati direttamente nell'area di disegno insieme agli operatori predefiniti. Usarli per estendere Lakeflow Designer con la propria logica di business, calcoli o integrazioni.
Esistono tre tipi di operatori definiti dall'utente:
-
python-run-function: file YAML autonomo con Python inline archiviato nell'area di lavoro. Ideale per trasformazioni a livello di dataframe e integrazioni esterne. Le autorizzazioni vengono gestite a livello di file dell'area di lavoro. -
uc-udf: Incapsula una funzione scalare di Unity Catalog. Ideale per le trasformazioni a livello di colonna. L'accesso è regolato dalle autorizzazioni del catalogo Unity. -
uc-udtf: esegue il wrapping di una funzione con valori di tabella di Unity Catalog. Ideale per le trasformazioni a livello di tabella, ad esempio il clustering e l'aggregazione di ML. L'accesso è regolato dalle autorizzazioni del catalogo Unity.
| Feature | python-run-function |
uc-udf |
uc-udtf |
|---|---|---|---|
| Esempio di caso d'uso | Trasformazioni di dataframe, integrazioni api, notifiche tramite posta elettronica | Calcoli a livello di colonna (BMI, tassi di interesse) | Clustering ML, aggregazione tra le righe |
| Inserimento | DataFrame | Valori singoli | Intera tabella, riga per riga |
| Risultato | DataFrame | Valore singolo | Tabella (più righe) |
| Richiede la funzione Catalog di Unity | No | Yes | Yes |
| Governance degli accessi | Autorizzazioni per i file dell'area di lavoro | Autorizzazioni del catalogo Unity (EXECUTE, USE SCHEMA) |
Autorizzazioni del catalogo Unity (EXECUTE, USE SCHEMA) |
| Lingue disponibili | Solo Python | SQL o Python in un wrapper per SQL | SQL o Python in un wrapper SQL |
Come funzionano gli operatori definiti dall'utente?
Un operatore definito dall'utente è costituito da:
-
Logica dell'operatore: codice eseguito quando viene eseguito l'operatore. Può trattarsi di una funzione inline Python
run()(perpython-run-function) o di una funzione del catalogo Unity (peruc-udfeuc-udtf). -
Configurazione YAML: indica a Lakeflow Designer come presentare l'operatore nell'interfaccia utente, inclusi il nome, la descrizione, i parametri di input, i widget dell'interfaccia utente e le porte dell'operatore. Tutti i tipi di operatore usano lo
user-defined-operator-v0.1.0schema. -
File di registrazione: voce in
.user_defined_operators.yamlche consente a Lakeflow Designer di individuare l'operatore.
Logica dell'operatore
Python eseguire la logica dell'operatore definita dall'utente
Ogni python-run-function operatore deve definire una run() funzione:
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
-
config: valori configurati dall'utente nell'interfaccia utente, indicizzati per nome della proprietà. -
inputs: DataFrame di input, indicizzati per porta di inputname. -
spark: La SparkSession attiva. -
Valori restituiti: un dizionario che associa i valori delle porte di output
nameai DataFrame.
L'esempio seguente filtra le righe da un dataframe di input:
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}
Se l'operatore richiede pacchetti pip esterni, aggiungere il environment campo a YAML:
environment:
environment_version: '1'
dependencies:
- requests==2.31.0
- beautifulsoup4==4.12.0
Logica dell'operatore UDF e UDTF
È possibile scrivere funzioni UC in SQL o Python. Le funzioni Python sono racchiuse in un'istruzione SQL CREATE FUNCTION:
Funzione SQL:
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
SELECT weight_kg / (height_m * height_m);
Funzione Python (incapsulata in SQL):
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
return weight_kg / (height_m ** 2)
$$;
Le funzioni definite dall'utente elaborano un singolo valore alla volta e restituiscono un valore calcolato. Le UDTF elaborano le tabelle riga per riga e possono mantenere uno stato condiviso tra tutte le righe. Utilizzare uc-udf per le trasformazioni a livello di colonna e uc-udtf per operazioni come il clustering ML o l'aggregazione.
Inoltre, le UDTF richiedono la definizione di tre metodi chiave: __init__(), eval() e terminate():
class MyOperator:
def __init__(self):
# Called before processing - initialize any values needed.
def eval(self, row, id_column, columns, k):
# Called once per input row - accumulate data here.
def terminate(self):
# Called after all rows - perform final calculations and yield results.
Annotazioni
Le tabelle restituite UDTF devono avere tipi fissi ed espliciti. Non è possibile fare riferimento ai tipi di colonna di input nella configurazione restituita.
Configurazione YAML
La configurazione YAML indica a Lakeflow Designer come presentare l'operatore nell'interfaccia utente. Definisce il nome, la descrizione, i parametri di input, i widget dell'interfaccia utente e le porte dell'operatore. Ogni campo di configurazione è una proprietà con un tipo, un titolo e indicazioni facoltative del widget x-ui:
config:
type: object
properties:
my_param:
type: string
title: My Parameter
x-ui:
widget: input
my_expression:
type: string
title: Column
format: expression
x-ui:
widget: expression
port: in
my_number:
type: number
title: Count
default: 10
minimum: 0
maximum: 100
required:
- my_param
- my_expression
Per informazioni dettagliate sullo schema YAML, inclusi tutti i tipi di widget e le opzioni di configurazione, vedere Informazioni di riferimento sull'operatore YAML definito dall'utente.
Porti
Le porte definiscono gli input e gli output per l'operatore:
ports:
input:
- name: in
title: Input Data
mime: application/vnd.databricks.dataframe
required: true
allowMultiple: false
output:
- name: out
title: Output Data
YAML per gli operatori della funzione run in Python
Per gli operatori python-run-function, il file YAML è autonomo e include un campo run_function con codice Python in linea:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
type: object
properties:
filter_expression:
type: string
title: Filter Expression
x-ui:
widget: input
required:
- filter_expression
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}
YAML per le funzioni del catalogo Unity
Per gli operatori basati su UC, incorporare la configurazione YAML come commento o docstring nella funzione.
In SQL (usare /* ... */ commento):
RETURN(/*
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
*/
SELECT weight_kg / (height_m * height_m)
);
In Python (usare """ ... """ docstring):
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
"""
return weight_kg / (height_m ** 2)
$$;
Registra e distribuisci il tuo operatore in Lakeflow Designer
Affinché l'operatore venga visualizzato in Lakeflow Designer, registrarlo in un .user_defined_operators.yaml file:
- Livello dell'area di lavoro: Posizionare il file nella radice dell'area di lavoro per rendere l'operatore visibile a tutti gli utenti.
-
Livello utente: Posizionare il file nella home folder dell'utente (
/Workspace/Users/<user-name>/.user_defined_operators.yaml) per rendere visibili solo gli operatori.
La operators: sezione supporta i percorsi di file, i riferimenti alle funzioni di Unity Catalog e i modelli GLOB. È possibile combinare tipi di immissione:
operators:
# File path (python-run-function operators)
- /Workspace/Users/me/udos/my_operator.yaml
# Glob pattern (registers all matching files)
- /Workspace/Users/me/udos/transforms/*.yaml
# UC function reference (uc-udf and uc-udtf operators)
- catalog: my_catalog
schema: my_schema
functionName: my_function
Configurazioni avanzate
Modalità di anteprima
Lakeflow Designer supporta le anteprime in modalità progettazione. Per gli operatori che chiamano API esterne o scrivono in sistemi esterni, aggiungere una is_preview proprietà config in modo da ignorare gli effetti collaterali durante l'anteprima. Quando la modalità di anteprima è abilitata, gli utenti devono fare clic in modo esplicito su Esegui per eseguire l'operatore con effetti collaterali.
config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false
Lakeflow Designer imposta automaticamente questo valore su true durante l'anteprima. Gestiscilo nella tua logica per ignorare gli effetti collaterali:
# In a python-run-function
if config.get("is_preview"):
return {"out": inputs["in"]}
# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END
Connessioni del catalogo Unity
Per gli operatori SQL basati su UC che chiamano API esterne, usare le connessioni HTTP del catalogo Unity per archiviare in modo sicuro le credenziali:
CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
host 'https://api.example.com',
port '443',
base_path '/v1/',
bearer_token 'your-token-here'
);
Usa quindi la connessione nella tua funzione SQL definita dall'utente con la funzione http_request(). Per informazioni dettagliate, vedere Connettersi a servizi HTTP esterni.
WorkspaceClient
Per gli operatori python-run-function, è possibile usare il Azure Databricks WorkspaceClient per accedere alle risorse dell'area di lavoro e alle API esterne:
def run(config, inputs, spark):
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Use w to access workspace resources
Creare un operatore completo python-run-function definito dall'utente
La procedura seguente illustra come creare un python-run-function operatore da zero.
Passaggio 1: Definire la logica
Scrivi la tua run() funzione in un notebook:
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}
Passaggio 2: Testare la funzione
Testare la funzione in modo interattivo con i dati di esempio:
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)
result = run(
config={"column_name": "processed_at"},
inputs={"in": test_df},
spark=spark
)
result["out"].show()
Passaggio 3: Creare la configurazione YAML
Definire i metadati dell'operatore, i campi di configurazione e le porte in un file YAML:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
Passaggio 4: Combinare la logica e YAML
Aggiungere i run_function campi e ports per creare il file YAML completo. Salvarlo nell'area di lavoro, ad esempio /Workspace/Users/<user-name>/udos/add_timestamp.yaml:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}
Passaggio 5: Registrare l'operatore
Aggiungi il percorso del file al file .user_defined_operators.yaml:
operators:
- /Workspace/Users/<user-name>/udos/add_timestamp.yaml
Passaggio 6: Utilizzare l'operatore in Lakeflow Designer
Aprire Lakeflow Designer e verificare che l'operatore venga visualizzato nel riquadro operatore. Trascinarlo nell'area di disegno, connettere un input, configurare il nome della colonna ed eseguire un'anteprima.
Crea un operatore UC completo definito dall'utente
La procedura seguente illustra come creare un operatore basato su uc-udf UC.
Passaggio 1: Definire la logica
Scrivere e testare la logica della funzione in un notebook:
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2
Passaggio 2: Creare la configurazione YAML
Definire i metadati dell'operatore, i campi di configurazione e le porte:
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
Passaggio 3: Combinare la logica e YAML
Creare la funzione Catalogo Unity con YAML incorporato come docstring:
CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: "1.0.0"
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
"""
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2
return double_value(input_value)
$$
Passaggio 4: Testare la funzione
SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10
Passaggio 5: Registrare l'operatore
Aggiungere il riferimento alla funzione Catalogo Unity al .user_defined_operators.yaml file:
operators:
- catalog: main
schema: my_schema
functionName: double_value
Passaggio 6: Utilizzare l'operatore in Lakeflow Designer
Aprire Lakeflow Designer e verificare che l'operatore venga visualizzato nel riquadro operatore. Trascinarlo nell'area di disegno, connettere un input ed eseguire un'anteprima.
Risoluzione dei problemi
| Issue | Soluzione |
|---|---|
| L'operatore non viene visualizzato in Lakeflow Designer. | Verificare che .user_defined_operators.yaml esista ed elencare la funzione o il percorso del file. Per python-run-function operatori, verificare il percorso del file e assicurarsi che il file YAML sia accessibile. |
| La convalida dello schema non riesce. | Verificare il codice YAML rispetto allo schema ufficiale all'indirizzo https://your-workspace.cloud.databricks.com/static/schemas/user-defined-operator-v0.1.0.json. |
| Autorizzazione negata. | Per gli operatori che usano UC, verificare che gli utenti abbiano EXECUTE sulla funzione e USE SCHEMA sullo schema. Per gli operatori python-run-function, verificare che gli utenti abbiano accesso in lettura al file YAML. |
python-run-function l'operatore ha esito negativo in fase di esecuzione. |
Verificare che la firma della run() funzione corrisponda a def run(config, inputs, spark). Verificare che i nomi delle porte nel codice corrispondano a YAML e che le chiavi del dizionario restituite corrispondano ai valori della porta name di output. |
| UDTF restituisce tipi errati. | I tipi restituiti UDTF devono essere espliciti. Non è possibile fare riferimento ai tipi di colonna di input. |
Permissions
| Autorizzazione | Purpose |
|---|---|
Accesso in lettura a .user_defined_operators.yaml. |
Individuare l'operatore . |
Accesso in lettura al file YAML (python-run-function soltanto). |
Caricare la definizione dell'operatore. |
| EXECUTE sulla funzione Catalogo Unity (solo operatori basati su UC). | Esegui l'operatore. |
| USE SCHEMA nello schema (solo operatori basati su UC). | Accedere allo schema in cui viene creata la funzione. |
| Altre autorizzazioni | A seconda dell'operatore, gli utenti potrebbero richiedere altre autorizzazioni. Ad esempio, USE CONNECTION in una connessione del catalogo Unity per le chiamate API HTTP. |
Passaggi successivi
Esplora i seguenti tutorial:
| Example | Tipo | Description |
|---|---|---|
| Mittente di posta elettronica Gmail | python-run-function |
Inviare dati dataframe come allegato di posta elettronica CSV tramite Gmail. |
| Calcolatore di interessi composti | uc-udf |
Calcolare i valori di investimento futuri utilizzando la formula di interesse composta. |
| Clustering K-means | uc-udtf |
Segmentare i dati in gruppi usando scikit-learn. |
| Invia messaggio Slack | uc-udf |
Inviare notifiche ai canali Slack tramite l'API. |
| Tutti i widget dell'interfaccia utente | uc-udf |
Operatore di riferimento che mostra tutti i widget dell'interfaccia utente disponibili. |
Per il riferimento completo dello schema YAML, vedere Riferimento YAML dell'operatore definito dall'utente.