Giới Thiệu — Tại Sao Kỹ Sư Backend Cần Quan Tâm Ngay

Sau 18 tháng phát triển và qua nhiều bản beta, MCP Protocol 1.0 đã chính thức được công bố với hệ sinh thái 200+ server implementation. Là kỹ sư đã triển khai hệ thống AI tool calling cho production tại HolySheep AI, tôi nhận ra đây là thời điểm quan trọng để cộng đồng kỹ thuật hiểu rõ protocol này hoạt động như thế nào và tại sao nó sẽ trở thành tiêu chuẩn ngành. MCP (Model Context Protocol) không chỉ là một specification mới — nó là lời giải cho bài toán mà chúng ta đã đối mặt suốt: làm sao để AI model gọi external tools một cách an toàn, nhanh chóng và có thể mở rộng. Trong bài viết này, tôi sẽ chia sẻ kinh nghiệm thực chiến với MCP, từ kiến trúc core cho đến optimization cho production environment.

Kiến Trúc Core Của MCP Protocol 1.0

1. Transport Layer — WebSocket vs SSE

MCP hỗ trợ hai transport mechanism chính. Trong production, WebSocket là lựa chọn tối ưu cho low-latency tool calling:
# MCP Client với WebSocket Transport - Production Implementation
import asyncio
import json
from typing import Any, Optional, Dict, List
from dataclasses import dataclass, field
from enum import Enum
import aiohttp
import structlog

logger = structlog.get_logger()

class MCPTransportType(Enum):
    WEBSOCKET = "websocket"
    SSE = "sse"
    STDIO = "stdio"

@dataclass
class MCPToolDefinition:
    name: str
    description: str
    input_schema: Dict[str, Any]
    output_schema: Optional[Dict[str, Any]] = None

@dataclass
class MCPToolCallRequest:
    tool_name: str
    arguments: Dict[str, Any]
    request_id: str
    timeout_ms: int = 30000

@dataclass 
class MCPToolCallResult:
    request_id: str
    success: bool
    result: Optional[Any]
    error: Optional[str] = None
    latency_ms: float = 0.0
    tokens_used: Optional[int] = None

