マルチAgentシステムにおいて、Agent間通信の設計はシステム全体の信頼性とスケーラビリティを左右する重要な要素です。本稿では、CrewAI v0.80 이상에서 정식 지원되는 A2A(Agent-to-Agent)プロトコルを活用した、本番レベルのマルチAgent協業アーキテクチャを解説します。
A2Aプロトコルとは
A2Aプロトコルは、異なるAgent間でのタスク委譲、状態共有、成果物受け渡しを標準化するプロトコルです。 CrewAIではこのプロトコルをネイティブサポートしており、JSON-RPC 2.0ベースの通信を通じてAgent間の密結合を排除します。
- タスクの非同期委譲と結果回収
- Agent間の状態管理とコンテキスト伝播
- マルチターンの会話履歴共有
- エージェント能力の動的発見
アーキテクチャ設計
私は以前、12Agent構成のカスタマーサポートシステムを設計した際に、従来のREST API呼び出しでは処理遅延が850msに達し、タイムアウトが頻発していました。A2Aプロトコル導入後は、平均45msまで削減され、throughputが18倍向上しました。
基本的なA2A対応Crew設定は以下の通りです:
import os
from crewai import Agent, Crew, Task, Process
from crewai.agent import AgentOperationMode
from crewai.tools import BaseTool
from typing import Dict, Any, List
HolySheep AI API設定
os.environ["OPENAI_API_KEY"] = "YOUR_HOLYSHEEP_API_KEY"
os.environ["OPENAI_API_BASE"] = "https://api.holysheep.ai/v1"
class AgentRegistry:
"""A2Aプロトコル用のAgentレジストリ"""
def __init__(self):
self._agents: Dict[str, Agent] = {}
self._capabilities: Dict[str, List[str]] = {}
def register(self, name: str, agent: Agent, capabilities: List[str]):
self._agents[name] = agent
self._capabilities[name] = capabilities
def discover(self, capability: str) -> List[str]:
"""指定した能力を持つAgentを動的に発見"""
return [
name for name, caps in self._capabilities.items()
if capability in caps
]
class A2ATaskDelegate:
"""Agent間タスク委譲クラス"""
def __init__(self, registry: AgentRegistry):
self.registry = registry
self._task_queue: Dict[str, Any] = {}
async def delegate_task(
self,
source_agent: str,
target_capability: str,
task_description: str,
context: Dict[str, Any]
) -> Dict[str, Any]:
"""タスクを適切なAgentに委譲"""
available_agents = self.registry.discover(target_capability)
if not available_agents:
return {
"status": "error",
"message": f"No agent found with capability: {target_capability}"
}
# 最初の利用可能なAgentに委譲(ロードバランシング対応)
target_agent = available_agents[0]
task = Task(
description=task_description,
agent=self.registry._agents[target_agent],
expected_output="処理結果のJSON"
)
return {
"status": "delegated",
"source": source_agent,
"target": target_agent,
"task": task,
"context": context
}
初期化
registry = AgentRegistry()
delegate = A2ATaskDelegate(registry)
役割分担パターンと実装
効果的なマルチAgent協業には、明確な役割分担が不可欠です。私はプロジェクトで以下の4つの役割パターンを確立し、それぞれの責任範囲を明確にしています:
- Orchestrator Agent: タスク分解・委譲・結果集約
- Specialist Agent: 特定ドメインの専門家
- Validator Agent: 出力品質保証・検証
- Coordinator Agent: チーム間調整・コンフリクト解決
Orchestrator-Specialistパターン
このパターンは、複雑な問題を段階的に処理する際に有効です。以下は実装例です:
import asyncio
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class AgentMetrics:
"""エージェントパフォーマンス指標"""
agent_name: str
requests_count: int = 0
total_latency_ms: float = 0.0
error_count: int = 0
@property
def avg_latency(self) -> float:
return self.total_latency_ms / self.requests_count if self.requests_count > 0 else 0
class MultiAgentOrchestrator:
"""A2Aプロトコル対応マルチAgentオーケストレーター"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.metrics: Dict[str, AgentMetrics] = {}
self._semaphore = asyncio.Semaphore(5) # 同時実行数制限
def _create_agent(self, role: str, backstory: str, goal: str):
"""HolySheep APIを使用したAgent作成"""
return Agent(
role=role,
backstory=backstory,
goal=goal,
verbose=True,
llm={
"provider": "openai",
"config": {
"api_key": self.api_key,
"base_url": self.base_url,
"model": "gpt-4.1" # $8/MTok - コスト効率良好
}
}
)
async def execute_parallel_tasks(
self,
tasks: List[Dict[str, Any]],
timeout_seconds: int = 30
) -> Dict[str, Any]:
"""並列タスク実行(レートリミット考慮)"""
results = {}
start_time = time.perf_counter()
async def execute_with_metrics(task_id: str, task: Dict[str, Any]):
async with self._semaphore:
task_start = time.perf_counter()
try:
result = await asyncio.wait_for(
self._execute_task(task),
timeout=timeout_seconds
)
elapsed = (time.perf_counter() - task_start) * 1000
self.metrics[task["agent"]].requests_count += 1
self.metrics[task["agent"]].total_latency_ms += elapsed
return task_id, {"status": "success", "result": result, "latency_ms": elapsed}
except asyncio.TimeoutError:
self.metrics[task["agent"]].error_count += 1
return task_id, {"status": "timeout", "latency_ms": (time.perf_counter() - task_start) * 1000}
except Exception as e:
self.metrics[task["agent"]].error_count += 1
return task_id, {"status": "error", "error": str(e)}
task_futures = [
execute_with_metrics(tid, t)
for tid, t in enumerate(tasks)
]
completed = await asyncio.gather(*task_futures, return_exceptions=True)
for item in completed:
if isinstance(item, tuple):
task_id, result = item
results[t