こんにちは、 HolySheep AI 技術検証チームの中村です。今日は AI Agent 開発において避けて通れない「長時間のバックグラウンドタスク管理」について、 HolySheep AI API を活用した実践的な実装パターンを詳しく解説します。

AI Agent を運用していると、文章生成・データ分析・画像処理などの長時間実行タスクに出会う機会が増えています。しかし、 API 接続の切断、タイムアウトエラー、進捗の喪失といった課題に直面することも珍しくありません。本稿では、私自身が実際のプロジェクトで遭遇した問題を解決しながら、 HolySheep AI の低遅延 API (レイテンシ 50ms 未満)を活かした堅牢なタスク管理アーキテクチャを構築していきます。

なぜ長タスク管理が重要なのか

従来の短時間の単発リクエストとは異なり、 AI Agent が自律的に動作する場合、数分から数十分かかるタスクを実行する必要があります。この際、以下の課題が発生します:

HolySheep AI では、 GPT-4.1 が $8/MTok 、 Claude Sonnet 4.5 が $15/MTok の料金体系中での無駄な再処理は致命的です。私の場合、あるデータ分析タスクで中断からの再実行を余儀なくされ、想定外のコスト増加に頭を悩ませた経験があります。

システムアーキテクチャの設計

本稿で構築するシステムのアーキテクチャは以下の通りです:

1. 基本設定と SDK 初期化

まずは HolySheep AI API への接続設定を定義します。 HolySheep AI は 登録 だけですぐに使用開始でき、レートは ¥1=$1 と公式 ¥7.3=$1 比で 85% のコスト削減を実現しています。

import os
import json
import time
import uuid
import asyncio
import aiohttp
from dataclasses import dataclass, field, asdict
from enum import Enum
from typing import Optional, Dict, Any, List, Callable
from datetime import datetime, timedelta
import redis.asyncio as redis

============================================================

HolySheep AI 設定

============================================================

HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1" HOLYSHEEP_API_KEY = os.environ.get("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY")

タイムアウト設定(秒)

DEFAULT_REQUEST_TIMEOUT = 120 # 2分 MAX_TASK_DURATION = 3600 # 1時間(最大タスク実行時間)

リトライ設定

MAX_RETRIES = 5 INITIAL_BACKOFF = 1.0 # 初期バックオフ(秒) MAX_BACKOFF = 60.0 # 最大バックオフ(秒) class TaskStatus(Enum): PENDING = "pending" RUNNING = "running" PAUSED = "paused" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" @dataclass class TaskProgress: """タスク進捗情報を保持するデータクラス""" task_id: str status: TaskStatus current_step: int total_steps: int checkpoint_data: Dict[str, Any] created_at: str updated_at: str last_heartbeat: str error_message: Optional[str] = None retry_count: int = 0 metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: data = asdict(self) data['status'] = self.status.value return data @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'TaskProgress': data['status'] = TaskStatus(data['status']) return cls(**data) class HolySheepClient: """HolySheep AI API クライアント(拡張版)""" def __init__(self, api_key: str = HOLYSHEEP_API_KEY): self.api_key = api_key self.base_url = HOLYSHEEP_BASE_URL self._session: Optional[aiohttp.ClientSession] = None self._redis: Optional[redis.Redis] = None async def __aenter__(self): await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def connect(self): """接続の確立""" self._session = aiohttp.ClientSession( headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" }, timeout=aiohttp.ClientTimeout(total=DEFAULT_REQUEST_TIMEOUT) ) # ローカル開発時はコメントアウト(Redis 不要の場合) # self._redis = await redis.from_url("redis://localhost:6379") async def close(self): """接続の切断""" if self._session: await self._session.close() if self._redis: await self._redis.close() async def chat_completion( self, messages: List[Dict[str, str]], model: str = "gpt-4.1", **kwargs ) -> Dict[str, Any]: """Chat Completion API の呼び出し(リトライ機能付き)""" url = f"{self.base_url}/chat/completions" payload = { "model": model, "messages": messages, **kwargs } backoff = INITIAL_BACKOFF last_error = None for attempt in range(MAX_RETRIES): try: async with self._session.post(url, json=payload) as response: if response.status == 200: return await response.json() elif response.status == 429: # レート制限時のバックオフ await asyncio.sleep(backoff) backoff = min(backoff * 2, MAX_BACKOFF) continue elif response.status == 500 or response.status == 502 or response.status == 503: # サーバーエラー時のリトライ await asyncio.sleep(backoff) backoff = min(backoff * 2, MAX_BACKOFF) continue else: error_text = await response.text() raise Exception(f"API Error {response.status}: {error_text}") except aiohttp.ClientError as e: last_error = e await asyncio.sleep(backoff) backoff = min(backoff * 2, MAX_BACKOFF) raise Exception(f"Max retries exceeded. Last error: {last_error}") print("✓ HolySheep AI クライアント初期化完了") print(f" API Endpoint: {HOLYSHEEP_BASE_URL}") print(f" 対応モデル: GPT-4.1 ($8/MTok), Claude Sonnet 4.5 ($15/MTok), Gemini 2.5 Flash ($2.50/MTok), DeepSeek V3.2 ($0.42/MTok)")

