장시간 AI 에이전트 태스크를 운영한 경험이 있는 개발자라면 알 것입니다. 10분 이상 소요되는 작업이 중간에 실패했을 때, 모든 과정을 처음부터 다시 시작해야 하는 좌절감. 저는 HolySheep AI 게이트웨이 환경에서 장시간 작업의 안정성을 보장하는 세 가지 핵심 메커니즘—진행 상황 추적, 타임아웃 제어, 체크포인트 재개—을 프로덕션 수준의 아키텍처로 구현하는 방법을 공유합니다.
왜 장시간 작업 관리가 중요한가
단일 LLM 호출은 수 초 내에 완료됩니다. 그러나 에이전트가 반복적으로 도구를 호출하고, 웹 검색을 수행하고, 데이터베이스를 순회하는 시나리오에서는 작업 시간이 5분에서 30분으로 늘어날 수 있습니다. 이 과정에서:
- 네트워크 단절: HTTP 연결 타임아웃 (기본 30초)
- API 속도 제한: HolySheep AI의 Rate Limit 초과
- 컨텍스트 윈도우 소진: 긴 대화 히스토리 누적
- 예측 불가능한 처리 시간: 검색 결과 수, 복잡한 계산량
가장 현실적인 비용 수치를 보겠습니다. HolySheep AI 가격표를 기준으로:
| 모델 | 입력 비용 | 출력 비용 | 적합한 용도 |
|---|---|---|---|
| DeepSeek V3.2 | $0.28/MTok | $0.42/MTok | 장시간 다단계 추론 |
| Gemini 2.5 Flash | $1.25/MTok | $5.00/MTok | 빠른 반복 작업 |
| Claude Sonnet 4 | $7.50/MTok | $15.00/MTok | 고품질 분석 작업 |
장시간 작업 실패 시 평균 500K 토큰이 낭비될 수 있으며, 이는 DeepSeek 기준 $0.21, Claude 기준 $11.25의 비용 손실로 이어집니다.
아키텍처 개요: 3계층 작업 관리 시스템
+─────────────────────────────────────────────────────────────┐
│ Task Orchestrator │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Progress │ │ Timeout │ │ Checkpoint │ │
│ │ Tracker │ │ Controller │ │ Manager │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ HolySheep│ │ Redis │ │ File │
│ API │ │ State │ │ Storage │
└──────────┘ └──────────┘ └──────────┘
저는 이 아키텍처를 실제 프로덕션 환경에서 6개월 이상 운영했습니다. 핵심은 세 컴포넌트가 독립적으로 동작하면서도 상태를 공유하는 것입니다.
1. 진행 상황 추적 시스템 구현
HolySheep AI API의 Streaming Response를 활용하면 실시간 진행률 추적이 가능합니다. 저는 Event-Driven 방식으로 진행 상황을 중앙 집중형 상태 저장소에 기록합니다.
import json
import time
import asyncio
from typing import Optional, Callable, Any
from dataclasses import dataclass, asdict
from enum import Enum
from openai import AsyncOpenAI
from datetime import datetime
HolySheep AI 설정
BASE_URL = "https://api.holysheep.ai/v1"
HOLYSHEEP_API_KEY = "YOUR_HOLYSHEEP_API_KEY"
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
CHECKPOINTED = "checkpointed"
COMPLETED = "completed"
FAILED = "failed"
TIMEOUT = "timeout"
@dataclass
class TaskProgress:
task_id: str
status: str
current_step: int
total_steps: int
last_checkpoint: dict
tokens_used: int
elapsed_seconds: float
started_at: str
updated_at: str
class ProgressTracker:
"""장시간 작업의 진행 상황을 추적하는 클래스"""
def __init__(self, task_id: str, total_steps: int):
self.task_id = task_id
self.total_steps = total_steps
self.current_step = 0
self.tokens_used = 0
self.started_at = datetime.utcnow().isoformat()
self.last_checkpoint_data = {}
self._subscribers: list[Callable] = []
@property
def progress_percent(self) -> float:
if self.total_steps == 0:
return 0.0
return (self.current_step / self.total_steps) * 100
@property
def elapsed_seconds(self) -> float:
start = datetime.fromisoformat(self.started_at)
return (datetime.utcnow() - start).total_seconds()
def update_step(self, step: int, checkpoint_data: Optional[dict] = None):
"""작업 단계 업데이트 및 체크포인트 저장"""
self.current_step = min(step, self.total_steps)
self.last_checkpoint_data = checkpoint_data or {}
# subscribers에게 변경 사항 알림
for callback in self._subscribers:
callback(self.get_progress())
def add_token_usage(self, tokens: int):
self.tokens_used += tokens
def subscribe(self, callback: Callable):
self._subscribers.append(callback)
def get_progress(self) -> TaskProgress:
return TaskProgress(
task_id=self.task_id,
status=TaskStatus.RUNNING.value,
current_step=self.current_step,
total_steps=self.total_steps,
last_checkpoint=self.last_checkpoint_data,
tokens_used=self.tokens_used,
elapsed_seconds=self.elapsed_seconds,
started_at=self.started_at,
updated_at=datetime.utcnow().isoformat()
)
def to_json(self) -> str:
return json.dumps(self.get_progress(), default=str)
class HolySheepAgent:
"""HolySheep AI를 사용한 장시간 작업 에이전트"""
def __init__(self, api_key: str = HOLYSHEEP_API_KEY):
self.client = AsyncOpenAI(
api_key=api_key,
base_url=BASE_URL
)
self.progress_tracker: Optional[ProgressTracker] = None
self._messages = []
async def execute_long_task(
self,
task_id: str,
user_prompt: str,
max_steps: int = 20,
model: str = "deepseek/deepseek-chat-v3-0324"
):
"""
장시간 작업을 실행하고 진행 상황을 추적
Args:
task_id: 고유 작업 식별자
user_prompt: 작업 지시사항
max_steps: 최대 실행 단계 수
model: 사용할 모델
"""
self.progress_tracker = ProgressTracker(task_id, max_steps)
# Webhook 콜백 시뮬레이션 (실제 환경에서는 Redis/DB에 저장)
self.progress_tracker.subscribe(self._persist_progress)
print(f"[{task_id}] 작업 시작: {user_prompt[:50]}...")
self._messages = [
{"role": "system", "content": self._create_agent_system_prompt()},
{"role": "user", "content": user_prompt}
]
for step in range(max_steps):
self.progress_tracker.update_step(step + 1)
try:
# HolySheep AI API 호출
response = await self.client.chat.completions.create(
model=model,
messages=self._messages,
temperature=0.7,
max_tokens=4096,
stream=True # Streaming으로 토큰 사용량 추적
)
# Streaming 응답 처리
full_response = ""
async for chunk in response:
if chunk.choices[0].delta.content:
full_response += chunk.choices[0].delta.content
self._messages.append({"role": "assistant", "content": full_response})
# 토큰 사용량 추정 (실제 환경에서는 usage 메타데이터 활용)
estimated_tokens = len(user_prompt.split()) * 1.3 + len(full_response.split()) * 1.3
self.progress_tracker.add_token_usage(int(estimated_tokens))
# 작업 완료 여부 판단
if self._is_task_complete(full_response):
self.progress_tracker.update_step(
step + 1,
{"final_response": full_response, "status": "success"}
)
print(f"[{task_id}] 작업 완료! 소요 시간: {self.progress_tracker.elapsed_seconds:.1f}초")
return full_response
except Exception as e:
print(f"[{task_id}] 단계 {step + 1} 오류: {e}")
# 체크포인트 데이터와 함께 실패 기록
self.progress_tracker.update_step(
step + 1,
{"error": str(e), "messages_snapshot": self._messages.copy()}
)
raise
return self._messages[-1]["content"]
def _create_agent_system_prompt(self) -> str:
return """당신은 장기 작업을 수행하는 AI 에이전트입니다.
규칙:
1. 복잡한 작업은 작은 단계로 분할하세요
2. 각 단계 완료 시 [STEP_COMPLETE] 마커를 출력하세요
3. 모든 단계 완료 시 [TASK_COMPLETE]를 출력하고 결과를 요약하세요
4. 실패 시 [STEP_FAILED: 이유]를 출력하세요"""
def _is_task_complete(self, response: str) -> bool:
return "[TASK_COMPLETE]" in response
def _persist_progress(self, progress: TaskProgress):
"""진행 상황을 영구 저장소에 기록"""
# 프로덕션 환경에서는 Redis, PostgreSQL, S3 등에 저장
print(f"[Progress] {progress.task_id}: {progress.progress_percent:.1f}% "
f"({progress.current_step}/{progress.total_steps}) "
f"Tokens: {progress.tokens_used:,}")
사용 예시
async def main():
agent = HolySheepAgent()
try:
result = await agent.execute_long_task(
task_id="research-2024-001",
user_prompt="2024년 AI 분야의 주요 발전사항을 조사하고 요약해주세요",
max_steps=5
)
print(f"최종 결과: {result[:200]}...")
# 최종 진행 상황 출력
if agent.progress_tracker:
final = agent.progress_tracker.get_progress()
print(f"\n=== 작업 요약 ===")
print(f"총 소요 시간: {final.elapsed_seconds:.1f}초")
print(f"총 토큰 사용: {final.tokens_used:,}")
except Exception as e:
print(f"작업 실패: {e}")
# 체크포인트 데이터로 재개 준비
if agent.progress_tracker:
checkpoint = agent.progress_tracker.last_checkpoint_data
print(f"체크포인트 데이터: {json.dumps(checkpoint, indent=2, default=str)}")
if __name__ == "__main__":
asyncio.run(main())
2. 타임아웃 제어 시스템
저의 프로덕션 환경에서 가장 많이 발생했던 문제는 네트워크 타임아웃과 HolySheep AI의 Rate Limit입니다. HolySheep API의 기본 타임아웃은 60초이며, 장시간 작업에서는 이 값을 적절히 관리해야 합니다.
import asyncio
import signal
from typing import Optional, Callable, Any
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
import httpx
class TimeoutStrategy(Enum):
FIXED = "fixed" # 고정 시간 초과
ADAPTIVE = "adaptive" # 작업 크기에 따라 동적 조정
DEADLINE = "deadline" # 절대 마감 시간
@dataclass
class TimeoutConfig:
"""타임아웃 설정"""
strategy: TimeoutStrategy
base_timeout: float = 120.0 # 기본 타임아웃 (초)
max_timeout: float = 600.0 # 최대 타임아웃 (초)
min_timeout: float = 30.0 # 최소 타임아웃 (초)
scale_factor: float = 1.5 # 작업 크기별 배율
@dataclass
class ExecutionResult:
"""실행 결과"""
success: bool
result: Any = None
error: Optional[str] = None
elapsed_seconds: float = 0.0
timeout_occurred: bool = False
retry_count: int = 0
class TimeoutController:
"""적응형 타임아웃 컨트롤러"""
def __init__(self, config: Optional[TimeoutConfig] = None):
self.config = config or TimeoutConfig(
strategy=TimeoutStrategy.ADAPTIVE
)
self._execution_history: list[tuple[int, float]] = []
def calculate_timeout(
self,
estimated_input_tokens: int,
expected_steps: int = 1
) -> float:
"""입력 토큰 수와 예상 단계 수를 기반으로 타임아웃 계산"""
if self.config.strategy == TimeoutStrategy.FIXED:
return self.config.base_timeout
elif self.config.strategy == TimeoutStrategy.ADAPTIVE:
# 토큰 기반 기본 시간 계산
# 1M 토큰 처리 시 평균 30초 가정
token_based_timeout = (estimated_input_tokens / 1_000_000) * 30
# 단계 수에 따른 배율
step_factor = 1 + (expected_steps - 1) * 0.5
# 최종 타임아웃 = (토큰 기반) × (단계 배율) × (기본 배율)
calculated = token_based_timeout * step_factor * self.config.scale_factor
# 범위 제한
return max(
self.config.min_timeout,
min(calculated, self.config.max_timeout)
)
elif self.config.strategy == TimeoutStrategy.DEADLINE:
return self.config.max_timeout
return self.config.base_timeout
def record_execution(self, input_tokens: int, actual_duration: float):
"""실행 결과 기록 (미래 타임아웃 예측 개선용)"""
self._execution_history.append((input_tokens, actual_duration))
# 최근 100개만 유지
if len(self._execution_history) > 100:
self._execution_history.pop(0)
def get_average_latency(self, token_range: tuple[int, int]) -> float:
"""특정 토큰 범위의 평균 지연 시간 반환"""
relevant = [
duration for tokens, duration in self._execution_history
if token_range[0] <= tokens <= token_range[1]
]
return sum(relevant) / len(relevant) if relevant else self.config.base_timeout
class ResilientTaskExecutor:
"""재시도 메커니즘이 포함된 탄력적 작업 실행기"""
def __init__(
self,
timeout_controller: TimeoutController,
max_retries: int = 3,
backoff_factor: float = 2.0
):
self.timeout_controller = timeout_controller
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self._abort_event = asyncio.Event()
def create_timeout_task(self, coro, timeout_seconds: float):
"""지정된 시간 내에 완료되지 않으면 취소되는 태스크 생성"""
return asyncio.wait_for(coro, timeout=timeout_seconds)
async def execute_with_timeout(
self,
coro,
estimated_tokens: int,
expected_steps: int = 1,
on_progress: Optional[Callable] = None
) -> ExecutionResult:
"""
타임아웃과 재시도가 적용된 작업 실행
Args:
coro: 실행할 코루틴
estimated_tokens: 예상 입력 토큰 수
expected_steps: 예상 실행 단계 수
on_progress: 진행 상황 콜백
"""
start_time = datetime.utcnow()
timeout = self.timeout_controller.calculate_timeout(
estimated_tokens, expected_steps
)
for attempt in range(self.max_retries + 1):
try:
print(f"[Execution] 시도 {attempt + 1}/{self.max_retries + 1} "
f"(타임아웃: {timeout:.0f}초)")
result = await asyncio.wait_for(coro, timeout=timeout)
elapsed = (datetime.utcnow() - start_time).total_seconds()
self.timeout_controller.record_execution(estimated_tokens, elapsed)
return ExecutionResult(
success=True,
result=result,
elapsed_seconds=elapsed,
retry_count=attempt
)
except asyncio.TimeoutError:
elapsed = (datetime.utcnow() - start_time).total_seconds()
print(f"[Timeout] {timeout:.0f}초 초과 (경과: {elapsed:.1f}초)")
if attempt < self.max_retries:
#了指數バックオフ
wait_time = min(timeout * (self.backoff_factor ** attempt), 60)
print(f"[Retry] {wait_time:.0f}초 후 재시도...")
await asyncio.sleep(wait_time)
timeout = min(timeout * self.backoff_factor,
self.timeout_controller.config.max_timeout)
else:
return ExecutionResult(
success=False,
error=f"최대 재시도 횟수 초과 ({self.max_retries})",
elapsed_seconds=elapsed,
timeout_occurred=True,
retry_count=attempt
)
except httpx.TimeoutException as e:
print(f"[HTTP Timeout] {e}")
if "rate" in str(e).lower():
# Rate limit의 경우 더 긴 대기 시간
await asyncio.sleep(30)
timeout = self.timeout_controller.config.max_timeout
except Exception as e:
elapsed = (datetime.utcnow() - start_time).total_seconds()
return ExecutionResult(
success=False,
error=str(e),
elapsed_seconds=elapsed,
retry_count=attempt
)
return ExecutionResult(success=False, error="알 수 없는 오류")
def abort(self):
"""실행 중인 작업 취소 요청"""
self._abort_event.set()
print("[Abort] 작업 취소 요청됨")
def reset_abort(self):
"""취소 이벤트 초기화"""
self._abort_event.clear()
통합 사용 예시
async def example_task(task_data: dict) -> str:
"""예시로 10초 소요되는 작업 시뮬레이션"""
await asyncio.sleep(5)
return f"처리 완료: {task_data.get('id', 'unknown')}"
async def main_timeout_example():
# 타임아웃 컨트롤러 초기화
config = TimeoutConfig(
strategy=TimeoutStrategy.ADAPTIVE,
base_timeout=60.0,
max_timeout=300.0,
scale_factor=1.2
)
timeout_ctrl = TimeoutController(config)
executor = ResilientTaskExecutor(timeout_ctrl, max_retries=2)
# 테스트 작업 실행
task_data = {"id": "task-001", "tokens": 50000}
estimated_tokens = task_data["tokens"]
print(f"예상 토큰: {estimated_tokens:,}")
print(f"계산된 타임아웃: {timeout_ctrl.calculate_timeout(estimated_tokens, 3):.0f}초")
result = await executor.execute_with_timeout(
example_task(task_data),
estimated_tokens=estimated_tokens,
expected_steps=3
)
if result.success:
print(f"✓ 성공: {result.result}")
print(f" 소요 시간: {result.elapsed_seconds:.1f}초")
print(f" 재시도 횟수: {result.retry_count}")
else:
print(f"✗ 실패: {result.error}")
print(f" 타임아웃 발생: {result.timeout_occurred}")
if __name__ == "__main__":
asyncio.run(main_timeout_example())
3. 체크포인트 재개 시스템
저는 체크포인트 재개 시스템을 직접 구현하면서 가장 중요한 교훈을 얻었습니다: 체크포인트는 실패 가능 지점에서만 저장해야 합니다. 모든 단계마다 저장하면 I/O 오버헤드가 발생하고, 너무 드물게 저장하면 데이터 손실이 커집니다.
import json
import hashlib
import os
from typing import Optional, Any, Callable
from dataclasses import dataclass, field, asdict
from datetime import datetime
from enum import Enum
import pickle
import asyncio
class CheckpointStrategy(Enum):
STEP_BASED = "step_based" # N단계마다 저장
CONDITION_BASED = "condition_based" # 조건 충족 시 저장
COMPENSATION = "compensation