class MCPClient:
    """
    Production-grade MCP Client với connection pooling,
    automatic reconnection và circuit breaker pattern.
    """
    
    def __init__(
        self,
        server_url: str,
        api_key: str,
        transport: MCPTransportType = MCPTransportType.WEBSOCKET,
        max_concurrent_calls: int = 50,
        request_timeout: int = 60,
        max_retries: int = 3
    ):
        self.server_url = server_url
        self.api_key = api_key
        self.transport = transport
        self.max_concurrent_calls = max_concurrent_calls
        self.request_timeout = request_timeout
        self.max_retries = max_retries
        
        # Semaphore cho concurrency control
        self._semaphore = asyncio.Semaphore(max_concurrent_calls)
        
        # Connection state
        self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
        self._session_id: Optional[str] = None
        self._tools_cache: Dict[str, MCPToolDefinition] = {}
        self._cache_ttl: int = 300  # 5 minutes
        
        # Metrics
        self._metrics = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "total_latency_ms": 0.0,
            "cache_hits": 0
        }
        
        # Circuit breaker state
        self._failure_count = 0
        self._circuit_open = False
        self._circuit_open_time: Optional[float] = None
        self.CIRCUIT_BREAKER_THRESHOLD = 5
        self.CIRCUIT_BREAKER_RESET_TIME = 60  # seconds
        
        logger.info("mcp_client_initialized", 
                   server_url=server_url,
                   transport=transport.value,
                   max_concurrent=max_concurrent_calls)

    async def connect(self) -> bool:
        """Establish connection với MCP server."""
        try:
            if self.transport == MCPTransportType.WEBSOCKET:
                return await self._connect_websocket()
            elif self.transport == MCPTransportType.SSE:
                return await self._connect_sse()
            return False
        except Exception as e:
            logger.error("connection_failed", error=str(e))
            return False

    async def _connect_websocket(self) -> bool:
        """WebSocket connection với proper handshake."""
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "X-MCP-Version": "1.0",
            "X-Client-Version": "1.0.0"
        }
        
        async with aiohttp.ClientSession() as session:
            self._ws = await session.ws_connect(
                f"{self.server_url}/mcp",
                headers=headers,
                timeout=aiohttp.ClientTimeout(total=self.request_timeout)
            )
            
            # Wait for connection acknowledgment
            msg = await self._ws.receive()
            if msg.type == aiohttp.WSMsgType.TEXT:
                data = json.loads(msg.data)
                if data.get("type") == "connection.established":
                    self._session_id = data.get("session_id")
                    await self._initialize_session()
                    logger.info("websocket_connected", session_id=self._session_id)
                    return True
        
        return False

    async def _initialize_session(self):
        """Initialize MCP session và discover available tools."""
        init_request = {
            "jsonrpc": "2.0",
            "method": "initialize",
            "params": {
                "protocolVersion": "1.0.0",
                "capabilities": {
                    "tools": True,
                    "resources": True,
                    "prompts": True
                },
                "clientInfo": {
                    "name": "production-mcp-client",
                    "version": "1.0.0"
                }
            },
            "id": "init-1"
        }
        
        await self._ws.send_json(init_request)
        response = await self._ws.receive_json()
        
        if response.get("result"):
            self._server_capabilities = response["result"].get("capabilities", {})
            await self._discover_tools()

    async def _discover_tools(self):
        """Discover và cache available tools từ server."""
        tools_request = {
            "jsonrpc": "2.0",
            "method": "tools/list",
            "params": {},
            "id": "tools-list-1"
        }
        
        await self._ws.send_json(tools_request)
        response = await self._ws.receive_json()
        
        if response.get("result") and "tools" in response["result"]:
            for tool in response["result"]["tools"]:
                self._tools_cache[tool["name"]] = MCPToolDefinition(
                    name=tool["name"],
                    description=tool.get("description", ""),
                    input_schema=tool.get("inputSchema", {}),
                    output_schema=tool.get("outputSchema")
                )
            
            logger.info("tools_discovered", count=len(self._tools_cache))

    async def call_tool(
        self,
        tool_name: str,
        arguments: Dict[str, Any],
        timeout_ms: Optional[int] = None
    ) -> MCPToolCallResult:
        """
        Execute tool call với full production features:
        - Concurrency control
        - Circuit breaker
        - Automatic retry
        - Metrics collection
        """
        import time
        
        start_time = time.perf_counter()
        request_id = f"req-{uuid.uuid4().hex[:12]}"
        timeout = timeout_ms or self.request_timeout * 1000
        
        # Circuit breaker check
        if self._is_circuit_open():
            return MCPToolCallResult(
                request_id=request_id,
                success=False,
                error="Circuit breaker is open",
                latency_ms=0
            )
        
        async with self._semaphore:
            for attempt in range(self.max_retries):
                try:
                    result = await self._execute_tool_call(
                        tool_name, arguments, request_id, timeout
                    )
                    
                    self._metrics["total_requests"] += 1
                    self._metrics["successful_requests"] += 1
                    self._failure_count = 0
                    
                    return result
                    
                except asyncio.TimeoutError:
                    logger.warning("tool_call_timeout",
                                 tool=tool_name,
                                 attempt=attempt + 1,
                                 timeout_ms=timeout)
                    
                except Exception as e:
                    logger.error("tool_call_error",
                               tool=tool_name,
                               error=str(e),
                               attempt=attempt + 1)
                    self._failure_count += 1
                    
                    if attempt < self.max_retries - 1:
                        await asyncio.sleep(2 ** attempt)  # Exponential backoff
            
            # All retries failed
            self._metrics["failed_requests"] += 1
            self._record_failure()
            
            return MCPToolCallResult(
                request_id=request_id,
                success=False,
                error=f"Failed after {self.max_retries} attempts",
                latency_ms=(time.perf_counter() - start_time) * 1000
            )

    async def _execute_tool_call(
        self,
        tool_name: str,
        arguments: Dict[str, Any],
        request_id: str,
        timeout_ms: int
    ) -> MCPToolCallResult:
        """Execute actual tool call qua WebSocket."""
        import time
        
        start_time = time.perf_counter()
        
        request = {
            "jsonrpc": "2.0",
            "method": "tools/call",
            "params": {
                "name": tool_name,
                "arguments": arguments
            },
            "id": request_id
        }
        
        await self._ws.send_json(request)
        
        # Wait for response với timeout
        try:
            response = await asyncio.wait_for(
                self._ws.receive_json(),
                timeout=timeout_ms / 1000
            )
        except asyncio.TimeoutError:
            raise asyncio.TimeoutError(f"Tool call timed out after {timeout_ms}ms")
        
        latency_ms = (time.perf_counter() - start_time) * 1000
        self._metrics["total_latency_ms"] += latency_ms
        
        if "error" in response:
            return MCPToolCallResult(
                request_id=request_id,
                success=False,
                error=response["error"].get("message"),
                latency_ms=latency_ms
            )
        
        return MCPToolCallResult(
            request_id=request_id,
            success=True,
            result=response.get("result"),
            latency_ms=latency_ms,
            tokens_used=response.get("meta", {}).get("tokens_used")
        )

    def _is_circuit_open(self) -> bool:
        """Check circuit breaker status."""
        if not self._circuit_open:
            return False
        
        if self._circuit_open_time:
            import time
            if time.time() - self._circuit_open_time > self.CIRCUIT_BREAKER_RESET_TIME:
                self._circuit_open = False
                self._failure_count = 0
                logger.info("circuit_breaker_reset")
                return False
        
        return True

    def _record_failure(self):
        """Record failure for circuit breaker."""
        if self._failure_count >= self.CIRCUIT_BREAKER_THRESHOLD:
            self._circuit_open = True
            self._circuit_open_time = time.time()
            logger.warning("circuit_breaker_opened",
                         failure_count=self._failure_count)

    def get_metrics(self) -> Dict[str, Any]:
        """Get client metrics."""
        avg_latency = (
            self._metrics["total_latency_ms"] / self._metrics["total_requests"]
            if self._metrics["total_requests"] > 0 else 0
        )
        
        return {
            **self._metrics,
            "avg_latency_ms": round(avg_latency, 2),
            "success_rate": (
                self._metrics["successful_requests"] / self._metrics["total_requests"]
                if self._metrics["total_requests"] > 0 else 0
            ),
            "cached_tools": len(self._tools_cache),
            "circuit_breaker_status": "open" if self._circuit_open else "closed"
        }

