Operatori definiti dall'utente in Lakeflow Designer

Importante

Questa funzionalità è in Anteprima Pubblica.

Lakeflow Designer consente di creare operatori definiti dall'utente visualizzati direttamente nell'area di disegno insieme agli operatori predefiniti. Usarli per estendere Lakeflow Designer con la propria logica di business, calcoli o integrazioni.

Esistono tre tipi di operatori definiti dall'utente:

  • python-run-function: file YAML autonomo con Python inline archiviato nell'area di lavoro. Ideale per trasformazioni a livello di dataframe e integrazioni esterne. Le autorizzazioni vengono gestite a livello di file dell'area di lavoro.
  • uc-udf: Incapsula una funzione scalare di Unity Catalog. Ideale per le trasformazioni a livello di colonna. L'accesso è regolato dalle autorizzazioni del catalogo Unity.
  • uc-udtf: esegue il wrapping di una funzione con valori di tabella di Unity Catalog. Ideale per le trasformazioni a livello di tabella, ad esempio il clustering e l'aggregazione di ML. L'accesso è regolato dalle autorizzazioni del catalogo Unity.
Feature python-run-function uc-udf uc-udtf
Esempio di caso d'uso Trasformazioni di dataframe, integrazioni api, notifiche tramite posta elettronica Calcoli a livello di colonna (BMI, tassi di interesse) Clustering ML, aggregazione tra le righe
Inserimento DataFrame Valori singoli Intera tabella, riga per riga
Risultato DataFrame Valore singolo Tabella (più righe)
Richiede la funzione Catalog di Unity No Yes Yes
Governance degli accessi Autorizzazioni per i file dell'area di lavoro Autorizzazioni del catalogo Unity (EXECUTE, USE SCHEMA) Autorizzazioni del catalogo Unity (EXECUTE, USE SCHEMA)
Lingue disponibili Solo Python SQL o Python in un wrapper per SQL SQL o Python in un wrapper SQL

Come funzionano gli operatori definiti dall'utente?

Un operatore definito dall'utente è costituito da:

  • Logica dell'operatore: codice eseguito quando viene eseguito l'operatore. Può trattarsi di una funzione inline Python run() (per python-run-function) o di una funzione del catalogo Unity (per uc-udf e uc-udtf).
  • Configurazione YAML: indica a Lakeflow Designer come presentare l'operatore nell'interfaccia utente, inclusi il nome, la descrizione, i parametri di input, i widget dell'interfaccia utente e le porte dell'operatore. Tutti i tipi di operatore usano lo user-defined-operator-v0.1.0 schema.
  • File di registrazione: voce in .user_defined_operators.yaml che consente a Lakeflow Designer di individuare l'operatore.

Logica dell'operatore

Python eseguire la logica dell'operatore definita dall'utente

Ogni python-run-function operatore deve definire una run() funzione:

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
  • config: valori configurati dall'utente nell'interfaccia utente, indicizzati per nome della proprietà.
  • inputs: DataFrame di input, indicizzati per porta di input name.
  • spark: La SparkSession attiva.
  • Valori restituiti: un dizionario che associa i valori delle porte di output name ai DataFrame.

L'esempio seguente filtra le righe da un dataframe di input:

def run(config, inputs, spark):
    df = inputs["in"]
    filtered = df.filter(config["filter_expression"])
    return {"out": filtered}

Se l'operatore richiede pacchetti pip esterni, aggiungere il environment campo a YAML:

environment:
  environment_version: '1'
  dependencies:
    - requests==2.31.0
    - beautifulsoup4==4.12.0

Logica dell'operatore UDF e UDTF

È possibile scrivere funzioni UC in SQL o Python. Le funzioni Python sono racchiuse in un'istruzione SQL CREATE FUNCTION:

Funzione SQL:

CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
  SELECT weight_kg / (height_m * height_m);

Funzione Python (incapsulata in SQL):

CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
  return weight_kg / (height_m ** 2)
$$;

Le funzioni definite dall'utente elaborano un singolo valore alla volta e restituiscono un valore calcolato. Le UDTF elaborano le tabelle riga per riga e possono mantenere uno stato condiviso tra tutte le righe. Utilizzare uc-udf per le trasformazioni a livello di colonna e uc-udtf per operazioni come il clustering ML o l'aggregazione.

Inoltre, le UDTF richiedono la definizione di tre metodi chiave: __init__(), eval() e terminate():

