Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cette page fournit une vue d’ensemble de Checkpoints dans le système de flux de travail Microsoft Agent Framework.
Vue d’ensemble
Les points de contrôle vous permettent d’enregistrer l’état d’un flux de travail à des points spécifiques pendant son exécution et de reprendre à partir de ces points ultérieurement. Cette fonctionnalité est particulièrement utile pour les scénarios suivants :
- Flux de travail longs dans lesquels vous souhaitez éviter de perdre la progression en cas d’échecs.
- Flux de travail de longue durée dans lesquels vous souhaitez suspendre et reprendre l’exécution ultérieurement.
- Flux de travail nécessitant un enregistrement d’état périodique pour des besoins d’audit ou de conformité.
- Flux de travail qui doivent être migrés dans différents environnements ou instances.
Quand les points de contrôle sont-ils créés ?
N’oubliez pas que les flux de travail sont exécutés en supersteps, comme documenté dans les concepts de base. Les points de contrôle sont créés à la fin de chaque superstep, une fois que tous les exécuteurs de ce superstep ont terminé leur exécution. Un point de contrôle capture l’état entier du flux de travail, notamment :
- État actuel de tous les exécuteurs
- Tous les messages en attente dans le flux de travail pour le superstep suivant
- Demandes et réponses en attente
- États partagés
Capture de points de contrôle
Pour activer le checkpointing, vous devez disposer d’un CheckpointManager lors de l’exécution du flux de travail. Un point de contrôle est ensuite accessible via un SuperStepCompletedEvent, ou via la propriété Checkpoints pendant l’exécution.
using Microsoft.Agents.AI.Workflows;
// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
.RunStreamingAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
}
}
// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;
Pour activer le point de contrôle, un CheckpointStorage doit être fourni lors de la création d’un flux de travail. Un point de contrôle est ensuite accessible via le stockage. Agent Framework fournit trois implémentations intégrées : choisissez celle qui correspond à vos besoins de durabilité et de déploiement :
| Fournisseur | Package | Durability | Idéal pour |
|---|---|---|---|
InMemoryCheckpointStorage |
agent-framework |
En cours uniquement | Tests, démonstrations, flux de travail de courte durée |
FileCheckpointStorage |
agent-framework |
Disque local | Flux de travail à ordinateur unique, développement local |
CosmosCheckpointStorage |
agent-framework-azure-cosmos |
Azure Cosmos DB | Flux de travail de production, distribués et inter-processus |
Les trois implémentent le même CheckpointStorage protocole. Vous pouvez donc échanger des fournisseurs sans modifier le code du flux de travail ou de l’exécuteur.
InMemoryCheckpointStorage conserve les points de contrôle dans la mémoire du processus. Idéal pour les tests, les démonstrations et les flux de travail de courte durée où vous n’avez pas besoin de durabilité entre les redémarrages.
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()
# Run the workflow
async for event in workflow.run(input, stream=True):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
Reprise à partir de points de contrôle
Vous pouvez reprendre un flux de travail à partir d’un point de contrôle spécifique directement lors de la même exécution.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Vous pouvez reprendre un flux de travail à partir d’un point de contrôle spécifique directement sur la même instance de workflow.
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
...
Réhydratage à partir de points de contrôle
Vous pouvez également réalimenter un flux de travail à partir d’un point de contrôle dans une nouvelle instance d’exécution.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
.ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Vous pouvez également réalimenter une nouvelle instance de flux de travail à partir d’un point de contrôle.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
checkpoint_id=saved_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage,
stream=True,
):
...
Enregistrer les états de l’exécuteur
Pour que l'état d'un exécuteur soit capturé dans un point de contrôle, l'exécuteur doit redéfinir la méthode OnCheckpointingAsync et enregistrer son état dans le contexte du workflow.
using Microsoft.Agents.AI.Workflows;
internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
En outre, pour vous assurer que l’état est correctement restauré lors de la reprise à partir d’un point de contrôle, l’exécuteur doit remplacer la OnCheckpointRestoredAsync méthode et charger son état à partir du contexte de flux de travail.
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
Pour vous assurer que l’état d’un exécuteur est capturé dans un point de contrôle, l’exécuteur doit remplacer la méthode on_checkpoint_save et retourner son état sous forme de dictionnaire.
class CustomExecutor(Executor):
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._messages: list[str] = []
@handler
async def handle(self, message: str, ctx: WorkflowContext):
self._messages.append(message)
# Executor logic...
async def on_checkpoint_save(self) -> dict[str, Any]:
return {"messages": self._messages}
En outre, pour vous assurer que l’état est correctement restauré lors de la reprise à partir d’un point de contrôle, l’exécuteur doit remplacer la on_checkpoint_restore méthode et restaurer son état à partir du dictionnaire d’état fourni.
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
self._messages = state.get("messages", [])
Considérations relatives à la sécurité
Important
Le stockage de points de contrôle est une frontière de confiance. Que vous utilisiez les implémentations de stockage intégrées ou une implémentation personnalisée, le serveur principal de stockage doit être traité comme une infrastructure privée approuvée. Ne chargez jamais les points de contrôle à partir de sources non approuvées ou potentiellement falsifiées.
Vérifiez que l’emplacement de stockage utilisé pour les points de contrôle est sécurisé de manière appropriée. Seuls les services autorisés et les utilisateurs doivent avoir un accès en lecture ou en écriture aux données de point de contrôle.
Sérialisation avec Pickle
Les deux FileCheckpointStorage et CosmosCheckpointStorage utilisent le module pickle de Python pour sérialiser un état natif non JSON tel que des classes de données, des datetimes et des objets personnalisés. Pour atténuer les risques d’exécution arbitraire du code pendant la désérialisation, les deux fournisseurs utilisent par défaut un dé-sérialiseur restreint. Seul un ensemble intégré de types de Python sécurisés (primitives, datetime, uuid, Decimal, collections communes, etc.) et tous les types internes agent_framework sont autorisés pendant la désérialisation. Tout autre type rencontré dans un point de contrôle entraîne l’échec de la désérialisation avec un WorkflowCheckpointException.
Pour autoriser des types supplémentaires spécifiques à l’application, transmettez-les via le allowed_checkpoint_types paramètre en utilisant le "module:qualname" format suivant :
from agent_framework import FileCheckpointStorage
storage = FileCheckpointStorage(
"/tmp/checkpoints",
allowed_checkpoint_types=[
"my_app.models:SafeState",
"my_app.models:UserProfile",
],
)
CosmosCheckpointStorage accepte le même paramètre :
from azure.identity.aio import DefaultAzureCredential
from agent_framework_azure_cosmos import CosmosCheckpointStorage
storage = CosmosCheckpointStorage(
endpoint="https://my-account.documents.azure.com:443/",
credential=DefaultAzureCredential(),
database_name="agent-db",
container_name="checkpoints",
allowed_checkpoint_types=[
"my_app.models:SafeState",
"my_app.models:UserProfile",
],
)
Si votre modèle de menace n’autorise pas du tout la sérialisation basée sur pickle, utilisez InMemoryCheckpointStorage ou implémentez une solution personnalisée CheckpointStorage avec une autre stratégie de sérialisation.
Responsabilité de l’emplacement de stockage
FileCheckpointStorage nécessite un paramètre explicite storage_path : il n’existe aucun répertoire par défaut. Bien que l’infrastructure valide contre les attaques de traversée de chemin d’accès, la sécurisation du répertoire de stockage lui-même (autorisations de fichier, chiffrement au repos, contrôles d’accès) est la responsabilité du développeur. Seuls les processus autorisés doivent avoir un accès en lecture ou en écriture au répertoire de point de contrôle.
CosmosCheckpointStorage s’appuie sur Azure Cosmos DB pour le stockage. Utilisez l’identité managée /RBAC si possible, limitez la base de données et le conteneur au service de flux de travail et faites pivoter les clés de compte si vous utilisez l’authentification basée sur des clés. Comme pour le stockage de fichiers, seuls les principaux autorisés doivent avoir un accès en lecture ou en écriture au conteneur Cosmos DB qui contient des documents de point de contrôle.