การสร้างระบบ Multi-Agent ที่ทำงานร่วมกันอย่างมีประสิทธิภาพนั้นไม่ใช่เรื่องง่าย วิศวกรหลายคนมักประสบปัญหาเรื่องการจัดสรรงานที่ไม่สมดุล การแบ่งปันสถานะระหว่าง Agent ที่ขัดแย้งกัน และการกู้คืนจากข้อผิดพลาดที่ไม่สมบูรณ์ บทความนี้จะพาคุณเจาะลึกการออกแบบสถาปัตยกรรมที่พร้อมสำหรับ Production โดยใช้ HolySheep AI เป็น Backend หลัก

ทำความเข้าใจ Multi-Agent Architecture

ระบบ Multi-Agent ประกอบด้วยหลายส่วนหลักที่ต้องออกแบบให้สอดคล้องกัน ก่อนจะเข้าสู่การ implement มาทำความเข้าใจโครงสร้างพื้นฐานกันก่อน

หน้าที่หลักของแต่ละ Component

การจัดสรรงานแบบ Dynamic Load Balancing

การจัดสรรงานแบบ static ไม่สามารถรองรับ workload ที่เปลี่ยนแปลงได้ วิธีที่ดีกว่าคือการใช้ dynamic load balancing ที่พิจารณาจากความสามารถของแต่ละ Agent และปริมาณงานปัจจุบัน

Task Queue Architecture

import asyncio
import aiohttp
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
from enum import Enum
import json
import time

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRYING = "retrying"

class TaskPriority(Enum):
    LOW = 1
    NORMAL = 2
    HIGH = 3
    CRITICAL = 4

@dataclass
class Task:
    task_id: str
    task_type: str
    payload: Dict[str, Any]
    priority: TaskPriority = TaskPriority.NORMAL
    status: TaskStatus = TaskStatus.PENDING
    assigned_agent: Optional[str] = None
    retry_count: int = 0
    max_retries: int = 3
    created_at: float = field(default_factory=time.time)
    started_at: Optional[float] = None
    completed_at: Optional[float] = None
    error: Optional[str] = None

class TaskAllocator:
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url
        self.api_key = api_key
        self.task_queue: List[Task] = []
        self.agent_loads: Dict[str, int] = {}
        self.agent_capabilities: Dict[str, List[str]] = {}
        self.max_concurrent_tasks = 10
        
    async def register_agent(self, agent_id: str, capabilities: List[str]):
        self.agent_capabilities[agent_id] = capabilities
        self.agent_loads[agent_id] = 0
        
    async def add_task(self, task: Task) -> str:
        self.task_queue.append(task)
        self.task_queue.sort(
            key=lambda t: (t.priority.value, t.created_at),
            reverse=True
        )
        return task.task_id
        
    async def allocate_task(self) -> Optional[Task]:
        if not self.task_queue:
            return None
            
        task = self.task_queue.pop(0)
        
        suitable_agents = [
            agent_id for agent_id, caps in self.agent_capabilities.items()
            if task.task_type in caps
            and self.agent_loads[agent_id] < self.max_concurrent_tasks
        ]
        
        if not suitable_agents:
            self.task_queue.insert(0, task)
            return None
            
        best_agent = min(suitable_agents, key=lambda a: self.agent_loads[a])
        
        task.status = TaskStatus.RUNNING
        task.assigned_agent = best_agent
        task.started_at = time.time()
        self.agent_loads[best_agent] += 1
        
        return task
        
    async def complete_task(self, task_id: str, result: Any):
        task = self._find_task(task_id)
        if task:
            self.agent_loads[task.assigned_agent] -= 1
            task.status = TaskStatus.COMPLETED
            task.completed_at = time.time()
            
    def _find_task(self, task_id: str) -> Optional[Task]:
        for task in self.task_queue:
            if task.task_id == task_id:
                return task
        return None

allocator = TaskAllocator(
    base_url="https://api.holysheep.ai/v1",
    api_key="YOUR_HOLYSHEEP_API_KEY"
)

การจัดการ State ร่วมแบบ Event-Driven

การแบ่งปันสถานะระหว่าง Agent หลายตัวต้องมีกลไกที่รับประกันความสอดคล้องของข้อมูล เราใช้ Event Sourcing ร่วมกับ Distributed Lock เพื่อหลีกเลี่ยง race condition

import asyncio
import redis.asyncio as redis
from typing import Any, Dict, Optional, Callable
import json
import hashlib

class StateStore:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.subscribers: Dict[str, List[Callable]] = {}
        
    async def set_state(self, key: str, value: Any, ttl: Optional[int] = None):
        serialized = json.dumps(value)
        async with self.redis.pipeline() as pipe:
            await pipe.set(key, serialized)
            if ttl:
                await pipe.expire(key, ttl)
            await pipe.publish(f"state:{key}", serialized)
        await self._notify_subscribers(key, value)
        
    async def get_state(self, key: str) -> Optional[Any]:
        data = await self.redis.get(key)
        if data:
            return json.loads(data)
        return None
        
    async def atomic_update(
        self, 
        key: str, 
        update_fn: Callable[[Any], Any],
        max_retries: int = 5
    ):
        lock_key = f"lock:{key}"
        lock = self.redis.lock(lock_key, timeout=30)
        
        for attempt in range(max_retries):
            if await lock.acquire(blocking=True, blocking_timeout=10):
                try:
                    current = await self.get_state(key)
                    new_value = update_fn(current)
                    await self.set_state(key, new_value)
                    return new_value
                finally:
                    await lock.release()
            else:
                await asyncio.sleep(0.1 * (attempt + 1))
        raise Exception(f"Failed to acquire lock for key: {key}")
        
    async def subscribe(self, key: str, callback: Callable):
        if key not in self.subscribers:
            self.subscribers[key] = []
        self.subscribers[key].append(callback)
        
    async def _notify_subscribers(self, key: str, value: Any):
        if key in self.subscribers:
            for callback in self.subscribers[key]:
                await callback(key, value)

class AgentState:
    def __init__(self, agent_id: str, state_store: StateStore):
        self.agent_id = agent_id
        self.state_store = state_store
        self.prefix = f"agent:{agent_id}"
        
    async def update_status(self, status: str, metadata: Optional[Dict] = None):
        state = {
            "agent_id": self.agent_id,
            "status": status,
            "metadata": metadata or {},
            "updated_at": asyncio.get_event_loop().time()
        }
        await self.state_store.set_state(self.prefix, state)
        
    async def get_status(self) -> Optional[Dict]:
        return await self.state_store.get_state(self.prefix)
        
    async def update_progress(self, current: int, total: int, task_id: str):
        progress = {
            "current": current,
            "total": total,
            "percentage": (current / total) * 100 if total > 0 else 0
        }
        await self.state_store.atomic_update(
            f"{self.prefix}:progress:{task_id