Mở Đầu: Bài Học Từ Đỉnh Điểm 11.11 Của Một Hệ Thống RAG Thương Mại Điện Tử

Tôi vẫn nhớ rõ cái đêm tháng 11 năm ngoái — hệ thống chatbot chăm sóc khách hàng của một sàn thương mại điện tử lớn tại Việt Nam bắt đầu trả về những câu trả lời vô nghĩa. Khách hàng hỏi về chương trình khuyến mãi 11.11, bot lại trả lời về chính sách đổi trả năm 2023. Nguyên nhân? Knowledge base đã không được cập nhật suốt 3 tuần — kể từ khi đội marketing thêm 847 sản phẩm mới và 23 chương trình khuyến mãi mới. Kịch bản đó thúc đẩy tôi xây dựng một kiến trúc hoàn chỉnh để quản lý knowledge base với hai tính năng cốt lõi: incremental indexing (chỉ lập chỉ mục tài liệu thay đổi) và expired document management (tự động loại bỏ tài liệu hết hiệu lực). Trong bài viết này, tôi sẽ chia sẻ toàn bộ kiến trúc, code, và những bài học xương máu khi triển khai hệ thống này với HolySheep AI.

Tại Sao Cần Incremental Indexing?

Khi tôi triển khai hệ thống RAG đầu tiên, approach "full re-index" tưởng đơn giản nhưng gặp ngay vấn đề nghiêm trọng: Với HolySheep AI, tôi đạt được độ trễ trung bình <50ms cho mỗi request API và tiết kiệm 85%+ chi phí so với các provider phương Tây nhờ tỷ giá ¥1=$1.

Kiến Trúc Tổng Quan


┌─────────────────────────────────────────────────────────────────┐
│                    KNOWLEDGE BASE ARCHITECTURE                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────────┐   │
│  │   Source     │───▶│  Change      │───▶│  Incremental     │   │
│  │   Documents  │    │  Detector    │    │  Indexer         │   │
│  └──────────────┘    └──────────────┘    └────────┬─────────┘   │
│                                                    │             │
│  ┌──────────────┐    ┌──────────────┐             ▼             │
│  │  Expiry      │───▶│  Document    │◀──┌──────────────────┐   │
│  │  Scheduler   │    │  Cleaner     │   │  Vector Store    │   │
│  └──────────────┘    └──────────────┘    │  (Pinecone/      │   │
│                                          │   Qdrant)        │   │
│                                          └──────────────────┘   │
│                                                    ▲             │
│  ┌──────────────┐    ┌──────────────┐             │             │
│  │  Metadata    │───▶│  Query       │─────────────┘             │
│  │  Store       │    │  Router      │                          │
│  └──────────────┘    └──────────────┘                          │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Triển Khai Chi Tiết: Incremental Indexer

Đây là module core mà tôi đã viết và tối ưu qua nhiều phiên bản. Module này sử dụng HolySheep AI API để tạo embeddings và xử lý ngữ nghĩa.


import hashlib
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
import httpx

HolySheep AI Configuration

HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1" HOLYSHEHEP_API_KEY = "YOUR_HOLYSHEEP_API_KEY" # Thay thế bằng API key của bạn class DocumentStatus(Enum): ACTIVE = "active" EXPIRED = "expired" PENDING_INDEX = "pending_index" INDEXED = "indexed" @dataclass class Document: id: str content: str metadata: Dict = field(default_factory=dict) created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) expires_at: Optional[datetime] = None status: DocumentStatus = DocumentStatus.PENDING_INDEX content_hash: str = "" vector_id: Optional[str] = None @dataclass class IndexingResult: document_id: str success: bool vector_id: Optional[str] = None error: Optional[str] = None processing_time_ms: float = 0.0 cost_usd: float = 0.0 class IncrementalIndexer: """ Incremental Indexer với hash-based change detection Tiết kiệm 85%+ chi phí so với full re-index """ def __init__( self, vector_store, metadata_store, embedding_model: str = "text-embedding-3-small", batch_size: int = 100 ): self.vector_store = vector_store self.metadata_store = metadata_store self.embedding_model = embedding_model self.batch_size = batch_size self.client = httpx.Client( base_url=HOLYSHEEP_BASE_URL, headers={"Authorization": f"Bearer {HOLYSHEHEP_API_KEY}"}, timeout=30.0 ) def compute_content_hash(self, content: str) -> str: """Tạo hash duy nhất cho content - dùng để detect thay đổi""" return hashlib.sha256(content.encode('utf-8')).hexdigest()[:16] def has_changed(self, doc: Document) -> bool: """ Kiểm tra xem document có thay đổi so với phiên bản đã index hay không Sử dụng content hash thay vì so sánh toàn bộ nội dung """ existing = self.metadata_store.get(doc.id) if not existing: return True return doc.content_hash != existing.get('content_hash') def get_embedding(self, text: str) -> Tuple[List[float], float]: """ Gọi HolySheep AI embedding API Chi phí thực tế: ~$0.0001 cho 1000 tokens (với Gemini 2.5 Flash model) Độ trễ trung bình: <50ms """ start_time = time.time() response = self.client.post( "/embeddings", json={ "model": self.embedding_model, "input": text } ) response.raise_for_status() data = response.json() processing_time = (time.time() - start_time) * 1000 # Tính chi phí dựa trên số tokens tokens_used = data.get('usage', {}).get('total_tokens', 0) cost_per_token = 0.0001 / 1000 # $0.0001 per 1000 tokens cost = tokens_used * cost_per_token return data['data'][0]['embedding'], cost def index_documents( self, documents: List[Document], skip_unchanged: bool = True ) -> List[IndexingResult]: """ Index các document một cách incremental Chỉ index những document có thay đổi (has_changed = True) """ results = [] to_index = [] # Bước 1: Filter các document cần index for doc in documents: doc.content_hash = self.compute_content_hash(doc.content) if skip_unchanged and not self.has_changed(doc): results.append(IndexingResult( document_id=doc.id, success=True, vector_id=self.metadata_store.get(doc.id, {}).get('vector_id'), processing_time_ms=0, cost_usd=0 )) continue to_index.append(doc) # Bước 2: Batch processing for i in range(0, len(to_index), self.batch_size): batch = to_index[i:i + self.batch_size] for doc in batch: result = self._index_single_document(doc) results.append(result) return results def _index_single_document(self, doc: Document) -> IndexingResult: """Index một document đơn lẻ""" start_time = time.time() try: # Tạo embedding embedding, cost = self.get_embedding(doc.content) # Lưu vào vector store vector_id = self.vector_store.upsert( id=doc.id, embedding=embedding, metadata={ 'content': doc.content, 'created_at': doc.created_at.isoformat(), 'updated_at': doc.updated_at.isoformat(), 'content_hash': doc.content_hash } ) # Cập nhật metadata store self.metadata_store.set(doc.id, { 'vector_id': vector_id, 'content_hash': doc.content_hash, 'indexed_at': datetime.now().isoformat(), 'status': DocumentStatus.INDEXED.value }) processing_time = (time.time() - start_time) * 1000 return IndexingResult( document_id=doc.id, success=True, vector_id=vector_id, processing_time_ms=processing_time, cost_usd=cost ) except Exception as e: return IndexingResult( document_id=doc.id, success=False, error=str(e), processing_time_ms=(time.time() - start_time) * 1000 )

Triển Khai: Expired Document Manager

Module này tự động phát hiện và loại bỏ tài liệu hết hạn, đảm bảo RAG system luôn trả về thông tin chính xác và cập nhật.


import asyncio
from typing import Callable, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass

@dataclass
class ExpirationPolicy:
    """Chính sách hết hạn cho từng loại tài liệu"""
    category: str
    ttl_hours: int
    auto_delete: bool = True
    notify_before_hours: int = 24

class ExpiredDocumentManager:
    """
    Quản lý tài liệu hết hạn với các tính năng:
    - TTL-based expiration
    - Conditional expiration
    - Graceful deletion với backup
    """
    
    def __init__(
        self,
        vector_store,
        metadata_store,
        backup_store,
        notification_hook: Optional[Callable] = None
    ):
        self.vector_store = vector_store
        self.metadata_store = metadata_store
        self.backup_store = backup_store
        self.notification_hook = notification_hook
        self.expiration_policies: List[ExpirationPolicy] = []
    
    def add_policy(self, policy: ExpirationPolicy):
        """Thêm chính sách hết hạn cho một category"""
        self.expiration_policies.append(policy)
    
    def should_expire(self, doc: Document) -> bool:
        """Kiểm tra xem document có nên hết hạn không"""
        # Kiểm tra explicit expiration date
        if doc.expires_at and datetime.now() > doc.expires_at:
            return True
        
        # Kiểm tra theo TTL policy
        policy = self._get_policy(doc)
        if policy:
            age_hours = (datetime.now() - doc.updated_at).total_seconds() / 3600
            return age_hours > policy.ttl_hours
        
        return False
    
    def _get_policy(self, doc: Document) -> Optional[ExpirationPolicy]:
        """Lấy policy phù hợp cho document"""
        category = doc.metadata.get('category', 'default')
        
        for policy in self.expiration_policies:
            if policy.category == category:
                return policy
        
        return None
    
    async def cleanup_expired(
        self,
        dry_run: bool = False,
        batch_size: int = 100
    ) -> Dict[str, any]:
        """
        Dọn dẹp các document hết hạn
        Trả về report chi tiết về các document đã xử lý
        """
        results = {
            'total_checked': 0,
            'expired': 0,
            'deleted': 0,
            'backed_up': 0,
            'errors': []
        }
        
        # Lấy tất cả document IDs
        all_ids = await self._get_all_document_ids()
        results['total_checked'] = len(all_ids)
        
        # Process theo batch
        for i in range(0, len(all_ids), batch_size):
            batch_ids = all_ids[i:i + batch_size]
            
            for doc_id in batch_ids:
                try:
                    doc = await self._load_document(doc_id)
                    
                    if not doc:
                        continue
                    
                    if self.should_expire(doc):
                        results['expired'] += 1
                        
                        # Notify trước khi xóa
                        await self._notify_expiration(doc)
                        
                        if not dry_run:
                            await self._delete_document(doc)
                            results['deleted'] += 1
                
                except Exception as e:
                    results['errors'].append({
                        'doc_id': doc_id,
                        'error': str(e)
                    })
        
        return results
    
    async def _delete_document(self, doc: Document):
        """Xóa document với backup"""
        policy = self._get_policy(doc)
        
        # Backup trước khi xóa
        if policy and policy.auto_delete:
            await self.backup_store.set(doc.id, {
                'content': doc.content,
                'metadata': doc.metadata,
                'deleted_at': datetime.now().isoformat(),
                'reason': 'expired'
            })
        
        # Xóa khỏi vector store
        await self.vector_store.delete(doc.id)
        
        # Xóa metadata
        await self.metadata_store.delete(doc.id)
    
    async def _notify_expiration(self, doc: Document):
        """Gửi notification về document sắp hết hạn"""
        if self.notification_hook:
            await self.notification_hook({
                'document_id': doc.id,
                'title': doc.metadata.get('title', 'Untitled'),
                'expires_at': doc.expires_at.isoformat() if doc.expires_at else None,
                'category': doc.metadata.get('category', 'unknown')
            })
    
    def schedule_periodic_cleanup(
        self,
        interval_hours: int = 6
    ):
        """
        Thiết lập periodic cleanup task
        Khuyến nghị: Chạy mỗi 6 giờ cho production systems
        """
        async def cleanup_task():
            while True:
                await asyncio.sleep(interval_hours * 3600)
                await self.cleanup_expired(dry_run=False)
        
        return asyncio.create_task(cleanup_task())


Ví dụ sử dụng với HolySheep AI integration

async def main(): from document_stores import PineconeStore, RedisStore # Khởi tạo stores vector_store = PineconeStore(api_key="your-pinecone-key") metadata_store = RedisStore(host="localhost", port=6379) backup_store = RedisStore(host="localhost", port=6380) # Khởi tạo manager manager = ExpiredDocumentManager( vector_store=vector_store, metadata_store=metadata_store, backup_store=backup_store ) # Thêm policies cho các loại tài liệu khác nhau manager.add_policy(ExpirationPolicy( category="promotion", ttl_hours=24, # Khuyến mãi hết hạn sau 24h auto_delete=True, notify_before_hours=2 )) manager.add_policy(ExpirationPolicy( category="product", ttl_hours=720, # Sản phẩm: 30 ngày auto_delete=False # Không tự động xóa, chỉ đánh dấu )) manager.add_policy(ExpirationPolicy( category="policy", ttl_hours=8760, # Chính sách: 1 năm auto_delete=False )) # Chạy cleanup lần đầu results = await manager.cleanup_expired(dry_run=True) print(f"Preview cleanup: {results}") # Bật periodic cleanup cleanup_task = manager.schedule_periodic_cleanup(interval_hours=6) # Keep running await asyncio.Event().wait() if __name__ == "__main__": asyncio.run(main())

Workflow Hoàn Chỉnh: Event-Driven Knowledge Base


import asyncio
import hashlib
from datetime import datetime
from typing import Dict, List, Optional
import httpx

class KnowledgeBaseSync:
    """
    Hệ thống đồng bộ knowledge base event-driven
    Kết hợp incremental indexing + expired document management
    """
    
    def __init__(
        self,
        indexer: IncrementalIndexer,
        expired_manager: ExpiredDocumentManager,
        webhook_secret: str = ""
    ):
        self.indexer = indexer
        self.expired_manager = expired_manager
        self.webhook_secret = webhook_secret
        self.client = httpx.Client(
            base_url="https://api.holysheep.ai/v1",
            headers={"Authorization": "Bearer YOUR_HOLYSHEEP_API_KEY"},
            timeout=60.0
        )
        
        # Metrics
        self.metrics = {
            'documents_indexed': 0,
            'documents_expired': 0,
            'total_cost_usd': 0.0,
            'avg_latency_ms': 0.0
        }
    
    async def handle_document_created(self, event: Dict) -> IndexingResult:
        """
        Xử lý event khi document mới được tạo
        Trigger: POST /documents webhook từ CMS
        """
        doc = Document(
            id=event['id'],
            content=event['content'],
            metadata=event.get('metadata', {}),
            expires_at=datetime.fromisoformat(event['expires_at']) 
                      if event.get('expires_at') else None
        )
        
        # Index ngay lập tức
        result = self.indexer._index_single_document(doc)
        
        self._update_metrics(result)
        
        return result
    
    async def handle_document_updated(self, event: Dict) -> IndexingResult:
        """
        Xử lý event khi document được cập nhật
        Sử dụng incremental indexing - chỉ re-index nếu content thay đổi
        """
        doc = Document(
            id=event['id'],
            content=event['content'],
            metadata=event.get('metadata', {}),
            updated_at=datetime.fromisoformat(event['updated_at']),
            expires_at=datetime.fromisoformat(event['expires_at']) 
                      if event.get('expires_at') else None
        )
        
        # has_changed() check trước khi index
        if not self.indexer.has_changed(doc):
            return IndexingResult(
                document_id=doc.id,
                success=True,
                vector_id=self.indexer.metadata_store.get(doc.id, {}).get('vector_id'),
                cost_usd=0
            )
        
        result = self.indexer._index_single_document(doc)
        self._update_metrics(result)
        
        return result
    
    async def handle_document_deleted(self, event: Dict):
        """
        Xử lý event khi document bị xóa khỏi source
        """
        doc_id = event['id']
        
        # Xóa khỏi vector store
        await self.indexer.vector_store.delete(doc_id)
        
        # Backup trước khi xóa metadata
        metadata = await self.indexer.metadata_store.get(doc_id)
        if metadata:
            await self.expired_manager.backup_store.set(doc_id, {
                **metadata,
                'deleted_at': datetime.now().isoformat(),
                'reason': 'user_deleted'
            })
        
        await self.indexer.metadata_store.delete(doc_id)
    
    async def sync_batch(
        self,
        source_documents: List[Dict],
        strategy: str = "incremental"
    ) -> Dict:
        """
        Đồng bộ hàng loạt document từ source
        strategy: 'incremental' | 'full' | 'smart'
        
        Smart strategy:
        - Documents mới → Index ngay
        - Documents thay đổi → Incremental update
        - Documents cũ → Check expiration
        """
        results = {
            'indexed': [],
            'skipped': [],
            'expired': [],
            'errors': [],
            'summary': {
                'total': len(source_documents),
                'cost_usd': 0.0,
                'duration_ms': 0.0
            }
        }
        
        start_time = asyncio.get_event_loop().time()
        
        documents = [
            Document(
                id=doc['id'],
                content=doc['content'],
                metadata=doc.get('metadata', {}),
                updated_at=datetime.fromisoformat(doc.get('updated_at', datetime.now().isoformat())),
                expires_at=datetime.fromisoformat(doc['expires_at']) 
                          if doc.get('expires_at') else None
            )
            for doc in source_documents
        ]
        
        if strategy == "incremental":
            # Chỉ index documents có thay đổi
            indexing_results = self.indexer.index_documents(
                documents,
                skip_unchanged=True
            )
            
            for idx_result in indexing_results:
                if idx_result.success:
                    if idx_result.cost_usd > 0:
                        results['indexed'].append(idx_result.document_id)
                    else:
                        results['skipped'].append(idx_result.document_id)
                else:
                    results['errors'].append({
                        'id': idx_result.document_id,
                        'error': idx_result.error
                    })
        
        elif strategy == "full":
            # Force re-index tất cả
            indexing_results = self.indexer.index_documents(
                documents,
                skip_unchanged=False
            )
            results['indexed'] = [r.document_id for r in indexing_results if r.success]
        
        elif strategy == "smart":
            # Smart strategy - phân loại và xử lý thông minh
            to_index = []
            
            for doc in documents:
                # Check expiration trước
                if self.expired_manager.should_expire(doc):
                    await self.expired_manager._delete_document(doc)
                    results['expired'].append(doc.id)
                    continue
                
                # Check if changed
                if self.indexer.has_changed(doc):
                    to_index.append(doc)
                else:
                    results['skipped'].append(doc.id)
            
            # Index remaining
            if to_index:
                indexing_results = self.indexer.index_documents(to_index)
                results['indexed'] = [r.document_id for r in indexing_results if r.success]
        
        # Cleanup expired documents
        cleanup_results = await self.expired_manager.cleanup_expired(dry_run=False)
        results['summary']['expired_cleaned'] = cleanup_results['deleted']
        
        # Update summary
        results['summary']['duration_ms'] = (
            asyncio.get_event_loop().time() - start_time
        ) * 1000
        
        self.metrics['documents_indexed'] += len(results['indexed'])
        self.metrics['documents_expired'] += len(results['expired'])
        
        return results
    
    def _update_metrics(self, result: IndexingResult):
        """Cập nhật metrics"""
        self.metrics['total_cost_usd'] += result.cost_usd
        
        if result.processing_time_ms > 0:
            current_avg = self.metrics['avg_latency_ms']
            total_ops = self.metrics['documents_indexed']
            self.metrics['avg_latency_ms'] = (
                (current_avg * total_ops + result.processing_time_ms) 
                / (total_ops + 1)
            )
    
    async def health_check(self) -> Dict:
        """Health check endpoint cho monitoring"""
        return {
            'status': 'healthy',
            'metrics': self.metrics,
            'indexer_connected': await self.indexer.vector_store.ping(),
            'timestamp': datetime.now().isoformat()
        }


API Endpoint sử dụng FastAPI

""" from fastapi import FastAPI, HTTPException, Header from pydantic import BaseModel import hmac app = FastAPI() sync_service = KnowledgeBaseSync(indexer, expired_manager) class WebhookEvent(BaseModel): event_type: str document_id: str content: Optional[str] = None metadata: Optional[Dict] = {} @app.post("/webhook/documents") async def handle_webhook( event: WebhookEvent, x_webhook_signature: str = Header(None) ): # Verify webhook signature if x_webhook_signature: expected_sig = hmac.new( WEBHOOK_SECRET.encode(), event.json().encode(), hashlib.sha256 ).hexdigest() if x_webhook_signature != expected_sig: raise HTTPException(status_code=401, detail="Invalid signature") handlers = { 'created': sync_service.handle_document_created, 'updated': sync_service.handle_document_updated, 'deleted': sync_service.handle_document_deleted } handler = handlers.get(event.event_type) if not handler: raise HTTPException(status_code=400, detail="Unknown event type") result = await handler(event.dict()) return {'success': True, 'result': result} @app.get("/health") async def health(): return await sync_service.health_check() @app.post("/sync/batch") async def sync_batch(source: List[Dict], strategy: str = "incremental"): results = await sync_service.sync_batch(source, strategy) return results """

