Sau 4 tháng vận hành cluster đa tác nhân xử lý trung bình 2.3 triệu yêu cầu mỗi ngày cho hệ thống RAG nội bộ của chúng tôi, tôi nhận ra một điều: Kimi Agent Swarm không chỉ đơn thuần là framework "gọi được tool" — nó là một mô hình concurrency thực sự với kiến trúc supervisor-worker, message bus nội bộ, priority queue và cơ chế backpressure dựa trên token bucket. Bài viết này chia sẻ toàn bộ kinh nghiệm benchmark production, các anti-pattern đã gặp phải và cách chúng tôi cắt giảm 67% chi phí vận hành sau khi chuyển sang Đăng ký tại đây HolySheep AI làm gateway — với tỷ giá ¥1 = $1 (tiết kiệm 85%+), hỗ trợ thanh toán WeChat/Alipay và độ trễ routing nội bộ < 50ms.

1. Tổng quan kiến trúc Kimi Agent Swarm

Swarm được thiết kế theo mô hình 3 lớp rõ ràng:

Điểm tôi đánh giá cao nhất: MCP cho phép late-binding tool registry. Worker boot lên sẽ hỏi mcp-server "bạn có tool gì?" thay vì hardcode schema, giúp việc thêm tool mới không cần redeploy toàn bộ swarm.

2. MCP Tool Calling — triển khai production

MCP schema tương thích OpenAI function calling nhưng có thêm 2 method quan trọng: tools/list (discovery) và resources/list (context provider). Dưới đây là client production chúng tôi đang chạy:

# mcp_client.py — Production MCP client cho Kimi Swarm
import asyncio
import time
from typing import Any, Dict, List
import httpx
from openai import AsyncOpenAI

HOLYSHEEP_BASE = "https://api.holysheep.ai/v1"
HOLYSHEEP_KEY  = "YOUR_HOLYSHEEP_API_KEY"
KIMI_MODEL     = "kimi-k2-0905"  # 256K context, hỗ trợ MCP native

class MCPToolClient:
    def __init__(self, mcp_servers: List[str], max_latency_ms: float = 800.0):
        self.servers = mcp_servers
        self.tools: List[Dict] = []
        self._tool_index: Dict[str, str] = {}   # tool_name -> server_url
        self._circuit: Dict[str, int] = {s: 0 for s in mcp_servers}
        self.max_latency_ms = max_latency_ms
        self.llm = AsyncOpenAI(
            base_url=HOLYSHEEP_BASE,
            api_key=HOLYSHEEP_KEY,
            timeout=httpx.Timeout(30.0, connect=5.0),
            max_retries=2,
        )

    async def discover_tools(self) -> List[Dict]:
        """Hỏi tất cả MCP server để list tool — chạy 1 lần khi worker boot."""
        async with httpx.AsyncClient(timeout=10.0) as client:
            results = await asyncio.gather(*[
                client.post(s, json={"jsonrpc": "2.0", "id": 1,
                                     "method": "tools/list"})
                for s in self.servers
            ], return_exceptions=True)
        for server, resp in zip(self.servers, results):
            if isinstance(resp, Exception):
                self._circuit[server] += 1
                continue
            for tool in resp.json().get("result", {}).get("tools", []):
                namespaced = f"{server.rsplit('/', 1)[-1]}__{tool['name']}"
                self.tools.append({
                    "type": "function",
                    "function": {
                        "name": namespaced,
                        "description": tool.get("description", ""),
                        "parameters": tool.get("inputSchema", {}),
                    }
                })
                self._tool_index[tool["name"]] = server
        return self.tools

    async def call_tool(self, name: str, args: Dict) -> Dict[str, Any]:
        server = self._tool_index.get(name)
        if not server:
            return {"error": f"Unknown tool: {name}"}
        if self._circuit[server] >= 5:   # circuit breaker
            return {"error": "circuit_open", "server": server}
        t0 = time.perf_counter()
        try:
            async with httpx.AsyncClient(timeout=60.0) as client:
                resp = await client.post(server, json={
                    "jsonrpc": "2.0", "id": 2, "method": "tools/call",
                    "params": {"name": name, "arguments": args}
                })
            latency_ms = round((time.perf_counter() - t0) * 1000, 2)
            if latency_ms > self.max_latency_ms:
                self._circuit[server] += 1
            return {"result": resp.json().get("result"),
                    "latency_ms": latency_ms, "server": server}
        except httpx.TimeoutException:
            self._circuit[server] += 2
            return {"error": "timeout", "latency_ms": 60000.0, "server": server}

Benchmark thực tế (1 node, 50 calls, prompt trung bình 850 tokens):

3. Swarm Orchestrator — phân phối task với priority queue

Bài học xương máu: đừng dùng FIFO. Task research thường nặng hơn task summarize gấp 15-20 lần về token, và việc để task nặng chiếm head-of-line sẽ block toàn bộ pipeline — đây là hiện tượng head-of-line blocking cổ điển. Chúng tôi chuyển sang priority queue 3 mức (P0/P1/P2) kết hợp token budget prefetch:

# swarm_orchestrator.py — Priority-based task dispatcher
import asyncio
import heapq
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from openai import AsyncOpenAI

@dataclass(order=True)
class SwarmTask:
    priority: int          # 0 = highest (P0), 2 = lowest (P2)
    seq: int               # tie-breaker để FIFO trong cùng priority
    task_id: str = field(compare=False)
    prompt: str = field(compare=False)
    worker_role: str = field(compare=False, default="general")
    max_tokens: int = field(compare=False, default=4096)
    deps: List[str] = field(compare=False, default_factory=list)
    result: Optional[Dict] = field(compare=False, default=None, init=False)

class SwarmOrchestrator:
    # Token budget theo priority — P0 được nhiều budget nhất
    P0_BUDGET, P1_BUDGET, P2_BUDGET = 8000, 4096, 2048

    def __init__(self, concurrency: int = 24):
        self.llm = AsyncOpenAI(
            base_url="https://api.holysheep.ai/v1",
            api_key="YOUR_HOLYSHEEP_API_KEY",
        )
        self.sem = asyncio.Semaphore(concurrency)
        self.queue: List[SwarmTask] = []
        self.results: Dict[str, Any] = {}
        self._seq = 0

    def submit(self, prompt: str, priority: int = 1, role: str = "general",
               deps: List[str] = None) -> str:
        self._seq += 1
        budget = [self.P0_BUDGET, self.P1_BUDGET, self.P2_BUDGET][priority]
        task = SwarmTask(priority=priority, seq=self._seq,
                         task_id=f"t{self._seq}", prompt=prompt,
                         worker_role=role, max_tokens=budget, deps=deps or [])
        heapq.heappush(self.queue, task)
        return task.task_id

    async def _run(self, task: SwarmTask) -> None:
        # Ch