다음을 통해 공유


Microsoft 에이전트 프레임워크 워크플로 - 검사점

이 페이지에서는 Microsoft 에이전트 프레임워크 워크플로 시스템의 체크포인트 개요를 제공합니다.

개요

검사점을 사용하면 실행 중에 특정 지점에서 워크플로의 상태를 저장하고 나중에 해당 지점에서 다시 시작할 수 있습니다. 이 기능은 다음 시나리오에 특히 유용합니다.

  • 오류 발생 시 진행률 손실을 방지하려는 장기 실행 워크플로입니다.
  • 나중에 실행을 일시 중지하고 다시 시작하려는 장기 실행 워크플로입니다.
  • 감사 또는 규정 준수를 위해 주기적인 상태 저장이 필요한 워크플로입니다.
  • 여러 환경 또는 인스턴스에서 마이그레이션해야 하는 워크플로입니다.

검사점은 언제 생성되나요?

워크플로는 핵심 개념에 설명된 대로 슈퍼스텝으로 실행됩니다. 검사점은 해당 슈퍼스텝의 모든 실행기가 실행을 완료한 후 각 슈퍼스텝의 끝에 만들어집니다. 검사점은 다음을 포함하여 워크플로의 전체 상태를 캡처합니다.

  • 모든 실행기의 현재 상태입니다.
  • 다음 슈퍼스텝에 대한 워크플로의 보류 중인 모든 메시지
  • 보류 중인 요청 및 응답
  • 공유 상태

검사점 캡처

검사점을 활성화하려면 워크플로를 실행할 때 CheckpointManager를 제공해야 합니다. 그런 다음, 검사점은 SuperStepCompletedEvent 또는 실행 중 Checkpoints 속성을 통해 액세스할 수 있습니다.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
    }
}

// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;

검사점을 활성화하려면 워크플로를 CheckpointStorage 만들 때 지정해야 합니다. 그런 다음 스토리지를 통해 검사점에 액세스할 수 있습니다. Agent Framework는 내구성 및 배포 요구 사항과 일치하는 구현을 선택하는 세 가지 기본 제공 구현을 제공합니다.

공급자 Package Durability 적합한 대상
InMemoryCheckpointStorage agent-framework 프로세스 내부 전용 테스트, 데모, 수명이 짧은 워크플로
FileCheckpointStorage agent-framework 로컬 디스크 단일 머신 워크플로, 로컬 개발
CosmosCheckpointStorage agent-framework-azure-cosmos Azure Cosmos DB 프로덕션, 분산, 프로세스 간 워크플로

세 가지 모두 동일한 CheckpointStorage 프로토콜을 구현하므로 워크플로 또는 실행기 코드를 변경하지 않고도 공급자를 교환할 수 있습니다.

InMemoryCheckpointStorage 는 프로세스 메모리에 검사점을 유지합니다. 다시 시작 시 내구성이 필요하지 않은 테스트, 데모 및 수명이 짧은 워크플로에 가장 적합합니다.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

검사점에서 다시 시작하기

동일한 실행에서 특정 검사점에서 직접 워크플로를 다시 시작할 수 있습니다.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

동일한 워크플로 인스턴스의 특정 검사점에서 직접 워크플로를 다시 시작할 수 있습니다.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
    ...

검사점에서 리하이드라이팅

또는 검사점에서 새 실행 인스턴스로 워크플로를 리하일 수 있습니다.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
    .ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

새 워크플로 인스턴스를 검사점을 통해 리하이드레이트할 수 있습니다.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    ...

실행기 상태 저장

실행기의 상태가 검사점에서 캡처되도록 하려면 실행기가 OnCheckpointingAsync 메서드를 재정의하고 워크플로 컨텍스트에 해당 상태를 저장해야 합니다.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

또한 검사점에서 복귀할 때 상태가 올바르게 복원되도록 하기 위해, 실행기가 OnCheckpointRestoredAsync 메서드를 재정의하고 워크플로 컨텍스트에서 해당 상태를 로드해야 합니다.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