2. 進捗追跡システムの実装

長時間タスクの進捗を追跡するためのProgressTrackerクラスを実装します。このクラスは、 Redis を使用してタスクの状態を永続化し、ハートビート信号によりタスクの生存を確認します。

import logging
from typing import Optional, Callable, Any
import asyncio
from asyncio import Queue

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class ProgressTracker:
    """
    タスク進捗追跡システム
    
    機能:
    - リアルタイム進捗の更新・取得
    - ハートビート信号の自動送信
    - ブレークポイント(中間状態)の保存・復元
    - タスク中断・再開のサポート
    """
    
    def __init__(self, client: HolySheepClient, task_id: Optional[str] = None):
        self.client = client
        self.task_id = task_id or str(uuid.uuid4())
        self.progress: Optional[TaskProgress] = None
        self._heartbeat_task: Optional[asyncio.Task] = None
        self._checkpoint_queue: Queue = Queue()
        
    async def initialize(
        self,
        total_steps: int,
        metadata: Optional[Dict[str, Any]] = None
    ) -> str:
        """タスクの初期化"""
        now = datetime.utcnow().isoformat()
        self.progress = TaskProgress(
            task_id=self.task_id,
            status=TaskStatus.PENDING,
            current_step=0,
            total_steps=total_steps,
            checkpoint_data={},
            created_at=now,
            updated_at=now,
            last_heartbeat=now,
            metadata=metadata or {}
        )
        
        # Redis に保存(ローカル開発時はファイルバックエンドに切り替え可能)
        if self.client._redis:
            key = f"task:{self.task_id}"
            await self.client._redis.set(
                key,
                json.dumps(self.progress.to_dict()),
                ex=MAX_TASK_DURATION + 300  # タスク時間 + 5分のバッファ
            )
            
        logger.info(f"タスク初期化完了: {self.task_id} (全{total_steps}ステップ)")
        return self.task_id
    
    async def start(self):
        """タスクの開始"""
        await self._update_status(TaskStatus.RUNNING)
        self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
        logger.info(f"タスク開始: {self.task_id}")
    
    async def _heartbeat_loop(self):
        """
        ハートビート信号の定期送信
        
        10秒ごとに心跳信号を送信し、タスクの生存を確認します。
        HolySheep AI の低遅延 API を活用し、オーバーヘッドを最小限に抑えます。
        """
        while True:
            await asyncio.sleep(10)
            if self.progress and self.progress.status == TaskStatus.RUNNING:
                self.progress.last_heartbeat = datetime.utcnow().isoformat()
                await self._persist_progress()
                logger.debug(f"♥ ハートビート: {self.task_id}")
            else:
                break
                
    async def _update_status(self, status: TaskStatus):
        """タスクステータスの更新"""
        if self.progress:
            self.progress.status = status
            self.progress.updated_at = datetime.utcnow().isoformat()
            await self._persist_progress()
            
    async def _persist_progress(self):
        """進捗情報の永続化"""
        if self.client._redis and self.progress:
            key = f"task:{self.task_id}"
            await self.client._redis.set(
                key,
                json.dumps(self.progress.to_dict()),
                ex=MAX_TASK_DURATION + 300
            )
            
    async def update_step(
        self,
        step: int,
        checkpoint_data: Optional[Dict[str, Any]] = None,
        metadata: Optional[Dict[str, Any]] = None
    ):
        """ステップ進捗の更新とブレークポイント保存"""
        if self.progress:
            self.progress.current_step = step
            self.progress.updated_at = datetime.utcnow().isoformat()
            
            # ブレークポイントデータの保存(5ステップごとまたは明示的に)
            if checkpoint_data or (step % 5 == 0 and step > 0):
                if checkpoint_data:
                    self.progress.checkpoint_data.update(checkpoint_data)
                self.progress.checkpoint_data['last_completed_step'] = step
                self.progress.checkpoint_data[f'step_{step}_timestamp'] = datetime.utcnow().isoformat()
                
            if metadata:
                self.progress.metadata.update(metadata)
                
            await self._persist_progress()
            progress_pct = (step / self.progress.total_steps) * 100
            logger.info(f"[{self.task_id}] 進捗: {step}/{self.progress.total_steps} ({progress_pct:.1f}%)")
            
    async def save_checkpoint(self, key: str, data: Any):
        """任意のキー名でブレークポイントを保存"""
        if self.progress:
            self.progress.checkpoint_data[key] = {
                'data': data,
                'timestamp': datetime.utcnow().isoformat(),
                'step': self.progress.current_step
            }
            await self._persist_progress()
            logger.info(f"ブレークポイント保存: {key}")
            
    async def get_checkpoint(self, key: str) -> Optional[Any]:
        """ブレークポイントデータの取得"""
        if self.progress and key in self.progress.checkpoint_data:
            return self.progress.checkpoint_data[key].get('data')
        return None
    
    async def get_latest_checkpoint(self) -> Dict[str, Any]:
        """最後に保存されたブレークポイントを取得"""
        if self.progress:
            return {
                'step': self.progress.checkpoint_data.get('last_completed_step', 0),
                'data': self.progress.checkpoint_data
            }
        return {'step': 0, 'data': {}}
        
    async def pause(self):
        """タスクの一時停止"""
        await self._update_status(TaskStatus.PAUSED)
        if self._heartbeat_task:
            self._heartbeat_task.cancel()
        logger.info(f"タスク一時停止: {self.task_id}")
        
    async def resume(self):
        """タスク再開(ブレークポイントから)"""
        checkpoint = await self.get_latest_checkpoint()
        await self.start()
        logger.info(f"タスク再開: {self.task_id} (ステップ {checkpoint['step']} から)")
        return checkpoint
        
    async def complete(self, final_data: Optional[Dict[str, Any]] = None):
        """タスク完了"""
        if self._heartbeat_task:
            self._heartbeat_task.cancel()
        await self._update_status(TaskStatus.COMPLETED)
        if final_data:
            await self.save_checkpoint('final_result', final_data)
        logger.info(f"タスク完了: {self.task_id}")
        
    async def fail(self, error_message: str):
        """タスク失敗"""
        if self.progress:
            self.progress.error_message = error_message
        if self._heartbeat_task:
            self._heartbeat_task.cancel()
        await self._update_status(TaskStatus.FAILED)
        logger.error(f"タスク失敗: {self.task_id} - {error_message}")
        
    async def get_status(self) -> Optional[TaskProgress]:
        """タスクステータスの取得"""
        if self.client._redis:
            key = f"task:{self.task_id}"
            data = await self.client._redis.get(key)
            if data:
                return TaskProgress.from_dict(json.loads(data))
        return self.progress


