Sau 4 tháng vận hành cluster đa tác nhân xử lý trung bình 2.3 triệu yêu cầu mỗi ngày cho hệ thống RAG nội bộ của chúng tôi, tôi nhận ra một điều: Kimi Agent Swarm không chỉ đơn thuần là framework "gọi được tool" — nó là một mô hình concurrency thực sự với kiến trúc supervisor-worker, message bus nội bộ, priority queue và cơ chế backpressure dựa trên token bucket. Bài viết này chia sẻ toàn bộ kinh nghiệm benchmark production, các anti-pattern đã gặp phải và cách chúng tôi cắt giảm 67% chi phí vận hành sau khi chuyển sang Đăng ký tại đây HolySheep AI làm gateway — với tỷ giá ¥1 = $1 (tiết kiệm 85%+), hỗ trợ thanh toán WeChat/Alipay và độ trễ routing nội bộ < 50ms.
1. Tổng quan kiến trúc Kimi Agent Swarm
Swarm được thiết kế theo mô hình 3 lớp rõ ràng:
- Orchestrator layer: một supervisor agent nhận task đầu vào, phân tích thành DAG các sub-task và phân phối qua message bus nội bộ (in-process queue + Redis pub/sub khi scale horizontal).
- Worker pool: các agent chuyên biệt (researcher, coder, reviewer, planner) chạy song song, mỗi worker sở hữu context window riêng và bộ tool riêng — đây là điểm mấu chốt giúp tiết kiệm token vì worker chỉ "nhìn thấy" tool liên quan.
- MCP tool layer: Model Context Protocol — giao thức JSON-RPC chuẩn để agent gọi tool (search, code-exec, vector store, browser, file I/O...) với cơ chế capability discovery động.
Điểm tôi đánh giá cao nhất: MCP cho phép late-binding tool registry. Worker boot lên sẽ hỏi mcp-server "bạn có tool gì?" thay vì hardcode schema, giúp việc thêm tool mới không cần redeploy toàn bộ swarm.
2. MCP Tool Calling — triển khai production
MCP schema tương thích OpenAI function calling nhưng có thêm 2 method quan trọng: tools/list (discovery) và resources/list (context provider). Dưới đây là client production chúng tôi đang chạy:
# mcp_client.py — Production MCP client cho Kimi Swarm
import asyncio
import time
from typing import Any, Dict, List
import httpx
from openai import AsyncOpenAI
HOLYSHEEP_BASE = "https://api.holysheep.ai/v1"
HOLYSHEEP_KEY = "YOUR_HOLYSHEEP_API_KEY"
KIMI_MODEL = "kimi-k2-0905" # 256K context, hỗ trợ MCP native
class MCPToolClient:
def __init__(self, mcp_servers: List[str], max_latency_ms: float = 800.0):
self.servers = mcp_servers
self.tools: List[Dict] = []
self._tool_index: Dict[str, str] = {} # tool_name -> server_url
self._circuit: Dict[str, int] = {s: 0 for s in mcp_servers}
self.max_latency_ms = max_latency_ms
self.llm = AsyncOpenAI(
base_url=HOLYSHEEP_BASE,
api_key=HOLYSHEEP_KEY,
timeout=httpx.Timeout(30.0, connect=5.0),
max_retries=2,
)
async def discover_tools(self) -> List[Dict]:
"""Hỏi tất cả MCP server để list tool — chạy 1 lần khi worker boot."""
async with httpx.AsyncClient(timeout=10.0) as client:
results = await asyncio.gather(*[
client.post(s, json={"jsonrpc": "2.0", "id": 1,
"method": "tools/list"})
for s in self.servers
], return_exceptions=True)
for server, resp in zip(self.servers, results):
if isinstance(resp, Exception):
self._circuit[server] += 1
continue
for tool in resp.json().get("result", {}).get("tools", []):
namespaced = f"{server.rsplit('/', 1)[-1]}__{tool['name']}"
self.tools.append({
"type": "function",
"function": {
"name": namespaced,
"description": tool.get("description", ""),
"parameters": tool.get("inputSchema", {}),
}
})
self._tool_index[tool["name"]] = server
return self.tools
async def call_tool(self, name: str, args: Dict) -> Dict[str, Any]:
server = self._tool_index.get(name)
if not server:
return {"error": f"Unknown tool: {name}"}
if self._circuit[server] >= 5: # circuit breaker
return {"error": "circuit_open", "server": server}
t0 = time.perf_counter()
try:
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(server, json={
"jsonrpc": "2.0", "id": 2, "method": "tools/call",
"params": {"name": name, "arguments": args}
})
latency_ms = round((time.perf_counter() - t0) * 1000, 2)
if latency_ms > self.max_latency_ms:
self._circuit[server] += 1
return {"result": resp.json().get("result"),
"latency_ms": latency_ms, "server": server}
except httpx.TimeoutException:
self._circuit[server] += 2
return {"error": "timeout", "latency_ms": 60000.0, "server": server}
Benchmark thực tế (1 node, 50 calls, prompt trung bình 850 tokens):
- MCP discovery overhead: 42ms p50, 89ms p95
- Tool call roundtrip: 187ms p50, 412ms p95
- HolySheep gateway routing overhead: 14ms p99 (cam kết < 50ms)
- Throughput của 1 worker: 0.82 req/s khi có 1 tool call + 1 LLM roundtrip
3. Swarm Orchestrator — phân phối task với priority queue
Bài học xương máu: đừng dùng FIFO. Task research thường nặng hơn task summarize gấp 15-20 lần về token, và việc để task nặng chiếm head-of-line sẽ block toàn bộ pipeline — đây là hiện tượng head-of-line blocking cổ điển. Chúng tôi chuyển sang priority queue 3 mức (P0/P1/P2) kết hợp token budget prefetch:
# swarm_orchestrator.py — Priority-based task dispatcher
import asyncio
import heapq
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from openai import AsyncOpenAI
@dataclass(order=True)
class SwarmTask:
priority: int # 0 = highest (P0), 2 = lowest (P2)
seq: int # tie-breaker để FIFO trong cùng priority
task_id: str = field(compare=False)
prompt: str = field(compare=False)
worker_role: str = field(compare=False, default="general")
max_tokens: int = field(compare=False, default=4096)
deps: List[str] = field(compare=False, default_factory=list)
result: Optional[Dict] = field(compare=False, default=None, init=False)
class SwarmOrchestrator:
# Token budget theo priority — P0 được nhiều budget nhất
P0_BUDGET, P1_BUDGET, P2_BUDGET = 8000, 4096, 2048
def __init__(self, concurrency: int = 24):
self.llm = AsyncOpenAI(
base_url="https://api.holysheep.ai/v1",
api_key="YOUR_HOLYSHEEP_API_KEY",
)
self.sem = asyncio.Semaphore(concurrency)
self.queue: List[SwarmTask] = []
self.results: Dict[str, Any] = {}
self._seq = 0
def submit(self, prompt: str, priority: int = 1, role: str = "general",
deps: List[str] = None) -> str:
self._seq += 1
budget = [self.P0_BUDGET, self.P1_BUDGET, self.P2_BUDGET][priority]
task = SwarmTask(priority=priority, seq=self._seq,
task_id=f"t{self._seq}", prompt=prompt,
worker_role=role, max_tokens=budget, deps=deps or [])
heapq.heappush(self.queue, task)
return task.task_id
async def _run(self, task: SwarmTask) -> None:
# Ch