실행기의 상태가 검사점에 캡처되도록 하려면, 실행기는 on_checkpoint_save 메서드를 재정의하고 해당 상태를 사전으로 반환해야 합니다.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

또한 검사점에서 다시 시작할 때 상태가 올바르게 복원되도록 하려면, 실행기가 on_checkpoint_restore 메서드를 재정의하고 제공된 상태 딕셔너리에서 상태를 복원해야 합니다.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

보안 고려사항

중요합니다

체크포인트 스토리지는 신뢰 경계입니다. 기본 제공 스토리지 구현 또는 사용자 지정 구현을 사용하든 스토리지 백 엔드는 신뢰할 수 있는 프라이빗 인프라로 처리되어야 합니다. 신뢰할 수 없거나 잠재적으로 변조된 원본에서 검사점을 로드하지 마세요.

검사점에서 사용되는 스토리지 위치가 적절하게 보호되었는지 확인합니다. 권한 있는 서비스 및 사용자만 검사점 데이터에 대한 읽기 또는 쓰기 액세스 권한이 있어야 합니다.

피클 시리얼라이제이션

FileCheckpointStorageCosmosCheckpointStorage 모두 Python pickle 모듈을 사용하여 데이터 클래스, datetimes 및 사용자 지정 개체와 같은 비 JSON 네이티브 상태를 직렬화합니다. 역직렬화하는 동안 임의 코드 실행의 위험을 완화하기 위해 두 공급자는 기본적으로 제한적 역직렬화 프로그램을 사용합니다. 안전한 Python 형식(기본 형식, datetime, uuid, Decimal, 공통 컬렉션 등)의 기본 제공 집합과 모든 agent_framework 내부 형식은 역직렬화 중에 허용됩니다. 다른 형식이 검사점에서 발견되면, 이로 인해 WorkflowCheckpointException와 함께 역직렬화가 실패합니다.

추가 애플리케이션 관련 형식을 허용하려면 "module:qualname" 형식을 사용하여 allowed_checkpoint_types 매개 변수를 전달합니다.

from agent_framework import FileCheckpointStorage

storage = FileCheckpointStorage(
    "/tmp/checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

CosmosCheckpointStorage 는 동일한 매개 변수를 허용합니다.

from azure.identity.aio import DefaultAzureCredential
from agent_framework_azure_cosmos import CosmosCheckpointStorage

storage = CosmosCheckpointStorage(
    endpoint="https://my-account.documents.azure.com:443/",
    credential=DefaultAzureCredential(),
    database_name="agent-db",
    container_name="checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

위협 모델에서 피클 기반 직렬화를 전혀 허용하지 않는 경우 InMemoryCheckpointStorage을 사용하거나, 대체 직렬화 전략을 사용하여 사용자 지정 CheckpointStorage을 구현하십시오.

스토리지 위치 책임

FileCheckpointStorage 에는 명시적 storage_path 매개 변수가 필요합니다. 기본 디렉터리가 없습니다. 프레임워크는 경로 순회 공격에 대해 유효성을 검사하지만 스토리지 디렉터리 자체(파일 권한, 미사용 암호화, 액세스 제어)를 보호하는 것은 개발자의 책임입니다. 권한 있는 프로세스만 검사점 디렉터리에 대한 읽기 또는 쓰기 권한이 있어야 합니다.

CosmosCheckpointStorage 스토리지에 대한 Azure Cosmos DB 사용합니다. 가능한 경우 관리 ID/RBAC를 사용하고, 데이터베이스 및 컨테이너의 범위를 워크플로 서비스로 지정하고, 키 기반 인증을 사용하는 경우 계정 키를 회전합니다. 파일 스토리지와 마찬가지로 권한 있는 보안 주체만 검사점 문서를 보유하는 Cosmos DB 컨테이너에 대한 읽기 또는 쓰기 액세스 권한이 있어야 합니다.

다음 단계