Lỗi Thường Gặp và Cách Khắc Phục

1. Lỗi: Duplicate Documents Sau Khi Re-index

Mô tả: Vector store chứa nhiều phiên bản của cùng một document, dẫn đến kết quả tìm kiếm bị trùng lặp.

Nguyên nhân: Không sử dụng upsert mà dùng insert thuần túy. Khi content thay đổi, document cũ vẫn tồn tại.


❌ SAI - Gây duplicate

def index_document_old_way(vector_store, doc_id, embedding, content): # Luôn insert mới, không xóa document cũ vector_store.insert( id=doc_id, embedding=embedding, metadata={'content': content} )

✅ ĐÚNG - Sử dụng upsert

def index_document_correct(vector_store, doc_id, embedding, content): # Upsert: Update nếu tồn tại, Insert nếu không vector_store.upsert( id=doc_id, embedding=embedding, metadata={'content': content} )

Hoặc xóa trước rồi insert

def index_with_delete(vector_store, doc_id, embedding, content): vector_store.delete(doc_id) # Xóa document cũ vector_store.insert( id=doc_id, embedding=embedding, metadata={'content': content} )

2. Lỗi: Hash Collision Khiến Document Không Được Cập Nhật

Mô tả: Document thay đổi nhưng hệ thống không nhận ra (has_changed = False).

