Guida introduttiva: Creare un'applicazione Python Durable Functions

Usare Durable Functions, una funzionalità di Funzioni di Azure, per scrivere flussi di lavoro serverless con stato in Python. In questa guida introduttiva si clona ed esegue un'app di esempio che illustra due modelli di orchestrazione comuni:

  • Concatenamento delle funzioni: chiama le attività in sequenza (Tokyo → Seattle → Londra).
  • Fan-out/fan-in: chiama le attività in parallelo tra cinque città, quindi aggrega i risultati.

Alla fine, avrai entrambe le orchestrazioni in esecuzione in locale con l'emulatore Durable Task Scheduler e potrai visualizzarne lo stato nella dashboard.

  • Clonare e preparare il progetto di esempio Hello Cities.
  • Configurare l'emulatore Durable Task Scheduler e Azurite per lo sviluppo locale.
  • Eseguire l'app per le funzioni e attivare entrambe le orchestrazioni.
  • Esamina lo stato dell'orchestrazione e il relativo output nel dashboard di Durable Task Scheduler.

Prerequisiti

Configurare l'emulatore dell'utilità di pianificazione di Durable Task

L'emulatore Durable Task Scheduler fornisce un ambiente di sviluppo locale che consente di testare le orchestrazioni senza un abbonamento di Azure. L'host funzioni richiede anche Azurite per l'archiviazione locale.

Avviare entrambi i contenitori:

docker run -d --name dtsemulator -p 8080:8080 -p 8082:8082 \
  mcr.microsoft.com/dts/dts-emulator:latest

docker run -d --name azurite -p 10000:10000 -p 10001:10001 -p 10002:10002 \
  mcr.microsoft.com/azure-storage/azurite

Suggerimento

Una volta avviato l'emulatore, è possibile accedere al dashboard di Durable Task Scheduler all'indirizzo http://localhost:8082 per monitorare le orchestrazioni.

Eseguire l'esempio di avvio rapido

  1. Vai alla directory di esempio di Hello Cities:

    cd samples/durable-functions/python/hello-cities
    
  2. Creare un ambiente virtuale e installare le dipendenze:

    python -m venv .venv
    .venv\Scripts\activate
    pip install -r requirements.txt
    
  3. Verificare che il local.settings.json file contenga la configurazione seguente:

    {
      "IsEncrypted": false,
      "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "python",
        "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"
      }
    }
    
  4. Avviare l'app per le funzioni:

    func start
    
  5. In un terminale separato attivare l'orchestrazione del concatenamento delle funzioni :

    $response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartChaining
    $response
    

    La risposta contiene gli URL di stato per l'istanza di orchestrazione. Copiare il statusQueryGetUri valore ed eseguirlo per controllare il risultato:

    Invoke-RestMethod -Uri $response.statusQueryGetUri
    
  6. Attiva l'orchestrazione fan-out/fan-in:

    $response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartFanOutFanIn
    Invoke-RestMethod -Uri $response.statusQueryGetUri
    

Output previsto

La richiesta POST restituisce una risposta JSON con URL di stato. Per esempio:

{
  "id": "<instanceId>",
  "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<instanceId>?code=...",
  "sendEventPostUri": "...",
  "terminatePostUri": "...",
  "purgeHistoryDeleteUri": "..."
}

Quando si interroga statusQueryGetUri e il campo runtimeStatus dell'orchestrazione è Completed, è possibile trovare i risultati del saluto nel campo output. L'orchestrazione del concatenamento restituisce:

{
  "name": "chaining_orchestration",
  "runtimeStatus": "Completed",
  "output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
}

L'orchestrazione fan-out/fan-in restituisce:

{
  "name": "fan_out_fan_in_orchestration",
  "runtimeStatus": "Completed",
  "output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!", "Hello Paris!", "Hello Berlin!"]
}

Suggerimento

Se runtimeStatus mostra Running o Pending, attendere un attimo ed eseguire di nuovo una statusQueryGetUri query.

Apri il dashboard di Durable Task Scheduler all'indirizzo http://localhost:8082 per visualizzare lo stato dell'orchestrazione e la cronologia di esecuzione.

Informazioni sul codice

L'esempio usa il modello di programmazione Python v2 con elementi decorator, in cui tutte le funzioni sono definite in un singolo file (function_app.py).

Funzione dell'attività

L'attività say_hello prende il nome di una città e restituisce un saluto:

@app.activity_trigger(input_name="city")
def say_hello(city: str) -> str:
    """Activity function that returns a greeting for a city."""
    logging.info(f"Saying hello to {city}.")
    return f"Hello {city}!"

Funzioni dell'agente di orchestrazione

L'orchestratore di concatenazione chiama say_hello in sequenza per tre città:

@app.orchestration_trigger(context_name="context")
def chaining_orchestration(context: df.DurableOrchestrationContext):
    """Function chaining orchestration: calls activities sequentially."""
    result1 = yield context.call_activity("say_hello", "Tokyo")
    result2 = yield context.call_activity("say_hello", "Seattle")
    result3 = yield context.call_activity("say_hello", "London")
    return [result1, result2, result3]

L'orchestratore fan-out/fan-in pianifica le attività in parallelo:

@app.orchestration_trigger(context_name="context")
def fan_out_fan_in_orchestration(context: df.DurableOrchestrationContext):
    """Fan-out/Fan-in orchestration: calls activities in parallel."""
    cities = ["Tokyo", "Seattle", "London", "Paris", "Berlin"]

    # Fan-out: schedule all activities in parallel
    parallel_tasks = []
    for city in cities:
        task = context.call_activity("say_hello", city)
        parallel_tasks.append(task)

    # Fan-in: wait for all to complete
    results = yield context.task_all(parallel_tasks)
    return results

Funzioni del client

Le funzioni client attivate da HTTP avviano ogni orchestrazione. Ad esempio, l'avvio della concatenazione:

@app.route(route="StartChaining", methods=["POST"])
@app.durable_client_input(client_name="client")
async def start_chaining(req: func.HttpRequest, client) -> func.HttpResponse:
    """HTTP trigger to start the function chaining orchestration."""
    instance_id = await client.start_new("chaining_orchestration")
    logging.info(f"Started chaining orchestration with ID = '{instance_id}'.")
    return client.create_check_status_response(req, instance_id)

Configurazione

L'esempio usa l'emulatore di Durable Task Scheduler come backend di archiviazione. Questa operazione è configurata in host.json:

{
  "version": "2.0",
  "logging": {
    "logLevel": {
      "DurableTask.Core": "Warning"
    }
  },
  "extensions": {
    "durableTask": {
      "hubName": "default",
      "storageProvider": {
        "type": "azureManaged",
        "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  }
}

Pulire le risorse

Quando avete finito, fermate i contenitori dell'emulatore:

docker stop dtsemulator azurite && docker rm dtsemulator azurite

Per disattivare l'ambiente virtuale Python:

deactivate

Passaggi successivi