2. Server Implementation — Production Architecture

Dưới đây là server implementation hoàn chỉnh với connection pooling và resource management:
# MCP Server Implementation - Production Ready
import asyncio
import json
from typing import Dict, Any, Optional, Callable, List
from dataclasses import dataclass, field
from enum import Enum
from abc import ABC, abstractmethod
import structlog
from datetime import datetime, timedelta

logger = structlog.get_logger()

class MCPErrorCode(Enum):
    PARSE_ERROR = -32700
    INVALID_REQUEST = -32600
    METHOD_NOT_FOUND = -32601
    INVALID_PARAMS = -32602
    INTERNAL_ERROR = -32603
    TOOL_NOT_FOUND = -32001
    TOOL_EXECUTION_ERROR = -32002
    RATE_LIMITED = -32003
    RESOURCE_EXHAUSTED = -32004

@dataclass
class MCPRequest:
    jsonrpc: str
    method: str
    params: Dict[str, Any] = field(default_factory=dict)
    id: Optional[Any] = None

@dataclass
class MCPResponse:
    jsonrpc: str = "2.0"
    result: Optional[Any] = None
    error: Optional[Dict[str, Any]] = None
    id: Optional[Any] = None

class Tool(ABC):
    """Base class cho MCP tools."""
    
    @property
    @abstractmethod
    def name(self) -> str:
        pass
    
    @property
    @abstractmethod
    def description(self) -> str:
        pass
    
    @property
    @abstractmethod
    def input_schema(self) -> Dict[str, Any]:
        pass
    
    @abstractmethod
    async def execute(self, arguments: Dict[str, Any]) -> Any:
        pass

