Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Deze pagina bevat een overzicht van Checkpoints in het Microsoft Agent Framework Workflow-systeem.
Overzicht
Met controlepunten kunt u de status van een werkstroom opslaan op specifieke punten tijdens de uitvoering en later hervatten vanaf die punten. Deze functie is met name handig voor de volgende scenario's:
- Langlopende werkstromen waarbij u wilt voorkomen dat de voortgang verloren gaat in het geval van fouten.
- Langlopende werkstromen waarin u de uitvoering op een later tijdstip wilt onderbreken en hervatten.
- Werkstromen waarvoor periodieke statusbesparing is vereist voor controle- of nalevingsdoeleinden.
- Werkstromen die moeten worden gemigreerd naar verschillende omgevingen of instanties.
Wanneer worden controlepunten gemaakt?
Houd er rekening mee dat werkstromen worden uitgevoerd in supersteps, zoals beschreven in de kernconcepten. Controlepunten worden gemaakt aan het einde van elke superstep, nadat alle uitvoerders in die superstep hun uitvoering hebben voltooid. Een controlepunt legt de volledige status van de werkstroom vast, waaronder:
- De huidige status van alle uitvoerders
- Alle berichten in behandeling in de werkstroom voor de volgende superstep
- Aanvragen en antwoorden in behandeling
- Gedeelde toestanden
Controlepunten vastleggen
Als u checkpointing wilt inschakelen, moet tijdens het uitvoeren van de workflow een CheckpointManager worden opgegeven. Een controlepunt kan vervolgens worden geopend via een SuperStepCompletedEvent, of met behulp van de Checkpoints eigenschap tijdens de uitvoering van het programma.
using Microsoft.Agents.AI.Workflows;
// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
.RunStreamingAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
}
}
// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;
Als u controlepunten wilt inschakelen, moet er een CheckpointStorage worden opgegeven bij het maken van een workflow. Een controlepunt kan vervolgens worden geopend via de opslag. Agent Framework verzendt drie ingebouwde implementaties: kies de implementatie die overeenkomt met uw duurzaamheid en implementatiebehoeften:
| Aanbieder | Package | Durability | Ideaal voor |
|---|---|---|---|
InMemoryCheckpointStorage |
agent-framework |
In bewerking alleen | Tests, demo's, kortdurende werkstromen |
FileCheckpointStorage |
agent-framework |
Lokale schijf | Werkstromen met één machine, lokale ontwikkeling |
CosmosCheckpointStorage |
agent-framework-azure-cosmos |
Azure Cosmos DB | Productie-, gedistribueerde, werkstromen voor meerdere processen |
Alle drie implementeren hetzelfde CheckpointStorage protocol, zodat u providers kunt wisselen zonder werkstroom- of uitvoercode te wijzigen.
InMemoryCheckpointStorage houdt controlepunten in procesgeheugen. Het meest geschikt voor tests, demo's en kortdurende werkstromen waarvoor u geen duurzaamheid nodig hebt bij het opnieuw opstarten.
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()
# Run the workflow
async for event in workflow.run(input, stream=True):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
Hervatten vanaf controlepunten
U kunt een werkstroom rechtstreeks vanuit een specifiek controlepunt hervatten tijdens dezelfde uitvoering.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
U kunt een werkstroom hervatten vanaf een specifiek controlepunt op dezelfde workflowinstantie.
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
...
Reactiveren vanuit controlepunten
Of u kunt een werkstroom vanuit een controlepunt reactiveren in een nieuw run-exemplaar.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
.ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
U kunt ook een nieuwe werkstroomexemplaar herstellen vanaf een controlepunt.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
checkpoint_id=saved_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage,
stream=True,
):
...
Uitvoerdersstatussen opslaan
Om ervoor te zorgen dat de status van een uitvoerder wordt vastgelegd in een controlepunt, moet de uitvoerder de methode overschrijven en de OnCheckpointingAsync status ervan opslaan in de werkstroomcontext.
using Microsoft.Agents.AI.Workflows;
internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
Om ervoor te zorgen dat de status correct wordt hersteld bij het hervatten van een controlepunt, moet de uitvoerder de OnCheckpointRestoredAsync methode overschrijven en de status ervan laden vanuit de werkstroomcontext.
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
Om ervoor te zorgen dat de status van een uitvoerder wordt vastgelegd in een controlepunt, moet de uitvoerder de on_checkpoint_save methode overschrijven en de status ervan als een woordenlijst retourneren.
class CustomExecutor(Executor):
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._messages: list[str] = []
@handler
async def handle(self, message: str, ctx: WorkflowContext):
self._messages.append(message)
# Executor logic...
async def on_checkpoint_save(self) -> dict[str, Any]:
return {"messages": self._messages}
Om ervoor te zorgen dat de status correct wordt hersteld bij het hervatten van een controlepunt, moet de uitvoerder de on_checkpoint_restore methode overschrijven en de status ervan herstellen vanuit de opgegeven statuswoordenlijst.
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
self._messages = state.get("messages", [])
Beveiligingsoverwegingen
Belangrijk
Controlepuntopslag vormt een vertrouwensgrens. Of u nu gebruikmaakt van de ingebouwde opslag-implementaties of een aangepaste implementatie, de back-end van de opslag moet worden behandeld als vertrouwde, privé-infrastructuur. Laad nooit controlepunten van niet-vertrouwde of mogelijk gemanipuleerde bronnen.
Zorg ervoor dat de opslaglocatie die wordt gebruikt voor controlepunten op de juiste manier is beveiligd. Alleen geautoriseerde services en gebruikers moeten lees- of schrijftoegang hebben tot controlepuntgegevens.
Pickle-serialisatie
Zowel FileCheckpointStorage als CosmosCheckpointStorage gebruiken de module pickle van Python om de niet-JSON-systeemeigen status te serialiseren, zoals dataclasses, datum/tijd en aangepaste objecten. Om de risico's van willekeurige uitvoering van code tijdens de deserialisatie te beperken, gebruiken beide providers standaard een beperkte uitkiezer . Alleen een ingebouwde set veilige Python typen (primitieven, datetime, uuid, Decimal, algemene verzamelingen, enz.) en alle agent_framework interne typen zijn toegestaan tijdens deserialisatie. Elk ander type dat in een controlepunt wordt aangetroffen, zorgt ervoor dat deserialisatie mislukt met een WorkflowCheckpointException.
Als u aanvullende toepassingsspecifieke typen wilt toestaan, geeft u deze door via de allowed_checkpoint_types parameter met behulp van "module:qualname" de indeling:
from agent_framework import FileCheckpointStorage
storage = FileCheckpointStorage(
"/tmp/checkpoints",
allowed_checkpoint_types=[
"my_app.models:SafeState",
"my_app.models:UserProfile",
],
)
CosmosCheckpointStorage accepteert dezelfde parameter:
from azure.identity.aio import DefaultAzureCredential
from agent_framework_azure_cosmos import CosmosCheckpointStorage
storage = CosmosCheckpointStorage(
endpoint="https://my-account.documents.azure.com:443/",
credential=DefaultAzureCredential(),
database_name="agent-db",
container_name="checkpoints",
allowed_checkpoint_types=[
"my_app.models:SafeState",
"my_app.models:UserProfile",
],
)
Als uw bedreigingsmodel helemaal geen serialisatie op basis van pickle toestaat, gebruikt InMemoryCheckpointStorage of implementeert u een aangepaste CheckpointStorage met een alternatieve serialisatiestrategie.
Verantwoordelijkheid voor opslaglocatie
FileCheckpointStorage vereist een expliciete storage_path parameter. Er is geen standaardmap. Hoewel het framework valideert tegen padtraversalaanvallen, is het beveiligen van de opslagmap zelf (bestandsmachtigingen, versleuteling bij opslag, toegangsbeheer) de verantwoordelijkheid van de ontwikkelaar. Alleen geautoriseerde processen moeten lees- of schrijftoegang hebben tot de checkpointdirectory.
CosmosCheckpointStorage is afhankelijk van Azure Cosmos DB voor opslag. Gebruik waar mogelijk beheerde identiteit en rolgebaseerde toegangscontrole (RBAC), beperk de toegang van de database en container tot de workflowservice en ververs accountsleutels als u sleutelgebaseerde authenticatie gebruikt. Net als bij bestandsopslag moeten alleen geautoriseerde principals lees- of schrijftoegang hebben tot de Cosmos DB-container die checkpoint-documenten bevat.