Trong bài viết này, tôi sẽ chia sẻ kinh nghiệm thực chiến về việc xây dựng hệ thống quản lý hội thoại AI chatbot với khả năng duy trì ngữ cảnh đa luồng (multi-turn context) và quản lý trạng thái phiên (session state) hiệu quả. Đây là những bài học tôi đã đúc kết từ hàng trăm dự án triển khai thực tế.
Vấn đề thực tế: Khi chatbot "quên" ngữ cảnh
Khi triển khai chatbot cho một hệ thống thương mại điện tử lớn tại Việt Nam, tôi gặp lỗi nghiêm trọng: ConversationContextOverflowError: maximum token limit exceeded after 12 messages. Người dùng phàn nàn rằng chatbot không nhớ họ đã hỏi gì ở tin nhắn trước đó, dẫn đến trải nghiệm rất tệ.
Sau 3 tuần debug và tối ưu, tôi đã xây dựng được một kiến trúc hoàn chỉnh để giải quyết vấn đề này. Hãy cùng tôi đi sâu vào chi tiết kỹ thuật.
Kiến trúc Tổng quan
Một hệ thống对话管理 hoàn chỉnh cần đảm bảo 3 thành phần cốt lõi:
- Context Window Management — Quản lý cửa sổ ngữ cảnh hiệu quả
- Session State Store — Lưu trữ trạng thái phiên với Redis/Memcached
- Message History Truncation — Cắt tỉa lịch sử hội thoại thông minh
Triển khai Context Manager với HolySheep AI
Dưới đây là implementation hoàn chỉnh sử dụng HolySheep AI — nền tảng có độ trễ trung bình dưới 50ms và chi phí chỉ từ $0.42/MTok với DeepSeek V3.2.
import requests
import json
import tiktoken
from datetime import datetime
from typing import List, Dict, Optional
from dataclasses import dataclass, asdict
from collections import deque
@dataclass
class Message:
role: str # 'user', 'assistant', 'system'
content: str
timestamp: datetime = None
metadata: dict = None
class ContextWindowManager:
"""
Quản lý cửa sổ ngữ cảnh với chiến lược truncation thông minh.
Encoding: cl100k_base (dùng cho GPT-4, Claude, DeepSeek)
"""
def __init__(self, max_tokens: int = 6000, model: str = "deepseek-chat"):
self.max_tokens = max_tokens
self.model = model
self.encoding = tiktoken.get_encoding("cl100k_base")
self.system_prompt = ""
self.message_history: deque = deque(maxlen=100)
def count_tokens(self, text: str) -> int:
"""Đếm số tokens trong text"""
return len(self.encoding.encode(text))
def set_system_prompt(self, prompt: str):
"""Thiết lập system prompt với token budget"""
self.system_prompt = prompt
print(f"[ContextManager] System prompt set: {self.count_tokens(prompt)} tokens")
def add_message(self, role: str, content: str, metadata: dict = None):
"""Thêm message vào history"""
msg = Message(
role=role,
content=content,
timestamp=datetime.now(),
metadata=metadata or {}
)
self.message_history.append(msg)
self._validate_context_window()
def _validate_context_window(self):
"""Kiểm tra và cắt tỉa context nếu vượt limit"""
total_tokens = self._calculate_total_tokens()
if total_tokens > self.max_tokens:
print(f"[ContextManager] Token limit exceeded: {total_tokens} > {self.max_tokens}")
self._smart_truncate()
def _calculate_total_tokens(self) -> int:
"""Tính tổng tokens của toàn bộ context"""
system_tokens = self.count_tokens(self.system_prompt)
history_tokens = sum(
self.count_tokens(f"{msg.role}: {msg.content}")
for msg in self.message_history
)
return system_tokens + history_tokens
def _smart_truncate(self):
"""
Chiến lược truncation thông minh:
1. Giữ lại system prompt
2. Giữ lại 2 tin nhắn đầu và cuối của user
3. Tính tổng summary từ các tin nhắn giữa
"""
if len(self.message_history) <= 4:
# Nếu quá ngắn, xóa từng message cũ nhất
while self._calculate_total_tokens() > self.max_tokens and self.message_history:
self.message_history.popleft()
return
# Giữ lại messages đầu và cuối
preserved_indices = [0, 1, -2, -1]
preserved_messages = [self.message_history[i] for i in preserved_indices if i < len(self.message_history)]
# Tạo summary cho các messages ở giữa
middle_messages = list(self.message_history)[2:-2]
if middle_messages:
summary_content = self._generate_summary(middle_messages)
summary_msg = Message(
role="system",
content=f"[Tóm tắt cuộc trò chuyện trước đó]: {summary_content}",
timestamp=datetime.now(),
metadata={"type": "summary"}
)
# Rebuild history với preserved + summary
self.message_history.clear()
# Thêm lại system messages gốc
for msg in preserved_messages:
if msg.role == "system" and msg.metadata.get("type") != "summary":
continue
self.message_history.append(msg)
# Thêm summary nếu có middle messages
if middle_messages:
self.message_history.appendleft(Message(
role="system",
content=self.system_prompt,
metadata={"type": "original_system"}
))
self.message_history.append(summary_msg)
def _generate_summary(self, messages: List[Message]) -> str:
"""Tạo summary cho các messages đã bị cắt"""
summary_parts = []
current_topic = ""
for msg in messages:
if msg.role == "user":
# Trích xuất essence của câu hỏi
preview = msg.content[:100] + "..." if len(msg.content) > 100 else msg.content
summary_parts.append(f"- User hỏi về: {preview}")
elif msg.role == "assistant":
if msg.metadata.get("intent"):
summary_parts.append(f"- Chatbot xử lý intent: {msg.metadata['intent']}")
return "; ".join(summary_parts[:5]) # Giới hạn 5 phần tử
def get_context_for_api(self) -> List[Dict]:
"""Format context thành định dạng API request"""
messages = []
if self.system_prompt:
messages.append({"role": "system", "content": self.system_prompt})
for msg in self.message_history:
if msg.metadata.get("type") == "summary":
messages.append({"role": "system", "content": msg.content})
else:
messages.append({"role": msg.role, "content": msg.content})
return messages
def get_context_info(self) -> Dict:
"""Debug info về current context"""
return {
"total_tokens": self._calculate_total_tokens(),
"max_tokens": self.max_tokens,
"message_count": len(self.message_history),
"utilization": f"{self._calculate_total_tokens() / self.max_tokens * 100:.1f}%"
}
Triển khai Session State với Redis
Để duy trì trạng thái phiên qua nhiều request, chúng ta cần một storage layer. Dưới đây là implementation với Redis integration:
import redis
import json
import hashlib
from typing import Any, Optional
from datetime import timedelta
class SessionStateStore:
"""
Quản lý trạng thái phiên với Redis
Features:
- Auto-expiration sau 30 phút không hoạt động
- Transaction support cho atomic operations
- Structured data với JSON serialization
"""
def __init__(self, redis_url: str = "redis://localhost:6379/0", ttl: int = 1800):
self.redis = redis.from_url(redis_url, decode_responses=True)
self.ttl = ttl
self.key_prefix = "chatbot:session:"
def _generate_session_id(self, user_id: str, platform: str = "web") -> str:
"""Tạo deterministic session ID từ user info"""
raw = f"{user_id}:{platform}:{datetime.now().strftime('%Y%m%d')}"
return self.key_prefix + hashlib.sha256(raw.encode()).hexdigest()[:16]
def create_session(self, user_id: str, platform: str = "web", initial_data: dict = None) -> str:
"""Tạo phiên mới"""
session_id = self._generate_session_id(user_id, platform)
session_data = {
"user_id": user_id,
"platform": platform,
"created_at": datetime.now().isoformat(),
"last_activity": datetime.now().isoformat(),
"conversation_mode": "general",
"user_profile": {},
"cart": {"items": [], "total": 0},
"preferences": {"language": "vi", "tone": "friendly"},
"conversation_history": []
}
if initial_data:
session_data.update(initial_data)
self.redis.setex(
session_id,
self.ttl,
json.dumps(session_data)
)
print(f"[SessionStore] Created session: {session_id}")
return session_id
def get_session(self, session_id: str) -> Optional[dict]:
"""Lấy data của phiên"""
data = self.redis.get(session_id)
if data:
# Refresh TTL on access
self.redis.expire(session_id, self.ttl)
return json.loads(data)
return None
def update_session(self, session_id: str, updates: dict) -> bool:
"""Cập nhật session data với atomic operation"""
current = self.get_session(session_id)
if not current:
return False
# Deep merge updates
for key, value in updates.items():
if isinstance(value, dict) and key in current and isinstance(current[key], dict):
current[key].update(value)
else:
current[key] = value
current["last_activity"] = datetime.now().isoformat()
self.redis.setex(session_id, self.ttl, json.dumps(current))
return True
def append_conversation(self, session_id: str, role: str, content: str, metadata: dict = None):
"""Thêm message vào conversation history của session"""
session = self.get_session(session_id)
if not session:
return False
message_entry = {
"role": role,
"content": content,
"timestamp": datetime.now().isoformat(),
"metadata": metadata or {}
}
# Giới hạn history trong session store (30 messages)
conversation = session.get("conversation_history", [])
if len(conversation) >= 30:
conversation = conversation[-25:] # Giữ 25 messages gần nhất
conversation.append(message_entry)
session["conversation_history"] = conversation
self.redis.setex(session_id, self.ttl, json.dumps(session))
return True
def set_conversation_mode(self, session_id: str, mode: str) -> bool:
"""Chuyển đổi chế độ hội thoại: general, shopping, support, booking"""
valid_modes = ["general", "shopping", "support", "booking", "technical"]
if mode not in valid_modes:
print(f"[SessionStore] Invalid mode: {mode}")
return False
return self.update_session(session_id, {"conversation_mode": mode})
def get_user_intent_from_history(self, session_id: str) -> Optional[str]:
"""Phân tích intent từ conversation history"""
session = self.get_session(session_id)
if not session:
return None
history = session.get("conversation_history", [])
# Đếm tần suất intents
intent_counts = {}
for msg in history:
if msg.get("metadata", {}).get("intent"):
intent = msg["metadata"]["intent"]
intent_counts[intent] = intent_counts.get(intent, 0) + 1
if intent_counts:
return max(intent_counts, key=intent_counts.get)
return None
def clear_session(self, session_id: str) -> bool:
"""Xóa phiên"""
result = self.redis.delete(session_id)
print(f"[SessionStore] Cleared session: {session_id}")
return result > 0
def get_session_stats(self) -> dict:
"""Lấy statistics của Redis"""
info = self.redis.info("stats")
return {
"total_connections": info.get("total_connections_received", 0),
"keyspace_hits": info.get("keyspace_hits", 0),
"keyspace_misses": info.get("keyspace_misses", 0)
}
Chatbot Engine — Tích hợp HolySheep AI API
Đây là phần core của hệ thống, tích hợp HolySheep AI với chi phí tiết kiệm đến 85% so với OpenAI:
import requests
from requests.exceptions import ConnectionError, Timeout, HTTPError
import time
from typing import Optional
class HolySheepChatbot:
"""
Chatbot engine với HolySheep AI integration
Pricing 2026 (tham khảo):
- DeepSeek V3.2: $0.42/MTok (input), $0.42/MTok (output)
- GPT-4.1: $8/MTok (input), $8/MTok (output)
"""
BASE_URL = "https://api.holysheep.ai/v1"
def __init__(
self,
api_key: str,
model: str = "deepseek-chat",
max_tokens: int = 2048,
temperature: float = 0.7
):
self.api_key = api_key
self.model = model
self.max_tokens = max_tokens
self.temperature = temperature
self.context_manager = ContextWindowManager(max_tokens=6000)
self.session_store = SessionStateStore()
self.request_count = 0
self.total_latency = 0
def _build_system_prompt(self, session_mode: str) -> str:
"""Xây dựng system prompt theo chế độ hội thoại"""
base_prompt = """Bạn là trợ lý AI thông minh, thân thiện.
Hãy trả lời bằng tiếng Việt, ngắn gọn và hữu ích.
Nếu không chắc chắn, hãy nói rõ giới hạn kiến thức của bạn.
Luôn hỏi clarifying questions nếu cần thiết."""
mode_prompts = {
"shopping": base_prompt + "\nBạn đang hỗ trợ mua sắm. Hãy tập trung vào sản phẩm, giá cả, và khuyến mãi.",
"support": base_prompt + "\nBạn đang hỗ trợ kỹ thuật. Hãy đưa ra giải pháp cụ thể và có thể thực hiện được.",
"booking": base_prompt + "\nBạn đang hỗ trợ đặt lịch. Hãy thu thập đầy đủ thông tin: ngày, giờ, dịch vụ.",
"technical": base_prompt + "\nBạn đang hỗ trợ kỹ thuật. Hãy sử dụng code examples khi phù hợp."
}
return mode_prompts.get(session_mode, base_prompt)
def initialize_session(self, user_id: str, platform: str = "web") -> str:
"""Khởi tạo phiên mới cho user"""
session_id = self.session_store.create_session(
user_id=user_id,
platform=platform,
initial_data={"model": self.model}
)
# Thiết lập system prompt
system_prompt = self._build_system_prompt("general")
self.context_manager.set_system_prompt(system_prompt)
print(f"[Chatbot] Session initialized: {session_id}")
return session_id
def send_message(
self,
session_id: str,
user_message: str,
metadata: dict = None
) -> dict:
"""
Gửi message và nhận response từ AI
Returns: {"success": bool, "response": str, "latency_ms": float, "error": str}
"""
start_time = time.time()
# Lấy session data
session = self.session_store.get_session(session_id)
if not session:
return {
"success": False,
"response": None,
"latency_ms": 0,
"error": "Session not found. Vui lòng khởi tạo phiên mới."
}
# Cập nhật system prompt theo mode
current_mode = session.get("conversation_mode", "general")
self.context_manager.set_system_prompt(self._build_system_prompt(current_mode))
# Thêm user message vào context
self.context_manager.add_message("user", user_message, metadata)
# Lấy context cho API
messages = self.context_manager.get_context_for_api()
# Gọi HolySheep AI API
try:
response = self._call_holysheep_api(messages)
if response.get("success"):
# Thêm assistant response vào context
self.context_manager.add_message(
"assistant",
response["content"],
{"intent": metadata.get("intent") if metadata else None}
)
# Cập nhật session store
self.session_store.append_conversation(
session_id, "user", user_message, metadata
)
self.session_store.append_conversation(
session_id, "assistant", response["content"]
)
latency_ms = (time.time() - start_time) * 1000
self.request_count += 1
self.total_latency += latency_ms
return {
"success": True,
"response": response["content"],
"latency_ms": round(latency_ms, 2),
"context_info": self.context_manager.get_context_info()
}
else:
return {
"success": False,
"response": None,
"latency_ms": (time.time() - start_time) * 1000,
"error": response.get("error", "Unknown error")
}
except ConnectionError as e:
return {
"success": False,
"response": None,
"latency_ms": (time.time() - start_time) * 1000,
"error": f"ConnectionError: Không thể kết nối đến API. Kiểm tra network."
}
except Timeout as e:
return {
"success": False,
"response": None,
"latency_ms": (time.time() - start_time) * 1000,
"error": f"Timeout: Request v