Giới Thiệu — Tại Sao Kỹ Sư Backend Cần Quan Tâm Ngay
Sau 18 tháng phát triển và qua nhiều bản beta,
MCP Protocol 1.0 đã chính thức được công bố với hệ sinh thái 200+ server implementation. Là kỹ sư đã triển khai hệ thống AI tool calling cho production tại HolySheep AI, tôi nhận ra đây là thời điểm quan trọng để cộng đồng kỹ thuật hiểu rõ protocol này hoạt động như thế nào và tại sao nó sẽ trở thành tiêu chuẩn ngành.
MCP (Model Context Protocol) không chỉ là một specification mới — nó là lời giải cho bài toán mà chúng ta đã đối mặt suốt:
làm sao để AI model gọi external tools một cách an toàn, nhanh chóng và có thể mở rộng. Trong bài viết này, tôi sẽ chia sẻ kinh nghiệm thực chiến với MCP, từ kiến trúc core cho đến optimization cho production environment.
Kiến Trúc Core Của MCP Protocol 1.0
1. Transport Layer — WebSocket vs SSE
MCP hỗ trợ hai transport mechanism chính. Trong production, WebSocket là lựa chọn tối ưu cho low-latency tool calling:
# MCP Client với WebSocket Transport - Production Implementation
import asyncio
import json
from typing import Any, Optional, Dict, List
from dataclasses import dataclass, field
from enum import Enum
import aiohttp
import structlog
logger = structlog.get_logger()
class MCPTransportType(Enum):
WEBSOCKET = "websocket"
SSE = "sse"
STDIO = "stdio"
@dataclass
class MCPToolDefinition:
name: str
description: str
input_schema: Dict[str, Any]
output_schema: Optional[Dict[str, Any]] = None
@dataclass
class MCPToolCallRequest:
tool_name: str
arguments: Dict[str, Any]
request_id: str
timeout_ms: int = 30000
@dataclass
class MCPToolCallResult:
request_id: str
success: bool
result: Optional[Any]
error: Optional[str] = None
latency_ms: float = 0.0
tokens_used: Optional[int] = None
class MCPClient:
"""
Production-grade MCP Client với connection pooling,
automatic reconnection và circuit breaker pattern.
"""
def __init__(
self,
server_url: str,
api_key: str,
transport: MCPTransportType = MCPTransportType.WEBSOCKET,
max_concurrent_calls: int = 50,
request_timeout: int = 60,
max_retries: int = 3
):
self.server_url = server_url
self.api_key = api_key
self.transport = transport
self.max_concurrent_calls = max_concurrent_calls
self.request_timeout = request_timeout
self.max_retries = max_retries
# Semaphore cho concurrency control
self._semaphore = asyncio.Semaphore(max_concurrent_calls)
# Connection state
self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
self._session_id: Optional[str] = None
self._tools_cache: Dict[str, MCPToolDefinition] = {}
self._cache_ttl: int = 300 # 5 minutes
# Metrics
self._metrics = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"total_latency_ms": 0.0,
"cache_hits": 0
}
# Circuit breaker state
self._failure_count = 0
self._circuit_open = False
self._circuit_open_time: Optional[float] = None
self.CIRCUIT_BREAKER_THRESHOLD = 5
self.CIRCUIT_BREAKER_RESET_TIME = 60 # seconds
logger.info("mcp_client_initialized",
server_url=server_url,
transport=transport.value,
max_concurrent=max_concurrent_calls)
async def connect(self) -> bool:
"""Establish connection với MCP server."""
try:
if self.transport == MCPTransportType.WEBSOCKET:
return await self._connect_websocket()
elif self.transport == MCPTransportType.SSE:
return await self._connect_sse()
return False
except Exception as e:
logger.error("connection_failed", error=str(e))
return False
async def _connect_websocket(self) -> bool:
"""WebSocket connection với proper handshake."""
headers = {
"Authorization": f"Bearer {self.api_key}",
"X-MCP-Version": "1.0",
"X-Client-Version": "1.0.0"
}
async with aiohttp.ClientSession() as session:
self._ws = await session.ws_connect(
f"{self.server_url}/mcp",
headers=headers,
timeout=aiohttp.ClientTimeout(total=self.request_timeout)
)
# Wait for connection acknowledgment
msg = await self._ws.receive()
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
if data.get("type") == "connection.established":
self._session_id = data.get("session_id")
await self._initialize_session()
logger.info("websocket_connected", session_id=self._session_id)
return True
return False
async def _initialize_session(self):
"""Initialize MCP session và discover available tools."""
init_request = {
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "1.0.0",
"capabilities": {
"tools": True,
"resources": True,
"prompts": True
},
"clientInfo": {
"name": "production-mcp-client",
"version": "1.0.0"
}
},
"id": "init-1"
}
await self._ws.send_json(init_request)
response = await self._ws.receive_json()
if response.get("result"):
self._server_capabilities = response["result"].get("capabilities", {})
await self._discover_tools()
async def _discover_tools(self):
"""Discover và cache available tools từ server."""
tools_request = {
"jsonrpc": "2.0",
"method": "tools/list",
"params": {},
"id": "tools-list-1"
}
await self._ws.send_json(tools_request)
response = await self._ws.receive_json()
if response.get("result") and "tools" in response["result"]:
for tool in response["result"]["tools"]:
self._tools_cache[tool["name"]] = MCPToolDefinition(
name=tool["name"],
description=tool.get("description", ""),
input_schema=tool.get("inputSchema", {}),
output_schema=tool.get("outputSchema")
)
logger.info("tools_discovered", count=len(self._tools_cache))
async def call_tool(
self,
tool_name: str,
arguments: Dict[str, Any],
timeout_ms: Optional[int] = None
) -> MCPToolCallResult:
"""
Execute tool call với full production features:
- Concurrency control
- Circuit breaker
- Automatic retry
- Metrics collection
"""
import time
start_time = time.perf_counter()
request_id = f"req-{uuid.uuid4().hex[:12]}"
timeout = timeout_ms or self.request_timeout * 1000
# Circuit breaker check
if self._is_circuit_open():
return MCPToolCallResult(
request_id=request_id,
success=False,
error="Circuit breaker is open",
latency_ms=0
)
async with self._semaphore:
for attempt in range(self.max_retries):
try:
result = await self._execute_tool_call(
tool_name, arguments, request_id, timeout
)
self._metrics["total_requests"] += 1
self._metrics["successful_requests"] += 1
self._failure_count = 0
return result
except asyncio.TimeoutError:
logger.warning("tool_call_timeout",
tool=tool_name,
attempt=attempt + 1,
timeout_ms=timeout)
except Exception as e:
logger.error("tool_call_error",
tool=tool_name,
error=str(e),
attempt=attempt + 1)
self._failure_count += 1
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
# All retries failed
self._metrics["failed_requests"] += 1
self._record_failure()
return MCPToolCallResult(
request_id=request_id,
success=False,
error=f"Failed after {self.max_retries} attempts",
latency_ms=(time.perf_counter() - start_time) * 1000
)
async def _execute_tool_call(
self,
tool_name: str,
arguments: Dict[str, Any],
request_id: str,
timeout_ms: int
) -> MCPToolCallResult:
"""Execute actual tool call qua WebSocket."""
import time
start_time = time.perf_counter()
request = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": arguments
},
"id": request_id
}
await self._ws.send_json(request)
# Wait for response với timeout
try:
response = await asyncio.wait_for(
self._ws.receive_json(),
timeout=timeout_ms / 1000
)
except asyncio.TimeoutError:
raise asyncio.TimeoutError(f"Tool call timed out after {timeout_ms}ms")
latency_ms = (time.perf_counter() - start_time) * 1000
self._metrics["total_latency_ms"] += latency_ms
if "error" in response:
return MCPToolCallResult(
request_id=request_id,
success=False,
error=response["error"].get("message"),
latency_ms=latency_ms
)
return MCPToolCallResult(
request_id=request_id,
success=True,
result=response.get("result"),
latency_ms=latency_ms,
tokens_used=response.get("meta", {}).get("tokens_used")
)
def _is_circuit_open(self) -> bool:
"""Check circuit breaker status."""
if not self._circuit_open:
return False
if self._circuit_open_time:
import time
if time.time() - self._circuit_open_time > self.CIRCUIT_BREAKER_RESET_TIME:
self._circuit_open = False
self._failure_count = 0
logger.info("circuit_breaker_reset")
return False
return True
def _record_failure(self):
"""Record failure for circuit breaker."""
if self._failure_count >= self.CIRCUIT_BREAKER_THRESHOLD:
self._circuit_open = True
self._circuit_open_time = time.time()
logger.warning("circuit_breaker_opened",
failure_count=self._failure_count)
def get_metrics(self) -> Dict[str, Any]:
"""Get client metrics."""
avg_latency = (
self._metrics["total_latency_ms"] / self._metrics["total_requests"]
if self._metrics["total_requests"] > 0 else 0
)
return {
**self._metrics,
"avg_latency_ms": round(avg_latency, 2),
"success_rate": (
self._metrics["successful_requests"] / self._metrics["total_requests"]
if self._metrics["total_requests"] > 0 else 0
),
"cached_tools": len(self._tools_cache),
"circuit_breaker_status": "open" if self._circuit_open else "closed"
}
2. Server Implementation — Production Architecture
Dưới đây là server implementation hoàn chỉnh với connection pooling và resource management:
# MCP Server Implementation - Production Ready
import asyncio
import json
from typing import Dict, Any, Optional, Callable, List
from dataclasses import dataclass, field
from enum import Enum
from abc import ABC, abstractmethod
import structlog
from datetime import datetime, timedelta
logger = structlog.get_logger()
class MCPErrorCode(Enum):
PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERNAL_ERROR = -32603
TOOL_NOT_FOUND = -32001
TOOL_EXECUTION_ERROR = -32002
RATE_LIMITED = -32003
RESOURCE_EXHAUSTED = -32004
@dataclass
class MCPRequest:
jsonrpc: str
method: str
params: Dict[str, Any] = field(default_factory=dict)
id: Optional[Any] = None
@dataclass
class MCPResponse:
jsonrpc: str = "2.0"
result: Optional[Any] = None
error: Optional[Dict[str, Any]] = None
id: Optional[Any] = None
class Tool(ABC):
"""Base class cho MCP tools."""
@property
@abstractmethod
def name(self) -> str:
pass
@property
@abstractmethod
def description(self) -> str:
pass
@property
@abstractmethod
def input_schema(self) -> Dict[str, Any]:
pass
@abstractmethod
async def execute(self, arguments: Dict[str, Any]) -> Any:
pass
class MCPServer:
"""
Production MCP Server với:
- Tool registry
- Rate limiting
- Resource quotas
- Request validation
- Logging & monitoring
"""
def __init__(
self,
host: str = "0.0.0.0",
port: int = 8080,
max_connections: int = 1000,
rate_limit_per_minute: int = 60,
max_tool_execution_time: int = 30000 # ms
):
self.host = host
self.port = port
self.max_connections = max_connections
self.rate_limit_per_minute = rate_limit_per_minute
self.max_tool_execution_time = max_tool_execution_time
# Tool registry
self._tools: Dict[str, Tool] = {}
# Connection management
self._active_connections: Dict[str, datetime] = {}
self._connection_lock = asyncio.Lock()
# Rate limiting
self._rate_limit_buckets: Dict[str, List[datetime]] = {}
# Metrics
self._server_metrics = {
"total_requests": 0,
"tool_calls": 0,
"errors": 0,
"active_connections": 0
}
logger.info("mcp_server_initialized",
host=host,
port=port,
max_connections=max_connections)
def register_tool(self, tool: Tool):
"""Register a tool với the server."""
self._tools[tool.name] = tool
logger.info("tool_registered",
tool_name=tool.name,
description=tool.description[:50])
async def handle_request(self, request: Dict[str, Any]) -> MCPResponse:
"""Handle incoming MCP request."""
self._server_metrics["total_requests"] += 1
try:
# Parse request
mcp_request = self._parse_request(request)
# Route to handler
if mcp_request.method == "initialize":
return await self._handle_initialize(mcp_request)
elif mcp_request.method == "tools/list":
return await self._handle_list_tools(mcp_request)
elif mcp_request.method == "tools/call":
return await self._handle_tool_call(mcp_request)
elif mcp_request.method == "resources/list":
return await self._handle_list_resources(mcp_request)
else:
return self._error_response(
MCPErrorCode.METHOD_NOT_FOUND,
f"Unknown method: {mcp_request.method}",
mcp_request.id
)
except json.JSONDecodeError as e:
return self._error_response(
MCPErrorCode.PARSE_ERROR,
f"Invalid JSON: {str(e)}",
None
)
except Exception as e:
self._server_metrics["errors"] += 1
logger.error("request_error", error=str(e))
return self._error_response(
MCPErrorCode.INTERNAL_ERROR,
str(e),
request.get("id")
)
def _parse_request(self, request: Dict[str, Any]) -> MCPRequest:
"""Parse và validate MCP request."""
if not isinstance(request, dict):
raise ValueError("Request must be a dictionary")
if request.get("jsonrpc") != "2.0":
raise ValueError("Invalid JSON-RPC version")
if "method" not in request:
raise ValueError("Missing method field")
return MCPRequest(
jsonrpc=request["jsonrpc"],
method=request["method"],
params=request.get("params", {}),
id=request.get("id")
)
async def _handle_initialize(self, request: MCPRequest) -> MCPResponse:
"""Handle initialization request."""
client_info = request.params.get("clientInfo", {})
logger.info("client_initializing",
client_name=client_info.get("name"),
protocol_version=request.params.get("protocolVersion"))
return MCPResponse(
result={
"protocolVersion": "1.0.0",
"capabilities": {
"tools": {
"listChanged": True
},
"resources": {
"subscribe": True,
"listChanged": True
},
"prompts": {
"listChanged": True
}
},
"serverInfo": {
"name": "production-mcp-server",
"version": "1.0.0"
}
},
id=request.id
)
async def _handle_list_tools(self, request: MCPRequest) -> MCPResponse:
"""Handle tools/list request."""
tools_list = [
{
"name": tool.name,
"description": tool.description,
"inputSchema": tool.input_schema
}
for tool in self._tools.values()
]
return MCPResponse(
result={"tools": tools_list},
id=request.id
)
async def _handle_tool_call(self, request: MCPRequest) -> MCPResponse:
"""Handle tools/call request."""
self._server_metrics["tool_calls"] += 1
tool_name = request.params.get("name")
arguments = request.params.get("arguments", {})
# Validate tool exists
if tool_name not in self._tools:
return self._error_response(
MCPErrorCode.TOOL_NOT_FOUND,
f"Tool not found: {tool_name}",
request.id
)
# Rate limit check
client_id = request.params.get("_client_id", "anonymous")
if not self._check_rate_limit(client_id):
return self._error_response(
MCPErrorCode.RATE_LIMITED,
"Rate limit exceeded",
request.id
)
tool = self._tools[tool_name]
try:
# Execute with timeout
result = await asyncio.wait_for(
tool.execute(arguments),
timeout=self.max_tool_execution_time / 1000
)
return MCPResponse(
result={
"content": [
{
"type": "text",
"text": json.dumps(result, ensure_ascii=False)
}
],
"isError": False
},
id=request.id
)
except asyncio.TimeoutError:
return self._error_response(
MCPErrorCode.TOOL_EXECUTION_ERROR,
f"Tool execution timed out after {self.max_tool_execution_time}ms",
request.id
)
except Exception as e:
logger.error("tool_execution_error",
tool=tool_name,
error=str(e))
return self._error_response(
MCPErrorCode.TOOL_EXECUTION_ERROR,
str(e),
request.id
)
async def _handle_list_resources(self, request: MCPRequest) -> MCPResponse:
"""Handle resources/list request."""
# Return empty resources for now
return MCPResponse(
result={"resources": []},
id=request.id
)
def _check_rate_limit(self, client_id: str) -> bool:
"""Check if client is within rate limit."""
now = datetime.now()
minute_ago = now - timedelta(minutes=1)
if client_id not in self._rate_limit_buckets:
self._rate_limit_buckets[client_id] = []
# Clean old timestamps
self._rate_limit_buckets[client_id] = [
ts for ts in self._rate_limit_buckets[client_id]
if ts > minute_ago
]
# Check limit
if len(self._rate_limit_buckets[client_id]) >= self.rate_limit_per_minute:
return False
self._rate_limit_buckets[client_id].append(now)
return True
def _error_response(
self,
code: MCPErrorCode,
message: str,
request_id: Optional[Any]
) -> MCPResponse:
"""Create error response."""
return MCPResponse(
error={
"code": code.value,
"message": message,
"data": None
},
id=request_id
)
def get_metrics(self) -> Dict[str, Any]:
"""Get server metrics."""
return {
**self._server_metrics,
"registered_tools": len(self._tools),
"rate_limit_buckets": len(self._rate_limit_buckets)
}
Example: Real-world tool implementation
class WebSearchTool(Tool):
"""Production web search tool với caching và rate limiting."""
@property
def name(self) -> str:
return "web_search"
@property
def description(self) -> str:
return "Search the web for information"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"max_results": {
"type": "integer",
"description": "Maximum number of results",
"default": 10
}
},
"required": ["query"]
}
async def execute(self, arguments: Dict[str, Any]) -> Any:
query = arguments["query"]
max_results = arguments.get("max_results", 10)
# Simulate web search
await asyncio.sleep(0.1)
return {
"query": query,
"results": [
{"title": f"Result {i}", "url": f"https://example.com/{i}"}
for i in range(min(max_results, 5))
],
"total_found": max_results
}
Usage example
async def main():
server = MCPServer(port=8080)
server.register_tool(WebSearchTool())
# Test request
request = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "web_search",
"arguments": {"query": "MCP Protocol", "max_results": 5}
},
"id": "test-1"
}
response = await server.handle_request(request)
print(json.dumps(response.__dict__, indent=2, ensure_ascii=False))
if __name__ == "__main__":
asyncio.run(main())
Tích Hợp HolySheep AI — Tiết Kiệm 85%+ Chi Phí
Điểm mấu chốt của bài viết này là tích hợp MCP client với HolySheep AI API — nơi bạn có thể tiết kiệm đến 85% chi phí so với các provider lớn khác.
Đăng ký tại đây để nhận tín dụng miễn phí khi bắt đầu.
# Production Integration: MCP + HolyShehe AI
import asyncio
import json
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
import aiohttp
HolySheep AI Configuration
HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1"
HOLYSHEEP_API_KEY = "YOUR_HOLYSHEEP_API_KEY" # Thay thế bằng API key thực tế
@dataclass
class ModelConfig:
"""Cấu hình model với pricing info (2026)."""
model_id: str
name: str
price_per_1m_tokens_input: float
price_per_1m_tokens_output: float
max_tokens: int
supports_tools: bool = True
Model Registry với pricing thực tế (tính theo USD)
MODEL_REGISTRY: Dict[str, ModelConfig] = {
"gpt-4.1": ModelConfig(
model_id="gpt-4.1",
name="GPT-4.1",
price_per_1m_tokens_input=8.00, # $8/MTok
price_per_1m_tokens_output=24.00,
max_tokens=128000,
supports_tools=True
),
"claude-sonnet-4.5": ModelConfig(
model_id="claude-sonnet-4.5",
name="Claude Sonnet 4.5",
price_per_1m_tokens_input=15.00, # $15/MTok
price_per_1m_tokens_output=75.00,
max_tokens=200000,
supports_tools=True
),
"gemini-2.5-flash": ModelConfig(
model_id="gemini-2.5-flash",
name="Gemini 2.5 Flash",
price_per_1m_tokens_input=2.50, # $2.50/MTok
price_per_1m_tokens_output=10.00,
max_tokens=1000000,
supports_tools=True
),
"deepseek-v3.2": ModelConfig(
model_id="deepseek-v3.2",
name="DeepSeek V3.2",
price_per_1m_tokens_input=0.42, # Chỉ $0.42/MTok!
price_per_1m_tokens_output=1.68,
max_tokens=64000,
supports_tools=True
)
}
class ToolCallingAgent:
"""
AI Agent với tool calling capability sử dụng HolySheep AI.
Supports multiple models với automatic cost optimization.
"""
def __init__(
self,
api_key: str,
default_model: str = "deepseek-v3.2", # Model tiết kiệm nhất
enable_cost_tracking: bool = True
):
self.api_key = api_key
self.default_model = default_model
self.enable_cost_tracking = enable_cost_tracking
# Cost tracking
self._total_cost = 0.0
self._total_tokens = 0
self._request_count = 0
# Model selection strategy
self._model_strategy = "cost_optimized" # "cost_optimized", "quality_first", "balanced"
async def chat_completion(
self,
messages: List[Dict[str, str]],
tools: Optional[List[Dict[str, Any]]] = None,
model: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 4096
) -> Dict[str, Any]:
"""
Gửi request đến HolySheep AI với tool calling support.
Pricing comparison (input, per 1M tokens):
- GPT-4.1: $8.00
- Claude Sonnet 4.5: $15.00
- Gemini 2.5 Flash: $2.50
- DeepSeek V3.2: $0.42 (TIẾT KIỆM 95%!)
"""
model = model or self.default_model
if model not in MODEL_REGISTRY:
raise ValueError(f"Unknown model: {model}")
model_config = MODEL_REGISTRY[model]
# Build request payload
payload = {
"model": model_config.model_id,
"messages": messages,
"temperature": temperature,
"max_tokens": min(max_tokens, model_config.max_tokens)
}
if tools:
payload["tools"] = tools
payload["tool_choice"] = "auto"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{HOLYSHEEP_BASE_URL}/chat/completions",
headers=headers,
json=payload,
timeout=aiohttp.ClientTimeout(total=120)
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"API Error: {response.status} - {error_text}")
result = await response.json()
# Calculate cost
if self.enable_cost_tracking:
usage = result.get("usage", {})
input_tokens = usage.get("prompt_tokens", 0)
output_tokens = usage.get("completion_tokens", 0)
input_cost = (input_tokens / 1_000_000) * model_config.price_per_1m_tokens_input
output_cost = (output_tokens / 1_000_000) * model_config.price_per_1m_tokens_output
total_cost = input_cost + output_cost
self._total_cost += total_cost
self._total_tokens += input_tokens + output_tokens
self._request_count += 1
# Log cost breakdown
print(f"[COST] {model_config.name}: ${total_cost:.4f} "
f"(in: {input_tokens}, out: {output_tokens})")
return result
async def chat_with_tools(
self,
user_message: str,
available_tools: List[Dict[str, Any]],
max_turns: int = 5
) -> Dict[str, Any]:
"""
Multi-turn chat với tool execution.
Tự động execute tools khi model yêu cầu.
"""
messages = [
{"role": "system", "content": "Bạn là trợ lý AI có khả năng gọi tools."}
]
messages.append({"role": "user", "content": user_message})
for turn in range(max_turns):
# Get completion
response = await self.chat_completion(
messages=messages,
tools=available_tools,
model="deepseek-v3.2" # Dùng model rẻ nhất cho tool calls
)
# Extract response
choice = response["choices"][0]
assistant_message = choice["message"]
messages.append(assistant_message)
# Check for tool calls
if "tool_calls" not in assistant_message:
# No more tools, return final response
return {
"final_response": assistant_message["content"],
"total_turns": turn + 1,
"messages": messages
}
# Execute each tool call
for tool_call in assistant_message["tool_calls"]:
tool_name = tool_call["function"]["name"]
tool_args = json.loads(tool_call["function"]["arguments"])
tool_call_id = tool_call["id"]
# Find tool implementation
tool_result = await self._execute_tool(tool_name, tool_args)
# Add tool result to messages
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"content": json.dumps(tool_result, ensure_ascii=False)
})
# Max turns reached
return {
"final_response": "Đã đạt số lượt tối đa. Vui lòng thử câu hỏi ngắn hơn.",
"total_turns": max_turns,
"messages": messages
}
async def _execute_tool(self, tool_name: str, args: Dict[str, Any]) -> Dict[str, Any]:
"""Execute tool và trả về kết quả."""
# Tool execution logic here
return {"status": "success", "result": f"Executed {tool_name} with args: {args}"}
def get_cost_report(self) -> Dict[str, Any]:
"""Generate cost report."""
avg_cost_per_request = (
self._total_cost / self._request_count
if self._request_count > 0 else 0
)
return {
"total_cost_usd": round(self._total_cost, 4),
"total_tokens": self._total_tokens,
"request_count": self._request_count,
"avg_cost_per_request": round(avg_cost_per_request, 6),
"savings_vs_gpt4": self._calculate_savings("gpt-4.1"),
"savings_vs_claude": self._calculate_savings("claude-sonnet-4.5")
}
def _calculate_savings(self, compare_model: str) -> Dict[str, Any]:
"""Calculate savings compared to other providers."""
if compare_model not in MODEL_REGISTRY:
return {}
compare_config = MODEL_REGISTRY[compare_model]
default_config = MODEL_REGISTRY[self.default_model]
cost_ratio = (
(compare_config.price_per_1m_tokens_input + compare_config.price_per_1m_tokens_output) /
(default_config.price_per_1m_tokens_input + default_config.price_per_1m_tokens_output
Tài nguyên liên quan
Bài viết liên quan