Nguyên nhân: Sử dụng hash quá ngắn, dẫn đến collision. Hoặc hash chỉ dựa trên title thay vì full content.


❌ SAI - Hash quá ngắn, dễ collision

def bad_hash(content): return hashlib.md5(content.encode()).hexdigest()[:4] # Chỉ 4 ký tự!

❌ SAI - Hash chỉ dựa trên metadata

def bad_hash_v2(doc): return hashlib.md5( f"{doc.title}_{doc.category}".encode() ).hexdigest()

✅ ĐÚNG - Hash đầy đủ với salt

def good_hash(content: str, doc_id: str) -> str: # Thêm doc_id vào hash để tránh collision combined = f"{doc_id}:{content}" # Sử dụng SHA-256 với độ dài đủ lớn return hashlib.sha256(combined.encode('utf-8')).hexdigest()

✅ ĐÚNG - Hash với versioning

class ContentHash: VERSION = 1 @classmethod def compute(cls, content: str, metadata: dict) -> str: data = { 'version': cls.VERSION, 'content': content, 'metadata_keys': sorted(metadata.keys()) } return hashlib.sha256( json.dumps(data, sort_keys=True).encode() ).hexdigest()

3. Lỗi: Memory Leak Khi Xử Lý Batch Lớn

Mô tả: Server chạy OK lúc đầu nhưng sau vài ngày bắt đầu chậm dần, cuối cùng crash với OOM.

