การสร้างระบบ Multi-Agent ที่ทำงานร่วมกันอย่างมีประสิทธิภาพนั้นไม่ใช่เรื่องง่าย วิศวกรหลายคนมักประสบปัญหาเรื่องการจัดสรรงานที่ไม่สมดุล การแบ่งปันสถานะระหว่าง Agent ที่ขัดแย้งกัน และการกู้คืนจากข้อผิดพลาดที่ไม่สมบูรณ์ บทความนี้จะพาคุณเจาะลึกการออกแบบสถาปัตยกรรมที่พร้อมสำหรับ Production โดยใช้ HolySheep AI เป็น Backend หลัก
ทำความเข้าใจ Multi-Agent Architecture
ระบบ Multi-Agent ประกอบด้วยหลายส่วนหลักที่ต้องออกแบบให้สอดคล้องกัน ก่อนจะเข้าสู่การ implement มาทำความเข้าใจโครงสร้างพื้นฐานกันก่อน
หน้าที่หลักของแต่ละ Component
- Orchestrator Agent — รับผิดชอบการรับ Task ใหญ่และแตกออกเป็น Sub-tasks ย่อยๆ
- Worker Agent — รับผิดชอบการประมวลผล Sub-task ที่ได้รับมอบหมาย
- State Manager — จัดการสถานะร่วมของระบบทั้งหมด
- Error Handler — ตรวจจับและกู้คืนจากข้อผิดพลาด
- Message Queue — สื่อสารระหว่าง Agent ผ่าน Event-driven pattern
การจัดสรรงานแบบ Dynamic Load Balancing
การจัดสรรงานแบบ static ไม่สามารถรองรับ workload ที่เปลี่ยนแปลงได้ วิธีที่ดีกว่าคือการใช้ dynamic load balancing ที่พิจารณาจากความสามารถของแต่ละ Agent และปริมาณงานปัจจุบัน
Task Queue Architecture
import asyncio
import aiohttp
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
from enum import Enum
import json
import time
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
RETRYING = "retrying"
class TaskPriority(Enum):
LOW = 1
NORMAL = 2
HIGH = 3
CRITICAL = 4
@dataclass
class Task:
task_id: str
task_type: str
payload: Dict[str, Any]
priority: TaskPriority = TaskPriority.NORMAL
status: TaskStatus = TaskStatus.PENDING
assigned_agent: Optional[str] = None
retry_count: int = 0
max_retries: int = 3
created_at: float = field(default_factory=time.time)
started_at: Optional[float] = None
completed_at: Optional[float] = None
error: Optional[str] = None
class TaskAllocator:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.api_key = api_key
self.task_queue: List[Task] = []
self.agent_loads: Dict[str, int] = {}
self.agent_capabilities: Dict[str, List[str]] = {}
self.max_concurrent_tasks = 10
async def register_agent(self, agent_id: str, capabilities: List[str]):
self.agent_capabilities[agent_id] = capabilities
self.agent_loads[agent_id] = 0
async def add_task(self, task: Task) -> str:
self.task_queue.append(task)
self.task_queue.sort(
key=lambda t: (t.priority.value, t.created_at),
reverse=True
)
return task.task_id
async def allocate_task(self) -> Optional[Task]:
if not self.task_queue:
return None
task = self.task_queue.pop(0)
suitable_agents = [
agent_id for agent_id, caps in self.agent_capabilities.items()
if task.task_type in caps
and self.agent_loads[agent_id] < self.max_concurrent_tasks
]
if not suitable_agents:
self.task_queue.insert(0, task)
return None
best_agent = min(suitable_agents, key=lambda a: self.agent_loads[a])
task.status = TaskStatus.RUNNING
task.assigned_agent = best_agent
task.started_at = time.time()
self.agent_loads[best_agent] += 1
return task
async def complete_task(self, task_id: str, result: Any):
task = self._find_task(task_id)
if task:
self.agent_loads[task.assigned_agent] -= 1
task.status = TaskStatus.COMPLETED
task.completed_at = time.time()
def _find_task(self, task_id: str) -> Optional[Task]:
for task in self.task_queue:
if task.task_id == task_id:
return task
return None
allocator = TaskAllocator(
base_url="https://api.holysheep.ai/v1",
api_key="YOUR_HOLYSHEEP_API_KEY"
)
การจัดการ State ร่วมแบบ Event-Driven
การแบ่งปันสถานะระหว่าง Agent หลายตัวต้องมีกลไกที่รับประกันความสอดคล้องของข้อมูล เราใช้ Event Sourcing ร่วมกับ Distributed Lock เพื่อหลีกเลี่ยง race condition
import asyncio
import redis.asyncio as redis
from typing import Any, Dict, Optional, Callable
import json
import hashlib
class StateStore:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.subscribers: Dict[str, List[Callable]] = {}
async def set_state(self, key: str, value: Any, ttl: Optional[int] = None):
serialized = json.dumps(value)
async with self.redis.pipeline() as pipe:
await pipe.set(key, serialized)
if ttl:
await pipe.expire(key, ttl)
await pipe.publish(f"state:{key}", serialized)
await self._notify_subscribers(key, value)
async def get_state(self, key: str) -> Optional[Any]:
data = await self.redis.get(key)
if data:
return json.loads(data)
return None
async def atomic_update(
self,
key: str,
update_fn: Callable[[Any], Any],
max_retries: int = 5
):
lock_key = f"lock:{key}"
lock = self.redis.lock(lock_key, timeout=30)
for attempt in range(max_retries):
if await lock.acquire(blocking=True, blocking_timeout=10):
try:
current = await self.get_state(key)
new_value = update_fn(current)
await self.set_state(key, new_value)
return new_value
finally:
await lock.release()
else:
await asyncio.sleep(0.1 * (attempt + 1))
raise Exception(f"Failed to acquire lock for key: {key}")
async def subscribe(self, key: str, callback: Callable):
if key not in self.subscribers:
self.subscribers[key] = []
self.subscribers[key].append(callback)
async def _notify_subscribers(self, key: str, value: Any):
if key in self.subscribers:
for callback in self.subscribers[key]:
await callback(key, value)
class AgentState:
def __init__(self, agent_id: str, state_store: StateStore):
self.agent_id = agent_id
self.state_store = state_store
self.prefix = f"agent:{agent_id}"
async def update_status(self, status: str, metadata: Optional[Dict] = None):
state = {
"agent_id": self.agent_id,
"status": status,
"metadata": metadata or {},
"updated_at": asyncio.get_event_loop().time()
}
await self.state_store.set_state(self.prefix, state)
async def get_status(self) -> Optional[Dict]:
return await self.state_store.get_state(self.prefix)
async def update_progress(self, current: int, total: int, task_id: str):
progress = {
"current": current,
"total": total,
"percentage": (current / total) * 100 if total > 0 else 0
}
await self.state_store.atomic_update(
f"{self.prefix}:progress:{task_id
แหล่งข้อมูลที่เกี่ยวข้อง
บทความที่เกี่ยวข้อง