AI를 사용하여 에이전트 애플리케이션을 빌드하는 방법에는 두 가지 일반적인 방법이 있습니다.
-
결정적 워크플로 - 코드에서 제어 흐름을 정의합니다. 표준 프로그래밍 구문을 사용하여 단계, 분기, 병렬 처리 및 오류 처리 시퀀스를 작성합니다. LLM은 각 단계 내에서 작업을 수행하지만 전체 흐름을 제어하지는 않습니다.
-
에이전트 지향 워크플로(에이전트 루프) - LLM은 제어 흐름을 구동합니다. 에이전트는 호출할 도구, 순서 및 작업이 완료된 시기를 결정합니다. 도구와 지침을 제공하지만 에이전트는 런타임에 실행 경로를 결정합니다.
두 방법 모두 지속성 실행 의 이점을 활용하며 지속성 작업 프로그래밍 모델을 사용하여 구현할 수 있습니다. 이 문서에서는 C#, Python, JavaScript/TypeScript 및 Java 즉시 사용할 수 있는 코드 샘플을 사용하여 각 패턴을 빌드하는 방법을 보여 줍니다.
이 페이지에서 다음을 수행합니다.
팁 (조언)
이러한 패턴은 Anthropic의 효과적인 에이전트 빌드에 설명된 에이전트 워크플로 디자인과 일치합니다. 지속성 작업 프로그래밍 모델은 이러한 패턴에 자연스럽게 매핑됩니다. 오케스트레이션 은 워크플로 제어 흐름을 정의하고 자동으로 검사점이 지정되며, 활동 은 LLM 호출, 도구 호출 및 API 요청과 같은 비결정적 작업을 래핑합니다.
방법 선택
다음 표에서는 각 방법을 사용할 시기를 결정하는 데 도움이 됩니다.
| 결정적 워크플로를 사용하는 경우... |
에이전트 루프를 사용하는 경우... |
| 단계 시퀀스는 미리 알려져 있습니다. |
작업이 개방형이며 단계를 예측할 수 없습니다. |
| 에이전트 동작에 대한 명시적 가드레일이 필요합니다. |
LLM이 사용할 도구와 시기를 스스로 결정하도록 하고 싶습니다. |
| 규정 준수 또는 감사에는 검토 가능한 제어 흐름이 필요합니다. |
에이전트는 중간 결과에 따라 접근 방식을 조정해야 합니다. |
| 단일 워크플로에서 여러 AI 프레임워크를 결합하려고 합니다. |
도구 호출 기능을 사용하여 대화형 에이전트를 빌드하고 있습니다. |
두 방법 모두 지속성 실행을 통해 자동 검사점, 재시도 정책, 분산 크기 조정 및 휴먼 인 더 루프 지원을 제공합니다.
결정적 워크플로 패턴
결정적 워크플로에서 코드는 실행 경로를 제어합니다. LLM은 워크플로 내의 단계로 호출되지만 다음에 수행되는 작업을 결정하지는 않습니다. 지속성 작업 프로그래밍 모델은 다음 접근 방식에 자연스럽게 매핑됩니다.
-
오케스트레이션 은 워크플로 제어 흐름(시퀀스, 분기, 병렬 처리, 오류 처리)을 정의하고 자동으로 검사점이 지정됩니다.
-
활동은 LLM 호출, 도구 호출 및 API 요청과 같은 비결정적 작업을 래핑합니다. 활동은 사용 가능한 모든 컴퓨팅 인스턴스에서 실행할 수 있습니다.
다음 예제에서는 Azure Container Apps, Kubernetes, 가상 머신 또는 로컬을 포함한 모든 호스트 컴퓨팅에서 실행되는 이식 가능한 Durable Task SDK 사용합니다.
프롬프트 연결 패턴
프롬프트 연결은 가장 간단한 에이전트 패턴입니다. 복잡한 작업을 일련의 순차적 LLM 상호 작용으로 나누면 각 단계의 출력이 다음 단계의 입력으로 공급됩니다. 각 작업 호출은 자동으로 체크포인트가 지정되므로 파이프라인 도중에 충돌이 발생하더라도 처음부터 다시 시작하지 않아도 되며, 비용이 많이 드는 LLM 토큰을 재사용할 필요가 없습니다. 마지막으로 완료된 단계부터 실행이 다시 시작됩니다.
단계 사이에 프로그래밍 방식 유효성 검사 게이트를 삽입할 수도 있습니다. 예를 들어 개요를 생성한 후에는 길이 또는 토픽 제약 조건을 충족하는지 확인한 후 초안 작성 단계에 전달할 수 있습니다.
이 패턴은 지속성 작업 프로그래밍 모델의 함수 연결 패턴에 직접 매핑됩니다.
사용 시기: 콘텐츠 생성 파이프라인, 다단계 문서 처리, 순차적 데이터 보강, 중간 유효성 검사 게이트가 필요한 워크플로.
메모
다음 모든 예제에서 오케스트레이션 상태는 각 await/yield 문에서 자동으로 검사점이 지정됩니다. 호스트 프로세스가 충돌하거나 VM이 재활용되면 오케스트레이션은 다시 시작하는 대신 마지막으로 완료된 단계에서 다시 시작됩니다.
[Function(nameof(PromptChainingOrchestration))]
public async Task<string> PromptChainingOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var topic = context.GetInput<string>();
// Step 1: Generate research outline
string outline = await context.CallActivityAsync<string>(
nameof(GenerateOutlineAgent), topic);
// Step 2: Write first draft from outline
string draft = await context.CallActivityAsync<string>(
nameof(WriteDraftAgent), outline);
// Step 3: Refine and polish the draft
string finalContent = await context.CallActivityAsync<string>(
nameof(RefineDraftAgent), draft);
return finalContent;
}
@app.orchestration_trigger(context_name="context")
def prompt_chaining_orchestration(context: df.DurableOrchestrationContext):
topic = context.get_input()
# Step 1: Generate research outline
outline = yield context.call_activity("generate_outline_agent", topic)
# Step 2: Write first draft from outline
draft = yield context.call_activity("write_draft_agent", outline)
# Step 3: Refine and polish the draft
final_content = yield context.call_activity("refine_draft_agent", draft)
return final_content
const df = require("durable-functions");
df.app.orchestration("promptChainingOrchestration", function* (context) {
const topic = context.df.getInput();
// Step 1: Generate research outline
const outline = yield context.df.callActivity("generateOutlineAgent", topic);
// Step 2: Write first draft from outline
const draft = yield context.df.callActivity("writeDraftAgent", outline);
// Step 3: Refine and polish the draft
const finalContent = yield context.df.callActivity("refineDraftAgent", draft);
return finalContent;
});
@FunctionName("PromptChainingOrchestration")
public String promptChainingOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String topic = ctx.getInput(String.class);
// Step 1: Generate research outline
String outline = ctx.callActivity(
"GenerateOutlineAgent", topic, String.class).await();
// Step 2: Write first draft from outline
String draft = ctx.callActivity(
"WriteDraftAgent", outline, String.class).await();
// Step 3: Refine and polish the draft
String finalContent = ctx.callActivity(
"RefineDraftAgent", draft, String.class).await();
return finalContent;
}
[DurableTask]
public class PromptChainingOrchestration : TaskOrchestrator<string, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, string topic)
{
// Step 1: Generate research outline
string outline = await context.CallActivityAsync<string>(
nameof(GenerateOutlineAgent), topic);
// Step 2: Write first draft from outline
string draft = await context.CallActivityAsync<string>(
nameof(WriteDraftAgent), outline);
// Step 3: Refine and polish the draft
string finalContent = await context.CallActivityAsync<string>(
nameof(RefineDraftAgent), draft);
return finalContent;
}
}
def prompt_chaining_orchestration(ctx: task.OrchestrationContext, topic: str) -> str:
# Step 1: Generate research outline
outline = yield ctx.call_activity(generate_outline_agent, input=topic)
# Step 2: Write first draft from outline
draft = yield ctx.call_activity(write_draft_agent, input=outline)
# Step 3: Refine and polish the draft
final_content = yield ctx.call_activity(refine_draft_agent, input=draft)
return final_content
const promptChainingOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, topic: string): any {
// Step 1: Generate research outline
const outline: string = yield ctx.callActivity(generateOutlineAgent, topic);
// Step 2: Write first draft from outline
const draft: string = yield ctx.callActivity(writeDraftAgent, outline);
// Step 3: Refine and polish the draft
const finalContent: string = yield ctx.callActivity(refineDraftAgent, draft);
return finalContent;
};
ctx -> {
String topic = ctx.getInput(String.class);
// Step 1: Generate research outline
String outline = ctx.callActivity(
"GenerateOutlineAgent", topic, String.class).await();
// Step 2: Write first draft from outline
String draft = ctx.callActivity(
"WriteDraftAgent", outline, String.class).await();
// Step 3: Refine and polish the draft
String finalContent = ctx.callActivity(
"RefineDraftAgent", draft, String.class).await();
ctx.complete(finalContent);
}
Routing
라우팅은 분류 단계를 사용하여 요청을 처리해야 하는 다운스트림 에이전트 또는 모델을 결정합니다. 오케스트레이션은 먼저 분류자 작업을 호출한 다음 결과에 따라 적절한 처리기로 분기합니다. 이 방법을 사용하면 각 처리기의 프롬프트, 모델 및 도구 집합을 독립적으로 조정할 수 있습니다. 예를 들어 더 가벼운 모델에 일반적인 질문을 보내면서 결제 API에 액세스할 수 있는 전문 에이전트에 청구 질문을 보낼 수 있습니다.
사용 시기: 고객 지원 심사, 특수 에이전트에 대한 의도 분류, 작업 복잡성에 따른 동적 모델 선택.
[Function(nameof(RoutingOrchestration))]
public async Task<string> RoutingOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<SupportRequest>();
// Classify the request type
string category = await context.CallActivityAsync<string>(
nameof(ClassifyRequestAgent), request.Message);
// Route to the appropriate specialized agent
return category switch
{
"billing" => await context.CallActivityAsync<string>(
nameof(BillingAgent), request),
"technical" => await context.CallActivityAsync<string>(
nameof(TechnicalSupportAgent), request),
"general" => await context.CallActivityAsync<string>(
nameof(GeneralInquiryAgent), request),
_ => await context.CallActivityAsync<string>(
nameof(GeneralInquiryAgent), request),
};
}
@app.orchestration_trigger(context_name="context")
def routing_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
# Classify the request type
category = yield context.call_activity("classify_request_agent", request["message"])
# Route to the appropriate specialized agent
if category == "billing":
return (yield context.call_activity("billing_agent", request))
elif category == "technical":
return (yield context.call_activity("technical_support_agent", request))
else:
return (yield context.call_activity("general_inquiry_agent", request))
const df = require("durable-functions");
df.app.orchestration("routingOrchestration", function* (context) {
const request = context.df.getInput();
// Classify the request type
const category = yield context.df.callActivity("classifyRequestAgent", request.message);
// Route to the appropriate specialized agent
switch (category) {
case "billing":
return yield context.df.callActivity("billingAgent", request);
case "technical":
return yield context.df.callActivity("technicalSupportAgent", request);
default:
return yield context.df.callActivity("generalInquiryAgent", request);
}
});
@FunctionName("RoutingOrchestration")
public String routingOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
SupportRequest request = ctx.getInput(SupportRequest.class);
// Classify the request type
String category = ctx.callActivity(
"ClassifyRequestAgent", request.getMessage(), String.class).await();
// Route to the appropriate specialized agent
return switch (category) {
case "billing" -> ctx.callActivity(
"BillingAgent", request, String.class).await();
case "technical" -> ctx.callActivity(
"TechnicalSupportAgent", request, String.class).await();
default -> ctx.callActivity(
"GeneralInquiryAgent", request, String.class).await();
};
}
[DurableTask]
public class RoutingOrchestration : TaskOrchestrator<SupportRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, SupportRequest request)
{
// Classify the request type
string category = await context.CallActivityAsync<string>(
nameof(ClassifyRequestAgent), request.Message);
// Route to the appropriate specialized agent
return category switch
{
"billing" => await context.CallActivityAsync<string>(
nameof(BillingAgent), request),
"technical" => await context.CallActivityAsync<string>(
nameof(TechnicalSupportAgent), request),
_ => await context.CallActivityAsync<string>(
nameof(GeneralInquiryAgent), request),
};
}
}
def routing_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
# Classify the request type
category = yield ctx.call_activity(classify_request_agent, input=request["message"])
# Route to the appropriate specialized agent
if category == "billing":
return (yield ctx.call_activity(billing_agent, input=request))
elif category == "technical":
return (yield ctx.call_activity(technical_support_agent, input=request))
else:
return (yield ctx.call_activity(general_inquiry_agent, input=request))
const routingOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, request: SupportRequest): any {
// Classify the request type
const category: string = yield ctx.callActivity(classifyRequestAgent, request.message);
// Route to the appropriate specialized agent
switch (category) {
case "billing":
return yield ctx.callActivity(billingAgent, request);
case "technical":
return yield ctx.callActivity(technicalSupportAgent, request);
default:
return yield ctx.callActivity(generalInquiryAgent, request);
}
};
ctx -> {
SupportRequest request = ctx.getInput(SupportRequest.class);
// Classify the request type
String category = ctx.callActivity(
"ClassifyRequestAgent", request.getMessage(), String.class).await();
// Route to the appropriate specialized agent
String result = switch (category) {
case "billing" -> ctx.callActivity(
"BillingAgent", request, String.class).await();
case "technical" -> ctx.callActivity(
"TechnicalSupportAgent", request, String.class).await();
default -> ctx.callActivity(
"GeneralInquiryAgent", request, String.class).await();
};
ctx.complete(result);
}
병렬화
여러 독립 하위 작업이 있는 경우 병렬 작업 호출로 디스패치하고 계속하기 전에 모든 결과를 기다릴 수 있습니다. 지속성 작업 스케줄러는 사용 가능한 모든 컴퓨팅 인스턴스에 이러한 작업을 자동으로 분산하므로 작업자를 더 많이 추가하면 총 벽시계 시간이 직접 줄어듭니다.
일반적인 변형은 다중 모델 투표입니다. 동일한 프롬프트를 여러 모델(또는 온도가 다른 동일한 모델)에 병렬로 보낸 다음 응답에서 집계하거나 선택합니다. 각 병렬 분기는 독립적으로 검사점이므로 한 분기의 일시적인 오류는 다른 분기에 영향을 주지 않습니다.
이 패턴은 Durable Task의 팬아웃/팬인 패턴에 직접적으로 매핑됩니다.
사용 시기: 문서 일괄 분석, 병렬 도구 호출, 다중 모델 평가, 여러 검토자를 사용한 콘텐츠 조정
[Function(nameof(ParallelResearchOrchestration))]
public async Task<string> ParallelResearchOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<ResearchRequest>();
// Fan-out: research multiple subtopics in parallel
var researchTasks = request.Subtopics
.Select(subtopic => context.CallActivityAsync<string>(
nameof(ResearchSubtopicAgent), subtopic))
.ToList();
string[] researchResults = await Task.WhenAll(researchTasks);
// Aggregate: synthesize all research into a single summary
string summary = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = researchResults });
return summary;
}
@app.orchestration_trigger(context_name="context")
def parallel_research_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
# Fan-out: research multiple subtopics in parallel
research_tasks = []
for subtopic in request["subtopics"]:
research_tasks.append(
context.call_activity("research_subtopic_agent", subtopic)
)
research_results = yield context.task_all(research_tasks)
# Aggregate: synthesize all research into a single summary
summary = yield context.call_activity("synthesize_agent", {
"topic": request["topic"],
"research": research_results
})
return summary
const df = require("durable-functions");
df.app.orchestration("parallelResearchOrchestration", function* (context) {
const request = context.df.getInput();
// Fan-out: research multiple subtopics in parallel
const tasks = request.subtopics.map((subtopic) =>
context.df.callActivity("researchSubtopicAgent", subtopic)
);
const researchResults = yield context.df.Task.all(tasks);
// Aggregate: synthesize all research into a single summary
const summary = yield context.df.callActivity("synthesizeAgent", {
topic: request.topic,
research: researchResults,
});
return summary;
});
@FunctionName("ParallelResearchOrchestration")
public String parallelResearchOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Fan-out: research multiple subtopics in parallel
List<Task<String>> tasks = request.getSubtopics().stream()
.map(subtopic -> ctx.callActivity(
"ResearchSubtopicAgent", subtopic, String.class))
.collect(Collectors.toList());
List<String> researchResults = ctx.allOf(tasks).await();
// Aggregate: synthesize all research into a single summary
String summary = ctx.callActivity(
"SynthesizeAgent", researchResults, String.class).await();
return summary;
}
[DurableTask]
public class ParallelResearchOrchestration : TaskOrchestrator<ResearchRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, ResearchRequest request)
{
// Fan-out: research multiple subtopics in parallel
var researchTasks = request.Subtopics
.Select(subtopic => context.CallActivityAsync<string>(
nameof(ResearchSubtopicAgent), subtopic))
.ToList();
string[] researchResults = await Task.WhenAll(researchTasks);
// Aggregate: synthesize all research into a single summary
string summary = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = researchResults });
return summary;
}
}
def parallel_research_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
# Fan-out: research multiple subtopics in parallel
research_tasks = []
for subtopic in request["subtopics"]:
research_tasks.append(
ctx.call_activity(research_subtopic_agent, input=subtopic)
)
research_results = yield task.when_all(research_tasks)
# Aggregate: synthesize all research into a single summary
summary = yield ctx.call_activity(synthesize_agent, input={
"topic": request["topic"],
"research": research_results
})
return summary
const parallelResearchOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext,
request: { topic: string; subtopics: string[] }): any {
// Fan-out: research multiple subtopics in parallel
const tasks = request.subtopics.map((subtopic) =>
ctx.callActivity(researchSubtopicAgent, subtopic)
);
const researchResults: string[] = yield whenAll(tasks);
// Aggregate: synthesize all research into a single summary
const summary: string = yield ctx.callActivity(synthesizeAgent, {
topic: request.topic,
research: researchResults,
});
return summary;
};
ctx -> {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Fan-out: research multiple subtopics in parallel
List<Task<String>> tasks = request.getSubtopics().stream()
.map(subtopic -> ctx.callActivity(
"ResearchSubtopicAgent", subtopic, String.class))
.collect(Collectors.toList());
List<String> researchResults = ctx.allOf(tasks).await();
// Aggregate: synthesize all research into a single summary
String summary = ctx.callActivity(
"SynthesizeAgent", researchResults, String.class).await();
ctx.complete(summary);
}
오케스트레이터-작업자 패턴
이 패턴에서 중앙 오케스트레이터는 먼저 활동을 통해 LLM을 호출하여 작업을 계획합니다. LLM의 출력에 따라 오케스트레이터는 필요한 하위 작업을 결정합니다. 그런 다음 오케스트레이터는 이러한 하위 작업을 특수 작업자 오케스트레이션에 디스패치합니다. 병렬화의 주요 차이점은 하위 작업 집합이 디자인 타임에 고정되지 않는다는 것입니다. 오케스트레이터는 런타임에 동적으로 결정합니다.
이 패턴은 서브 오케스트레이션을 사용하며, 이는 독립적으로 체크포인트가 생성되는 하위 워크플로입니다. 각 작업자 오케스트레이션 자체에는 여러 단계, 재시도 및 중첩 병렬 처리가 포함될 수 있습니다.
사용 시기: 심층 연구 파이프라인, 여러 파일을 수정하는 코딩 에이전트 워크플로, 각 에이전트에 고유한 역할이 있는 다중 에이전트 공동 작업
[Function(nameof(OrchestratorWorkersOrchestration))]
public async Task<string> OrchestratorWorkersOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<ResearchRequest>();
// Central orchestrator: determine what research is needed
string[] subtasks = await context.CallActivityAsync<string[]>(
nameof(PlanResearchAgent), request.Topic);
// Delegate to worker orchestrations in parallel
var workerTasks = subtasks
.Select(subtask => context.CallSubOrchestratorAsync<string>(
nameof(ResearchWorkerOrchestration), subtask))
.ToList();
string[] results = await Task.WhenAll(workerTasks);
// Synthesize results
string finalReport = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = results });
return finalReport;
}
@app.orchestration_trigger(context_name="context")
def orchestrator_workers_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
# Central orchestrator: determine what research is needed
subtasks = yield context.call_activity("plan_research_agent", request["topic"])
# Delegate to worker orchestrations in parallel
worker_tasks = []
for subtask in subtasks:
worker_tasks.append(
context.call_sub_orchestrator("research_worker_orchestration", subtask)
)
results = yield context.task_all(worker_tasks)
# Synthesize results
final_report = yield context.call_activity("synthesize_agent", {
"topic": request["topic"],
"research": results
})
return final_report
const df = require("durable-functions");
df.app.orchestration("orchestratorWorkersOrchestration", function* (context) {
const request = context.df.getInput();
// Central orchestrator: determine what research is needed
const subtasks = yield context.df.callActivity("planResearchAgent", request.topic);
// Delegate to worker orchestrations in parallel
const workerTasks = subtasks.map((subtask) =>
context.df.callSubOrchestrator("researchWorkerOrchestration", subtask)
);
const results = yield context.df.Task.all(workerTasks);
// Synthesize results
const finalReport = yield context.df.callActivity("synthesizeAgent", {
topic: request.topic,
research: results,
});
return finalReport;
});
@FunctionName("OrchestratorWorkersOrchestration")
public String orchestratorWorkersOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Central orchestrator: determine what research is needed
List<String> subtasks = ctx.callActivity(
"PlanResearchAgent", request.getTopic(), List.class).await();
// Delegate to worker orchestrations in parallel
List<Task<String>> workerTasks = subtasks.stream()
.map(subtask -> ctx.callSubOrchestrator(
"ResearchWorkerOrchestration", subtask, String.class))
.collect(Collectors.toList());
List<String> results = ctx.allOf(workerTasks).await();
// Synthesize results
String finalReport = ctx.callActivity(
"SynthesizeAgent", results, String.class).await();
return finalReport;
}
[DurableTask]
public class OrchestratorWorkersOrchestration : TaskOrchestrator<ResearchRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, ResearchRequest request)
{
// Central orchestrator: determine what research is needed
string[] subtasks = await context.CallActivityAsync<string[]>(
nameof(PlanResearchAgent), request.Topic);
// Delegate to worker orchestrations in parallel
var workerTasks = subtasks
.Select(subtask => context.CallSubOrchestratorAsync<string>(
nameof(ResearchWorkerOrchestration), subtask))
.ToList();
string[] results = await Task.WhenAll(workerTasks);
// Synthesize results
string finalReport = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = results });
return finalReport;
}
}
def orchestrator_workers_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
# Central orchestrator: determine what research is needed
subtasks = yield ctx.call_activity(plan_research_agent, input=request["topic"])
# Delegate to worker orchestrations in parallel
worker_tasks = []
for subtask in subtasks:
worker_tasks.append(
ctx.call_sub_orchestrator(research_worker_orchestration, input=subtask)
)
results = yield task.when_all(worker_tasks)
# Synthesize results
final_report = yield ctx.call_activity(synthesize_agent, input={
"topic": request["topic"],
"research": results
})
return final_report
const orchestratorWorkersOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, request: ResearchRequest): any {
// Central orchestrator: determine what research is needed
const subtasks: string[] = yield ctx.callActivity(planResearchAgent, request.topic);
// Delegate to worker orchestrations in parallel
const workerTasks = subtasks.map((subtask) =>
ctx.callSubOrchestrator(researchWorkerOrchestration, subtask)
);
const results: string[] = yield whenAll(workerTasks);
// Synthesize results
const finalReport: string = yield ctx.callActivity(synthesizeAgent, {
topic: request.topic,
research: results,
});
return finalReport;
};
ctx -> {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Central orchestrator: determine what research is needed
List<String> subtasks = ctx.callActivity(
"PlanResearchAgent", request.getTopic(), List.class).await();
// Delegate to worker orchestrations in parallel
List<Task<String>> workerTasks = subtasks.stream()
.map(subtask -> ctx.callSubOrchestrator(
"ResearchWorkerOrchestration", subtask, String.class))
.collect(Collectors.toList());
List<String> results = ctx.allOf(workerTasks).await();
// Synthesize results
String finalReport = ctx.callActivity(
"SynthesizeAgent", results, String.class).await();
ctx.complete(finalReport);
}
평가자-최적화자 패턴
계산기 최적화 프로그램 패턴은 구체화 루프에서 계산기 에이전트와 생성기 에이전트를 쌍으로 연결합니다. 생성기는 출력을 생성하고, 평가자는 품질 기준에 대해 점수를 매기고 피드백을 제공하며, 출력이 통과하거나 최대 반복 횟수에 도달할 때까지 루프가 반복됩니다. 각 루프 반복은 검사점이므로 세 번의 구체화에 성공한 후 충돌이 발생해도 진행률이 손실되지 않습니다.
이 패턴은 품질이 프로그래밍 방식으로 측정될 수 있는 경우에 특히 유용합니다. 예를 들어 생성된 코드가 컴파일되는지 또는 번역에서 명명된 엔터티를 유지하는지 확인합니다.
사용 시기: 자동화된 검토, 문학 번역, 반복 콘텐츠 구체화, 여러 라운드의 분석이 필요한 복잡한 검색 작업을 사용하여 코드를 생성합니다.
[Function(nameof(EvaluatorOptimizerOrchestration))]
public async Task<string> EvaluatorOptimizerOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<ContentRequest>();
int maxIterations = 5;
string content = "";
string feedback = "";
for (int i = 0; i < maxIterations; i++)
{
// Generate or refine content
content = await context.CallActivityAsync<string>(
nameof(GenerateContentAgent),
new { request.Prompt, PreviousContent = content, Feedback = feedback });
// Evaluate quality
var evaluation = await context.CallActivityAsync<EvaluationResult>(
nameof(EvaluateContentAgent), content);
if (evaluation.MeetsQualityBar)
return content;
feedback = evaluation.Feedback;
}
return content; // Return best effort after max iterations
}
@app.orchestration_trigger(context_name="context")
def evaluator_optimizer_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
max_iterations = 5
content = ""
feedback = ""
for i in range(max_iterations):
# Generate or refine content
content = yield context.call_activity("generate_content_agent", {
"prompt": request["prompt"],
"previous_content": content,
"feedback": feedback
})
# Evaluate quality
evaluation = yield context.call_activity("evaluate_content_agent", content)
if evaluation["meets_quality_bar"]:
return content
feedback = evaluation["feedback"]
return content # Return best effort after max iterations
const df = require("durable-functions");
df.app.orchestration("evaluatorOptimizerOrchestration", function* (context) {
const request = context.df.getInput();
const maxIterations = 5;
let content = "";
let feedback = "";
for (let i = 0; i < maxIterations; i++) {
// Generate or refine content
content = yield context.df.callActivity("generateContentAgent", {
prompt: request.prompt,
previousContent: content,
feedback: feedback,
});
// Evaluate quality
const evaluation = yield context.df.callActivity("evaluateContentAgent", content);
if (evaluation.meetsQualityBar) {
return content;
}
feedback = evaluation.feedback;
}
return content; // Return best effort after max iterations
});
@FunctionName("EvaluatorOptimizerOrchestration")
public String evaluatorOptimizerOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ContentRequest request = ctx.getInput(ContentRequest.class);
int maxIterations = 5;
String content = "";
String feedback = "";
for (int i = 0; i < maxIterations; i++) {
// Generate or refine content
content = ctx.callActivity("GenerateContentAgent",
new GenerateInput(request.getPrompt(), content, feedback),
String.class).await();
// Evaluate quality
EvaluationResult evaluation = ctx.callActivity(
"EvaluateContentAgent", content, EvaluationResult.class).await();
if (evaluation.meetsQualityBar()) {
return content;
}
feedback = evaluation.getFeedback();
}
return content; // Return best effort after max iterations
}
[DurableTask]
public class EvaluatorOptimizerOrchestration : TaskOrchestrator<ContentRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, ContentRequest request)
{
int maxIterations = 5;
string content = "";
string feedback = "";
for (int i = 0; i < maxIterations; i++)
{
// Generate or refine content
content = await context.CallActivityAsync<string>(
nameof(GenerateContentAgent),
new { request.Prompt, PreviousContent = content, Feedback = feedback });
// Evaluate quality
var evaluation = await context.CallActivityAsync<EvaluationResult>(
nameof(EvaluateContentAgent), content);
if (evaluation.MeetsQualityBar)
return content;
feedback = evaluation.Feedback;
}
return content; // Return best effort after max iterations
}
}
def evaluator_optimizer_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
max_iterations = 5
content = ""
feedback = ""
for i in range(max_iterations):
# Generate or refine content
content = yield ctx.call_activity(generate_content_agent, input={
"prompt": request["prompt"],
"previous_content": content,
"feedback": feedback
})
# Evaluate quality
evaluation = yield ctx.call_activity(evaluate_content_agent, input=content)
if evaluation["meets_quality_bar"]:
return content
feedback = evaluation["feedback"]
return content # Return best effort after max iterations
const evaluatorOptimizerOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, request: ContentRequest): any {
const maxIterations = 5;
let content = "";
let feedback = "";
for (let i = 0; i < maxIterations; i++) {
// Generate or refine content
content = yield ctx.callActivity(generateContentAgent, {
prompt: request.prompt,
previousContent: content,
feedback: feedback,
});
// Evaluate quality
const evaluation = yield ctx.callActivity(evaluateContentAgent, content);
if (evaluation.meetsQualityBar) {
return content;
}
feedback = evaluation.feedback;
}
return content; // Return best effort after max iterations
};
ctx -> {
ContentRequest request = ctx.getInput(ContentRequest.class);
int maxIterations = 5;
String content = "";
String feedback = "";
for (int i = 0; i < maxIterations; i++) {
// Generate or refine content
content = ctx.callActivity("GenerateContentAgent",
new GenerateInput(request.getPrompt(), content, feedback),
String.class).await();
// Evaluate quality
EvaluationResult evaluation = ctx.callActivity(
"EvaluateContentAgent", content, EvaluationResult.class).await();
if (evaluation.meetsQualityBar()) {
ctx.complete(content);
return;
}
feedback = evaluation.getFeedback();
}
ctx.complete(content); // Return best effort after max iterations
}
에이전트 루프
일반적인 AI 에이전트 구현에서는 작업이 완료되거나 중지 조건에 도달할 때까지 LLM이 루프에서 호출되어 도구를 호출하고 결정을 내립니다. 결정적 워크플로와 달리 실행 경로는 미리 정의되지 않습니다. 에이전트는 이전 단계의 결과에 따라 각 단계에서 수행할 작업을 결정합니다.
에이전트 루프는 단계의 수 또는 순서를 예측할 수 없는 작업에 적합합니다. 일반적인 예로는 개방형 코딩 에이전트, 자율 연구 및 도구 호출 기능이 있는 대화형 봇이 있습니다.
지속성 작업 프로그래밍 모델을 사용하여 에이전트 루프를 구현하는 두 가지 권장 방법이 있습니다.
| 접근법 |
설명 |
사용 시기 |
|
오케스트레이션 기반 |
에이전트 루프는 지속적인 오케스트레이션입니다. 도구 호출은 활동입니다. 사용자 입력은 외부 이벤트를 사용합니다. |
세분화된 제어, 도구별 재시도 정책, 분산 도구 실행, IDE 디버깅. |
|
엔터티 기반 |
각 에이전트 인스턴스는 지속성 엔터티입니다. 에이전트 프레임워크는 루프를 관리합니다. 엔터티는 세션 지속성을 제공합니다. |
에이전트 루프가 이미 있고 최소한의 변경으로 내구성을 추가하려는 프레임워크(예: Microsoft Agent Framework)를 사용하고 있습니다. |
오케스트레이션 기반 에이전트 루프
오케스트레이션 기반 에이전트 루프는 여러 Durable Task 기능을 결합합니다: 영구 오케스트레이션(continue-as-new)를 통한 메모리 제한 유지, 병렬 도구 실행을 위한 팬 아웃/팬 인, 그리고 휴먼 인 더 루프 상호작용을 위한 외부 이벤트. 루프의 각 반복:
- 현재 대화 컨텍스트를 활동 또는 상태 저장 엔터티를 통해 LLM에 보냅니다.
- 도구 호출을 포함할 수 있는 LLM의 응답을 받습니다.
- 모든 도구 호출을 활동으로 실행합니다(사용 가능한 컴퓨팅에 분산됨).
- 필요에 따라 외부 이벤트를 사용하여 사용자 입력을 기다립니다.
- 업데이트된 상태로 루프를 계속하거나 에이전트가 완료되었음을 알릴 때 완료됩니다.
[Function(nameof(AgentLoopOrchestration))]
public async Task<string> AgentLoopOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
// Get state from input (supports continue-as-new)
var state = context.GetInput<AgentState>() ?? new AgentState();
int maxIterations = 100;
while (state.Iteration < maxIterations)
{
// Send conversation history to the LLM
var llmResponse = await context.CallActivityAsync<LlmResponse>(
nameof(CallLlmAgent), state.Messages);
state.Messages.Add(llmResponse.Message);
// If the LLM returned tool calls, execute them in parallel
if (llmResponse.ToolCalls is { Count: > 0 })
{
var toolTasks = llmResponse.ToolCalls
.Select(tc => context.CallActivityAsync<ToolResult>(
nameof(ExecuteTool), tc))
.ToList();
ToolResult[] toolResults = await Task.WhenAll(toolTasks);
foreach (var result in toolResults)
state.Messages.Add(result.ToMessage());
}
// If the LLM needs human input, wait for it
else if (llmResponse.NeedsHumanInput)
{
string humanInput = await context.WaitForExternalEvent<string>("HumanInput");
state.Messages.Add(new Message("user", humanInput));
}
// LLM is done
else
{
return llmResponse.FinalAnswer;
}
state.Iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.Iteration % 10 == 0)
{
context.ContinueAsNew(state);
return null!; // Orchestration will restart with updated state
}
}
return "Max iterations reached.";
}
@app.orchestration_trigger(context_name="context")
def agent_loop_orchestration(context: df.DurableOrchestrationContext):
# Get state from input (supports continue-as-new)
state = context.get_input() or {"messages": [], "iteration": 0}
max_iterations = 100
while state["iteration"] < max_iterations:
# Send conversation history to the LLM
llm_response = yield context.call_activity("call_llm_agent", state["messages"])
state["messages"].append(llm_response["message"])
# If the LLM returned tool calls, execute them
if llm_response.get("tool_calls"):
tool_tasks = [
context.call_activity("execute_tool", tc)
for tc in llm_response["tool_calls"]
]
tool_results = yield context.task_all(tool_tasks)
for result in tool_results:
state["messages"].append(result)
# If the LLM needs human input, wait for it
elif llm_response.get("needs_human_input"):
human_input = yield context.wait_for_external_event("HumanInput")
state["messages"].append({"role": "user", "content": human_input})
# LLM is done
else:
return llm_response["final_answer"]
state["iteration"] += 1
# Periodically continue-as-new to keep the history bounded
if state["iteration"] % 10 == 0:
context.continue_as_new(state)
return
return "Max iterations reached."
const df = require("durable-functions");
df.app.orchestration("agentLoopOrchestration", function* (context) {
// Get state from input (supports continue-as-new)
const state = context.df.getInput() || { messages: [], iteration: 0 };
const maxIterations = 100;
while (state.iteration < maxIterations) {
// Send conversation history to the LLM
const llmResponse = yield context.df.callActivity("callLlmAgent", state.messages);
state.messages.push(llmResponse.message);
// If the LLM returned tool calls, execute them
if (llmResponse.toolCalls && llmResponse.toolCalls.length > 0) {
const toolTasks = llmResponse.toolCalls.map((tc) =>
context.df.callActivity("executeTool", tc)
);
const toolResults = yield context.df.Task.all(toolTasks);
for (const result of toolResults) {
state.messages.push(result);
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput) {
const humanInput = yield context.df.waitForExternalEvent("HumanInput");
state.messages.push({ role: "user", content: humanInput });
// LLM is done
} else {
return llmResponse.finalAnswer;
}
state.iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.iteration % 10 === 0) {
context.df.continueAsNew(state);
return;
}
}
return "Max iterations reached.";
});
@FunctionName("AgentLoopOrchestration")
public String agentLoopOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get state from input (supports continue-as-new)
AgentState state = ctx.getInput(AgentState.class);
if (state == null) state = new AgentState();
int maxIterations = 100;
while (state.getIteration() < maxIterations) {
// Send conversation history to the LLM
LlmResponse llmResponse = ctx.callActivity(
"CallLlmAgent", state.getMessages(), LlmResponse.class).await();
state.getMessages().add(llmResponse.getMessage());
// If the LLM returned tool calls, execute them
if (llmResponse.getToolCalls() != null && !llmResponse.getToolCalls().isEmpty()) {
List<Task<ToolResult>> toolTasks = llmResponse.getToolCalls().stream()
.map(tc -> ctx.callActivity("ExecuteTool", tc, ToolResult.class))
.collect(Collectors.toList());
List<ToolResult> toolResults = ctx.allOf(toolTasks).await();
for (ToolResult result : toolResults) {
state.getMessages().add(result.toMessage());
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput()) {
String humanInput = ctx.waitForExternalEvent("HumanInput", String.class).await();
state.getMessages().add(new Message("user", humanInput));
// LLM is done
} else {
return llmResponse.getFinalAnswer();
}
state.incrementIteration();
// Periodically continue-as-new to keep the history bounded
if (state.getIteration() % 10 == 0) {
ctx.continueAsNew(state);
return null;
}
}
return "Max iterations reached.";
}
[DurableTask]
public class AgentLoopOrchestration : TaskOrchestrator<AgentState, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, AgentState? state)
{
state ??= new AgentState();
int maxIterations = 100;
while (state.Iteration < maxIterations)
{
// Send conversation history to the LLM
var llmResponse = await context.CallActivityAsync<LlmResponse>(
nameof(CallLlmAgent), state.Messages);
state.Messages.Add(llmResponse.Message);
// If the LLM returned tool calls, execute them
if (llmResponse.ToolCalls is { Count: > 0 })
{
var toolTasks = llmResponse.ToolCalls
.Select(tc => context.CallActivityAsync<ToolResult>(
nameof(ExecuteTool), tc))
.ToList();
ToolResult[] toolResults = await Task.WhenAll(toolTasks);
foreach (var result in toolResults)
state.Messages.Add(result.ToMessage());
}
// If the LLM needs human input, wait for it
else if (llmResponse.NeedsHumanInput)
{
string humanInput = await context.WaitForExternalEvent<string>("HumanInput");
state.Messages.Add(new Message("user", humanInput));
}
// LLM is done
else
{
return llmResponse.FinalAnswer;
}
state.Iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.Iteration % 10 == 0)
{
context.ContinueAsNew(state);
return null!;
}
}
return "Max iterations reached.";
}
}
def agent_loop_orchestration(ctx: task.OrchestrationContext, state: dict | None) -> str:
if state is None:
state = {"messages": [], "iteration": 0}
max_iterations = 100
while state["iteration"] < max_iterations:
# Send conversation history to the LLM
llm_response = yield ctx.call_activity(call_llm_agent, input=state["messages"])
state["messages"].append(llm_response["message"])
# If the LLM returned tool calls, execute them
if llm_response.get("tool_calls"):
tool_tasks = [
ctx.call_activity(execute_tool, input=tc)
for tc in llm_response["tool_calls"]
]
tool_results = yield task.when_all(tool_tasks)
for result in tool_results:
state["messages"].append(result)
# If the LLM needs human input, wait for it
elif llm_response.get("needs_human_input"):
human_input = yield ctx.wait_for_external_event("HumanInput")
state["messages"].append({"role": "user", "content": human_input})
# LLM is done
else:
return llm_response["final_answer"]
state["iteration"] += 1
# Periodically continue-as-new to keep the history bounded
if state["iteration"] % 10 == 0:
ctx.continue_as_new(state)
return
return "Max iterations reached."
const agentLoopOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, state: AgentState | null): any {
if (!state) state = { messages: [], iteration: 0 };
const maxIterations = 100;
while (state.iteration < maxIterations) {
// Send conversation history to the LLM
const llmResponse = yield ctx.callActivity(callLlmAgent, state.messages);
state.messages.push(llmResponse.message);
// If the LLM returned tool calls, execute them
if (llmResponse.toolCalls && llmResponse.toolCalls.length > 0) {
const toolTasks = llmResponse.toolCalls.map((tc: any) =>
ctx.callActivity(executeTool, tc)
);
const toolResults = yield whenAll(toolTasks);
for (const result of toolResults) {
state.messages.push(result);
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput) {
const humanInput: string = yield ctx.waitForExternalEvent("HumanInput");
state.messages.push({ role: "user", content: humanInput });
// LLM is done
} else {
return llmResponse.finalAnswer;
}
state.iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.iteration % 10 === 0) {
ctx.continueAsNew(state);
return;
}
}
return "Max iterations reached.";
};
ctx -> {
AgentState state = ctx.getInput(AgentState.class);
if (state == null) state = new AgentState();
int maxIterations = 100;
while (state.getIteration() < maxIterations) {
// Send conversation history to the LLM
LlmResponse llmResponse = ctx.callActivity(
"CallLlmAgent", state.getMessages(), LlmResponse.class).await();
state.getMessages().add(llmResponse.getMessage());
// If the LLM returned tool calls, execute them
if (llmResponse.getToolCalls() != null && !llmResponse.getToolCalls().isEmpty()) {
List<Task<ToolResult>> toolTasks = llmResponse.getToolCalls().stream()
.map(tc -> ctx.callActivity("ExecuteTool", tc, ToolResult.class))
.collect(Collectors.toList());
List<ToolResult> toolResults = ctx.allOf(toolTasks).await();
for (ToolResult result : toolResults) {
state.getMessages().add(result.toMessage());
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput()) {
String humanInput = ctx.waitForExternalEvent("HumanInput", String.class).await();
state.getMessages().add(new Message("user", humanInput));
// LLM is done
} else {
ctx.complete(llmResponse.getFinalAnswer());
return;
}
state.incrementIteration();
// Periodically continue-as-new to keep the history bounded
if (state.getIteration() % 10 == 0) {
ctx.continueAsNew(state);
return;
}
}
ctx.complete("Max iterations reached.");
}
엔터티 기반 에이전트 루프
자체 에이전트 루프를 이미 구현하는 에이전트 프레임워크를 사용하는 경우 루프 논리를 다시 작성하지 않고 내구성을 추가하기 위해 지속성 엔터티 로 래핑할 수 있습니다. 각 엔터티 인스턴스는 단일 에이전트 세션을 나타냅니다. 엔터티는 메시지를 수신하고 에이전트 프레임워크에 내부적으로 위임하며 상호 작용 간에 대화 상태를 유지합니다.
이 방법의 주요 이점은 단순성입니다. 기본 프레임워크를 사용하여 에이전트를 작성하고 에이전트의 제어 흐름을 다시 디자인하는 대신 호스팅 문제로 내구성을 추가합니다. 엔터티는 내구성 래퍼 역할을 하며 세션 지속 및 복구를 자동으로 처리합니다.
다음 예제에서는 기존 에이전트 SDK를 지속성 엔터티로 래핑하는 방법을 보여 줍니다. 엔터티는 클라이언트가 message 사용자 입력을 보내기 위해 호출하는 작업을 노출합니다. 내부적으로 엔터티는 자체 도구 호출 루프를 관리하는 에이전트 프레임워크에 위임됩니다.
// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity : TaskEntity<ChatAgentState>
{
private readonly IChatClient _chatClient;
public ChatAgentEntity(IChatClient chatClient)
{
_chatClient = chatClient;
}
// Called by clients to send a message to the agent
public async Task<string> Message(string userMessage)
{
// Add the user message to the conversation history
State.Messages.Add(new ChatMessage(ChatRole.User, userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
ChatResponse response = await _chatClient.GetResponseAsync(
State.Messages, State.Options);
// Persist the response in the entity state
State.Messages.AddRange(response.Messages);
return response.Text;
}
// Azure Functions entry point for the entity
[Function(nameof(ChatAgentEntity))]
public Task RunEntityAsync([EntityTrigger] TaskEntityDispatcher dispatcher)
{
return dispatcher.DispatchAsync<ChatAgentEntity>();
}
}
# Define the entity that wraps an existing agent SDK
@app.entity_trigger(context_name="context")
def chat_agent_entity(context):
# Load persisted conversation state
state = context.get_state(lambda: {"messages": []})
if context.operation_name == "message":
user_message = context.get_input()
# Add the user message to the conversation history
state["messages"].append({"role": "user", "content": user_message})
# Delegate to the agent SDK for the LLM call (with tool loop)
response = call_agent_sdk(state["messages"])
# Persist the response in the entity state
state["messages"].append({"role": "assistant", "content": response})
context.set_state(state)
context.set_result(response)
const df = require("durable-functions");
// Define the entity that wraps an existing agent SDK
const chatAgentEntity = async function (context) {
// Load persisted conversation state
let state = context.df.getState(() => ({ messages: [] }));
switch (context.df.operationName) {
case "message":
const userMessage = context.df.getInput();
// Add the user message to the conversation history
state.messages.push({ role: "user", content: userMessage });
// Delegate to the agent SDK for the LLM call (with tool loop)
const response = await callAgentSdk(state.messages);
// Persist the response in the entity state
state.messages.push({ role: "assistant", content: response });
context.df.setState(state);
context.df.return(response);
break;
}
};
df.app.entity("ChatAgent", chatAgentEntity);
메모
Java 지속성 엔터티에는 durabletask-azure-functions 및 durabletask-client 패키지 버전 1.9.0 이상이 필요합니다.
// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity extends AbstractTaskEntity<ChatAgentState> {
// Called by clients to send a message to the agent
public String message(String userMessage) {
// Add the user message to the conversation history
this.state.getMessages().add(new ChatMessage("user", userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
String response = callAgentSdk(this.state.getMessages());
// Persist the response in the entity state
this.state.getMessages().add(new ChatMessage("assistant", response));
return response;
}
@Override
protected ChatAgentState initializeState(TaskEntityOperation operation) {
return new ChatAgentState();
}
}
// Register the entity with Azure Functions
@FunctionName("ChatAgent")
public String chatAgentEntity(
@DurableEntityTrigger(name = "req") String req) {
return EntityRunner.loadAndRun(req, ChatAgentEntity::new);
}
// Define the entity that wraps an existing agent SDK
[DurableTask(Name = "ChatAgent")]
public class ChatAgentEntity : TaskEntity<ChatAgentState>
{
private readonly IChatClient _chatClient;
public ChatAgentEntity(IChatClient chatClient)
{
_chatClient = chatClient;
}
// Called by clients to send a message to the agent
public async Task<string> Message(string userMessage)
{
// Add the user message to the conversation history
State.Messages.Add(new ChatMessage(ChatRole.User, userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
ChatResponse response = await _chatClient.GetResponseAsync(
State.Messages, State.Options);
// Persist the response in the entity state
State.Messages.AddRange(response.Messages);
return response.Text;
}
}
from durabletask.entities.durable_entity import DurableEntity
# Define the entity that wraps an existing agent SDK
class ChatAgentEntity(DurableEntity):
"""Durable entity wrapping an agent SDK."""
def message(self, user_message: str) -> str:
# Load persisted conversation state
state = self.get_state(default={"messages": []})
# Add the user message to the conversation history
state["messages"].append({"role": "user", "content": user_message})
# Delegate to the agent SDK for the LLM call (with tool loop)
response = call_agent_sdk(state["messages"])
# Persist the response in the entity state
state["messages"].append({"role": "assistant", "content": response})
self.set_state(state)
return response
import { TaskEntity } from "@microsoft/durabletask-js";
// Define the entity that wraps an existing agent SDK
class ChatAgentEntity extends TaskEntity<ChatAgentState> {
// Called by clients to send a message to the agent
async message(userMessage: string): Promise<string> {
// Add the user message to the conversation history
this.state.messages.push({ role: "user", content: userMessage });
// Delegate to the agent SDK for the LLM call (with tool loop)
const response = await callAgentSdk(this.state.messages);
// Persist the response in the entity state
this.state.messages.push({ role: "assistant", content: response });
return response;
}
initializeState(): ChatAgentState {
return { messages: [] };
}
}
메모
Java 지속성 엔터티에는 durabletask-client 패키지 버전 1.9.0 이상이 필요합니다.
// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity extends AbstractTaskEntity<ChatAgentState> {
// Called by clients to send a message to the agent
public String message(String userMessage) {
// Add the user message to the conversation history
this.state.getMessages().add(new ChatMessage("user", userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
String response = callAgentSdk(this.state.getMessages());
// Persist the response in the entity state
this.state.getMessages().add(new ChatMessage("assistant", response));
return response;
}
@Override
protected ChatAgentState initializeState(TaskEntityOperation operation) {
return new ChatAgentState();
}
}
Microsoft Agent Framework의 Durable Task 확장은 이 방법을 사용합니다. Microsoft Agent Framework 에이전트를 지속성 있는 엔터티로 래핑하여, 지속적인 세션, 자동 체크포인트, 그리고 기본 제공 API 엔드포인트를 간단한 설정 한 줄로 제공합니다.
다음 단계