使用例

async def example_progress_tracking(): """進捗追跡の実行例""" async with HolySheepClient() as client: tracker = ProgressTracker(client) task_id = await tracker.initialize(total_steps=10, metadata={'task_type': 'analysis'}) await tracker.start() # 模擬的な長タスク処理 for step in range(1, 11): await asyncio.sleep(0.5) # 実際の処理 # 中間結果をブレークポイントとして保存 checkpoint_data = { 'intermediate_result': f'result_at_step_{step}', 'processed_items': step * 100 } await tracker.update_step(step, checkpoint_data=checkpoint_data) # 5ステップ目で重要なブレークポイントを保存 if step == 5: await tracker.save_checkpoint('milestone_1', { 'description': '前半完了', 'cumulative_score': 85.5 }) await tracker.complete(final_data={'total_score': 92.3, 'status': 'success'}) print(f"✓ タスク完了: {task_id}")

asyncio.run(example_progress_tracking())

3. ブレークポイント續傳マネージャー

タスクの中断・再開を容易にするBreakpointManagerクラスを実装します。これにより、ネットワーク切断やアプリケーションの再起動後も、タスクを最後の中間状態から再開できます。

import pickle
import os
from pathlib import Path
from typing import TypeVar, Generic, Optional
import hashlib

T = TypeVar('T')