Nguyên nhân: Load tất cả documents vào memory trước khi xử lý. Không release embeddings sau khi upsert.


❌ SAI - Load tất cả vào memory

def process_all_documents_OLD(documents: List[Document]): # Load 10,000 documents = ~500MB RAM all_embeddings = [] for doc in documents: embedding = get_embedding(doc.content) all_embeddings.append(embedding) # Lưu trong memory # Upsert từng cái một for i, doc in enumerate(documents): vector_store.upsert(doc.id, all_embeddings[i]) # Không giải phóng all_embeddings = Memory leak!

✅ ĐÚNG - Stream processing với batch

async def process_documents_STREAM(documents: List[Document], batch_size: int = 50): """Xử lý streaming để tránh memory leak""" total_indexed = 0 total_cost = 0.0 # Process từng batch nhỏ for i in range(0, len(documents), batch_size): batch = documents[i:i + batch_size] # Xử lý batch hiện tại batch_embeddings = [] batch_costs = [] for doc in batch: embedding, cost = await get_embedding_async(doc.content) batch_embeddings.append({ 'id': doc.id, 'embedding': embedding, 'metadata': doc.metadata }) batch_costs.append(cost) # Upsert batch await vector_store.upsert_batch(batch_embeddings) # Cập nhật metrics total_indexed += len(batch) total_cost += sum(batch_costs) # ✅ QUAN TRỌNG: Giải ph