class MCPServer:
    """
    Production MCP Server với:
    - Tool registry
    - Rate limiting
    - Resource quotas
    - Request validation
    - Logging & monitoring
    """
    
    def __init__(
        self,
        host: str = "0.0.0.0",
        port: int = 8080,
        max_connections: int = 1000,
        rate_limit_per_minute: int = 60,
        max_tool_execution_time: int = 30000  # ms
    ):
        self.host = host
        self.port = port
        self.max_connections = max_connections
        self.rate_limit_per_minute = rate_limit_per_minute
        self.max_tool_execution_time = max_tool_execution_time
        
        # Tool registry
        self._tools: Dict[str, Tool] = {}
        
        # Connection management
        self._active_connections: Dict[str, datetime] = {}
        self._connection_lock = asyncio.Lock()
        
        # Rate limiting
        self._rate_limit_buckets: Dict[str, List[datetime]] = {}
        
        # Metrics
        self._server_metrics = {
            "total_requests": 0,
            "tool_calls": 0,
            "errors": 0,
            "active_connections": 0
        }
        
        logger.info("mcp_server_initialized",
                   host=host,
                   port=port,
                   max_connections=max_connections)

    def register_tool(self, tool: Tool):
        """Register a tool với the server."""
        self._tools[tool.name] = tool
        logger.info("tool_registered",
                   tool_name=tool.name,
                   description=tool.description[:50])

    async def handle_request(self, request: Dict[str, Any]) -> MCPResponse:
        """Handle incoming MCP request."""
        self._server_metrics["total_requests"] += 1
        
        try:
            # Parse request
            mcp_request = self._parse_request(request)
            
            # Route to handler
            if mcp_request.method == "initialize":
                return await self._handle_initialize(mcp_request)
            elif mcp_request.method == "tools/list":
                return await self._handle_list_tools(mcp_request)
            elif mcp_request.method == "tools/call":
                return await self._handle_tool_call(mcp_request)
            elif mcp_request.method == "resources/list":
                return await self._handle_list_resources(mcp_request)
            else:
                return self._error_response(
                    MCPErrorCode.METHOD_NOT_FOUND,
                    f"Unknown method: {mcp_request.method}",
                    mcp_request.id
                )
                
        except json.JSONDecodeError as e:
            return self._error_response(
                MCPErrorCode.PARSE_ERROR,
                f"Invalid JSON: {str(e)}",
                None
            )
        except Exception as e:
            self._server_metrics["errors"] += 1
            logger.error("request_error", error=str(e))
            return self._error_response(
                MCPErrorCode.INTERNAL_ERROR,
                str(e),
                request.get("id")
            )

    def _parse_request(self, request: Dict[str, Any]) -> MCPRequest:
        """Parse và validate MCP request."""
        if not isinstance(request, dict):
            raise ValueError("Request must be a dictionary")
        
        if request.get("jsonrpc") != "2.0":
            raise ValueError("Invalid JSON-RPC version")
        
        if "method" not in request:
            raise ValueError("Missing method field")
        
        return MCPRequest(
            jsonrpc=request["jsonrpc"],
            method=request["method"],
            params=request.get("params", {}),
            id=request.get("id")
        )

    async def _handle_initialize(self, request: MCPRequest) -> MCPResponse:
        """Handle initialization request."""
        client_info = request.params.get("clientInfo", {})
        
        logger.info("client_initializing",
                   client_name=client_info.get("name"),
                   protocol_version=request.params.get("protocolVersion"))
        
        return MCPResponse(
            result={
                "protocolVersion": "1.0.0",
                "capabilities": {
                    "tools": {
                        "listChanged": True
                    },
                    "resources": {
                        "subscribe": True,
                        "listChanged": True
                    },
                    "prompts": {
                        "listChanged": True
                    }
                },
                "serverInfo": {
                    "name": "production-mcp-server",
                    "version": "1.0.0"
                }
            },
            id=request.id
        )

    async def _handle_list_tools(self, request: MCPRequest) -> MCPResponse:
        """Handle tools/list request."""
        tools_list = [
            {
                "name": tool.name,
                "description": tool.description,
                "inputSchema": tool.input_schema
            }
            for tool in self._tools.values()
        ]
        
        return MCPResponse(
            result={"tools": tools_list},
            id=request.id
        )

    async def _handle_tool_call(self, request: MCPRequest) -> MCPResponse:
        """Handle tools/call request."""
        self._server_metrics["tool_calls"] += 1
        
        tool_name = request.params.get("name")
        arguments = request.params.get("arguments", {})
        
        # Validate tool exists
        if tool_name not in self._tools:
            return self._error_response(
                MCPErrorCode.TOOL_NOT_FOUND,
                f"Tool not found: {tool_name}",
                request.id
            )
        
        # Rate limit check
        client_id = request.params.get("_client_id", "anonymous")
        if not self._check_rate_limit(client_id):
            return self._error_response(
                MCPErrorCode.RATE_LIMITED,
                "Rate limit exceeded",
                request.id
            )
        
        tool = self._tools[tool_name]
        
        try:
            # Execute with timeout
            result = await asyncio.wait_for(
                tool.execute(arguments),
                timeout=self.max_tool_execution_time / 1000
            )
            
            return MCPResponse(
                result={
                    "content": [
                        {
                            "type": "text",
                            "text": json.dumps(result, ensure_ascii=False)
                        }
                    ],
                    "isError": False
                },
                id=request.id
            )
            
        except asyncio.TimeoutError:
            return self._error_response(
                MCPErrorCode.TOOL_EXECUTION_ERROR,
                f"Tool execution timed out after {self.max_tool_execution_time}ms",
                request.id
            )
        except Exception as e:
            logger.error("tool_execution_error",
                        tool=tool_name,
                        error=str(e))
            return self._error_response(
                MCPErrorCode.TOOL_EXECUTION_ERROR,
                str(e),
                request.id
            )

    async def _handle_list_resources(self, request: MCPRequest) -> MCPResponse:
        """Handle resources/list request."""
        # Return empty resources for now
        return MCPResponse(
            result={"resources": []},
            id=request.id
        )

    def _check_rate_limit(self, client_id: str) -> bool:
        """Check if client is within rate limit."""
        now = datetime.now()
        minute_ago = now - timedelta(minutes=1)
        
        if client_id not in self._rate_limit_buckets:
            self._rate_limit_buckets[client_id] = []
        
        # Clean old timestamps
        self._rate_limit_buckets[client_id] = [
            ts for ts in self._rate_limit_buckets[client_id]
            if ts > minute_ago
        ]
        
        # Check limit
        if len(self._rate_limit_buckets[client_id]) >= self.rate_limit_per_minute:
            return False
        
        self._rate_limit_buckets[client_id].append(now)
        return True

    def _error_response(
        self,
        code: MCPErrorCode,
        message: str,
        request_id: Optional[Any]
    ) -> MCPResponse:
        """Create error response."""
        return MCPResponse(
            error={
                "code": code.value,
                "message": message,
                "data": None
            },
            id=request_id
        )

    def get_metrics(self) -> Dict[str, Any]:
        """Get server metrics."""
        return {
            **self._server_metrics,
            "registered_tools": len(self._tools),
            "rate_limit_buckets": len(self._rate_limit_buckets)
        }