class CheckpointManager(Generic[T]):
    """
    ブレークポイント續傳マネージャー
    
    機能:
    - チェックポイントの手動保存・自動保存
    - 複数バージョンの管理
    - 增量チェックポイント(差分のみ保存)
    - 暗号化サポート
    """
    
    def __init__(
        self,
        checkpoint_dir: str = "./checkpoints",
        max_checkpoints: int = 10,
        enable_encryption: bool = False,
        encryption_key: Optional[bytes] = None
    ):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
        self.max_checkpoints = max_checkpoints
        self.enable_encryption = enable_encryption
        self.encryption_key = encryption_key or os.environ.get('CHECKPOINT_KEY', '').encode()
        
    def _get_checkpoint_path(self, task_id: str, version: int = 0) -> Path:
        """チェックポイントファイルのパスを生成"""
        return self.checkpoint_dir / f"{task_id}_v{version}.ckpt"
    
    def _generate_checkpoint_id(self, task_id: str, data_hash: str) -> str:
        """チェックポイントの一意識別子を生成"""
        combined = f"{task_id}:{data_hash}:{datetime.utcnow().isoformat()}"
        return hashlib.sha256(combined.encode()).hexdigest()[:16]
    
    async def save_checkpoint(
        self,
        task_id: str,
        state: T,
        metadata: Optional[Dict[str, Any]] = None,
        is_incremental: bool = False
    ) -> str:
        """
        チェックポイントの保存
        
        Args:
            task_id: タスク識別子
            state: 保存する状態オブジェクト
            metadata: 付随するメタデータ
            is_incremental: 增量チェックポイントフラグ
            
        Returns:
            チェックポイントID
        """
        checkpoint_data = {
            'task_id': task_id,
            'state': state,
            'metadata': metadata or {},
            'is_incremental': is_incremental,
            'timestamp': datetime.utcnow().isoformat(),
            'checkpoint_id': self._generate_checkpoint_id(task_id, str(hash(state)))
        }
        
        # 既存チェックポイント数をチェック
        existing = list(self.checkpoint_dir.glob(f"{task_id}_v*.ckpt"))
        if len(existing) >= self.max_checkpoints:
            # 最も古いチェックポイントを削除
            oldest = min(existing, key=lambda p: p.stat().st_mtime)
            oldest.unlink()
            logger.info(f"古いチェックポイント削除: {oldest}")
            
        # 新しいチェックポイントを保存
        version = len(existing)
        path = self._get_checkpoint_path(task_id, version)
        
        with open(path, 'wb') as f:
            pickle.dump(checkpoint_data, f)
            
        logger.info(f"チェックポイント保存: {path} (ID: {checkpoint_data['checkpoint_id']})")
        return checkpoint_data['checkpoint_id']
    
    async def load_checkpoint(
        self,
        task_id: str,
        version: Optional[int] = None
    ) -> Optional[T]:
        """
        チェックポイントの読み込み
        
        Args:
            task_id: タスク識別子
            version: 特定バージョンを指定(None の場合は最新版)
            
        Returns:
            保存された状態オブジェクト
        """
        if version is not None:
            path = self._get_checkpoint_path(task_id, version)
            if not path.exists():
                return None
        else:
            # 最新のチェックポイントを検索
            checkpoints = sorted(
                self.checkpoint_dir.glob(f"{task_id}_v*.ckpt"),
                key=lambda p: p.stat().st_mtime
            )
            if not checkpoints:
                return None
            path = checkpoints[-1]
            
        with open(path, 'rb') as f:
            checkpoint_data = pickle.load(f)
            
        logger.info(f"チェックポイント読込: {path}")
        return checkpoint_data['state']
    
    async def list_checkpoints(self, task_id: str) -> List[Dict[str, Any]]:
        """指定タスクのチェックポイント一覧を取得"""
        checkpoints = sorted(
            self.checkpoint_dir.glob(f"{task_id}_v*.ckpt"),
            key=lambda p: p.stat().st_mtime
        )
        
        result = []
        for path in checkpoints:
            with open(path, 'rb') as f:
                data = pickle.load(f)
            result.append({
                'version': int(path.stem.split('_v')[1]),
                'path': str(path),
                'timestamp': data['timestamp'],
                'is_incremental': data['is_incremental'],
                'checkpoint_id': data['checkpoint_id'],
                'size_bytes': path.stat().st_size
            })
        return result
    
    async def delete_checkpoint(self, task_id: str, version: Optional[int] = None):
        """チェックポイントの削除"""
        if version is not None:
            path = self._get_checkpoint_path(task_id, version)
            if path.exists():
                path.unlink()
        else:
            # 全チェックポイントを削除
            for path in self.checkpoint_dir.glob(f"{task_id}_v*.ckpt"):
                path.unlink()
        logger.info(f"チェックポイント削除: {task_id}")


状態オブジェクトの例

@dataclass class AnalysisTaskState: """分析タスクの状態""" current_batch: int total_batches: int processed_items: List[str] aggregated_results: Dict[str, Any] last_api_call_timestamp: str partial_summary: Optional[str] = None async def example_with_checkpoint(): """チェックポイント機能の使用例""" manager = CheckpointManager[AnalysisTaskState](