모니터 패턴은 조건이 충족될 때까지 외부 시스템을 폴링하는 워크플로의 되풀이 프로세스입니다(예: 완료될 때까지 작업 상태 확인 또는 하늘이 맑을 때까지 날씨 데이터 관찰). 고정 일정 타이머 트리거와 달리 모니터는 반복 간에 대기하고(겹치지 않도록) 동적 간격을 지원하며 조건이 충족되거나 시간 제한이 만료되면 자체를 종료할 수 있습니다.
이 문서에서는 지속성 오케스트레이션을 사용하여 모니터 패턴을 구현하는 방법을 설명합니다.
Tip
이 문서에서는 전체 구현을 보여줍니다. 지속성 오케스트레이션 사용 사례에 대한 개념적 개요는 지속성 작업이란?을 참조하세요.
Durable Functions 예제에는 날씨 모니터링 시나리오(C#/JavaScript) 및 GitHub 문제 모니터링 시나리오(Python)가 포함됩니다.
메모
Azure Functions에 대한 Node.js 프로그래밍 모델의 버전 4는 일반적으로 사용할 수 있습니다. v4 모델은 JavaScript 및 TypeScript 개발자에게 보다 유연하고 직관적인 환경을 제공하도록 설계되었습니다. v3과 v4의 차이점에 대한 자세한 내용은 마이그레이션 가이드를 참조하세요.
다음 코드 조각에서 JAVAScript(PM4)는 새로운 환경인 프로그래밍 모델 v4를 나타냅니다.
지속성 작업 SDK 예제는 .NET, JavaScript, Python 및 Java 사용하여 구성 가능한 폴링 간격으로 작업 상태 모니터링을 보여 줍니다.
사전 요구 사항
Durable Functions 대한 PowerShell 샘플은 이 시나리오에서 아직 사용할 수 없습니다.
Durable Functions 대한 Java 샘플은 이 시나리오에서 아직 사용할 수 없습니다. 지속성 작업 SDK 피벗을 참조하세요.
- .NET 8.0 SDK 이상
- Azure 지속성 작업 스케줄러 또는 로컬 에뮬레이터에 대한 액세스
- Node.js 22 이상
- Azure 지속성 작업 스케줄러 또는 로컬 에뮬레이터에 대한 액세스
- Python 3.9 이상
- Azure 지속성 작업 스케줄러 또는 로컬 에뮬레이터에 대한 액세스
이 샘플은 .NET, JavaScript, Java 및 Python 대해 표시됩니다.
- Java 11 이상
- Azure 지속성 작업 스케줄러 또는 로컬 에뮬레이터에 대한 액세스
모니터 시나리오 개요
이 샘플은 특정 위치의 현재 기상 조건을 모니터링하고 하늘이 맑으면 SMS로 사용자에게 알려줍니다. 타이머로 트리거되는 일반 함수를 사용하여 날씨를 확인하고 알림을 보낼 수 있습니다. 그러나 이 방법의 한 가지 문제는 수명 관리입니다. 알림을 하나만 보내야 하는 경우에는 맑은 날씨가 감지된 후 모니터를 비활성화해야 합니다.
이 샘플은 특정 위치의 현재 기상 조건을 모니터링하고 하늘이 맑으면 SMS로 사용자에게 알려줍니다. 타이머로 트리거되는 일반 함수를 사용하여 날씨를 확인하고 알림을 보낼 수 있습니다. 그러나 이 방법의 한 가지 문제는 수명 관리입니다. 알림을 하나만 보내야 하는 경우에는 맑은 날씨가 감지된 후 모니터를 비활성화해야 합니다.
이 샘플은 GitHub 리포지토리의 문제 수를 모니터링하고 열려 있는 문제가 3개 이상인 경우 사용자에게 경고합니다. 일반 타이머 트리거 함수를 사용하여 정기적으로 열린 문제 수를 요청할 수 있습니다. 그러나 이 방법의 한 가지 문제는 수명 관리입니다. 경고를 하나만 전송해야 하는 경우 3개 이상의 문제가 검색된 후 모니터에서 자체 경고를 사용하지 않도록 설정해야 합니다.
Durable Functions 대한 PowerShell 샘플은 이 시나리오에서 아직 사용할 수 없습니다.
Durable Functions 대한 Java 샘플은 이 시나리오에서 아직 사용할 수 없습니다. 지속성 작업 SDK 피벗을 참조하세요.
모니터링 패턴은 다른 이점과 함께 자체 실행을 종료 할 수 있습니다.
- 모니터는 일정이 아닌 간격으로 실행됩니다. 타이머 트리거는 1시간마다 실행됩니다 . 모니터는 작업 사이에 1시간 대기합니다 . 모니터의 작업은 지정되지 않는 한 겹치지 않습니다. 이는 장기 실행되는 작업에 중요합니다.
- 모니터에 동적 간격을 설정할 수 있으며, 대기 시간은 조건에 따라 변경될 수 있습니다.
- 모니터는 조건이 충족되면 종료되거나 다른 프로세스에 의해 종료될 수 있습니다.
- 모니터는 매개 변수를 받을 수 있습니다. 이 샘플에서는 요청된 위치, 전화 번호 또는 리포지토리에 동일한 모니터링 프로세스를 적용하는 방법을 보여 있습니다.
- 모니터는 확장성이 있습니다. 각 모니터는 오케스트레이션 인스턴스이기 때문에 새 함수를 만들거나 코드를 더 정의하지 않고도 다수의 모니터를 만들 수 있습니다.
- 모니터는 보다 큰 워크플로에 쉽게 통합됩니다. 모니터는 더 복잡한 오케스트레이션 함수 또는 하위 오케스트레이션의 한 섹션일 수 있습니다.
이 샘플은 장기 실행 작업의 상태를 모니터링하고 작업이 완료되거나 시간이 초과될 때 최종 결과를 반환합니다. 일반 폴링 루프를 사용하여 작업 상태를 확인할 수 있지만 이 접근 방식에는 수명 관리 및 안정성에 대한 제한 사항이 있습니다.
모니터링 패턴은 다음과 같은 이점을 제공합니다.
-
지속형 폴링: 오케스트레이션은 프로세스 재시작에도 살아남으며 오류 발생 후에도 모니터링을 계속할 수 있습니다.
-
구성 가능한 간격: 상태 검사 간의 대기 시간을 동적으로 조정할 수 있습니다.
-
시간 제한 지원: 조건이 충족되거나 시간 제한이 만료되면 모니터가 종료됩니다.
-
상태 표시 여부: 클라이언트는 오케스트레이션의 사용자 지정 상태를 쿼리하여 현재 모니터링 진행률을 확인할 수 있습니다.
-
확장성: 여러 모니터가 동시에 실행될 수 있으며 각 모니터는 서로 다른 작업을 추적할 수 있습니다.
Configuration
Twilio 통합 구성
이 샘플에서는 Twilio 서비스를 사용하여 SMS 메시지를 휴대폰으로 보냅니다. Azure Functions 이미 Twilio 바인딩 통해 Twilio를 지원하고 있으며 샘플은 해당 기능을 사용합니다.
가장 먼저 필요한 것은 Twilio 계정입니다.
https://www.twilio.com/try-twilio에서 무료로 만들 수 있습니다. 계정이 있으면 다음 세 가지 앱 설정을 함수 앱에 추가합니다.
| 앱 설정 이름 |
값 설명 |
|
TwilioAccountSid |
Twilio 계정의 SID |
|
TwilioAuthToken |
Twilio 계정의 인증 토큰 |
|
TwilioPhoneNumber |
Twilio 계정과 연결되는 전화 번호이며, SMS 메시지를 보내는 데 사용됩니다. |
날씨 API 구성
C#/JavaScript 샘플은 날씨 API를 호출하여 현재 상태를 확인합니다. 고유한 날씨 API 키를 제공하고 그에 따라 샘플 코드를 업데이트해야 합니다. 샘플 코드는 앱 설정을 참조합니다 WeatherUndergroundApiKey . 이를 선택한 날씨 공급자의 키로 바꿉니다.
| 앱 설정 이름 |
값 설명 |
|
WeatherUndergroundApiKey |
날씨 API 키(필요에 따라 공급자의 키 이름으로 대체). |
Twilio 통합 구성
이 샘플에서는 Twilio 서비스를 사용하여 SMS 메시지를 휴대폰으로 보냅니다. Azure Functions 이미 Twilio 바인딩 통해 Twilio를 지원하고 있으며 샘플은 해당 기능을 사용합니다.
가장 먼저 필요한 것은 Twilio 계정입니다.
https://www.twilio.com/try-twilio에서 무료로 만들 수 있습니다. 계정이 있으면 다음 세 가지 앱 설정을 함수 앱에 추가합니다.
| 앱 설정 이름 |
값 설명 |
|
TwilioAccountSid |
Twilio 계정의 SID |
|
TwilioAuthToken |
Twilio 계정의 인증 토큰 |
|
TwilioPhoneNumber |
Twilio 계정과 연결되는 전화 번호이며, SMS 메시지를 보내는 데 사용됩니다. |
날씨 API 구성
C#/JavaScript 샘플은 날씨 API를 호출하여 현재 상태를 확인합니다. 고유한 날씨 API 키를 제공하고 그에 따라 샘플 코드를 업데이트해야 합니다. 샘플 코드는 앱 설정을 참조합니다 WeatherUndergroundApiKey . 이를 선택한 날씨 공급자의 키로 바꿉니다.
| 앱 설정 이름 |
값 설명 |
|
WeatherUndergroundApiKey |
날씨 API 키(필요에 따라 공급자의 키 이름으로 대체). |
Twilio 통합 구성
이 샘플에서는 Twilio 서비스를 사용하여 SMS 메시지를 휴대폰으로 보냅니다. Azure Functions 이미 Twilio 바인딩 통해 Twilio를 지원하고 있으며 샘플은 해당 기능을 사용합니다.
가장 먼저 필요한 것은 Twilio 계정입니다.
https://www.twilio.com/try-twilio에서 무료로 만들 수 있습니다. 계정이 있으면 다음 세 가지 앱 설정을 함수 앱에 추가합니다.
| 앱 설정 이름 |
값 설명 |
|
TwilioAccountSid |
Twilio 계정의 SID |
|
TwilioAuthToken |
Twilio 계정의 인증 토큰 |
|
TwilioPhoneNumber |
Twilio 계정과 연결되는 전화 번호이며, SMS 메시지를 보내는 데 사용됩니다. |
Durable Functions 대한 PowerShell 샘플은 이 시나리오에서 아직 사용할 수 없습니다.
Durable Functions 대한 Java 샘플은 이 시나리오에서 아직 사용할 수 없습니다. 지속성 작업 SDK 피벗을 참조하세요.
오케스트레이터
[FunctionName("E3_Monitor")]
public static async Task Run([OrchestrationTrigger] IDurableOrchestrationContext monitorContext, ILogger log)
{
MonitorRequest input = monitorContext.GetInput<MonitorRequest>();
if (!monitorContext.IsReplaying) { log.LogInformation($"Received monitor request. Location: {input?.Location}. Phone: {input?.Phone}."); }
VerifyRequest(input);
DateTime endTime = monitorContext.CurrentUtcDateTime.AddHours(6);
if (!monitorContext.IsReplaying) { log.LogInformation($"Instantiating monitor for {input.Location}. Expires: {endTime}."); }
while (monitorContext.CurrentUtcDateTime < endTime)
{
// Check the weather
if (!monitorContext.IsReplaying) { log.LogInformation($"Checking current weather conditions for {input.Location} at {monitorContext.CurrentUtcDateTime}."); }
bool isClear = await monitorContext.CallActivityAsync<bool>("E3_GetIsClear", input.Location);
if (isClear)
{
// It's not raining! Or snowing. Or misting. Tell our user to take advantage of it.
if (!monitorContext.IsReplaying) { log.LogInformation($"Detected clear weather for {input.Location}. Notifying {input.Phone}."); }
await monitorContext.CallActivityAsync("E3_SendGoodWeatherAlert", input.Phone);
break;
}
else
{
// Wait for the next checkpoint
var nextCheckpoint = monitorContext.CurrentUtcDateTime.AddMinutes(30);
if (!monitorContext.IsReplaying) { log.LogInformation($"Next check for {input.Location} at {nextCheckpoint}."); }
await monitorContext.CreateTimer(nextCheckpoint, CancellationToken.None);
}
}
log.LogInformation($"Monitor expiring.");
}
[Deterministic]
private static void VerifyRequest(MonitorRequest request)
{
if (request == null)
{
throw new ArgumentNullException(nameof(request), "An input object is required.");
}
if (request.Location == null)
{
throw new ArgumentNullException(nameof(request.Location), "A location input is required.");
}
if (string.IsNullOrEmpty(request.Phone))
{
throw new ArgumentNullException(nameof(request.Phone), "A phone number input is required.");
}
}
오케스트레이터 함수에는 모니터링할 위치와 해당 위치에서 날씨가 명확해질 때 메시지를 보내는 전화 번호가 필요합니다. 이 데이터는 MonitorRequest 개체로 강력한 형식화되어 오케스트레이터 함수에 전달됩니다.
E3_Monitor 함수는 오케스트레이터 함수에 표준 function.json 사용합니다.
{
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
],
"disabled": false
}
함수를 구현하는 코드는 다음과 같습니다.
const df = require("durable-functions");
const moment = require("moment");
module.exports = df.orchestrator(function* (context) {
const input = context.df.getInput();
context.log(
"Received monitor request. location: " +
(input ? input.location : undefined) +
". phone: " +
(input ? input.phone : undefined) +
"."
);
verifyRequest(input);
const endTime = moment.utc(context.df.currentUtcDateTime).add(6, "h");
context.log(
"Instantiating monitor for " +
input.location.city +
", " +
input.location.state +
". Expires: " +
endTime +
"."
);
while (moment.utc(context.df.currentUtcDateTime).isBefore(endTime)) {
// Check the weather
context.log(
"Checking current weather conditions for " +
input.location.city +
", " +
input.location.state +
" at " +
context.df.currentUtcDateTime +
"."
);
const isClear = yield context.df.callActivity("E3_GetIsClear", input.location);
if (isClear) {
// It's not raining! Or snowing. Or misting. Tell our user to take advantage of it.
context.log(
"Detected clear weather for " +
input.location.city +
", " +
input.location.state +
". Notifying " +
input.phone +
"."
);
yield context.df.callActivity("E3_SendGoodWeatherAlert", input.phone);
break;
} else {
// Wait for the next checkpoint
const nextCheckpoint = moment.utc(context.df.currentUtcDateTime).add(30, "s");
context.log(
"Next check for " +
input.location.city +
", " +
input.location.state +
" at " +
nextCheckpoint.toString()
);
yield context.df.createTimer(nextCheckpoint.toDate()); // accomodate cancellation tokens
}
}
context.log("Monitor expiring.");
});
function verifyRequest(request) {
if (!request) {
throw new Error("An input object is required.");
}
if (!request.location) {
throw new Error("A location input is required.");
}
if (!request.phone) {
throw new Error("A phone number input is required.");
}
}
E3_Monitor 함수는 오케스트레이터 함수에 표준 function.json 사용합니다.
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
다음은 이 함수를 구현하는 코드입니다.
import azure.durable_functions as df
from datetime import timedelta
from typing import Dict
def orchestrator_function(context: df.DurableOrchestrationContext):
monitoring_request: Dict[str, str] = context.get_input()
repo_url: str = monitoring_request["repo"]
phone: str = monitoring_request["phone"]
# Expiration of the repo monitoring
expiry_time = context.current_utc_datetime + timedelta(minutes=5)
while context.current_utc_datetime < expiry_time:
# Count the number of issues in the repo (the GitHub API caps at 30 issues per page)
too_many_issues = yield context.call_activity("E3_TooManyOpenIssues", repo_url)
# If we detect too many issues, we text the provided phone number
if too_many_issues:
# Extract URLs of GitHub issues, and return them
yield context.call_activity("E3_SendAlert", phone)
break
else:
# Reporting the number of statuses found
status = f"The repository does not have too many issues, for now ..."
context.set_custom_status(status)
# Schedule a new "wake up" signal
next_check = context.current_utc_datetime + timedelta(minutes=1)
yield context.create_timer(next_check)
return "Monitor completed!"
main = df.Orchestrator.create(orchestrator_function)
이 샘플에서는 사용할 수 없습니다. Java 범위는 지속 가능한 작업 SDK 탭을 참조하십시오.
@FunctionName("E3_Monitor")
public void monitorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
MonitorRequest input = ctx.getInput(MonitorRequest.class);
Instant expirationTime = ctx.getCurrentInstant().plus(Duration.ofHours(6));
int pollingInterval = input.getPollingIntervalSeconds();
while (ctx.getCurrentInstant().isBefore(expirationTime)) {
// Check current conditions
boolean isClear = ctx.callActivity(
"E3_GetIsClear", input.getLocation(), boolean.class).await();
if (isClear) {
// Condition met - send alert and exit
ctx.callActivity("E3_SendGoodWeatherAlert", input.getPhone()).await();
break;
}
// Wait for the next polling interval
Instant nextCheck = ctx.getCurrentInstant().plus(
Duration.ofSeconds(pollingInterval));
ctx.createTimer(nextCheck).await();
}
}
이 오케스트레이터 함수는 다음 작업을 수행합니다.
- 모니터링할 위치와 SMS 알림을 보낼 전화 번호(또는 Python 예제의 경우 레포)로 구성된 MonitorRequest를 가져옵니다.
- 모니터의 만료 시간을 결정합니다. 간결함을 위해 이 샘플에서는 하드 코드된 값을 사용합니다.
- 상태 검사 작업을 호출하여 조건이 충족되는지 여부를 확인합니다.
- 조건이 충족되면 경고 활동을 호출하여 알림을 보냅니다.
- 다음 폴링 간격에 오케스트레이션을 재개하기 위해 내구 타이머를 생성합니다. 간결함을 위해 이 샘플에서는 하드 코드된 값을 사용합니다.
- 현재 UTC 시간이 모니터의 만료 시간을 지나거나 경고가 전송될 때까지 계속 실행됩니다.
오케스트레이터 함수를 여러 번 호출하여 여러 오케스트레이터 함수 인스턴스를 동시에 실행할 수 있습니다. 모니터링할 위치와 경고를 보낼 전화 번호를 지정할 수 있습니다. 오케스트레이터 함수는 타이머를 기다리는 동안 실행되지 않으므로 요금이 청구되지 않습니다.
오케스트레이터는 정기적으로 작업의 상태를 확인하고 작업이 완료되거나 시간이 초과되면 반환합니다.
using Microsoft.DurableTask;
using System;
using System.Threading.Tasks;
[DurableTask(nameof(MonitoringJobOrchestration))]
public class MonitoringJobOrchestration : TaskOrchestrator<JobMonitorInput, JobMonitorResult>
{
public override async Task<JobMonitorResult> RunAsync(
TaskOrchestrationContext context, JobMonitorInput input)
{
var jobId = input.JobId;
var pollingInterval = TimeSpan.FromSeconds(input.PollingIntervalSeconds);
var expirationTime = context.CurrentUtcDateTime.AddSeconds(input.TimeoutSeconds);
// Initialize monitoring state
int checkCount = 0;
while (context.CurrentUtcDateTime < expirationTime)
{
// Check current job status
var jobStatus = await context.CallActivityAsync<JobStatus>(
nameof(CheckJobStatusActivity),
new CheckJobInput { JobId = jobId, CheckCount = checkCount });
checkCount = jobStatus.CheckCount;
// Make job status available via custom status
context.SetCustomStatus(jobStatus);
if (jobStatus.Status == "Completed")
{
return new JobMonitorResult
{
JobId = jobId,
FinalStatus = "Completed",
ChecksPerformed = checkCount
};
}
// Calculate next check time
var nextCheck = context.CurrentUtcDateTime.Add(pollingInterval);
if (nextCheck > expirationTime)
{
nextCheck = expirationTime;
}
// Wait until next polling interval
await context.CreateTimer(nextCheck, default);
}
// Timeout reached
return new JobMonitorResult
{
JobId = jobId,
FinalStatus = "Timeout",
ChecksPerformed = checkCount
};
}
}
import {
OrchestrationContext,
TOrchestrator,
} from "@microsoft/durabletask-js";
const monitorOrchestrator: TOrchestrator = async function* (
ctx: OrchestrationContext,
input: { jobId: string; pollingIntervalSeconds: number; timeoutSeconds: number }
): any {
const { jobId, pollingIntervalSeconds, timeoutSeconds } = input;
const expirationTime = new Date(
ctx.currentUtcDateTime.getTime() + timeoutSeconds * 1000
);
let checkCount = 0;
while (ctx.currentUtcDateTime < expirationTime) {
// Check current job status
const jobStatus: any = yield ctx.callActivity(checkJobStatus, {
jobId,
checkCount,
});
checkCount = jobStatus.checkCount;
// Make job status available via custom status
ctx.setCustomStatus(jobStatus);
if (jobStatus.status === "Completed") {
return {
jobId,
finalStatus: "Completed",
checksPerformed: checkCount,
};
}
// Wait for next polling interval
yield ctx.createTimer(pollingIntervalSeconds);
}
// Timeout reached
return {
jobId,
finalStatus: "Timeout",
checksPerformed: checkCount,
};
};
import datetime
from durabletask import task
def monitoring_job_orchestrator(ctx: task.OrchestrationContext, job_data: dict) -> dict:
"""
Orchestrator that demonstrates the monitoring pattern.
Periodically checks the status of a job until it completes or times out.
"""
job_id = job_data.get("job_id")
polling_interval = job_data.get("polling_interval_seconds", 5)
timeout = job_data.get("timeout_seconds", 30)
# Record the start time
start_time = ctx.current_utc_datetime
expiration_time = start_time + datetime.timedelta(seconds=timeout)
# Initialize monitoring state
job_status = {
"job_id": job_id,
"status": "Unknown",
"check_count": 0
}
# Loop until the job completes or times out
while True:
# Check current job status
check_input = {"job_id": job_id, "check_count": job_status.get("check_count", 0)}
job_status = yield ctx.call_activity("check_job_status", input=check_input)
# Make the job status available via custom status
ctx.set_custom_status(job_status)
if job_status["status"] == "Completed":
break
# Check if we've hit the timeout
current_time = ctx.current_utc_datetime
if current_time >= expiration_time:
job_status["status"] = "Timeout"
break
# Calculate next check time
next_check_time = current_time + datetime.timedelta(seconds=polling_interval)
if next_check_time > expiration_time:
next_check_time = expiration_time
# Wait until next polling interval
yield ctx.create_timer(next_check_time)
# Return the final status
return {
"job_id": job_id,
"final_status": job_status["status"],
"checks_performed": job_status["check_count"]
}
이 샘플은 .NET, JavaScript, Java 및 Python 대해 표시됩니다.
import com.microsoft.durabletask.*;
import com.microsoft.durabletask.azuremanaged.DurableTaskSchedulerWorkerExtensions;
import java.time.Duration;
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "MonitoringJobOrchestrator"; }
@Override
public TaskOrchestration create() {
return ctx -> {
JobData jobData = ctx.getInput(JobData.class);
int pollingCount = 0;
// Set initial status
ctx.setCustomStatus(new JobStatus("Starting monitoring..."));
while (true) {
// Update status
ctx.setCustomStatus(new JobStatus(
"Polling job status (attempt " + (++pollingCount) + ")"));
// Wait for polling interval
ctx.createTimer(Duration.ofSeconds(jobData.pollingIntervalSeconds)).await();
// Check if job is complete (simulated after 3 attempts)
if (pollingCount >= 3) {
ctx.setCustomStatus(new JobStatus("Job completed successfully"));
ctx.complete(new JobResult(
"COMPLETED",
"Job completed after " + pollingCount + " attempts"));
break;
}
}
};
}
})
.build();
이 오케스트레이터는 다음 작업을 수행합니다.
- 작업 ID, 폴링 간격 및 시간 제한을 입력 매개 변수로 사용합니다.
- 시작 시간을 기록하고 만료 시간을 계산합니다.
- 작업 상태를 확인하는 폴링 루프에 진입합니다.
- 클라이언트가 진행률을 모니터링할 수 있도록 사용자 지정 상태를 업데이트합니다.
- 작업이 완료되면 최종 결과를 반환합니다.
- 시간 제한에 도달하면 시간 제한 상태를 반환합니다.
- 리소스를 사용하지 않고 폴링 시도 간에 대기하는 데
CreateTimer가 사용됩니다.
활동
다른 샘플과 마찬가지로 도우미 작업 함수는 트리거 바인딩을 사용하는 activityTrigger 일반 함수입니다.
상태 검사 활동
E3_GetIsClear 함수는 Weather Underground API를 사용하여 현재 기상 조건을 가져오고 하늘이 맑은지 여부를 결정합니다.
[FunctionName("E3_GetIsClear")]
public static async Task<bool> GetIsClear([ActivityTrigger] Location location)
{
var currentConditions = await WeatherUnderground.GetCurrentConditionsAsync(location);
return currentConditions.Equals(WeatherCondition.Clear);
}
function.json은 다음과 같이 정의됩니다.
{
"bindings": [
{
"name": "location",
"type": "activityTrigger",
"direction": "in"
}
],
"disabled": false
}
그리고 구현은 다음과 같습니다.
const request = require("request");
const clearWeatherConditions = [
"Overcast",
"Clear",
"Partly Cloudy",
"Mostly Cloudy",
"Scattered Clouds",
];
module.exports = function (context, location) {
getCurrentConditions(location)
.then(function (data) {
const isClear = clearWeatherConditions.includes(data.weather);
context.done(null, isClear);
})
.catch(function (err) {
context.log(`E3_GetIsClear encountered an error: ${err}`);
context.done(err);
});
};
function getCurrentConditions(location) {
return new Promise(function (resolve, reject) {
const options = {
url: `https://api.wunderground.com/api/${process.env["WeatherUndergroundApiKey"]}/conditions/q/${location.state}/${location.city}.json`,
method: "GET",
json: true,
};
request(options, function (err, res, body) {
if (err) {
reject(err);
}
if (body.error) {
reject(body.error);
}
if (body.response.error) {
reject(body.response.error);
}
resolve(body.current_observation);
});
});
}
E3_TooManyOpenIssues 함수는 리포지토리에서 현재 열려 있는 문제 목록을 가져온 후, 이 중 "너무 많은" 문제가 있는지 확인합니다. 여기서 "너무 많은" 문제란 샘플에 따라 3개를 초과할 경우를 의미합니다.
function.json은 다음과 같이 정의됩니다.
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "repoID",
"type": "activityTrigger",
"direction": "in"
}
]
}
그리고 여기에 구현이 있습니다.
import requests
import json
def main(repoID: str) -> str:
# We use the GitHub API to count the number of open issues in the repo provided
# Note that the GitHub API only displays at most 30 issues per response, so
# the maximum number this activity will return is 30. That's enough for demo'ing purposes.
[user, repo] = repoID.split("/")
url = f"https://api.github.com/repos/{user}/{repo}/issues?state=open"
res = requests.get(url)
if res.status_code != 200:
error_message = f"Could not find repo {user} under {repo}! API endpoint hit was: {url}"
raise Exception(error_message)
issues = json.loads(res.text)
too_many_issues: bool = len(issues) >= 3
return too_many_issues
이 샘플에서는 사용할 수 없습니다. Java 적용 범위를 보려면 Durable Task SDK 피벗을 참조하세요.
E3_GetIsClear 함수는 날씨 API를 사용하여 위치에 대한 현재 기상 조건을 확인합니다.
@FunctionName("E3_GetIsClear")
public boolean getIsClear(
@DurableActivityTrigger(name = "location") Location location) {
// Call weather API to check current conditions
String conditions = getWeatherConditions(location);
return conditions.equals("Clear");
}
경고 활동
E3_SendGoodWeatherAlert 함수는 Twilio 바인딩을 사용하여 최종 사용자에게 산책하기에 좋은 시간임을 알리는 SMS 메시지를 보냅니다.
[FunctionName("E3_SendGoodWeatherAlert")]
public static void SendGoodWeatherAlert(
[ActivityTrigger] string phoneNumber,
ILogger log,
[TwilioSms(AccountSidSetting = "TwilioAccountSid", AuthTokenSetting = "TwilioAuthToken", From = "%TwilioPhoneNumber%")]
out CreateMessageOptions message)
{
message = new CreateMessageOptions(new PhoneNumber(phoneNumber));
message.Body = $"The weather's clear outside! Go take a walk!";
}
internal class WeatherUnderground
{
private static readonly HttpClient httpClient = new HttpClient();
private static IReadOnlyDictionary<string, WeatherCondition> weatherMapping = new Dictionary<string, WeatherCondition>()
{
{ "Clear", WeatherCondition.Clear },
{ "Overcast", WeatherCondition.Clear },
{ "Cloudy", WeatherCondition.Clear },
{ "Clouds", WeatherCondition.Clear },
{ "Drizzle", WeatherCondition.Precipitation },
{ "Hail", WeatherCondition.Precipitation },
{ "Ice", WeatherCondition.Precipitation },
{ "Mist", WeatherCondition.Precipitation },
{ "Precipitation", WeatherCondition.Precipitation },
{ "Rain", WeatherCondition.Precipitation },
{ "Showers", WeatherCondition.Precipitation },
{ "Snow", WeatherCondition.Precipitation },
{ "Spray", WeatherCondition.Precipitation },
{ "Squall", WeatherCondition.Precipitation },
{ "Thunderstorm", WeatherCondition.Precipitation },
};
internal static async Task<WeatherCondition> GetCurrentConditionsAsync(Location location)
{
var apiKey = Environment.GetEnvironmentVariable("WeatherUndergroundApiKey");
if (string.IsNullOrEmpty(apiKey))
{
throw new InvalidOperationException("The WeatherUndergroundApiKey environment variable was not set.");
}
var callString = string.Format("http://api.wunderground.com/api/{0}/conditions/q/{1}/{2}.json", apiKey, location.State, location.City);
var response = await httpClient.GetAsync(callString);
var conditions = await response.Content.ReadAsAsync<JObject>();
JToken currentObservation;
if (!conditions.TryGetValue("current_observation", out currentObservation))
{
JToken error = conditions.SelectToken("response.error");
if (error != null)
{
throw new InvalidOperationException($"API returned an error: {error}.");
}
else
{
throw new ArgumentException("Could not find weather for this location. Try being more specific.");
}
}
return MapToWeatherCondition((string)(currentObservation as JObject).GetValue("weather"));
}
private static WeatherCondition MapToWeatherCondition(string weather)
{
foreach (var pair in weatherMapping)
{
if (weather.Contains(pair.Key))
{
return pair.Value;
}
}
return WeatherCondition.Other;
}
}
메모
샘플 코드를 실행하려면 Microsoft.Azure.WebJobs.Extensions.Twilio Nuget 패키지를 설치해야 합니다.
function.json 간단합니다.
{
"bindings": [
{
"name": "phoneNumber",
"type": "activityTrigger",
"direction": "in"
},
{
"type": "twilioSms",
"name": "message",
"from": "%TwilioPhoneNumber%",
"accountSidSetting": "TwilioAccountSid",
"authTokenSetting": "TwilioAuthToken",
"direction": "out"
}
],
"disabled": false
}
SMS 메시지를 보내는 코드는 다음과 같습니다.
module.exports = function (context, phoneNumber) {
context.bindings.message = {
body: `The weather's clear outside! Go take a walk!`,
to: phoneNumber,
};
context.done();
};
E3_SendAlert 함수는 Twilio 바인딩을 사용하여 최종 사용자에게 해결을 기다리는 3개 이상의 미해결 문제가 있음을 알리는 SMS 메시지를 보냅니다.
function.json 간단합니다.
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "repoID",
"type": "activityTrigger",
"direction": "in"
}
]
}
SMS 메시지를 보내는 코드는 다음과 같습니다.
import json
import random
random.seed(10)
def main(phoneNumber: str, message):
payload = {
"body": f"Hey! You may want to check on your repo, there are too many open issues",
"to": phoneNumber
}
message.set(json.dumps(payload))
return "Message sent!"
이 샘플에서는 사용할 수 없습니다. Java 관련 내용은 Durable Task SDK 섹션을 참조하세요.
E3_SendGoodWeatherAlert 함수는 사용자에게 SMS 알림을 보냅니다.
@FunctionName("E3_SendGoodWeatherAlert")
public void sendGoodWeatherAlert(
@DurableActivityTrigger(name = "phoneNumber") String phoneNumber) {
// Send an SMS alert using your preferred messaging service
sendSms(phoneNumber, "The weather is clear outside! Enjoy your day.");
}
활동은 작업의 현재 상태를 확인합니다. 실제 애플리케이션에서는 외부 API 또는 서비스를 호출합니다.
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;
[DurableTask(nameof(CheckJobStatusActivity))]
public class CheckJobStatusActivity : TaskActivity<CheckJobInput, JobStatus>
{
private readonly ILogger<CheckJobStatusActivity> _logger;
public CheckJobStatusActivity(ILogger<CheckJobStatusActivity> logger)
{
_logger = logger;
}
public override Task<JobStatus> RunAsync(TaskActivityContext context, CheckJobInput input)
{
_logger.LogInformation("Checking status for job: {JobId} (check #{CheckCount})",
input.JobId, input.CheckCount + 1);
// Simulate job status - completes after 3 checks
var status = input.CheckCount >= 3 ? "Completed" : "Running";
return Task.FromResult(new JobStatus
{
JobId = input.JobId,
Status = status,
CheckCount = input.CheckCount + 1,
LastCheckTime = DateTime.UtcNow
});
}
}
// Data classes
public class JobMonitorInput
{
public string JobId { get; set; }
public int PollingIntervalSeconds { get; set; } = 5;
public int TimeoutSeconds { get; set; } = 30;
}
public class CheckJobInput
{
public string JobId { get; set; }
public int CheckCount { get; set; }
}
public class JobStatus
{
public string JobId { get; set; }
public string Status { get; set; }
public int CheckCount { get; set; }
public DateTime LastCheckTime { get; set; }
}
public class JobMonitorResult
{
public string JobId { get; set; }
public string FinalStatus { get; set; }
public int ChecksPerformed { get; set; }
}
import { ActivityContext } from "@microsoft/durabletask-js";
const checkJobStatus = async (
_ctx: ActivityContext,
input: { jobId: string; checkCount: number }
): Promise<any> => {
console.log(
`Checking status for job: ${input.jobId} (check #${input.checkCount + 1})`
);
// Simulate job status — completes after 3 checks
const status = input.checkCount >= 3 ? "Completed" : "Running";
return {
jobId: input.jobId,
status,
checkCount: input.checkCount + 1,
lastCheckTime: new Date().toISOString(),
};
};
import datetime
from durabletask import task
def check_job_status(ctx: task.ActivityContext, job_data: dict) -> dict:
"""
Activity that checks the status of a long-running job.
In a real application, this would call an external API or service.
"""
job_id = job_data.get("job_id", "unknown")
check_count = job_data.get("check_count", 0)
# Simulate job status - completes after 3 checks
if check_count >= 3:
status = "Completed"
else:
status = "Running"
return {
"job_id": job_id,
"status": status,
"check_count": check_count + 1,
"last_check_time": datetime.datetime.now().isoformat()
}
이 샘플은 .NET, JavaScript, Java 및 Python 대해 표시됩니다.
Java 샘플에서 상태 검사는 오케스트레이터에서 직접 시뮬레이션됩니다. 실제 애플리케이션에서는 별도의 작업을 만듭니다.
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "CheckJobStatus"; }
@Override
public TaskActivity create() {
return ctx -> {
JobCheckInput input = ctx.getInput(JobCheckInput.class);
// Simulate checking job status
// In a real app, this would call an external API
String status = input.checkCount >= 3 ? "Completed" : "Running";
return new JobStatus(status, input.checkCount + 1);
};
}
})
모니터링 샘플 실행
샘플에 포함된 HTTP 트리거 함수를 사용하여 다음 HTTP POST 요청을 전송함으로써 오케스트레이션을 시작할 수 있습니다.
POST https://{host}/orchestrators/E3_Monitor
Content-Length: 77
Content-Type: application/json
{ "location": { "city": "Redmond", "state": "WA" }, "phone": "+1425XXXXXXX" }
샘플에 포함된 HTTP 트리거 함수를 사용하여 다음 HTTP POST 요청을 전송함으로써 오케스트레이션을 시작할 수 있습니다.
POST https://{host}/orchestrators/E3_Monitor
Content-Length: 77
Content-Type: application/json
{ "location": { "city": "Redmond", "state": "WA" }, "phone": "+1425XXXXXXX" }
GitHub 계정이 필요합니다. 문제를 열 수 있는 임시 공용 리포지토리를 만듭니다.
샘플에 포함된 HTTP 트리거 함수를 사용하여 다음 HTTP POST 요청을 전송함으로써 오케스트레이션을 시작할 수 있습니다.
POST https://{host}/orchestrators/E3_Monitor
Content-Length: 77
Content-Type: application/json
{ "repo": "<your GitHub handle>/<a new GitHub repo under your user>", "phone": "+1425XXXXXXX" }
예를 들어 GitHub 사용자 이름이 foo이고 리포지토리가 bar 경우 "repo" 값은 "foo/bar" 합니다.
이 샘플에서는 사용할 수 없습니다. Java 적용 범위는 Durable Task SDKs 피벗 뷰를 참고하십시오.
샘플에 포함된 HTTP 트리거 함수를 사용하여 다음 HTTP POST 요청을 전송하여 오케스트레이션을 시작할 수 있습니다.
POST https://{host}/api/StartMonitor
Content-Type: application/json
{ "location": { "city": "Redmond", "state": "WA" }, "phone": "+1425XXXXXXX" }
HTTP 트리거 함수는 오케스트레이션을 예약합니다.
@FunctionName("StartMonitor")
public HttpResponseMessage startMonitor(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}) HttpRequestMessage<Optional<String>> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
DurableTaskClient client = durableContext.getClient();
String instanceId = client.scheduleNewOrchestrationInstance("E3_Monitor", req.getBody().get());
context.getLogger().info("Started monitor orchestration with ID = " + instanceId);
return durableContext.createCheckStatusResponse(req, instanceId);
}
HTTP/1.1 202 Accepted
Content-Type: application/json; charset=utf-8
Location: https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635?taskHub=SampleHubVS&connection=Storage&code={SystemKey}
RetryAfter: 10
{"id": "f6893f25acf64df2ab53a35c09d52635", "statusQueryGetUri": "https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635?taskHub=SampleHubVS&connection=Storage&code={systemKey}", "sendEventPostUri": "https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635/raiseEvent/{eventName}?taskHub=SampleHubVS&connection=Storage&code={systemKey}", "terminatePostUri": "https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635/terminate?reason={text}&taskHub=SampleHubVS&connection=Storage&code={systemKey}"}
E3_Monitor 인스턴스는 현재 조건을 시작하고 쿼리합니다. 조건이 충족되면 작업 함수를 호출하여 경고를 보냅니다. 그렇지 않으면 타이머를 설정합니다. 타이머가 만료되면 오케스트레이션이 다시 시작됩니다.
Azure Functions 포털에서 함수 로그를 확인하여 오케스트레이션의 활동을 볼 수 있습니다.
오케스트레이션은 제한 시간이 도달하거나 조건이 감지되면 완료됩니다. 다른 함수 내에서 API를 terminate 사용하거나 이전 202 응답에서 참조된 terminatePostUri HTTP POST 웹후크를 호출할 수도 있습니다. 웹후크를 사용하려면 {text}를 조기 종료 사유로 교체하세요. HTTP POST URL은 대략 다음과 같이 표시됩니다.
POST https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635/terminate?reason=Because&taskHub=SampleHubVS&connection=Storage&code={systemKey}
샘플을 실행하려면 다음이 필요합니다.
지속성 작업 스케줄러 에뮬레이터를 시작 합니다(로컬 개발용).
docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
작업자를 시작하여 오케스트레이터 및 활동을 등록합니다.
클라이언트를 실행 하여 모니터링 오케스트레이션을 예약합니다.
using System;
using System.Threading.Tasks;
var client = DurableTaskClientBuilder.UseDurableTaskScheduler(connectionString).Build();
// Schedule the monitoring orchestration
var input = new JobMonitorInput
{
JobId = "job-" + Guid.NewGuid().ToString(),
PollingIntervalSeconds = 5,
TimeoutSeconds = 30
};
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(MonitoringJobOrchestration), input);
Console.WriteLine($"Started monitoring orchestration: {instanceId}");
// Wait for completion while checking status
while (true)
{
var state = await client.GetInstanceMetadataAsync(instanceId, getInputsAndOutputs: true);
if (state.RuntimeStatus == OrchestrationRuntimeStatus.Completed ||
state.RuntimeStatus == OrchestrationRuntimeStatus.Failed)
{
Console.WriteLine($"Monitoring completed: {state.ReadOutputAs<JobMonitorResult>().FinalStatus}");
break;
}
Console.WriteLine($"Current status: {state.ReadCustomStatusAs<JobStatus>()?.Status}");
await Task.Delay(2000);
}
import {
DurableTaskAzureManagedClientBuilder,
DurableTaskAzureManagedWorkerBuilder,
} from "@microsoft/durabletask-js-azuremanaged";
const client = new DurableTaskAzureManagedClientBuilder()
.connectionString(connectionString)
.build();
const worker = new DurableTaskAzureManagedWorkerBuilder()
.connectionString(connectionString)
.addOrchestrator(monitorOrchestrator)
.addActivity(checkJobStatus)
.build();
await worker.start();
// Schedule the monitoring orchestration
const input = {
jobId: `job-${Date.now()}`,
pollingIntervalSeconds: 5,
timeoutSeconds: 30,
};
const instanceId = await client.scheduleNewOrchestration(
monitorOrchestrator,
input
);
console.log(`Started monitoring orchestration: ${instanceId}`);
// Wait for completion
const result = await client.waitForOrchestrationCompletion(
instanceId,
true,
60
);
console.log(`Final result: ${result?.serializedOutput}`);
await worker.stop();
await client.stop();
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
import time
client = DurableTaskSchedulerClient(
host_address=endpoint,
secure_channel=endpoint != "http://localhost:8080",
taskhub=taskhub,
token_credential=credential
)
# Schedule the monitoring orchestration
job_data = {
"job_id": "job-123",
"polling_interval_seconds": 5,
"timeout_seconds": 30
}
instance_id = client.schedule_new_orchestration(
monitoring_job_orchestrator,
input=job_data
)
print(f"Started monitoring orchestration: {instance_id}")
# Wait for completion
result = client.wait_for_orchestration_completion(instance_id, timeout=60)
print(f"Final result: {result.serialized_output}")
이 샘플은 .NET, JavaScript, Java 및 Python 대해 표시됩니다.
import java.time.Duration;
import java.util.UUID;
DurableTaskClient client = DurableTaskSchedulerClientExtensions
.createClientBuilder(connectionString).build();
// Schedule the monitoring orchestration
JobData jobData = new JobData(
"job-" + UUID.randomUUID().toString(),
5, // polling interval seconds
30 // timeout seconds
);
String instanceId = client.scheduleNewOrchestrationInstance(
"MonitoringJobOrchestrator",
new NewOrchestrationInstanceOptions().setInput(jobData));
System.out.println("Started monitoring orchestration: " + instanceId);
// Wait for completion
OrchestrationMetadata result = client.waitForInstanceCompletion(
instanceId, Duration.ofSeconds(60), true);
System.out.println("Final result: " + result.readOutputAs(JobResult.class).status);
다음 단계
이 샘플에서는 Durable Functions 사용하여 외부 타이머 조건부 논리를 사용하여 외부 원본의 상태를 모니터링하는 방법을 보여 줍니다. 다음 샘플에서는 외부 이벤트 및 지속성 타이머를 사용하여 사용자 상호 작용을 처리하는 방법을 보여 줍니다.
이 샘플에서는 지속성 작업 SDK를 사용하여 지속성 타이머 및 상태 추적을 사용하여 모니터링 패턴을 구현하는 방법을 보여 줍니다. 다른 패턴 및 기능에 대해 자세히 알아봅니다.