Việc quản lý conversation context là một trong những thách thức lớn nhất khi triển khai GPT-4 vào môi trường production. Một startup AI ở Hà Nội đã phải đối mặt với bài toán này khi hệ thống chatbot chăm sóc khách hàng tự động của họ bắt đầu phình to với hơn 50,000 cuộc hội thoại đồng thời mỗi ngày. Bài viết này sẽ hướng dẫn chi tiết cách xây dựng hệ thống context management tối ưu, từ kiến trúc cơ bản đến các best practice đã được kiểm chứng thực tế.
Bối Cảnh Thực Tế: Startup AI ở Hà Nội
Một startup AI ở Hà Nội chuyên cung cấp giải pháp chatbot tự động cho các sàn thương mại điện tử đã gặp phải vấn đề nghiêm trọng với hệ thống conversation state ban đầu. Với kiến trúc đơn giản lưu toàn bộ message history trong một mảng, họ nhanh chóng nhận ra rằng:
- Hóa đơn hàng tháng tăng vọt lên $4,200 do token usage không kiểm soát được
- Độ trễ trung bình lên đến 420ms khi conversation dài
- Memory leak nghiêm trọng khiến server phải restart mỗi 6 giờ
- Rate limiting thường xuyên do exceed context window
Sau khi tìm hiểu và chuyển sang HolySheep AI với chi phí chỉ từ $8/MTok cho GPT-4.1, đội ngũ đã tiết kiệm được 85% chi phí và đạt độ trễ 180ms sau 30 ngày triển khai.
Kiến Trúc Context Management Cơ Bản
Trước khi đi vào chi tiết, chúng ta cần hiểu rõ cách GPT-4 API xử lý context. Mỗi request gửi lên đều cần bao gồm toàn bộ conversation history để model có thể hiểu ngữ cảnh. Điều này có nghĩa là chi phí tính theo số token sẽ tăng tuyến tính với độ dài conversation.
1. Cấu Trúc Message Database
-- PostgreSQL schema cho conversation management
CREATE TABLE conversations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
metadata JSONB DEFAULT '{}',
is_active BOOLEAN DEFAULT TRUE
);
CREATE TABLE messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
conversation_id UUID REFERENCES conversations(id),
role VARCHAR(20) NOT NULL CHECK (role IN ('system', 'user', 'assistant')),
content TEXT NOT NULL,
token_count INTEGER,
created_at TIMESTAMP DEFAULT NOW(),
metadata JSONB DEFAULT '{}'
);
CREATE INDEX idx_messages_conversation ON messages(conversation_id, created_at);
CREATE INDEX idx_conversations_user ON conversations(user_id, is_active);
-- Trigger để tự động cập nhật updated_at
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER conversations_updated_at
BEFORE UPDATE ON conversations
FOR EACH ROW EXECUTE FUNCTION update_updated_at();
2. Session Manager Class
import hashlib
import tiktoken
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from datetime import datetime
from enum import Enum
class ContextStrategy(Enum):
FULL = "full" # Giữ toàn bộ context
SLIDING_WINDOW = "sliding" # Giữ N messages gần nhất
SUMMARY = "summary" # Tóm tắt context cũ
SEMANTIC = "semantic" # Chọn lọc theo semantic similarity
@dataclass
class Message:
role: str
content: str
token_count: int = 0
created_at: datetime = field(default_factory=datetime.now)
def __post_init__(self):
if self.token_count == 0:
self.token_count = self._estimate_tokens(self.content)
@staticmethod
def _estimate_tokens(text: str) -> int:
# Ước lượng: 1 token ≈ 4 ký tự cho tiếng Anh
# Với tiếng Việt, tỷ lệ này cao hơn
return len(text) // 4 + len(text.split())
@dataclass
class ConversationContext:
messages: List[Message] = field(default_factory=list)
system_prompt: str = ""
max_tokens: int = 128000 # GPT-4-32k context window
reserved_tokens: int = 2000 # Token cho response
def get_available_context_tokens(self) -> int:
current_tokens = sum(m.token_count for m in self.messages)
system_tokens = len(self.system_prompt) // 4
return self.max_tokens - current_tokens - system_tokens - self.reserved_tokens
def add_message(self, role: str, content: str) -> Message:
msg = Message(role=role, content=content)
self.messages.append(msg)
return msg
class GPTSessionManager:
def __init__(
self,
api_key: str,
base_url: str = "https://api.holysheep.ai/v1",
model: str = "gpt-4.1",
context_strategy: ContextStrategy = ContextStrategy.SLIDING_WINDOW,
max_messages: int = 20,
encoding_model: str = "cl100k_base"
):
self.api_key = api_key
self.base_url = base_url
self.model = model
self.context_strategy = context_strategy
self.max_messages = max_messages
self.encoding = tiktoken.get_encoding(encoding_model)
# Cache cho context optimization
self._context_cache: Dict[str, List[Message]] = {}
self._cache_ttl = 300 # 5 phút
def build_context(
self,
conversation_id: str,
messages: List[Message],
force_rebuild: bool = False
) -> List[Dict[str, str]]:
"""Xây dựng context cho API request"""
# Kiểm tra cache
cache_key = f"{conversation_id}:{len(messages)}"
if not force_rebuild and cache_key in self._context_cache:
return self._context_cache[cache_key]
# Áp dụng context strategy
optimized_messages = self._optimize_context(messages)
# Định dạng output
context = []
if self.system_prompt:
context.append({"role": "system", "content": self.system_prompt})
context.extend([
{"role": m.role, "content": m.content}
for m in optimized_messages
])
# Cache kết quả
self._context_cache[cache_key] = context
return context
def _optimize_context(self, messages: List[Message]) -> List[Message]:
"""Tối ưu context dựa trên strategy"""
if self.context_strategy == ContextStrategy.FULL:
return messages
elif self.context_strategy == ContextStrategy.SLIDING_WINDOW:
# Giữ N messages gần nhất
return messages[-self.max_messages:]
elif self.context_strategy == ContextStrategy.SUMMARY:
# Tóm tắt messages cũ, giữ messages gần đây
if len(messages) <= self.max_messages:
return messages
recent = messages[-self.max_messages//2:]
older = messages[:-self.max_messages//2]
summary = self._summarize_messages(older)
return [Message(role="assistant", content=f"[Tóm tắt]: {summary}")] + recent
elif self.context_strategy == ContextStrategy.SEMANTIC:
# Chọn lọc messages có liên quan
return self._semantic_filter(messages)
return messages
def _summarize_messages(self, messages: List[Message]) -> str:
"""Tạo tóm tắt cho các messages cũ"""
# Sử dụng model nhẹ để tóm tắt
summary_prompt = f"""Tóm tắt ngắn gọn cuộc trò chuyện sau,
chỉ giữ lại thông tin quan trọng:
{messages}"""
# Gọi API tóm tắt (sử dụng model rẻ hơn)
# Chi phí: DeepSeek V3.2 chỉ $0.42/MTok
return "Tóm tắt conversation..."
def _semantic_filter(self, messages: List[Message]) -> List[Message]:
"""Lọc messages dựa trên semantic similarity"""
# Sử dụng embeddings để chọn messages liên quan nhất
return messages[-self.max_messages:] # Fallback
Khởi tạo session manager
session_manager = GPTSessionManager(
api_key="YOUR_HOLYSHEEP_API_KEY",
base_url="https://api.holysheep.ai/v1",
model="gpt-4.1",
context_strategy=ContextStrategy.SLIDING_WINDOW,
max_messages=20
)
3. Async API Integration
import aiohttp
import asyncio
from typing import Dict, List, Any, Optional
import json
from datetime import datetime
import redis.asyncio as redis
class AsyncGPTClient:
def __init__(
self,
api_key: str,
base_url: str = "https://api.holysheep.ai/v1",
model: str = "gpt-4.1",
max_retries: int = 3,
timeout: int = 60
):
self.api_key = api_key
self.base_url = base_url.rstrip('/')
self.model = model
self.max_retries = max_retries
self.timeout = timeout
# Redis cache cho conversation context
self._redis: Optional[redis.Redis] = None
async def init_redis(self, redis_url: str = "redis://localhost:6379"):
"""Khởi tạo Redis connection pool"""
self._redis = await redis.from_url(
redis_url,
encoding="utf-8",
decode_responses=True
)
async def chat(
self,
messages: List[Dict[str, str]],
conversation_id: str,
temperature: float = 0.7,
max_tokens: int = 2048,
**kwargs
) -> Dict[str, Any]:
"""Gửi request lên GPT-4 API với retry logic"""
url = f"{self.base_url}/chat/completions"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
**kwargs
}
# Retry logic với exponential backoff
for attempt in range(self.max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.post(
url,
headers=headers,
json=payload,
timeout=aiohttp.ClientTimeout(total=self.timeout)
) as response:
if response.status == 200:
result = await response.json()
# Cache context vào Redis
await self._cache_context(
conversation_id,
messages,
result
)
return {
"content": result["choices"][0]["message"]["content"],
"usage": result.get("usage", {}),
"model": result.get("model"),
"latency_ms": response.headers.get("x-response-time"),
"cached": False
}
elif response.status == 429:
# Rate limit - wait and retry
retry_after = int(response.headers.get("Retry-After", 60))
await asyncio.sleep(retry_after)
continue
elif response.status == 400:
# Context length exceeded
error = await response.json()
if "context_length" in str(error):
return {
"error": "context_exceeded",
"message": "Cần tối ưu context"
}
raise Exception(f"Bad request: {error}")
else:
error_text = await response.text()
raise Exception(f"API error {response.status}: {error_text}")
except aiohttp.ClientError as e:
if attempt == self.max_retries - 1:
raise
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
raise Exception("Max retries exceeded")
async def _cache_context(
self,
conversation_id: str,
messages: List[Dict[str, str]],
response: Dict
):
"""Cache conversation context vào Redis"""
if not self._redis:
return
cache_key = f"ctx:{conversation_id}"
cache_data = {
"messages": messages,
"last_response": response["choices"][0]["message"],
"updated_at": datetime.now().isoformat()
}
# TTL: 24 giờ cho active conversation
await self._redis.setex(
cache_key,
86400,
json.dumps(cache_data)
)
async def get_cached_context(self, conversation_id: str) -> Optional[Dict]:
"""Lấy context từ cache"""
if not self._redis:
return None
cache_key = f"ctx:{conversation_id}"
data = await self._redis.get(cache_key)
if data:
return json.loads(data)
return None
Sử dụng với asyncio
async def main():
client = AsyncGPTClient(
api_key="YOUR_HOLYSHEEP_API_KEY",
base_url="https://api.holysheep.ai/v1",
model="gpt-4.1"
)
await client.init_redis()
messages = [
{"role": "system", "content": "Bạn là trợ lý AI hữu ích."},
{"role": "user", "content": "Xin chào, tôi cần hỗ trợ về API"}
]
result = await client.chat(
messages=messages,
conversation_id="conv_123",
temperature=0.7
)
print(f"Response: {result['content']}")
print(f"Token usage: {result['usage']}")
if __name__ == "__main__":
asyncio.run(main())
Các Chiến Lược Context Tối Ưu
1. Sliding Window - Phù Hợp Cho Chat Thông Thường
Chiến lược đơn giản nhất: luôn giữ N messages gần nhất trong context. Đây là cách tiếp cận hiệu quả về chi phí, phù hợp cho hầu hết các ứng dụng chatbot thông thường với độ trễ ước tính 150-200ms khi sử dụng HolySheep AI.
2. Semantic Chunking - Cho Nội Dung Dài
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from typing import List, Tuple
class SemanticChunker:
"""Chia context thành các chunks có ngữ nghĩa liên quan"""
def __init__(self, similarity_threshold: float = 0.7):
self.similarity_threshold = similarity_threshold
self.vectorizer = TfidfVectorizer()
def chunk_messages(
self,
messages: List[Message],
target_chunk_size: int = 10
) -> List[List[Message]]:
"""Chia messages thành chunks có semantic similarity cao"""
if len(messages) <= target_chunk_size:
return [messages]
# Tính TF-IDF vectors
contents = [m.content for m in messages]
try:
tfidf_matrix = self.vectorizer.fit_transform(contents)
except ValueError:
# Không đủ vocabulary
return [messages[-target_chunk_size:]]
# Tính similarity matrix
similarity = tfidf_matrix @ tfidf_matrix.T
similarity_dense = (similarity.toarray() + 1) / 2
# Greedy clustering
chunks = []
current_chunk = [0]
for i in range(1, len(messages)):
avg_similarity = np.mean([
similarity_dense[i, j] for j in current_chunk
])
if avg_similarity >= self.similarity_threshold:
current_chunk.append(i)
if len(current_chunk) >= target_chunk_size:
chunks.append([messages[j] for j in current_chunk])
current_chunk = []
else:
if current_chunk:
chunks.append([messages[j] for j in current_chunk])
current_chunk = [i]
if current_chunk:
chunks.append([messages[j] for j in current_chunk])
return chunks
def build_prompt_from_chunks(
self,
chunks: List[List[Message]],
max_chunks: int = 3
) -> str:
"""Xây dựng prompt từ các chunks được chọn"""
# Chọn chunks gần nhất với query
recent_messages = chunks[-1] if chunks else []
prompt_parts = []
# Thêm tóm tắt chunks cũ
if len(chunks) > 1:
summary_parts = []
for chunk in chunks[:-max_chunks]:
combined = "\n".join(m.content for m in chunk)
summary_parts.append(combined[:200] + "...")
prompt_parts.append("=== Lịch sử trước đó ===\n" + "\n---\n".join(summary_parts))
# Thêm chunks gần nhất
for chunk in chunks[-max_chunks:]:
for msg in chunk:
role_prefix = "Người dùng" if msg.role == "user" else "Trợ lý"
prompt_parts.append(f"{role_prefix}: {msg.content}")
return "\n\n".join(prompt_parts)
3. Token Budget Management
from dataclasses import dataclass
@dataclass
class TokenBudget:
"""Quản lý budget token cho từng conversation"""
max_total: int = 128000 # GPT-4-32k
max_response: int = 4096 # Reserve cho response
max_history: int = 120000 # Phần còn lại cho history
system_tokens: int = 0
history_tokens: int = 0
def allocate(self, system_prompt: str, history: List[Message]) -> bool:
"""Kiểm tra và phân bổ token budget"""
self.system_tokens = len(system_prompt) // 4
self.history_tokens = sum(m.token_count for m in history)
total_used = self.system_tokens + self.history_tokens
available = self.max_total - self.max_response
if total_used > available:
return False
return True
def get_truncation_needed(self, history: List[Message]) -> int:
"""Tính số messages cần loại bỏ"""
current = sum(m.token_count for m in history)
available = self.max_total - self.max_response - self.system_tokens
if current <= available:
return 0
excess = current - available
tokens_to_remove = 0
for i, msg in enumerate(history):
tokens_to_remove += msg.token_count
if tokens_to_remove >= excess:
return i + 1
return len(history)
def optimize_history(
self,
history: List[Message],
keep_recent: int = 10
) -> List[Message]:
"""Tối ưu history với các chiến lược khác nhau"""
if self.allocate("", history):
return history
# Chiến lược 1: Giữ messages gần nhất
if len(history) > keep_recent:
return history[-keep_recent:]
# Chiến lược 2: Giữ alternate messages
alternate = history[::2]
if self.allocate("", alternate):
return alternate
# Chiến lược 3: Loại bỏ dần cho đến khi fit
truncation = self.get_truncation_needed(history)
return history[truncation:]
class CostTracker:
"""Theo dõi chi phí theo thời gian thực"""
PRICING = {
"gpt-4.1": {"input": 8.0, "output": 8.0}, # $/MTok
"gpt-4.1-mini": {"input": 2.0, "output": 8.0},
"gpt-4o": {"input": 5.0, "output": 15.0},
"deepseek-v3.2": {"input": 0.42, "output": 2.80}
}
def __init__(self):
self.daily_costs: Dict[str, float] = {}
self.conversation_costs: Dict[str, float] = {}
def record_usage(
self,
conversation_id: str,
model: str,
input_tokens: int,
output_tokens: int
):
"""Ghi nhận usage và tính chi phí"""
pricing = self.PRICING.get(model, self.PRICING["gpt-4.1"])
input_cost = (input_tokens / 1_000_000) * pricing["input"]
output_cost = (output_tokens / 1_000_000) * pricing["output"]
total = input_cost + output_cost
today = datetime.now().strftime("%Y-%m-%d")
self.daily_costs[today] = self.daily_costs.get(today, 0) + total
self.conversation_costs[conversation_id] = \
self.conversation_costs.get(conversation_id, 0) + total
return {
"input_cost": round(input_cost, 6),
"output_cost": round(output_cost, 6),
"total": round(total, 6)
}
def get_daily_report(self, days: int = 30) -> Dict:
"""Báo cáo chi phí theo ngày"""
reports = []
for i in range(days):
date = (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d")
reports.append({
"date": date,
"cost": self.daily_costs.get(date, 0)
})
return reports
Ví dụ sử dụng CostTracker
tracker = CostTracker()
Sau mỗi API call
cost_info = tracker.record_usage(
conversation_id="conv_123",
model="gpt-4.1",
input_tokens=1500,
output_tokens=500
)
print(f"Chi phí cho request này: ${cost_info['total']}")
Các Bước Triển Khai Thực Tế
Bước 1: Migration Từ OpenAI Sang HolySheep
# Trước đây (OpenAI)
import openai
openai.api_key = "sk-..."
openai.api_base = "https://api.openai.com/v1"
Bây giờ (HolySheep)
import openai
openai.api_key = "YOUR_HOLYSHEEP_API_KEY"
openai.api_base = "https://api.holysheep.ai/v1" # Thay đổi base URL
Code gọi API giữ nguyên
response = openai.ChatCompletion.create(
model="gpt-4.1",
messages=[
{"role": "system", "content": "Bạn là trợ lý AI"},
{"role": "user", "content": "Xin chào"}
],
temperature=0.7,
max_tokens=2048
)
Bước 2: Canary Deployment
import random
import logging
from typing import Callable, Dict, Any
logger = logging.getLogger(__name__)
class CanaryRouter:
"""Định tuyến traffic với chiến lược canary deployment"""
def __init__(self):
self.providers = {
"openai": {
"weight": 0.0, # Bắt đầu với 0% để test
"api_key": "sk-openai-...",
"base_url": "https://api.openai.com/v1",
"models": ["gpt-4", "gpt-4-turbo"]
},
"holysheep": {
"weight": 100, # 100% traffic sang HolySheep
"api_key": "YOUR_HOLYSHEEP_API_KEY",
"base_url": "https://api.holysheep.ai/v1",
"models": ["gpt-4.1", "gpt-4.1-mini", "deepseek-v3.2"]
}
}
self.metrics = {
"openai": {"requests": 0, "errors": 0, "latencies": []},
"holysheep": {"requests": 0, "errors": 0, "latencies": []}
}
def update_weight(self, provider: str, new_weight: int):
"""Cập nhật traffic weight cho provider"""
self.providers[provider]["weight"] = new_weight
# Điều chỉnh weight cho các providers khác
total_weight = sum(p["weight"] for p in self.providers.values())
if total_weight > 0:
for p in self.providers:
self.providers[p]["weight"] = int(
(self.providers[p]["weight"] / total_weight) * 100
)
def select_provider(self, user_id: str = None) -> str:
"""Chọn provider dựa trên weight và optional user_id hash"""
if user_id:
# Deterministic routing cho cùng user
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
normalized = (hash_val % 100) + 1
else:
normalized = random.randint(1, 100)
cumulative = 0
for provider, config in self.providers.items():
cumulative += config["weight"]
if normalized <= cumulative:
return provider
return "holysheep" # Default fallback
async def call_llm(
self,
messages: List[Dict],
model: str,
user_id: str = None,
**kwargs
) -> Dict[str, Any]:
"""Gọi LLM với canary routing"""
provider = self.select_provider(user_id)
config = self.providers[provider]
start_time = time.time()
try:
# Import OpenAI client
from openai import AsyncOpenAI
client = AsyncOpenAI(
api_key=config["api_key"],
base_url=config["base_url"]
)
response = await client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
latency = (time.time() - start_time) * 1000 # ms
# Record metrics
self.metrics[provider]["requests"] += 1
self.metrics[provider]["latencies"].append(latency)
return {
"provider": provider,
"response": response,
"latency_ms": latency,
"error": None
}
except Exception as e:
self.metrics[provider]["errors"] += 1
logger.error(f"Lỗi khi gọi {provider}: {str(e)}")
# Fallback sang provider khác
fallback = "holysheep" if provider != "holysheep" else "openai"
return await self._fallback_call(fallback, messages, model, **kwargs)
async def _fallback_call(
self,
provider: str,
messages: List[Dict],
model: str,
**kwargs
) -> Dict[str, Any]:
"""Fallback khi provider chính lỗi"""
config = self.providers[provider]
from openai import AsyncOpenAI
client = AsyncOpenAI(
api_key=config["api_key"],
base_url=config["base_url"]
)
response = await client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
return {
"provider": f"{provider}_fallback",
"response": response,
"latency_ms": 0,
"error": None,
"fallback": True
}
def get_health_report(self) -> Dict:
"""Báo cáo sức khỏe của các providers"""
report = {}
for provider, metrics in self.metrics.items():
requests = metrics["requests"]
errors = metrics["errors"]
latencies = metrics["latencies"]
report[provider] = {
"total_requests": requests,
"error_count": errors,
"error_rate": errors / requests if requests > 0 else 0,
"avg_latency_ms": sum(latencies) / len(latencies) if latencies else 0,
"p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)]
if latencies else 0
}
return report
Sử dụng CanaryRouter
router = CanaryRouter()
Tăng dần traffic lên HolySheep
router.update_weight("holysheep", 25) # 25% traffic
await asyncio.sleep(3600) # Theo dõi 1 giờ
Kiểm tra metrics
report = router.get_health_report()
print(json.dumps(report, indent=2))
Bước 3: Rotation Key và Retry Logic
import time
from threading import Lock
from collections import deque
class APIKeyManager:
"""Quản lý và rotate API keys một cách an toàn"""
def __init__(
self,
keys: List[str],
base_url: str = "https://api.holysheep.ai/v1",
requests_per_minute: int = 500
):
self.keys = deque(keys)
self.base_url = base_url
self.rpm_limit = requests_per_minute
# Rate limiting per key
self.key_usage = {
key: {"requests": 0, "reset_at": time.time() + 60}
for key in keys
}
self.lock = Lock()
def get_available_key(self) -> Optional[str]:
"""Lấy key còn quota"""
current_time = time.time()
for key in list(self.keys):
usage = self.key_usage[key]
# Reset counter nếu đã qua 1 phút
if current_time >= usage["reset_at"]:
usage["requests"] = 0
usage["reset_at"] = current_time + 60
# Kiểm tra quota
if usage["requests"] < self.rpm_limit:
return key
# Tất cả keys đều hết quota
return None
def record_request(self, key: str):
"""Ghi nhận request cho key"""
with self.lock:
if key in self.key_usage:
self.key_usage[key]["requests"] += 1
def rotate_key(self, failed_key: str):
"""Xoay key khi gặp lỗi hoặc rate limit"""
with self.lock:
# Loại bỏ key lỗi
self.keys = deque([k for k in self.keys if k != failed_key])
# Thêm lại vào cuối queue
self.keys.append(failed_key)
print(f"Đã xoay key: {failed_key[:10]}... -> các key còn lại: {len(self.keys)}")
class ResilientClient:
"""Client với retry logic và rate limiting"""
def __init__(
self,
key_manager: APIKeyManager,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0
):
self.key_manager = key_manager
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
async def call_with_retry(
self,
messages: List[Dict],
model: str = "gpt-4.1",
**kwargs
) -> Dict:
"""Gọi API với retry logic mạnh mẽ"""
last_error = None
for attempt in range(self.max_retries):
key = self.key_manager.get_available_key()
if not key:
# Chờ cho đến khi có key
await asyncio.sleep(5)
continue
try:
self.key_manager.record_request(key)
response = await self._make_request(key, model, messages, **kwargs)
return response
except RateLimitError as e:
last_error = e
self.key_manager.rotate_key(key)
# Exponential backoff với jitter
delay = min(
self.base_delay * (2 ** attempt) + random.uniform(0, 1),
self.max_delay
)
await asyncio.sleep