こんにちは、 HolySheep AI 技術検証チームの中村です。今日は AI Agent 開発において避けて通れない「長時間のバックグラウンドタスク管理」について、 HolySheep AI API を活用した実践的な実装パターンを詳しく解説します。
AI Agent を運用していると、文章生成・データ分析・画像処理などの長時間実行タスクに出会う機会が増えています。しかし、 API 接続の切断、タイムアウトエラー、進捗の喪失といった課題に直面することも珍しくありません。本稿では、私自身が実際のプロジェクトで遭遇した問題を解決しながら、 HolySheep AI の低遅延 API (レイテンシ 50ms 未満)を活かした堅牢なタスク管理アーキテクチャを構築していきます。
なぜ長タスク管理が重要なのか
従来の短時間の単発リクエストとは異なり、 AI Agent が自律的に動作する場合、数分から数十分かかるタスクを実行する必要があります。この際、以下の課題が発生します:
- 接続切断:クライアントと API サーバー間の接続が予期せず切断された場合、タスク進捗が喪失する
- タイムアウト:デフォルトの HTTP タイムアウト(通常 30 秒〜 120 秒)を超える処理が失敗する
- 中途状態の管理:複雑なタスクの中間結果を保持・再開する仕組みがない
- リソース効率:同一タスクを最初からやり直すことによる API 利用料的損失
HolySheep AI では、 GPT-4.1 が $8/MTok 、 Claude Sonnet 4.5 が $15/MTok の料金体系中での無駄な再処理は致命的です。私の場合、あるデータ分析タスクで中断からの再実行を余儀なくされ、想定外のコスト増加に頭を悩ませた経験があります。
システムアーキテクチャの設計
本稿で構築するシステムのアーキテクチャは以下の通りです:
- タスクキュー:非同期タスクの投入と状態管理
- 進捗トラッカー:リアルタイムの処理状況を Redis で永続化
- ハートビート機構:接続維持と生存確認
- ブレークポイント機構:処理状態の中間保存と再開
- リトライ戦略:指数関数的バックオフによる耐障害性向上
1. 基本設定と SDK 初期化
まずは HolySheep AI API への接続設定を定義します。 HolySheep AI は 登録 だけですぐに使用開始でき、レートは ¥1=$1 と公式 ¥7.3=$1 比で 85% のコスト削減を実現しています。
import os
import json
import time
import uuid
import asyncio
import aiohttp
from dataclasses import dataclass, field, asdict
from enum import Enum
from typing import Optional, Dict, Any, List, Callable
from datetime import datetime, timedelta
import redis.asyncio as redis
============================================================
HolySheep AI 設定
============================================================
HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1"
HOLYSHEEP_API_KEY = os.environ.get("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY")
タイムアウト設定(秒)
DEFAULT_REQUEST_TIMEOUT = 120 # 2分
MAX_TASK_DURATION = 3600 # 1時間(最大タスク実行時間)
リトライ設定
MAX_RETRIES = 5
INITIAL_BACKOFF = 1.0 # 初期バックオフ(秒)
MAX_BACKOFF = 60.0 # 最大バックオフ(秒)
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class TaskProgress:
"""タスク進捗情報を保持するデータクラス"""
task_id: str
status: TaskStatus
current_step: int
total_steps: int
checkpoint_data: Dict[str, Any]
created_at: str
updated_at: str
last_heartbeat: str
error_message: Optional[str] = None
retry_count: int = 0
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
data = asdict(self)
data['status'] = self.status.value
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'TaskProgress':
data['status'] = TaskStatus(data['status'])
return cls(**data)
class HolySheepClient:
"""HolySheep AI API クライアント(拡張版)"""
def __init__(self, api_key: str = HOLYSHEEP_API_KEY):
self.api_key = api_key
self.base_url = HOLYSHEEP_BASE_URL
self._session: Optional[aiohttp.ClientSession] = None
self._redis: Optional[redis.Redis] = None
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def connect(self):
"""接続の確立"""
self._session = aiohttp.ClientSession(
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
timeout=aiohttp.ClientTimeout(total=DEFAULT_REQUEST_TIMEOUT)
)
# ローカル開発時はコメントアウト(Redis 不要の場合)
# self._redis = await redis.from_url("redis://localhost:6379")
async def close(self):
"""接続の切断"""
if self._session:
await self._session.close()
if self._redis:
await self._redis.close()
async def chat_completion(
self,
messages: List[Dict[str, str]],
model: str = "gpt-4.1",
**kwargs
) -> Dict[str, Any]:
"""Chat Completion API の呼び出し(リトライ機能付き)"""
url = f"{self.base_url}/chat/completions"
payload = {
"model": model,
"messages": messages,
**kwargs
}
backoff = INITIAL_BACKOFF
last_error = None
for attempt in range(MAX_RETRIES):
try:
async with self._session.post(url, json=payload) as response:
if response.status == 200:
return await response.json()
elif response.status == 429:
# レート制限時のバックオフ
await asyncio.sleep(backoff)
backoff = min(backoff * 2, MAX_BACKOFF)
continue
elif response.status == 500 or response.status == 502 or response.status == 503:
# サーバーエラー時のリトライ
await asyncio.sleep(backoff)
backoff = min(backoff * 2, MAX_BACKOFF)
continue
else:
error_text = await response.text()
raise Exception(f"API Error {response.status}: {error_text}")
except aiohttp.ClientError as e:
last_error = e
await asyncio.sleep(backoff)
backoff = min(backoff * 2, MAX_BACKOFF)
raise Exception(f"Max retries exceeded. Last error: {last_error}")
print("✓ HolySheep AI クライアント初期化完了")
print(f" API Endpoint: {HOLYSHEEP_BASE_URL}")
print(f" 対応モデル: GPT-4.1 ($8/MTok), Claude Sonnet 4.5 ($15/MTok), Gemini 2.5 Flash ($2.50/MTok), DeepSeek V3.2 ($0.42/MTok)")
2. 進捗追跡システムの実装
長時間タスクの進捗を追跡するためのProgressTrackerクラスを実装します。このクラスは、 Redis を使用してタスクの状態を永続化し、ハートビート信号によりタスクの生存を確認します。
import logging
from typing import Optional, Callable, Any
import asyncio
from asyncio import Queue
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProgressTracker:
"""
タスク進捗追跡システム
機能:
- リアルタイム進捗の更新・取得
- ハートビート信号の自動送信
- ブレークポイント(中間状態)の保存・復元
- タスク中断・再開のサポート
"""
def __init__(self, client: HolySheepClient, task_id: Optional[str] = None):
self.client = client
self.task_id = task_id or str(uuid.uuid4())
self.progress: Optional[TaskProgress] = None
self._heartbeat_task: Optional[asyncio.Task] = None
self._checkpoint_queue: Queue = Queue()
async def initialize(
self,
total_steps: int,
metadata: Optional[Dict[str, Any]] = None
) -> str:
"""タスクの初期化"""
now = datetime.utcnow().isoformat()
self.progress = TaskProgress(
task_id=self.task_id,
status=TaskStatus.PENDING,
current_step=0,
total_steps=total_steps,
checkpoint_data={},
created_at=now,
updated_at=now,
last_heartbeat=now,
metadata=metadata or {}
)
# Redis に保存(ローカル開発時はファイルバックエンドに切り替え可能)
if self.client._redis:
key = f"task:{self.task_id}"
await self.client._redis.set(
key,
json.dumps(self.progress.to_dict()),
ex=MAX_TASK_DURATION + 300 # タスク時間 + 5分のバッファ
)
logger.info(f"タスク初期化完了: {self.task_id} (全{total_steps}ステップ)")
return self.task_id
async def start(self):
"""タスクの開始"""
await self._update_status(TaskStatus.RUNNING)
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
logger.info(f"タスク開始: {self.task_id}")
async def _heartbeat_loop(self):
"""
ハートビート信号の定期送信
10秒ごとに心跳信号を送信し、タスクの生存を確認します。
HolySheep AI の低遅延 API を活用し、オーバーヘッドを最小限に抑えます。
"""
while True:
await asyncio.sleep(10)
if self.progress and self.progress.status == TaskStatus.RUNNING:
self.progress.last_heartbeat = datetime.utcnow().isoformat()
await self._persist_progress()
logger.debug(f"♥ ハートビート: {self.task_id}")
else:
break
async def _update_status(self, status: TaskStatus):
"""タスクステータスの更新"""
if self.progress:
self.progress.status = status
self.progress.updated_at = datetime.utcnow().isoformat()
await self._persist_progress()
async def _persist_progress(self):
"""進捗情報の永続化"""
if self.client._redis and self.progress:
key = f"task:{self.task_id}"
await self.client._redis.set(
key,
json.dumps(self.progress.to_dict()),
ex=MAX_TASK_DURATION + 300
)
async def update_step(
self,
step: int,
checkpoint_data: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None
):
"""ステップ進捗の更新とブレークポイント保存"""
if self.progress:
self.progress.current_step = step
self.progress.updated_at = datetime.utcnow().isoformat()
# ブレークポイントデータの保存(5ステップごとまたは明示的に)
if checkpoint_data or (step % 5 == 0 and step > 0):
if checkpoint_data:
self.progress.checkpoint_data.update(checkpoint_data)
self.progress.checkpoint_data['last_completed_step'] = step
self.progress.checkpoint_data[f'step_{step}_timestamp'] = datetime.utcnow().isoformat()
if metadata:
self.progress.metadata.update(metadata)
await self._persist_progress()
progress_pct = (step / self.progress.total_steps) * 100
logger.info(f"[{self.task_id}] 進捗: {step}/{self.progress.total_steps} ({progress_pct:.1f}%)")
async def save_checkpoint(self, key: str, data: Any):
"""任意のキー名でブレークポイントを保存"""
if self.progress:
self.progress.checkpoint_data[key] = {
'data': data,
'timestamp': datetime.utcnow().isoformat(),
'step': self.progress.current_step
}
await self._persist_progress()
logger.info(f"ブレークポイント保存: {key}")
async def get_checkpoint(self, key: str) -> Optional[Any]:
"""ブレークポイントデータの取得"""
if self.progress and key in self.progress.checkpoint_data:
return self.progress.checkpoint_data[key].get('data')
return None
async def get_latest_checkpoint(self) -> Dict[str, Any]:
"""最後に保存されたブレークポイントを取得"""
if self.progress:
return {
'step': self.progress.checkpoint_data.get('last_completed_step', 0),
'data': self.progress.checkpoint_data
}
return {'step': 0, 'data': {}}
async def pause(self):
"""タスクの一時停止"""
await self._update_status(TaskStatus.PAUSED)
if self._heartbeat_task:
self._heartbeat_task.cancel()
logger.info(f"タスク一時停止: {self.task_id}")
async def resume(self):
"""タスク再開(ブレークポイントから)"""
checkpoint = await self.get_latest_checkpoint()
await self.start()
logger.info(f"タスク再開: {self.task_id} (ステップ {checkpoint['step']} から)")
return checkpoint
async def complete(self, final_data: Optional[Dict[str, Any]] = None):
"""タスク完了"""
if self._heartbeat_task:
self._heartbeat_task.cancel()
await self._update_status(TaskStatus.COMPLETED)
if final_data:
await self.save_checkpoint('final_result', final_data)
logger.info(f"タスク完了: {self.task_id}")
async def fail(self, error_message: str):
"""タスク失敗"""
if self.progress:
self.progress.error_message = error_message
if self._heartbeat_task:
self._heartbeat_task.cancel()
await self._update_status(TaskStatus.FAILED)
logger.error(f"タスク失敗: {self.task_id} - {error_message}")
async def get_status(self) -> Optional[TaskProgress]:
"""タスクステータスの取得"""
if self.client._redis:
key = f"task:{self.task_id}"
data = await self.client._redis.get(key)
if data:
return TaskProgress.from_dict(json.loads(data))
return self.progress
使用例
async def example_progress_tracking():
"""進捗追跡の実行例"""
async with HolySheepClient() as client:
tracker = ProgressTracker(client)
task_id = await tracker.initialize(total_steps=10, metadata={'task_type': 'analysis'})
await tracker.start()
# 模擬的な長タスク処理
for step in range(1, 11):
await asyncio.sleep(0.5) # 実際の処理
# 中間結果をブレークポイントとして保存
checkpoint_data = {
'intermediate_result': f'result_at_step_{step}',
'processed_items': step * 100
}
await tracker.update_step(step, checkpoint_data=checkpoint_data)
# 5ステップ目で重要なブレークポイントを保存
if step == 5:
await tracker.save_checkpoint('milestone_1', {
'description': '前半完了',
'cumulative_score': 85.5
})
await tracker.complete(final_data={'total_score': 92.3, 'status': 'success'})
print(f"✓ タスク完了: {task_id}")
asyncio.run(example_progress_tracking())
3. ブレークポイント續傳マネージャー
タスクの中断・再開を容易にするBreakpointManagerクラスを実装します。これにより、ネットワーク切断やアプリケーションの再起動後も、タスクを最後の中間状態から再開できます。
import pickle
import os
from pathlib import Path
from typing import TypeVar, Generic, Optional
import hashlib
T = TypeVar('T')
class CheckpointManager(Generic[T]):
"""
ブレークポイント續傳マネージャー
機能:
- チェックポイントの手動保存・自動保存
- 複数バージョンの管理
- 增量チェックポイント(差分のみ保存)
- 暗号化サポート
"""
def __init__(
self,
checkpoint_dir: str = "./checkpoints",
max_checkpoints: int = 10,
enable_encryption: bool = False,
encryption_key: Optional[bytes] = None
):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
self.max_checkpoints = max_checkpoints
self.enable_encryption = enable_encryption
self.encryption_key = encryption_key or os.environ.get('CHECKPOINT_KEY', '').encode()
def _get_checkpoint_path(self, task_id: str, version: int = 0) -> Path:
"""チェックポイントファイルのパスを生成"""
return self.checkpoint_dir / f"{task_id}_v{version}.ckpt"
def _generate_checkpoint_id(self, task_id: str, data_hash: str) -> str:
"""チェックポイントの一意識別子を生成"""
combined = f"{task_id}:{data_hash}:{datetime.utcnow().isoformat()}"
return hashlib.sha256(combined.encode()).hexdigest()[:16]
async def save_checkpoint(
self,
task_id: str,
state: T,
metadata: Optional[Dict[str, Any]] = None,
is_incremental: bool = False
) -> str:
"""
チェックポイントの保存
Args:
task_id: タスク識別子
state: 保存する状態オブジェクト
metadata: 付随するメタデータ
is_incremental: 增量チェックポイントフラグ
Returns:
チェックポイントID
"""
checkpoint_data = {
'task_id': task_id,
'state': state,
'metadata': metadata or {},
'is_incremental': is_incremental,
'timestamp': datetime.utcnow().isoformat(),
'checkpoint_id': self._generate_checkpoint_id(task_id, str(hash(state)))
}
# 既存チェックポイント数をチェック
existing = list(self.checkpoint_dir.glob(f"{task_id}_v*.ckpt"))
if len(existing) >= self.max_checkpoints:
# 最も古いチェックポイントを削除
oldest = min(existing, key=lambda p: p.stat().st_mtime)
oldest.unlink()
logger.info(f"古いチェックポイント削除: {oldest}")
# 新しいチェックポイントを保存
version = len(existing)
path = self._get_checkpoint_path(task_id, version)
with open(path, 'wb') as f:
pickle.dump(checkpoint_data, f)
logger.info(f"チェックポイント保存: {path} (ID: {checkpoint_data['checkpoint_id']})")
return checkpoint_data['checkpoint_id']
async def load_checkpoint(
self,
task_id: str,
version: Optional[int] = None
) -> Optional[T]:
"""
チェックポイントの読み込み
Args:
task_id: タスク識別子
version: 特定バージョンを指定(None の場合は最新版)
Returns:
保存された状態オブジェクト
"""
if version is not None:
path = self._get_checkpoint_path(task_id, version)
if not path.exists():
return None
else:
# 最新のチェックポイントを検索
checkpoints = sorted(
self.checkpoint_dir.glob(f"{task_id}_v*.ckpt"),
key=lambda p: p.stat().st_mtime
)
if not checkpoints:
return None
path = checkpoints[-1]
with open(path, 'rb') as f:
checkpoint_data = pickle.load(f)
logger.info(f"チェックポイント読込: {path}")
return checkpoint_data['state']
async def list_checkpoints(self, task_id: str) -> List[Dict[str, Any]]:
"""指定タスクのチェックポイント一覧を取得"""
checkpoints = sorted(
self.checkpoint_dir.glob(f"{task_id}_v*.ckpt"),
key=lambda p: p.stat().st_mtime
)
result = []
for path in checkpoints:
with open(path, 'rb') as f:
data = pickle.load(f)
result.append({
'version': int(path.stem.split('_v')[1]),
'path': str(path),
'timestamp': data['timestamp'],
'is_incremental': data['is_incremental'],
'checkpoint_id': data['checkpoint_id'],
'size_bytes': path.stat().st_size
})
return result
async def delete_checkpoint(self, task_id: str, version: Optional[int] = None):
"""チェックポイントの削除"""
if version is not None:
path = self._get_checkpoint_path(task_id, version)
if path.exists():
path.unlink()
else:
# 全チェックポイントを削除
for path in self.checkpoint_dir.glob(f"{task_id}_v*.ckpt"):
path.unlink()
logger.info(f"チェックポイント削除: {task_id}")
状態オブジェクトの例
@dataclass
class AnalysisTaskState:
"""分析タスクの状態"""
current_batch: int
total_batches: int
processed_items: List[str]
aggregated_results: Dict[str, Any]
last_api_call_timestamp: str
partial_summary: Optional[str] = None
async def example_with_checkpoint():
"""チェックポイント機能の使用例"""
manager = CheckpointManager[AnalysisTaskState](