Multi-agentシステムにおいて、エージェント間のhandoff(受け渡し)機構は複雑なタスクを処理する上で極めて重要です。本稿では、CrewAIにおけるAgent間通信プロトコルの実装方法、具体的なエラーシナリオとその解決策について解説します。
私はHolySheep AIのAPIを使用して、複数のAIモデルを低コストで組み合わせたマルチエージェントシステムを構築しています。APIコストが85%節約できるため、本番環境での экспериメントもしやすいですね。
CrewAI Handoffsの基本概念
CrewAIのhandoff機能は、あるエージェントから別のエージェントへコンテキストと制御を渡すメカニズムです。主なユースケースとして以下があります:
- специализирован агентへのタスク委譲
- 단계별処理パイプラインの構築
- conditional branching(条件分岐)
実装サンプル:基本的なHandoff設定
import os
from crewai import Agent, Task, Crew
from langchain_openai import ChatOpenAI
HolySheep AI のエンドポイント設定
os.environ["OPENAI_API_BASE"] = "https://api.holysheep.ai/v1"
os.environ["OPENAI_API_KEY"] = "YOUR_HOLYSHEEP_API_KEY"
LLM設定(DeepSeek V3.2を使用 - $0.42/MTok)
llm = ChatOpenAI(
model="deepseek-chat",
temperature=0.7,
api_key=os.environ["OPENAI_API_KEY"],
base_url=os.environ["OPENAI_API_BASE"]
)
最初のエージェント:質問分析担当
researcher = Agent(
role="Research Analyst",
goal="ユーザーからの入力を分析し、必要な情報を特定する",
backstory="あなたの使命は、ユーザーのニーズを正確に理解し、"
"次にどの специалист にタスクを向けるかを決定することです。",
llm=llm,
verbose=True
)
2番目のエージェント:技術的な回答担当
technical_expert = Agent(
role="Technical Expert",
goal="技術的な質問に対して正確で詳細な回答を提供する",
backstory="深い技術知識を持ち、複雑な問題でも明確に説明できます。",
llm=llm,
verbose=True
)
3番目のエージェント:一般回答担当
general_expert = Agent(
role="General Assistant",
goal="一般的な質問に対して分かりやすく回答する",
backstory="幅広い知識を持ち、丁寧な言葉でサポートいたします。",
llm=llm,
verbose=True
)
ユーザー入力を分析して適切なエージェントにhandoff
def analyze_and_route(user_input: str):
"""入力内容に基づいて適切なエージェントを選択"""
technical_keywords = ["api", "code", "python", "programming", "error", "bug"]
if any(keyword in user_input.lower() for keyword in technical_keywords):
return technical_expert
return general_expert
print("Handoff agents configured successfully!")
実践的なHandoffプロトコル実装
from crewai import Agent, Task, Crew, Process
from typing import Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class HandoffProtocol:
"""CrewAI エージェント間handoffを管理するプロトコル"""
def __init__(self, llm):
self.llm = llm
self.context_history = []
def create_handoff_task(
self,
from_agent: Agent,
to_agent: Agent,
task_description: str,
context: dict
) -> Task:
"""エージェント間のhandoffタスクを生成"""
# コンテキスト передается через task description
enhanced_description = f"""
{task_description}
前のエージェントからのコンテキスト:
- 送信元: {from_agent.role}
- 優先度: {context.get('priority', 'normal')}
- 追加情報: {context.get('notes', 'なし')}
このタスクを継続して完了してください。
"""
return Task(
description=enhanced_description,
agent=to_agent,
expected_output="完全な回答と、次のエージェントへのhandoff提案(該当する場合)"
)
def execute_sequential_handoff(
self,
agents: list[Agent],
initial_task: str,
max_turns: int = 5
):
"""順番にhandoffを実行するパイプライン"""
crew = Crew(
agents=agents,
tasks=[],
verbose=True,
process=Process.sequential
)
# 最初のタスクを追加
initial_task_obj = Task(
description=initial_task,
agent=agents[0],
expected_output="分析結果と次のエージェントへのhandoff"
)
crew.tasks.append(initial_task_obj)
logger.info(f"Executing handoff pipeline with {len(agents)} agents")
result = crew.kickoff()
return result
使用例
protocol = HandoffProtocol(llm)
print(f"Latency target: <50ms for API calls")
よくあるエラーと対処法
エラー1: ConnectionError: timeout - API応答待ちでタイムアウト
# 問題: API呼び出しがタイムアウトする
原因: ネットワーク遅延、またはAPIエンドポイント不通
解決方法:リクエストタイムアウトとリトライロジックを追加
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session_with_retry():
"""リトライ機能付きのセッションを作成"""
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
タイムアウト設定(秒)
TIMEOUT = (5, 30) # (connect_timeout, read_timeout)
def call_holysheep_api(prompt: str, model: str = "deepseek-chat"):
"""HolySheep AI API呼び出し(タイムアウト対策済み)"""
import os
session = create_session_with_retry()
response = session.post(
"https://api.holysheep.ai/v1/chat/completions",
headers={
"Authorization": f"Bearer {os.environ['HOLYSHEEP_API_KEY']}",
"Content-Type": "application/json"
},
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.7
},
timeout=TIMEOUT
)
response.raise_for_status()
return response.json()
使用例
try:
result = call_holysheep_api("こんにちは")
print(result)
except requests.exceptions.Timeout:
print("タイムアウト: リクエスト時間を延長してください")
except requests.exceptions.ConnectionError:
print("接続エラー: ネットワークまたはエンドポイントを確認してください")
エラー2: 401 Unauthorized - APIキーが無効または期限切れ
# 問題: API呼び出し時に401エラーが発生する
原因: APIキーが無効、有効期限切れ、または権限不足
解決方法:APIキーの検証と適切なエラーメッセージ
import os
from functools import wraps
def validate_api_key(func):
"""APIキーの有効性を検証するデコレーター"""
@wraps(func)
def wrapper(*args, **kwargs):
api_key = os.environ.get('HOLYSHEEP_API_KEY')
if not api_key:
raise ValueError(
"HOLYSHEEP_API_KEYが設定されていません。"
"https://www.holysheep.ai/register でAPIキーを取得してください。"
)
if api_key == "YOUR_HOLYSHEEP_API_KEY":
raise ValueError(
"APIキーがデフォルト値のままでしています。"
"有効なAPIキーに置き換えてください。"
)
return func(*args, **kwargs)
return wrapper
@validate_api_key
def initialize_crewai_agent():
"""APIキーを検証してからCrewAIエージェントを初期化"""
from crewai import Agent
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
model="deepseek-chat",
api_key=os.environ["HOLYSHEEP_API_KEY"],
base_url="https://api.holysheep.ai/v1"
)
agent = Agent(
role="Assistant",
goal="ユーザー 지원하는",
backstory="친근한 도우미입니다.",
llm=llm
)
return agent
認証エラーの詳細処理
def handle_auth_error(response):
"""401エラーの詳細な処理"""
if response.status_code == 401:
error_detail = response.json().get('error', {})
if 'invalid_api_key' in str(error_detail):
return "無効なAPIキーです。APIキーを確認してください。"
elif 'expired' in str(error_detail):
return "APIキーの有効期限が切れています。"
else:
return f"認証エラー: {error_detail}"
return None
エラー3: RateLimitError - レート制限超過
# 問題: API呼び出し時に429 Rate Limitエラーが発生する
原因: 短時間过多的リクエストを送信した
解決方法:レート制限対策の実装
import time
import asyncio
from collections import deque
from datetime import datetime, timedelta
class RateLimiter:
"""API呼び出しのレートを制限するクラス"""
def __init__(self, max_requests: int = 60, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque()
def wait_if_needed(self):
"""必要に応じて待機"""
now = datetime.now()
cutoff = now - timedelta(seconds=self.window_seconds)
# ウィンドウ外の古いリクエストを削除
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
# レート制限に達した場合
if len(self.requests) >= self.max_requests:
sleep_time = (self.requests[0] - cutoff).total_seconds()
print(f"Rate limit reached. Waiting {sleep_time:.1f} seconds...")
time.sleep(sleep_time)
self.requests.append(now)
def execute_with_limit(self, func, *args, **kwargs):
"""レート制限付きで関数を実行"""
self.wait_if_needed()
return func(*args, **kwargs)
非同期バージョンのレートリミッター
class AsyncRateLimiter:
def __init__(self, max_requests: int = 60, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque()
self.semaphore = asyncio.Semaphore(max_requests)
async def execute(self, func, *args, **kwargs):
"""非同期関数のみ対応"""
async with self.semaphore:
self._cleanup_old_requests()
if len(self.requests) >= self.max_requests:
wait_time = self._calculate_wait_time()
print(f"Waiting {wait_time:.2f}s for rate limit...")
await asyncio.sleep(wait_time)
self.requests.append(datetime.now())
return await func(*args, **kwargs)
def _cleanup_old_requests(self):
now = datetime.now()
cutoff = now - timedelta(seconds=self.window_seconds)
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
def _calculate_wait_time(self):
if not self.requests:
return 0
oldest = self.requests[0]
return max(0, (oldest - timedelta(seconds=self.window_seconds)).total_seconds())
使用例
limiter = RateLimiter(max_requests=30, window_seconds=60)
def call_api_with_rate_limit(prompt: str):
"""レート制限付きでAPIを呼び出す"""
def _call():
# HolySheep AI API呼び出し
return {"status": "success", "response": f"Processed: {prompt}"}
return limiter.execute_with_limit(_call)
print("Rate limiter configured successfully")
エラー4: コンテキストコンテナのオーバーフロー
# 問題: エージェント間のコンテキストが大きくなりすぎる
原因: 複数のhandoffでコンテキストが累积する
解決方法:コンテキストの蒸留と圧縮
from typing import List, Dict, Any
import json
class ContextManager:
"""エージェント間コンテキストを管理・圧縮する"""
def __init__(self, max_tokens: int = 8000):
self.max_tokens = max_tokens
self.summaries = []
def distill_context(
self,
agent_name: str,
content: str,
key_points: List[str]
) -> str:
"""コンテキストを蒸留して要約"""
distilled = {
"agent": agent_name,
"timestamp": datetime.now().isoformat(),
"key_findings": key_points[:5], # 最大5つの主要ポイント
"conclusion": content[-500:] if len(content) > 500 else content # 最後の500文字
}
self.summaries.append(distilled)
return json.dumps(distilled, ensure_ascii=False)
def get_compact_context(self) -> str:
"""全てのコンテキストを統合して返す"""
combined = "=== エージェント間の処理履歴 ===\n\n"
for summary in self.summaries[-3:]: # 最新の3つのサマリーのみ
combined += f"[{summary['agent']}]\n"
combined += f"主要ポイント: {', '.join(summary['key_findings'])}\n"
combined += f"結論: {summary['conclusion']}\n\n"
return combined
def clear_old_context(self, keep_last: int = 5):
"""古いコンテキストをクリア"""
if len(self.summaries) > keep_last:
self.summaries = self.summaries[-keep_last:]
使用例
ctx_manager = ContextManager(max_tokens=8000)
各エージェントの処理後にコンテキストを蒸留
compressed_context = ctx_manager.distill_context(
agent_name="researcher",
content="調査が完了しました。API設計にはRESTとGraphQLの両方の利点があります...",
key_points=[
"RESTはシンプルだが過度に多的API呼び出しが必要",
"GraphQLは柔軟だが学習コストが高い",
"推奨: ハイブリッドアプローチ"
]
)
print("Context distilled successfully")
print(f"Compact context: {ctx_manager.get_compact_context()}")
ベストプラクティスとパフォーマンス最適化
HolySheep AIの<50msレイテンシーを活用した最適化戦略:
- バッチ処理:複数の小さなリクエストを1つにまとめてAPI呼び出し回数を削減
- Streaming対応:長い回答を逐次表示し、ユーザー体感速度を向上
- モデル選択:DeepSeek V3.2($0.42/MTok)でコスト効率を最大化
# コスト最適化:モデル選択のベストプラクティス
MODELS_CONFIG = {
"fast": {
"model": "deepseek-chat", # $0.42/MTok - 最も安い
"use_for": ["简单な質問", "要約", "分类"]
},
"balanced": {
"model": "gemini-2.0-flash", # $2.50/MTok
"use_for": ["一般的な質問", "文章作成"]
},
"high_quality": {
"model": "gpt-4.1", # $8/MTok - 高品質
"use_for": ["複雑な推論", "コード生成", "分析"]
}
}
def select_optimal_model(task_type: str) -> str:
"""タスクタイプに基づいて最適なモデルを選択"""
if any(keyword in task_type for keyword in ["简单", "brief", "短く"]):
return MODELS_CONFIG["fast"]["model"]
elif any(keyword in task_type for keyword in ["複雑な", "詳細", "深い"]):
return MODELS_CONFIG["high_quality"]["model"]
return MODELS_CONFIG["balanced"]["model"]
コスト試算:1000リクエストの場合
DeepSeek: 1000 × $0.42 = $420
Gemini: 1000 × $2.50 = $2,500
GPT-4.1: 1000 × $8 = $8,000
print(f"Cost comparison: DeepSeek saves up to 95% vs GPT-4.1")
まとめ
CrewAIのhandoffプロトコルは、複雑なマルチエージェントワークフローを構築する上で強力な機能です。本稿で解説したエラーハンドリングと最適化技術を適用することで、安定性とコスト効率の两者を実現できます。
特にHolySheep AIのようなコスト効率の高いAPIを活用することで、DeepSeek V3.2の$0.42/MTokという破格の料金で大規模実証実験も可能になります。WeChat PayやAlipayにも対応しているので、日本からでも簡単に актівацияできます。
👉 HolySheep AI に登録して無料クレジットを獲得