Example: Real-world tool implementation

class WebSearchTool(Tool): """Production web search tool với caching và rate limiting.""" @property def name(self) -> str: return "web_search" @property def description(self) -> str: return "Search the web for information" @property def input_schema(self) -> Dict[str, Any]: return { "type": "object", "properties": { "query": { "type": "string", "description": "Search query" }, "max_results": { "type": "integer", "description": "Maximum number of results", "default": 10 } }, "required": ["query"] } async def execute(self, arguments: Dict[str, Any]) -> Any: query = arguments["query"] max_results = arguments.get("max_results", 10) # Simulate web search await asyncio.sleep(0.1) return { "query": query, "results": [ {"title": f"Result {i}", "url": f"https://example.com/{i}"} for i in range(min(max_results, 5)) ], "total_found": max_results }

Usage example

async def main(): server = MCPServer(port=8080) server.register_tool(WebSearchTool()) # Test request request = { "jsonrpc": "2.0", "method": "tools/call", "params": { "name": "web_search", "arguments": {"query": "MCP Protocol", "max_results": 5} }, "id": "test-1" } response = await server.handle_request(request) print(json.dumps(response.__dict__, indent=2, ensure_ascii=False)) if __name__ == "__main__": asyncio.run(main())

Tích Hợp HolySheep AI — Tiết Kiệm 85%+ Chi Phí

