Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Usare Durable Functions, una funzionalità di Funzioni di Azure, per scrivere flussi di lavoro serverless con stato in Python. In questa guida introduttiva si clona ed esegue un'app di esempio che illustra due modelli di orchestrazione comuni:
- Concatenamento delle funzioni: chiama le attività in sequenza (Tokyo → Seattle → Londra).
- Fan-out/fan-in: chiama le attività in parallelo tra cinque città, quindi aggrega i risultati.
Alla fine, avrai entrambe le orchestrazioni in esecuzione in locale con l'emulatore Durable Task Scheduler e potrai visualizzarne lo stato nella dashboard.
- Clonare e preparare il progetto di esempio Hello Cities.
- Configurare l'emulatore Durable Task Scheduler e Azurite per lo sviluppo locale.
- Eseguire l'app per le funzioni e attivare entrambe le orchestrazioni.
- Esamina lo stato dell'orchestrazione e il relativo output nel dashboard di Durable Task Scheduler.
Prerequisiti
- Python 3.9+ installato.
- Funzioni di Azure Core Tools v4 o versione successiva.
- Docker per l'esecuzione dell'emulatore e di Azurite.
- Clonare il repository GitHub Durable Task Scheduler per usare l'esempio rapido.
Configurare l'emulatore dell'utilità di pianificazione di Durable Task
L'emulatore Durable Task Scheduler fornisce un ambiente di sviluppo locale che consente di testare le orchestrazioni senza un abbonamento di Azure. L'host funzioni richiede anche Azurite per l'archiviazione locale.
Avviare entrambi i contenitori:
docker run -d --name dtsemulator -p 8080:8080 -p 8082:8082 \
mcr.microsoft.com/dts/dts-emulator:latest
docker run -d --name azurite -p 10000:10000 -p 10001:10001 -p 10002:10002 \
mcr.microsoft.com/azure-storage/azurite
Suggerimento
Una volta avviato l'emulatore, è possibile accedere al dashboard di Durable Task Scheduler all'indirizzo http://localhost:8082 per monitorare le orchestrazioni.
Eseguire l'esempio di avvio rapido
Vai alla directory di esempio di Hello Cities:
cd samples/durable-functions/python/hello-citiesCreare un ambiente virtuale e installare le dipendenze:
python -m venv .venv .venv\Scripts\activate pip install -r requirements.txtVerificare che il
local.settings.jsonfile contenga la configurazione seguente:{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", "FUNCTIONS_WORKER_RUNTIME": "python", "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None" } }Avviare l'app per le funzioni:
func startIn un terminale separato attivare l'orchestrazione del concatenamento delle funzioni :
$response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartChaining $responseLa risposta contiene gli URL di stato per l'istanza di orchestrazione. Copiare il
statusQueryGetUrivalore ed eseguirlo per controllare il risultato:Invoke-RestMethod -Uri $response.statusQueryGetUriAttiva l'orchestrazione fan-out/fan-in:
$response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartFanOutFanIn Invoke-RestMethod -Uri $response.statusQueryGetUri
Output previsto
La richiesta POST restituisce una risposta JSON con URL di stato. Per esempio:
{
"id": "<instanceId>",
"statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<instanceId>?code=...",
"sendEventPostUri": "...",
"terminatePostUri": "...",
"purgeHistoryDeleteUri": "..."
}
Quando si interroga statusQueryGetUri e il campo runtimeStatus dell'orchestrazione è Completed, è possibile trovare i risultati del saluto nel campo output. L'orchestrazione del concatenamento restituisce:
{
"name": "chaining_orchestration",
"runtimeStatus": "Completed",
"output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
}
L'orchestrazione fan-out/fan-in restituisce:
{
"name": "fan_out_fan_in_orchestration",
"runtimeStatus": "Completed",
"output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!", "Hello Paris!", "Hello Berlin!"]
}
Suggerimento
Se runtimeStatus mostra Running o Pending, attendere un attimo ed eseguire di nuovo una statusQueryGetUri query.
Apri il dashboard di Durable Task Scheduler all'indirizzo http://localhost:8082 per visualizzare lo stato dell'orchestrazione e la cronologia di esecuzione.
Informazioni sul codice
L'esempio usa il modello di programmazione Python v2 con elementi decorator, in cui tutte le funzioni sono definite in un singolo file (function_app.py).
Funzione dell'attività
L'attività say_hello prende il nome di una città e restituisce un saluto:
@app.activity_trigger(input_name="city")
def say_hello(city: str) -> str:
"""Activity function that returns a greeting for a city."""
logging.info(f"Saying hello to {city}.")
return f"Hello {city}!"
Funzioni dell'agente di orchestrazione
L'orchestratore di concatenazione chiama say_hello in sequenza per tre città:
@app.orchestration_trigger(context_name="context")
def chaining_orchestration(context: df.DurableOrchestrationContext):
"""Function chaining orchestration: calls activities sequentially."""
result1 = yield context.call_activity("say_hello", "Tokyo")
result2 = yield context.call_activity("say_hello", "Seattle")
result3 = yield context.call_activity("say_hello", "London")
return [result1, result2, result3]
L'orchestratore fan-out/fan-in pianifica le attività in parallelo:
@app.orchestration_trigger(context_name="context")
def fan_out_fan_in_orchestration(context: df.DurableOrchestrationContext):
"""Fan-out/Fan-in orchestration: calls activities in parallel."""
cities = ["Tokyo", "Seattle", "London", "Paris", "Berlin"]
# Fan-out: schedule all activities in parallel
parallel_tasks = []
for city in cities:
task = context.call_activity("say_hello", city)
parallel_tasks.append(task)
# Fan-in: wait for all to complete
results = yield context.task_all(parallel_tasks)
return results
Funzioni del client
Le funzioni client attivate da HTTP avviano ogni orchestrazione. Ad esempio, l'avvio della concatenazione:
@app.route(route="StartChaining", methods=["POST"])
@app.durable_client_input(client_name="client")
async def start_chaining(req: func.HttpRequest, client) -> func.HttpResponse:
"""HTTP trigger to start the function chaining orchestration."""
instance_id = await client.start_new("chaining_orchestration")
logging.info(f"Started chaining orchestration with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
Configurazione
L'esempio usa l'emulatore di Durable Task Scheduler come backend di archiviazione. Questa operazione è configurata in host.json:
{
"version": "2.0",
"logging": {
"logLevel": {
"DurableTask.Core": "Warning"
}
},
"extensions": {
"durableTask": {
"hubName": "default",
"storageProvider": {
"type": "azureManaged",
"connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
}
}
Pulire le risorse
Quando avete finito, fermate i contenitori dell'emulatore:
docker stop dtsemulator azurite && docker rm dtsemulator azurite
Per disattivare l'ambiente virtuale Python:
deactivate
Passaggi successivi
- Informazioni sui modelli di app common Durable Functions.
- Informazioni sui provider di archiviazione Durable Functions.