Tutorial: operatore di invio email di Gmail

Importante

Questa funzionalità è in Anteprima Pubblica.

Questa esercitazione illustra come creare un python-run-function operatore per Lakeflow Designer che invia il contenuto di un dataframe come allegato CSV tramite Gmail. Usare questo esempio per informazioni su come creare operatori basati su YAML che eseguono effetti collaterali, ad esempio l'invio di notifiche o la scrittura in sistemi esterni. Per altre informazioni, vedere Operatori definiti dall'utente in Lakeflow Designer.

Requirements

  • Un'area di lavoro Azure Databricks con accesso per creare ambiti segreti.
  • Un account Gmail con una password dell'app Google (obbligatoria quando è abilitata l'autenticazione a più fattori).
  • CLI di Databricks installata sulla macchina di sviluppo locale.

Passaggio 1: Configurare i segreti

Archiviare le credenziali gmail in un ambito segreto Azure Databricks in modo che l'operatore possa recuperarle in fase di esecuzione.

  1. Creare un ambito segreto usando l'interfaccia della riga di comando di Azure Databricks:

    databricks secrets create-scope my_email_scope
    
  2. Memorizza la password dell'app di Gmail nell'ambito:

    databricks secrets put-secret my_email_scope gmail_app_password
    

    Viene richiesto di immettere il valore del segreto. Incollare la password dell'app Gmail e salvare.

Passaggio 2: Scrivere la run() funzione

Il python-run-function tipo di operatore richiede una run() funzione con questa firma:

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
  • config: valori di configurazione forniti dall'utente nell'interfaccia utente di Lakeflow Designer.
  • inputs: DataFrame di input indicizzati per nome della porta.
  • spark: sessione Spark attiva.

La funzione deve restituire un dizionario di DataFrame di output indicizzato per nome della porta di output.

Definire e testare la funzione in una cella del notebook:

from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
    input_df = inputs["data"]

    # Skip side effects during Designer preview
    if config.get("is_preview", False):
        return {"data": input_df}

    import smtplib
    import os
    from email.mime.multipart import MIMEMultipart
    from email.mime.text import MIMEText
    from email.mime.base import MIMEBase
    from email import encoders

    sender_email = config.get("sender_email", "")
    secret_scope = config.get("secret_scope", "")
    secret_key = config.get("secret_key", "")
    recipients_raw = config.get("recipients", "")
    subject = config.get("subject", "")
    body = config.get("body", "")

    if not sender_email:
        raise ValueError("Sender Email is required.")
    if not secret_scope or not secret_key:
        raise ValueError("Secret Scope and Secret Key are required.")
    if not recipients_raw:
        raise ValueError("At least one recipient is required.")

    recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
    if not recipients:
        raise ValueError("At least one valid recipient email is required.")

    # Retrieve password from Databricks secrets
    from pyspark.dbutils import DBUtils
    dbutils = DBUtils(spark)
    sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)

    # Convert DataFrame to CSV
    pdf = input_df.toPandas()
    file_path = "/tmp/designer_email_attachment.csv"
    pdf.to_csv(file_path, index=False)

    # Send email to each recipient
    for recipient in recipients:
        msg = MIMEMultipart()
        msg["From"] = sender_email
        msg["To"] = recipient
        msg["Subject"] = subject
        msg.attach(MIMEText(body, "plain"))

        with open(file_path, "rb") as attachment:
            part = MIMEBase("application", "octet-stream")
            part.set_payload(attachment.read())
            encoders.encode_base64(part)
            part.add_header(
                "Content-Disposition",
                f"attachment; filename={os.path.basename(file_path)}",
            )
            msg.attach(part)

        with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
            server.login(sender_email, sender_password)
            server.send_message(msg)

    # Clean up temp file
    if os.path.exists(file_path):
        os.remove(file_path)

    return {"data": input_df}

Passaggio 3: Testare la funzione

Testare la funzione con un dataframe di esempio:

test_df = spark.createDataFrame(
    [("Alice", 100), ("Bob", 200)],
    ["name", "amount"]
)

# Test in preview mode (no email sent)
result = run(
    config={
        "is_preview": True,
        "sender_email": "you@gmail.com",
        "secret_scope": "my_email_scope",
        "secret_key": "gmail_app_password",
        "recipients": "alice@example.com",
        "subject": "Test",
        "body": "Test body"
    },
    inputs={"data": test_df},
    spark=spark
)

result["data"].show()
# Expected: the original DataFrame, unchanged

Note

I secret_scope valori e secret_key nella configurazione sono i nomi dell'ambito del segreto e della chiave creati nel passaggio 1, non la password effettiva. L'operatore utilizza questi nomi per recuperare la password dai segreti di Azure Databricks in fase di esecuzione.

Importante

Eseguire il test con is_preview impostato su True prima per verificare il comportamento pass-through senza inviare messaggi di posta elettronica. Quando si è pronti a testare l'email effettiva, impostare is_preview su False.

Passaggio 4: Compilare la definizione YAML

Creare un file denominato gmail_email_sender.yaml con il contenuto seguente:

schema: user-defined-operator-v0.1.0
id: gmail_email_sender
type: python-run-function
version: '1.0.0'
name: Gmail Email Sender
description: Sends the input DataFrame as a CSV attachment via Gmail SMTP to one or more recipients.

config:
  type: object
  properties:
    is_preview:
      type: boolean
      format: is_preview
      default: false
    sender_email:
      type: string
      title: Sender Email
      default: ''
      examples:
        - 'you@gmail.com'
      x-ui:
        widget: input
    secret_scope:
      type: string
      title: Secret Scope
      default: ''
      examples:
        - 'my_email_scope'
      x-ui:
        widget: input
    secret_key:
      type: string
      title: Secret Key
      default: ''
      examples:
        - 'gmail_app_password'
      x-ui:
        widget: input
    recipients:
      type: string
      title: Recipients
      default: ''
      examples:
        - 'alice@example.com, bob@example.com'
      x-ui:
        widget: textarea
        rows: 2
    subject:
      type: string
      title: Subject
      default: ''
      examples:
        - 'Designer Output Data'
      x-ui:
        widget: input
    body:
      type: string
      title: Email Body
      default: "Hello,\n\nAttached is the latest data.\n\nBest,\nDatabricks Workflow"
      x-ui:
        widget: textarea
        rows: 6
  required:
    - sender_email
    - secret_scope
    - secret_key
    - recipients
    - subject
  additionalProperties: false

ports:
  input:
    - name: data
      title: Input Data
      mime: application/vnd.databricks.dataframe
  output:
    - name: data
      title: Output Data
      mime: application/vnd.databricks.dataframe

run_function:
  type: inline
  code: |
    from typing import Dict, Any

    def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
        input_df = inputs["data"]

        if config.get("is_preview", False):
            return {"data": input_df}

        import smtplib
        import os
        from email.mime.multipart import MIMEMultipart
        from email.mime.text import MIMEText
        from email.mime.base import MIMEBase
        from email import encoders

        sender_email = config.get("sender_email", "")
        secret_scope = config.get("secret_scope", "")
        secret_key = config.get("secret_key", "")
        recipients_raw = config.get("recipients", "")
        subject = config.get("subject", "")
        body = config.get("body", "")

        if not sender_email:
            raise ValueError("Sender Email is required.")
        if not secret_scope or not secret_key:
            raise ValueError("Secret Scope and Secret Key are required.")
        if not recipients_raw:
            raise ValueError("At least one recipient is required.")

        recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
        if not recipients:
            raise ValueError("At least one valid recipient email is required.")

        from pyspark.dbutils import DBUtils
        dbutils = DBUtils(spark)
        sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)

        pdf = input_df.toPandas()
        file_path = "/tmp/designer_email_attachment.csv"
        pdf.to_csv(file_path, index=False)

        for recipient in recipients:
            msg = MIMEMultipart()
            msg["From"] = sender_email
            msg["To"] = recipient
            msg["Subject"] = subject
            msg.attach(MIMEText(body, "plain"))

            with open(file_path, "rb") as attachment:
                part = MIMEBase("application", "octet-stream")
                part.set_payload(attachment.read())
                encoders.encode_base64(part)
                part.add_header(
                    "Content-Disposition",
                    f"attachment; filename={os.path.basename(file_path)}",
                )
                msg.attach(part)

            with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
                server.login(sender_email, sender_password)
                server.send_message(msg)

        if os.path.exists(file_path):
            os.remove(file_path)

        return {"data": input_df}

Passaggio 5: Salvare e registrare l'operatore

  1. Salvare il file YAML nell'area di lavoro Azure Databricks. Per esempio:

    /Workspace/Users/<user-name>/gmail_email_sender.yaml
    
  2. Aggiungi l'operatore nel file .user_defined_operators.yaml:

    operators:
      - /Workspace/Users/<user-name>/gmail_email_sender.yaml
    

Per altre informazioni sulle opzioni di registrazione, vedere Rendere individuabile l'operatore.

Permissions

Gli utenti che eseguono un flusso di lavoro contenente questo operatore devono READ accedere all'ambito del segreto oppure possono fornire il proprio ambito segreto e i valori di chiave nella configurazione dell'operatore. Gli utenti devono anche accedere in lettura al file YAML nell'area di lavoro.

Per concedere l'accesso all'ambito segreto:

databricks secrets put-acl my_email_scope <user-or-group> READ

Utilizzo dell'operatore in Lakeflow Designer

Dopo la registrazione, l'operatore viene visualizzato in Lakeflow Designer con una porta di input per l'origine dati e i campi di configurazione per l'indirizzo di posta elettronica del mittente, l'ambito segreto, la chiave privata, i destinatari, l'oggetto e il corpo.

Quando il flusso di lavoro viene eseguito, l'operatore converte il dataframe di input in CSV, lo allega a un messaggio di posta elettronica e lo invia a ogni destinatario. Il DataFrame passa alla porta di output senza modifiche, così puoi concatenare ulteriori operatori a valle. Durante l'anteprima del flusso di lavoro, non viene inviato alcun messaggio di posta elettronica.