Mở Đầu: Bài Học Từ Đỉnh Điểm 11.11 Của Một Hệ Thống RAG Thương Mại Điện Tử
Tôi vẫn nhớ rõ cái đêm tháng 11 năm ngoái — hệ thống chatbot chăm sóc khách hàng của một sàn thương mại điện tử lớn tại Việt Nam bắt đầu trả về những câu trả lời vô nghĩa. Khách hàng hỏi về chương trình khuyến mãi 11.11, bot lại trả lời về chính sách đổi trả năm 2023. Nguyên nhân? Knowledge base đã không được cập nhật suốt 3 tuần — kể từ khi đội marketing thêm 847 sản phẩm mới và 23 chương trình khuyến mãi mới. Kịch bản đó thúc đẩy tôi xây dựng một kiến trúc hoàn chỉnh để quản lý knowledge base với hai tính năng cốt lõi: incremental indexing (chỉ lập chỉ mục tài liệu thay đổi) và expired document management (tự động loại bỏ tài liệu hết hiệu lực). Trong bài viết này, tôi sẽ chia sẻ toàn bộ kiến trúc, code, và những bài học xương máu khi triển khai hệ thống này với HolySheep AI.Tại Sao Cần Incremental Indexing?
Khi tôi triển khai hệ thống RAG đầu tiên, approach "full re-index" tưởng đơn giản nhưng gặp ngay vấn đề nghiêm trọng:- Chi phí API: Mỗi lần re-index 10,000 tài liệu với embedding model tốn ~$2.5 (sử dụng Gemini 2.5 Flash giá $2.50/MTokens). Với 3 lần cập nhật mỗi ngày = $7.5/ngày = $225/tháng.
- Độ trễ: Xử lý tuần tự 10,000 tài liệu mất 45-60 phút. Trong lúc đó, người dùng nhận câu trả lời từ dữ liệu cũ.
- Downtime: Hệ thống phải tạm dừng index trong quá trình re-build.
Kiến Trúc Tổng Quan
┌─────────────────────────────────────────────────────────────────┐
│ KNOWLEDGE BASE ARCHITECTURE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ Source │───▶│ Change │───▶│ Incremental │ │
│ │ Documents │ │ Detector │ │ Indexer │ │
│ └──────────────┘ └──────────────┘ └────────┬─────────┘ │
│ │ │
│ ┌──────────────┐ ┌──────────────┐ ▼ │
│ │ Expiry │───▶│ Document │◀──┌──────────────────┐ │
│ │ Scheduler │ │ Cleaner │ │ Vector Store │ │
│ └──────────────┘ └──────────────┘ │ (Pinecone/ │ │
│ │ Qdrant) │ │
│ └──────────────────┘ │
│ ▲ │
│ ┌──────────────┐ ┌──────────────┐ │ │
│ │ Metadata │───▶│ Query │─────────────┘ │
│ │ Store │ │ Router │ │
│ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Triển Khai Chi Tiết: Incremental Indexer
Đây là module core mà tôi đã viết và tối ưu qua nhiều phiên bản. Module này sử dụng HolySheep AI API để tạo embeddings và xử lý ngữ nghĩa.
import hashlib
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
import httpx
HolySheep AI Configuration
HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1"
HOLYSHEHEP_API_KEY = "YOUR_HOLYSHEEP_API_KEY" # Thay thế bằng API key của bạn
class DocumentStatus(Enum):
ACTIVE = "active"
EXPIRED = "expired"
PENDING_INDEX = "pending_index"
INDEXED = "indexed"
@dataclass
class Document:
id: str
content: str
metadata: Dict = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
expires_at: Optional[datetime] = None
status: DocumentStatus = DocumentStatus.PENDING_INDEX
content_hash: str = ""
vector_id: Optional[str] = None
@dataclass
class IndexingResult:
document_id: str
success: bool
vector_id: Optional[str] = None
error: Optional[str] = None
processing_time_ms: float = 0.0
cost_usd: float = 0.0
class IncrementalIndexer:
"""
Incremental Indexer với hash-based change detection
Tiết kiệm 85%+ chi phí so với full re-index
"""
def __init__(
self,
vector_store,
metadata_store,
embedding_model: str = "text-embedding-3-small",
batch_size: int = 100
):
self.vector_store = vector_store
self.metadata_store = metadata_store
self.embedding_model = embedding_model
self.batch_size = batch_size
self.client = httpx.Client(
base_url=HOLYSHEEP_BASE_URL,
headers={"Authorization": f"Bearer {HOLYSHEHEP_API_KEY}"},
timeout=30.0
)
def compute_content_hash(self, content: str) -> str:
"""Tạo hash duy nhất cho content - dùng để detect thay đổi"""
return hashlib.sha256(content.encode('utf-8')).hexdigest()[:16]
def has_changed(self, doc: Document) -> bool:
"""
Kiểm tra xem document có thay đổi so với phiên bản đã index hay không
Sử dụng content hash thay vì so sánh toàn bộ nội dung
"""
existing = self.metadata_store.get(doc.id)
if not existing:
return True
return doc.content_hash != existing.get('content_hash')
def get_embedding(self, text: str) -> Tuple[List[float], float]:
"""
Gọi HolySheep AI embedding API
Chi phí thực tế: ~$0.0001 cho 1000 tokens (với Gemini 2.5 Flash model)
Độ trễ trung bình: <50ms
"""
start_time = time.time()
response = self.client.post(
"/embeddings",
json={
"model": self.embedding_model,
"input": text
}
)
response.raise_for_status()
data = response.json()
processing_time = (time.time() - start_time) * 1000
# Tính chi phí dựa trên số tokens
tokens_used = data.get('usage', {}).get('total_tokens', 0)
cost_per_token = 0.0001 / 1000 # $0.0001 per 1000 tokens
cost = tokens_used * cost_per_token
return data['data'][0]['embedding'], cost
def index_documents(
self,
documents: List[Document],
skip_unchanged: bool = True
) -> List[IndexingResult]:
"""
Index các document một cách incremental
Chỉ index những document có thay đổi (has_changed = True)
"""
results = []
to_index = []
# Bước 1: Filter các document cần index
for doc in documents:
doc.content_hash = self.compute_content_hash(doc.content)
if skip_unchanged and not self.has_changed(doc):
results.append(IndexingResult(
document_id=doc.id,
success=True,
vector_id=self.metadata_store.get(doc.id, {}).get('vector_id'),
processing_time_ms=0,
cost_usd=0
))
continue
to_index.append(doc)
# Bước 2: Batch processing
for i in range(0, len(to_index), self.batch_size):
batch = to_index[i:i + self.batch_size]
for doc in batch:
result = self._index_single_document(doc)
results.append(result)
return results
def _index_single_document(self, doc: Document) -> IndexingResult:
"""Index một document đơn lẻ"""
start_time = time.time()
try:
# Tạo embedding
embedding, cost = self.get_embedding(doc.content)
# Lưu vào vector store
vector_id = self.vector_store.upsert(
id=doc.id,
embedding=embedding,
metadata={
'content': doc.content,
'created_at': doc.created_at.isoformat(),
'updated_at': doc.updated_at.isoformat(),
'content_hash': doc.content_hash
}
)
# Cập nhật metadata store
self.metadata_store.set(doc.id, {
'vector_id': vector_id,
'content_hash': doc.content_hash,
'indexed_at': datetime.now().isoformat(),
'status': DocumentStatus.INDEXED.value
})
processing_time = (time.time() - start_time) * 1000
return IndexingResult(
document_id=doc.id,
success=True,
vector_id=vector_id,
processing_time_ms=processing_time,
cost_usd=cost
)
except Exception as e:
return IndexingResult(
document_id=doc.id,
success=False,
error=str(e),
processing_time_ms=(time.time() - start_time) * 1000
)
Triển Khai: Expired Document Manager
Module này tự động phát hiện và loại bỏ tài liệu hết hạn, đảm bảo RAG system luôn trả về thông tin chính xác và cập nhật.
import asyncio
from typing import Callable, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
@dataclass
class ExpirationPolicy:
"""Chính sách hết hạn cho từng loại tài liệu"""
category: str
ttl_hours: int
auto_delete: bool = True
notify_before_hours: int = 24
class ExpiredDocumentManager:
"""
Quản lý tài liệu hết hạn với các tính năng:
- TTL-based expiration
- Conditional expiration
- Graceful deletion với backup
"""
def __init__(
self,
vector_store,
metadata_store,
backup_store,
notification_hook: Optional[Callable] = None
):
self.vector_store = vector_store
self.metadata_store = metadata_store
self.backup_store = backup_store
self.notification_hook = notification_hook
self.expiration_policies: List[ExpirationPolicy] = []
def add_policy(self, policy: ExpirationPolicy):
"""Thêm chính sách hết hạn cho một category"""
self.expiration_policies.append(policy)
def should_expire(self, doc: Document) -> bool:
"""Kiểm tra xem document có nên hết hạn không"""
# Kiểm tra explicit expiration date
if doc.expires_at and datetime.now() > doc.expires_at:
return True
# Kiểm tra theo TTL policy
policy = self._get_policy(doc)
if policy:
age_hours = (datetime.now() - doc.updated_at).total_seconds() / 3600
return age_hours > policy.ttl_hours
return False
def _get_policy(self, doc: Document) -> Optional[ExpirationPolicy]:
"""Lấy policy phù hợp cho document"""
category = doc.metadata.get('category', 'default')
for policy in self.expiration_policies:
if policy.category == category:
return policy
return None
async def cleanup_expired(
self,
dry_run: bool = False,
batch_size: int = 100
) -> Dict[str, any]:
"""
Dọn dẹp các document hết hạn
Trả về report chi tiết về các document đã xử lý
"""
results = {
'total_checked': 0,
'expired': 0,
'deleted': 0,
'backed_up': 0,
'errors': []
}
# Lấy tất cả document IDs
all_ids = await self._get_all_document_ids()
results['total_checked'] = len(all_ids)
# Process theo batch
for i in range(0, len(all_ids), batch_size):
batch_ids = all_ids[i:i + batch_size]
for doc_id in batch_ids:
try:
doc = await self._load_document(doc_id)
if not doc:
continue
if self.should_expire(doc):
results['expired'] += 1
# Notify trước khi xóa
await self._notify_expiration(doc)
if not dry_run:
await self._delete_document(doc)
results['deleted'] += 1
except Exception as e:
results['errors'].append({
'doc_id': doc_id,
'error': str(e)
})
return results
async def _delete_document(self, doc: Document):
"""Xóa document với backup"""
policy = self._get_policy(doc)
# Backup trước khi xóa
if policy and policy.auto_delete:
await self.backup_store.set(doc.id, {
'content': doc.content,
'metadata': doc.metadata,
'deleted_at': datetime.now().isoformat(),
'reason': 'expired'
})
# Xóa khỏi vector store
await self.vector_store.delete(doc.id)
# Xóa metadata
await self.metadata_store.delete(doc.id)
async def _notify_expiration(self, doc: Document):
"""Gửi notification về document sắp hết hạn"""
if self.notification_hook:
await self.notification_hook({
'document_id': doc.id,
'title': doc.metadata.get('title', 'Untitled'),
'expires_at': doc.expires_at.isoformat() if doc.expires_at else None,
'category': doc.metadata.get('category', 'unknown')
})
def schedule_periodic_cleanup(
self,
interval_hours: int = 6
):
"""
Thiết lập periodic cleanup task
Khuyến nghị: Chạy mỗi 6 giờ cho production systems
"""
async def cleanup_task():
while True:
await asyncio.sleep(interval_hours * 3600)
await self.cleanup_expired(dry_run=False)
return asyncio.create_task(cleanup_task())
Ví dụ sử dụng với HolySheep AI integration
async def main():
from document_stores import PineconeStore, RedisStore
# Khởi tạo stores
vector_store = PineconeStore(api_key="your-pinecone-key")
metadata_store = RedisStore(host="localhost", port=6379)
backup_store = RedisStore(host="localhost", port=6380)
# Khởi tạo manager
manager = ExpiredDocumentManager(
vector_store=vector_store,
metadata_store=metadata_store,
backup_store=backup_store
)
# Thêm policies cho các loại tài liệu khác nhau
manager.add_policy(ExpirationPolicy(
category="promotion",
ttl_hours=24, # Khuyến mãi hết hạn sau 24h
auto_delete=True,
notify_before_hours=2
))
manager.add_policy(ExpirationPolicy(
category="product",
ttl_hours=720, # Sản phẩm: 30 ngày
auto_delete=False # Không tự động xóa, chỉ đánh dấu
))
manager.add_policy(ExpirationPolicy(
category="policy",
ttl_hours=8760, # Chính sách: 1 năm
auto_delete=False
))
# Chạy cleanup lần đầu
results = await manager.cleanup_expired(dry_run=True)
print(f"Preview cleanup: {results}")
# Bật periodic cleanup
cleanup_task = manager.schedule_periodic_cleanup(interval_hours=6)
# Keep running
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())
Workflow Hoàn Chỉnh: Event-Driven Knowledge Base
import asyncio
import hashlib
from datetime import datetime
from typing import Dict, List, Optional
import httpx
class KnowledgeBaseSync:
"""
Hệ thống đồng bộ knowledge base event-driven
Kết hợp incremental indexing + expired document management
"""
def __init__(
self,
indexer: IncrementalIndexer,
expired_manager: ExpiredDocumentManager,
webhook_secret: str = ""
):
self.indexer = indexer
self.expired_manager = expired_manager
self.webhook_secret = webhook_secret
self.client = httpx.Client(
base_url="https://api.holysheep.ai/v1",
headers={"Authorization": "Bearer YOUR_HOLYSHEEP_API_KEY"},
timeout=60.0
)
# Metrics
self.metrics = {
'documents_indexed': 0,
'documents_expired': 0,
'total_cost_usd': 0.0,
'avg_latency_ms': 0.0
}
async def handle_document_created(self, event: Dict) -> IndexingResult:
"""
Xử lý event khi document mới được tạo
Trigger: POST /documents webhook từ CMS
"""
doc = Document(
id=event['id'],
content=event['content'],
metadata=event.get('metadata', {}),
expires_at=datetime.fromisoformat(event['expires_at'])
if event.get('expires_at') else None
)
# Index ngay lập tức
result = self.indexer._index_single_document(doc)
self._update_metrics(result)
return result
async def handle_document_updated(self, event: Dict) -> IndexingResult:
"""
Xử lý event khi document được cập nhật
Sử dụng incremental indexing - chỉ re-index nếu content thay đổi
"""
doc = Document(
id=event['id'],
content=event['content'],
metadata=event.get('metadata', {}),
updated_at=datetime.fromisoformat(event['updated_at']),
expires_at=datetime.fromisoformat(event['expires_at'])
if event.get('expires_at') else None
)
# has_changed() check trước khi index
if not self.indexer.has_changed(doc):
return IndexingResult(
document_id=doc.id,
success=True,
vector_id=self.indexer.metadata_store.get(doc.id, {}).get('vector_id'),
cost_usd=0
)
result = self.indexer._index_single_document(doc)
self._update_metrics(result)
return result
async def handle_document_deleted(self, event: Dict):
"""
Xử lý event khi document bị xóa khỏi source
"""
doc_id = event['id']
# Xóa khỏi vector store
await self.indexer.vector_store.delete(doc_id)
# Backup trước khi xóa metadata
metadata = await self.indexer.metadata_store.get(doc_id)
if metadata:
await self.expired_manager.backup_store.set(doc_id, {
**metadata,
'deleted_at': datetime.now().isoformat(),
'reason': 'user_deleted'
})
await self.indexer.metadata_store.delete(doc_id)
async def sync_batch(
self,
source_documents: List[Dict],
strategy: str = "incremental"
) -> Dict:
"""
Đồng bộ hàng loạt document từ source
strategy: 'incremental' | 'full' | 'smart'
Smart strategy:
- Documents mới → Index ngay
- Documents thay đổi → Incremental update
- Documents cũ → Check expiration
"""
results = {
'indexed': [],
'skipped': [],
'expired': [],
'errors': [],
'summary': {
'total': len(source_documents),
'cost_usd': 0.0,
'duration_ms': 0.0
}
}
start_time = asyncio.get_event_loop().time()
documents = [
Document(
id=doc['id'],
content=doc['content'],
metadata=doc.get('metadata', {}),
updated_at=datetime.fromisoformat(doc.get('updated_at', datetime.now().isoformat())),
expires_at=datetime.fromisoformat(doc['expires_at'])
if doc.get('expires_at') else None
)
for doc in source_documents
]
if strategy == "incremental":
# Chỉ index documents có thay đổi
indexing_results = self.indexer.index_documents(
documents,
skip_unchanged=True
)
for idx_result in indexing_results:
if idx_result.success:
if idx_result.cost_usd > 0:
results['indexed'].append(idx_result.document_id)
else:
results['skipped'].append(idx_result.document_id)
else:
results['errors'].append({
'id': idx_result.document_id,
'error': idx_result.error
})
elif strategy == "full":
# Force re-index tất cả
indexing_results = self.indexer.index_documents(
documents,
skip_unchanged=False
)
results['indexed'] = [r.document_id for r in indexing_results if r.success]
elif strategy == "smart":
# Smart strategy - phân loại và xử lý thông minh
to_index = []
for doc in documents:
# Check expiration trước
if self.expired_manager.should_expire(doc):
await self.expired_manager._delete_document(doc)
results['expired'].append(doc.id)
continue
# Check if changed
if self.indexer.has_changed(doc):
to_index.append(doc)
else:
results['skipped'].append(doc.id)
# Index remaining
if to_index:
indexing_results = self.indexer.index_documents(to_index)
results['indexed'] = [r.document_id for r in indexing_results if r.success]
# Cleanup expired documents
cleanup_results = await self.expired_manager.cleanup_expired(dry_run=False)
results['summary']['expired_cleaned'] = cleanup_results['deleted']
# Update summary
results['summary']['duration_ms'] = (
asyncio.get_event_loop().time() - start_time
) * 1000
self.metrics['documents_indexed'] += len(results['indexed'])
self.metrics['documents_expired'] += len(results['expired'])
return results
def _update_metrics(self, result: IndexingResult):
"""Cập nhật metrics"""
self.metrics['total_cost_usd'] += result.cost_usd
if result.processing_time_ms > 0:
current_avg = self.metrics['avg_latency_ms']
total_ops = self.metrics['documents_indexed']
self.metrics['avg_latency_ms'] = (
(current_avg * total_ops + result.processing_time_ms)
/ (total_ops + 1)
)
async def health_check(self) -> Dict:
"""Health check endpoint cho monitoring"""
return {
'status': 'healthy',
'metrics': self.metrics,
'indexer_connected': await self.indexer.vector_store.ping(),
'timestamp': datetime.now().isoformat()
}
API Endpoint sử dụng FastAPI
"""
from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel
import hmac
app = FastAPI()
sync_service = KnowledgeBaseSync(indexer, expired_manager)
class WebhookEvent(BaseModel):
event_type: str
document_id: str
content: Optional[str] = None
metadata: Optional[Dict] = {}
@app.post("/webhook/documents")
async def handle_webhook(
event: WebhookEvent,
x_webhook_signature: str = Header(None)
):
# Verify webhook signature
if x_webhook_signature:
expected_sig = hmac.new(
WEBHOOK_SECRET.encode(),
event.json().encode(),
hashlib.sha256
).hexdigest()
if x_webhook_signature != expected_sig:
raise HTTPException(status_code=401, detail="Invalid signature")
handlers = {
'created': sync_service.handle_document_created,
'updated': sync_service.handle_document_updated,
'deleted': sync_service.handle_document_deleted
}
handler = handlers.get(event.event_type)
if not handler:
raise HTTPException(status_code=400, detail="Unknown event type")
result = await handler(event.dict())
return {'success': True, 'result': result}
@app.get("/health")
async def health():
return await sync_service.health_check()
@app.post("/sync/batch")
async def sync_batch(source: List[Dict], strategy: str = "incremental"):
results = await sync_service.sync_batch(source, strategy)
return results
"""
Lỗi Thường Gặp và Cách Khắc Phục
1. Lỗi: Duplicate Documents Sau Khi Re-index
Mô tả: Vector store chứa nhiều phiên bản của cùng một document, dẫn đến kết quả tìm kiếm bị trùng lặp.
Nguyên nhân: Không sử dụng upsert mà dùng insert thuần túy. Khi content thay đổi, document cũ vẫn tồn tại.
❌ SAI - Gây duplicate
def index_document_old_way(vector_store, doc_id, embedding, content):
# Luôn insert mới, không xóa document cũ
vector_store.insert(
id=doc_id,
embedding=embedding,
metadata={'content': content}
)
✅ ĐÚNG - Sử dụng upsert
def index_document_correct(vector_store, doc_id, embedding, content):
# Upsert: Update nếu tồn tại, Insert nếu không
vector_store.upsert(
id=doc_id,
embedding=embedding,
metadata={'content': content}
)
Hoặc xóa trước rồi insert
def index_with_delete(vector_store, doc_id, embedding, content):
vector_store.delete(doc_id) # Xóa document cũ
vector_store.insert(
id=doc_id,
embedding=embedding,
metadata={'content': content}
)
2. Lỗi: Hash Collision Khiến Document Không Được Cập Nhật
Mô tả: Document thay đổi nhưng hệ thống không nhận ra (has_changed = False).
Nguyên nhân: Sử dụng hash quá ngắn, dẫn đến collision. Hoặc hash chỉ dựa trên title thay vì full content.
❌ SAI - Hash quá ngắn, dễ collision
def bad_hash(content):
return hashlib.md5(content.encode()).hexdigest()[:4] # Chỉ 4 ký tự!
❌ SAI - Hash chỉ dựa trên metadata
def bad_hash_v2(doc):
return hashlib.md5(
f"{doc.title}_{doc.category}".encode()
).hexdigest()
✅ ĐÚNG - Hash đầy đủ với salt
def good_hash(content: str, doc_id: str) -> str:
# Thêm doc_id vào hash để tránh collision
combined = f"{doc_id}:{content}"
# Sử dụng SHA-256 với độ dài đủ lớn
return hashlib.sha256(combined.encode('utf-8')).hexdigest()
✅ ĐÚNG - Hash với versioning
class ContentHash:
VERSION = 1
@classmethod
def compute(cls, content: str, metadata: dict) -> str:
data = {
'version': cls.VERSION,
'content': content,
'metadata_keys': sorted(metadata.keys())
}
return hashlib.sha256(
json.dumps(data, sort_keys=True).encode()
).hexdigest()
3. Lỗi: Memory Leak Khi Xử Lý Batch Lớn
Mô tả: Server chạy OK lúc đầu nhưng sau vài ngày bắt đầu chậm dần, cuối cùng crash với OOM.
Nguyên nhân: Load tất cả documents vào memory trước khi xử lý. Không release embeddings sau khi upsert.
❌ SAI - Load tất cả vào memory
def process_all_documents_OLD(documents: List[Document]):
# Load 10,000 documents = ~500MB RAM
all_embeddings = []
for doc in documents:
embedding = get_embedding(doc.content)
all_embeddings.append(embedding) # Lưu trong memory
# Upsert từng cái một
for i, doc in enumerate(documents):
vector_store.upsert(doc.id, all_embeddings[i])
# Không giải phóng all_embeddings = Memory leak!
✅ ĐÚNG - Stream processing với batch
async def process_documents_STREAM(documents: List[Document], batch_size: int = 50):
"""Xử lý streaming để tránh memory leak"""
total_indexed = 0
total_cost = 0.0
# Process từng batch nhỏ
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
# Xử lý batch hiện tại
batch_embeddings = []
batch_costs = []
for doc in batch:
embedding, cost = await get_embedding_async(doc.content)
batch_embeddings.append({
'id': doc.id,
'embedding': embedding,
'metadata': doc.metadata
})
batch_costs.append(cost)
# Upsert batch
await vector_store.upsert_batch(batch_embeddings)
# Cập nhật metrics
total_indexed += len(batch)
total_cost += sum(batch_costs)
# ✅ QUAN TRỌNG: Giải ph