class MyOperator:
    def __init__(self):
        # Called before processing - initialize any values needed.

    def eval(self, row, id_column, columns, k):
        # Called once per input row - accumulate data here.

    def terminate(self):
        # Called after all rows - perform final calculations and yield results.

Annotazioni

Le tabelle restituite UDTF devono avere tipi fissi ed espliciti. Non è possibile fare riferimento ai tipi di colonna di input nella configurazione restituita.

Configurazione YAML

La configurazione YAML indica a Lakeflow Designer come presentare l'operatore nell'interfaccia utente. Definisce il nome, la descrizione, i parametri di input, i widget dell'interfaccia utente e le porte dell'operatore. Ogni campo di configurazione è una proprietà con un tipo, un titolo e indicazioni facoltative del widget x-ui:

config:
  type: object
  properties:
    my_param:
      type: string
      title: My Parameter
      x-ui:
        widget: input
    my_expression:
      type: string
      title: Column
      format: expression
      x-ui:
        widget: expression
        port: in
    my_number:
      type: number
      title: Count
      default: 10
      minimum: 0
      maximum: 100
  required:
    - my_param
    - my_expression

Per informazioni dettagliate sullo schema YAML, inclusi tutti i tipi di widget e le opzioni di configurazione, vedere Informazioni di riferimento sull'operatore YAML definito dall'utente.

Porti

Le porte definiscono gli input e gli output per l'operatore:

ports:
  input:
    - name: in
      title: Input Data
      mime: application/vnd.databricks.dataframe
      required: true
      allowMultiple: false
  output:
    - name: out
      title: Output Data

YAML per gli operatori della funzione run in Python

Per gli operatori python-run-function, il file YAML è autonomo e include un campo run_function con codice Python in linea:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
  type: object
  properties:
    filter_expression:
      type: string
      title: Filter Expression
      x-ui:
        widget: input
  required:
    - filter_expression
ports:
  input:
    - name: in
      title: Input
  output:
    - name: out
      title: Output
run_function:
  type: inline
  code: |
    def run(config, inputs, spark):
        df = inputs["in"]
        filtered = df.filter(config["filter_expression"])
        return {"out": filtered}

YAML per le funzioni del catalogo Unity

Per gli operatori basati su UC, incorporare la configurazione YAML come commento o docstring nella funzione.

In SQL (usare /* ... */ commento):

RETURN(/*
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Calculate BMI
  id: calculate_bmi
  version: "1.0.0"
  description: Calculates BMI from weight and height.
  config:
    type: object
    properties:
      weight_kg:
        type: string
        title: Weight (in kg)
        format: expression
        x-ui:
          widget: expression
          port: in
      height_m:
        type: string
        title: Height (in meters)
        format: expression
        x-ui:
          widget: expression
          port: in
    required:
      - weight_kg
      - height_m
  ports:
    input:
      - name: in
        title: Input Data
    output:
      - name: out
        title: Output
    */
  SELECT weight_kg / (height_m * height_m)
);

In Python (usare """ ... """ docstring):

AS $$
  """
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Calculate BMI
  id: calculate_bmi
  version: "1.0.0"
  description: Calculates BMI from weight and height.
  config:
    type: object
    properties:
      weight_kg:
        type: string
        title: Weight (in kg)
        format: expression
        x-ui:
          widget: expression
          port: in
      height_m:
        type: string
        title: Height (in meters)
        format: expression
        x-ui:
          widget: expression
          port: in
    required:
      - weight_kg
      - height_m
  ports:
    input:
      - name: in
        title: Input Data
    output:
      - name: out
        title: Output
  """

  return weight_kg / (height_m ** 2)
$$;

Registra e distribuisci il tuo operatore in Lakeflow Designer

Affinché l'operatore venga visualizzato in Lakeflow Designer, registrarlo in un .user_defined_operators.yaml file:

  • Livello dell'area di lavoro: Posizionare il file nella radice dell'area di lavoro per rendere l'operatore visibile a tutti gli utenti.
  • Livello utente: Posizionare il file nella home folder dell'utente (/Workspace/Users/<user-name>/.user_defined_operators.yaml) per rendere visibili solo gli operatori.

La operators: sezione supporta i percorsi di file, i riferimenti alle funzioni di Unity Catalog e i modelli GLOB. È possibile combinare tipi di immissione:

operators:
  # File path (python-run-function operators)
  - /Workspace/Users/me/udos/my_operator.yaml
  # Glob pattern (registers all matching files)
  - /Workspace/Users/me/udos/transforms/*.yaml
  # UC function reference (uc-udf and uc-udtf operators)
  - catalog: my_catalog
    schema: my_schema
    functionName: my_function

Configurazioni avanzate

Modalità di anteprima

Lakeflow Designer supporta le anteprime in modalità progettazione. Per gli operatori che chiamano API esterne o scrivono in sistemi esterni, aggiungere una is_preview proprietà config in modo da ignorare gli effetti collaterali durante l'anteprima. Quando la modalità di anteprima è abilitata, gli utenti devono fare clic in modo esplicito su Esegui per eseguire l'operatore con effetti collaterali.

config:
  type: object
  properties:
    is_preview:
      type: boolean
      format: is_preview
      default: false

Lakeflow Designer imposta automaticamente questo valore su true durante l'anteprima. Gestiscilo nella tua logica per ignorare gli effetti collaterali:

# In a python-run-function
if config.get("is_preview"):
    return {"out": inputs["in"]}

# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END

Connessioni del catalogo Unity

Per gli operatori SQL basati su UC che chiamano API esterne, usare le connessioni HTTP del catalogo Unity per archiviare in modo sicuro le credenziali:

CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
  host 'https://api.example.com',
  port '443',
  base_path '/v1/',
  bearer_token 'your-token-here'
);

Usa quindi la connessione nella tua funzione SQL definita dall'utente con la funzione http_request(). Per informazioni dettagliate, vedere Connettersi a servizi HTTP esterni.

WorkspaceClient

Per gli operatori python-run-function, è possibile usare il Azure Databricks WorkspaceClient per accedere alle risorse dell'area di lavoro e alle API esterne:

def run(config, inputs, spark):
    from databricks.sdk import WorkspaceClient
    w = WorkspaceClient()
    # Use w to access workspace resources

Creare un operatore completo python-run-function definito dall'utente

La procedura seguente illustra come creare un python-run-function operatore da zero.

Passaggio 1: Definire la logica

Scrivi la tua run() funzione in un notebook:

from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
    from pyspark.sql import functions as F
    df = inputs["in"]
    result = df.withColumn(config["column_name"], F.current_timestamp())
    return {"out": result}

Passaggio 2: Testare la funzione

Testare la funzione in modo interattivo con i dati di esempio:

test_df = spark.createDataFrame(
    [("Alice", 100), ("Bob", 200)],
    ["name", "amount"]
)

result = run(
    config={"column_name": "processed_at"},
    inputs={"in": test_df},
    spark=spark
)

result["out"].show()

Passaggio 3: Creare la configurazione YAML

Definire i metadati dell'operatore, i campi di configurazione e le porte in un file YAML:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
  type: object
  properties:
    column_name:
      type: string
      title: Column Name
      default: processed_at
      x-ui:
        widget: input
  required:
    - column_name

Passaggio 4: Combinare la logica e YAML

Aggiungere i run_function campi e ports per creare il file YAML completo. Salvarlo nell'area di lavoro, ad esempio /Workspace/Users/<user-name>/udos/add_timestamp.yaml:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
  type: object
  properties:
    column_name:
      type: string
      title: Column Name
      default: processed_at
      x-ui:
        widget: input
  required:
    - column_name
ports:
  input:
    - name: in
      title: Input
  output:
    - name: out
      title: Output
run_function:
  type: inline
  code: |
    from typing import Dict, Any

    def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
        from pyspark.sql import functions as F
        df = inputs["in"]
        result = df.withColumn(config["column_name"], F.current_timestamp())
        return {"out": result}

Passaggio 5: Registrare l'operatore

Aggiungi il percorso del file al file .user_defined_operators.yaml:

operators:
  - /Workspace/Users/<user-name>/udos/add_timestamp.yaml

Passaggio 6: Utilizzare l'operatore in Lakeflow Designer

Aprire Lakeflow Designer e verificare che l'operatore venga visualizzato nel riquadro operatore. Trascinarlo nell'area di disegno, connettere un input, configurare il nome della colonna ed eseguire un'anteprima.

Crea un operatore UC completo definito dall'utente

La procedura seguente illustra come creare un operatore basato su uc-udf UC.

Passaggio 1: Definire la logica

Scrivere e testare la logica della funzione in un notebook:

def double_value(input_value: float) -> float:
    if input_value is None:
        return None
    return input_value * 2

Passaggio 2: Creare la configurazione YAML

Definire i metadati dell'operatore, i campi di configurazione e le porte:

schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
  type: object
  properties:
    input_value:
      type: string
      title: Input Value
      format: expression
      x-ui:
        widget: expression
        port: input_data
  required:
    - input_value
ports:
  input:
    - name: input_data
      title: Input
  output:
    - name: out
      title: Output

Passaggio 3: Combinare la logica e YAML

Creare la funzione Catalogo Unity con YAML incorporato come docstring:

CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
  """
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Double Value
  id: math.double_value
  version: "1.0.0"
  description: Doubles the input value
  config:
    type: object
    properties:
      input_value:
        type: string
        title: Input Value
        format: expression
        x-ui:
          widget: expression
          port: input_data
    required:
      - input_value
  ports:
    input:
      - name: input_data
        title: Input
    output:
      - name: out
        title: Output
  """

  def double_value(input_value: float) -> float:
      if input_value is None:
          return None
      return input_value * 2

  return double_value(input_value)
$$

Passaggio 4: Testare la funzione

SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10

Passaggio 5: Registrare l'operatore

Aggiungere il riferimento alla funzione Catalogo Unity al .user_defined_operators.yaml file:

operators:
  - catalog: main
    schema: my_schema
    functionName: double_value

Passaggio 6: Utilizzare l'operatore in Lakeflow Designer

Aprire Lakeflow Designer e verificare che l'operatore venga visualizzato nel riquadro operatore. Trascinarlo nell'area di disegno, connettere un input ed eseguire un'anteprima.

Risoluzione dei problemi

Issue Soluzione
L'operatore non viene visualizzato in Lakeflow Designer. Verificare che .user_defined_operators.yaml esista ed elencare la funzione o il percorso del file. Per python-run-function operatori, verificare il percorso del file e assicurarsi che il file YAML sia accessibile.
La convalida dello schema non riesce. Verificare il codice YAML rispetto allo schema ufficiale all'indirizzo https://your-workspace.cloud.databricks.com/static/schemas/user-defined-operator-v0.1.0.json.
Autorizzazione negata. Per gli operatori che usano UC, verificare che gli utenti abbiano EXECUTE sulla funzione e USE SCHEMA sullo schema. Per gli operatori python-run-function, verificare che gli utenti abbiano accesso in lettura al file YAML.
python-run-function l'operatore ha esito negativo in fase di esecuzione. Verificare che la firma della run() funzione corrisponda a def run(config, inputs, spark). Verificare che i nomi delle porte nel codice corrispondano a YAML e che le chiavi del dizionario restituite corrispondano ai valori della porta name di output.
UDTF restituisce tipi errati. I tipi restituiti UDTF devono essere espliciti. Non è possibile fare riferimento ai tipi di colonna di input.

Permissions

Autorizzazione Purpose
Accesso in lettura a .user_defined_operators.yaml. Individuare l'operatore .
Accesso in lettura al file YAML (python-run-function soltanto). Caricare la definizione dell'operatore.
EXECUTE sulla funzione Catalogo Unity (solo operatori basati su UC). Esegui l'operatore.
USE SCHEMA nello schema (solo operatori basati su UC). Accedere allo schema in cui viene creata la funzione.
Altre autorizzazioni A seconda dell'operatore, gli utenti potrebbero richiedere altre autorizzazioni. Ad esempio, USE CONNECTION in una connessione del catalogo Unity per le chiamate API HTTP.

Passaggi successivi

Esplora i seguenti tutorial:

Example Tipo Description
Mittente di posta elettronica Gmail python-run-function Inviare dati dataframe come allegato di posta elettronica CSV tramite Gmail.
Calcolatore di interessi composti uc-udf Calcolare i valori di investimento futuri utilizzando la formula di interesse composta.
Clustering K-means uc-udtf Segmentare i dati in gruppi usando scikit-learn.
Invia messaggio Slack uc-udf Inviare notifiche ai canali Slack tramite l'API.
Tutti i widget dell'interfaccia utente uc-udf Operatore di riferimento che mostra tutti i widget dell'interfaccia utente disponibili.

Per il riferimento completo dello schema YAML, vedere Riferimento YAML dell'operatore definito dall'utente.