Điểm mấu chốt của bài viết này là tích hợp MCP client với HolySheep AI API — nơi bạn có thể tiết kiệm đến 85% chi phí so với các provider lớn khác. Đăng ký tại đây để nhận tín dụng miễn phí khi bắt đầu.
# Production Integration: MCP + HolyShehe AI
import asyncio
import json
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
import aiohttp

HolySheep AI Configuration

HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1" HOLYSHEEP_API_KEY = "YOUR_HOLYSHEEP_API_KEY" # Thay thế bằng API key thực tế @dataclass class ModelConfig: """Cấu hình model với pricing info (2026).""" model_id: str name: str price_per_1m_tokens_input: float price_per_1m_tokens_output: float max_tokens: int supports_tools: bool = True

Model Registry với pricing thực tế (tính theo USD)

MODEL_REGISTRY: Dict[str, ModelConfig] = { "gpt-4.1": ModelConfig( model_id="gpt-4.1", name="GPT-4.1", price_per_1m_tokens_input=8.00, # $8/MTok price_per_1m_tokens_output=24.00, max_tokens=128000, supports_tools=True ), "claude-sonnet-4.5": ModelConfig( model_id="claude-sonnet-4.5", name="Claude Sonnet 4.5", price_per_1m_tokens_input=15.00, # $15/MTok price_per_1m_tokens_output=75.00, max_tokens=200000, supports_tools=True ), "gemini-2.5-flash": ModelConfig( model_id="gemini-2.5-flash", name="Gemini 2.5 Flash", price_per_1m_tokens_input=2.50, # $2.50/MTok price_per_1m_tokens_output=10.00, max_tokens=1000000, supports_tools=True ), "deepseek-v3.2": ModelConfig( model_id="deepseek-v3.2", name="DeepSeek V3.2", price_per_1m_tokens_input=0.42, # Chỉ $0.42/MTok! price_per_1m_tokens_output=1.68, max_tokens=64000, supports_tools=True ) } class ToolCallingAgent: """ AI Agent với tool calling capability sử dụng HolySheep AI. Supports multiple models với automatic cost optimization. """ def __init__( self, api_key: str, default_model: str = "deepseek-v3.2", # Model tiết kiệm nhất enable_cost_tracking: bool = True ): self.api_key = api_key self.default_model = default_model self.enable_cost_tracking = enable_cost_tracking # Cost tracking self._total_cost = 0.0 self._total_tokens = 0 self._request_count = 0 # Model selection strategy self._model_strategy = "cost_optimized" # "cost_optimized", "quality_first", "balanced" async def chat_completion( self, messages: List[Dict[str, str]], tools: Optional[List[Dict[str, Any]]] = None, model: Optional[str] = None, temperature: float = 0.7, max_tokens: int = 4096 ) -> Dict[str, Any]: """ Gửi request đến HolySheep AI với tool calling support. Pricing comparison (input, per 1M tokens): - GPT-4.1: $8.00 - Claude Sonnet 4.5: $15.00 - Gemini 2.5 Flash: $2.50 - DeepSeek V3.2: $0.42 (TIẾT KIỆM 95%!) """ model = model or self.default_model if model not in MODEL_REGISTRY: raise ValueError(f"Unknown model: {model}") model_config = MODEL_REGISTRY[model] # Build request payload payload = { "model": model_config.model_id, "messages": messages, "temperature": temperature, "max_tokens": min(max_tokens, model_config.max_tokens) } if tools: payload["tools"] = tools payload["tool_choice"] = "auto" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } async with aiohttp.ClientSession() as session: async with session.post( f"{HOLYSHEEP_BASE_URL}/chat/completions", headers=headers, json=payload, timeout=aiohttp.ClientTimeout(total=120) ) as response: if response.status != 200: error_text = await response.text() raise Exception(f"API Error: {response.status} - {error_text}") result = await response.json() # Calculate cost if self.enable_cost_tracking: usage = result.get("usage", {}) input_tokens = usage.get("prompt_tokens", 0) output_tokens = usage.get("completion_tokens", 0) input_cost = (input_tokens / 1_000_000) * model_config.price_per_1m_tokens_input output_cost = (output_tokens / 1_000_000) * model_config.price_per_1m_tokens_output total_cost = input_cost + output_cost self._total_cost += total_cost self._total_tokens += input_tokens + output_tokens self._request_count += 1 # Log cost breakdown print(f"[COST] {model_config.name}: ${total_cost:.4f} " f"(in: {input_tokens}, out: {output_tokens})") return result async def chat_with_tools( self, user_message: str, available_tools: List[Dict[str, Any]], max_turns: int = 5 ) -> Dict[str, Any]: """ Multi-turn chat với tool execution. Tự động execute tools khi model yêu cầu. """ messages = [ {"role": "system", "content": "Bạn là trợ lý AI có khả năng gọi tools."} ] messages.append({"role": "user", "content": user_message}) for turn in range(max_turns): # Get completion response = await self.chat_completion( messages=messages, tools=available_tools, model="deepseek-v3.2" # Dùng model rẻ nhất cho tool calls ) # Extract response choice = response["choices"][0] assistant_message = choice["message"] messages.append(assistant_message) # Check for tool calls if "tool_calls" not in assistant_message: # No more tools, return final response return { "final_response": assistant_message["content"], "total_turns": turn + 1, "messages": messages } # Execute each tool call for tool_call in assistant_message["tool_calls"]: tool_name = tool_call["function"]["name"] tool_args = json.loads(tool_call["function"]["arguments"]) tool_call_id = tool_call["id"] # Find tool implementation tool_result = await self._execute_tool(tool_name, tool_args) # Add tool result to messages messages.append({ "role": "tool", "tool_call_id": tool_call_id, "content": json.dumps(tool_result, ensure_ascii=False) }) # Max turns reached return { "final_response": "Đã đạt số lượt tối đa. Vui lòng thử câu hỏi ngắn hơn.", "total_turns": max_turns, "messages": messages } async def _execute_tool(self, tool_name: str, args: Dict[str, Any]) -> Dict[str, Any]: """Execute tool và trả về kết quả.""" # Tool execution logic here return {"status": "success", "result": f"Executed {tool_name} with args: {args}"} def get_cost_report(self) -> Dict[str, Any]: """Generate cost report.""" avg_cost_per_request = ( self._total_cost / self._request_count if self._request_count > 0 else 0 ) return { "total_cost_usd": round(self._total_cost, 4), "total_tokens": self._total_tokens, "request_count": self._request_count, "avg_cost_per_request": round(avg_cost_per_request, 6), "savings_vs_gpt4": self._calculate_savings("gpt-4.1"), "savings_vs_claude": self._calculate_savings("claude-sonnet-4.5") } def _calculate_savings(self, compare_model: str) -> Dict[str, Any]: """Calculate savings compared to other providers.""" if compare_model not in MODEL_REGISTRY: return {} compare_config = MODEL_REGISTRY[compare_model] default_config = MODEL_REGISTRY[self.default_model] cost_ratio = ( (compare_config.price_per_1m_tokens_input + compare_config.price_per_1m_tokens_output) / (default_config.price_per_1m_tokens_input + default_config.price_per_1m_tokens_output