Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Den här sidan innehåller en översikt över Checkpoints i Microsoft Agent Framework Workflow-systemet.
Översikt
Med kontrollpunkter kan du spara tillståndet för ett arbetsflöde vid specifika tidpunkter under körningen och återuppta från dessa punkter senare. Den här funktionen är särskilt användbar för följande scenarier:
- Långvariga arbetsflöden där du vill undvika att förlora förloppet vid fel.
- Långvariga arbetsflöden där du vill pausa och återuppta körningen vid ett senare tillfälle.
- Arbetsflöden som kräver periodisk lagring av tillstånd för gransknings- eller efterlevnadssyften.
- Arbetsflöden som måste migreras mellan olika miljöer eller instanser.
När skapas kontrollpunkter?
Kom ihåg att arbetsflöden körs i supersteg, vilket beskrivs i huvudbegreppen. Kontrollpunkter skapas i slutet av varje supersteg, efter att alla utförare i det supersteget har slutfört sin körning. En kontrollpunkt samlar in hela arbetsflödets tillstånd, inklusive:
- Det aktuella tillståndet för alla utförare
- Alla väntande meddelanden i arbetsflödet för nästa supersteg
- Väntande begäranden och svar
- Delade tillstånd
Fånga kontrollpunkter
För att aktivera kontrollpunkter måste du ange en CheckpointManager när du kör arbetsflödet. En kontrollpunkt kan sedan nås via en SuperStepCompletedEvent, eller via egenskapen under körning i Checkpoints.
using Microsoft.Agents.AI.Workflows;
// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
.RunStreamingAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
}
}
// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;
För att aktivera kontrollpunkter måste du ange en CheckpointStorage när du skapar ett arbetsflöde. En kontrollpunkt kan sedan nås via lagringen. Agent Framework har tre inbyggda implementeringar – välj den som matchar dina behov av hållbarhet och distribution:
| Leverantör | Package | Durability | Passar bäst för |
|---|---|---|---|
InMemoryCheckpointStorage |
agent-framework |
Endast i processen | Tester, demonstrationer, kortvariga arbetsflöden |
FileCheckpointStorage |
agent-framework |
Lokal disk | Arbetsflöden med en dator, lokal utveckling |
CosmosCheckpointStorage |
agent-framework-azure-cosmos |
Azure Cosmos DB | Produktionsarbetsflöden, distribuerade och processöverskridande |
Alla tre implementerar samma CheckpointStorage protokoll så att du kan byta leverantör utan att ändra arbetsflödes- eller körkod.
InMemoryCheckpointStorage håller kontrollpunkter i processminnet. Bäst för tester, demonstrationer och kortvariga arbetsflöden där du inte behöver beständighet över omstarter.
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()
# Run the workflow
async for event in workflow.run(input, stream=True):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
Återuppta från kontrollpunkter
Du kan återuppta ett arbetsflöde från en specifik kontrollpunkt direkt på samma körning.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Du kan återuppta ett arbetsflöde från en specifik kontrollpunkt direkt på samma arbetsflödesinstans.
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
...
Rehydrering från kontrollpunkter
Eller så kan du återställa ett arbetsflöde från en kontrollpunkt i en ny körningsinstans.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
.ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Du kan också återställa en ny arbetsflödesinstans från en kontrollpunkt.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
checkpoint_id=saved_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage,
stream=True,
):
...
Spara exekutortillstånd
För att säkerställa att tillståndet för en exekverare registreras i en kontrollpunkt måste exekveraren åsidosätta OnCheckpointingAsync metoden och spara dess tillstånd i arbetsflödeskontexten.
using Microsoft.Agents.AI.Workflows;
internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
För att säkerställa att tillståndet återställs korrekt när du återupptar från en kontrollpunkt måste kören åsidosätta OnCheckpointRestoredAsync metoden och läsa in dess tillstånd från arbetsflödeskontexten.
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
För att säkerställa att tillståndet för en exekutor registreras i en kontrollpunkt måste exekutorn åsidosätta on_checkpoint_save metoden och returnera dess tillstånd som en dictionary.
class CustomExecutor(Executor):
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._messages: list[str] = []
@handler
async def handle(self, message: str, ctx: WorkflowContext):
self._messages.append(message)
# Executor logic...
async def on_checkpoint_save(self) -> dict[str, Any]:
return {"messages": self._messages}
För att säkerställa att tillståndet återställs korrekt när du återupptar från en kontrollpunkt måste kören åsidosätta on_checkpoint_restore metoden och återställa dess tillstånd från den angivna tillståndsordlistan.
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
self._messages = state.get("messages", [])
Säkerhetshänsyn
Viktigt!
Kontrollpunktslagring är en tillitsgräns. Oavsett om du använder de inbyggda lagringsimplementeringarna eller en anpassad, måste lagringsserverdelen behandlas som betrodd privat infrastruktur. Läs aldrig in kontrollpunkter från ej betrodda eller potentiellt manipulerade källor.
Kontrollera att lagringsplatsen som används för kontrollpunkter är korrekt skyddad. Endast auktoriserade tjänster och användare ska ha läs- eller skrivåtkomst till kontrollpunktsdata.
Pickle-serialisering
Både FileCheckpointStorage och CosmosCheckpointStorage använder Python modulen pickle för att serialisera icke-JSON-inbyggda tillstånd, till exempel dataklasser, datetimes och anpassade objekt. För att minska riskerna för arbiträr kodkörning under deserialisering använder båda leverantörerna som standard en begränsad unpickler . Endast en inbyggd uppsättning säkra Python typer (primitiver, datetime, uuid, Decimal, vanliga samlingar osv.) och alla agent_framework interna typer tillåts under deserialisering. Alla andra typer som påträffas i en kontrollpunkt gör att deserialiseringen misslyckas med en WorkflowCheckpointException.
Om du vill tillåta ytterligare programspecifika typer skickar du dem via parametern allowed_checkpoint_types med hjälp av "module:qualname" formatet:
from agent_framework import FileCheckpointStorage
storage = FileCheckpointStorage(
"/tmp/checkpoints",
allowed_checkpoint_types=[
"my_app.models:SafeState",
"my_app.models:UserProfile",
],
)
CosmosCheckpointStorage accepterar samma parameter:
from azure.identity.aio import DefaultAzureCredential
from agent_framework_azure_cosmos import CosmosCheckpointStorage
storage = CosmosCheckpointStorage(
endpoint="https://my-account.documents.azure.com:443/",
credential=DefaultAzureCredential(),
database_name="agent-db",
container_name="checkpoints",
allowed_checkpoint_types=[
"my_app.models:SafeState",
"my_app.models:UserProfile",
],
)
Om din hotmodell inte tillåter pickle-baserad serialisering alls, använd InMemoryCheckpointStorage eller implementera en anpassad CheckpointStorage med en alternativ serialiseringsstrategi.
Ansvar för lagringsplats
FileCheckpointStorage kräver en explicit storage_path parameter – det finns ingen standardkatalog. Ramverket validerar mot sökvägsattacker, men det är utvecklarens ansvar att skydda själva lagringskatalogen (filbehörigheter, kryptering i vila och åtkomstkontroller). Endast auktoriserade processer ska ha läs- eller skrivåtkomst till kontrollpunktskatalogen.
CosmosCheckpointStorage förlitar sig på Azure Cosmos DB för lagring. Använd hanterad identitet/RBAC där det är möjligt, begränsa omfattningen av databasen och containern till arbetsflödestjänsten och rotera kontonycklar om du använder nyckelbaserad autentisering. Precis som med fillagring ska endast auktoriserade aktörer ha läs- eller skrivåtkomst till Cosmos DB-containern som innehåller kontrollpunktsdokument.