Trong bài viết này, chúng ta sẽ đi sâu vào việc xây dựng một hệ thống tìm kiếm thông minh cho enterprise knowledge base sử dụng vector embedding. Đây là bài toán cực kỳ phổ biến khi doanh nghiệp cần tìm kiếm nội dung semantically thay vì chỉ keyword matching truyền thống.

Tại sao cần Intelligent Search cho Knowledge Base?

Phương pháp tìm kiếm truyền thống dựa trên TF-IDF hoặc BM25 có những hạn chế nghiêm trọng:

Kiến trúc tổng thể

Chúng ta sẽ xây dựng hệ thống với kiến trúc sau:

┌─────────────────────────────────────────────────────────────────┐
│                    ENTERPRISE KNOWLEDGE BASE                      │
│                    INTELLIGENT SEARCH SYSTEM                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────┐    ┌──────────────┐    ┌───────────────────────┐   │
│  │  User    │───▶│  Query       │───▶│  Embedding API        │   │
│  │  Input   │    │  Preprocess  │    │  (HolySheep AI)       │   │
│  └──────────┘    └──────────────┘    └───────────┬───────────┘   │
│                                                  │               │
│                                                  ▼               │
│  ┌──────────┐    ┌──────────────┐    ┌───────────────────────┐   │
│  │ Results  │◀───│  Ranking &   │◀───│  Vector Search       │   │
│  │ Display  │    │  Re-ranking  │    │  (Pinecone/Qdrant)    │   │
│  └──────────┘    └──────────────┘    └───────────────────────┘   │
│                                                                  │
│  ┌──────────┐    ┌──────────────┐    ┌───────────────────────┐   │
│  │ Document │───▶│  Chunking &  │───▶│  Batch Embedding      │   │
│  │ Ingestion│    │  Preprocess  │    │  Generation           │   │
│  └──────────┘    └──────────────┘    └───────────────────────┘   │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Cấu hình HolySheep AI Embedding API

HolySheep AI cung cấp API endpoint hoàn toàn tương thích với OpenAI format, với độ trễ trung bình dưới 50mschi phí tiết kiệm đến 85% so với các provider khác. Bạn có thể đăng ký tại đây để nhận tín dụng miễn phí.

import os
from openai import OpenAI

Cấu hình HolySheep AI - KHÔNG sử dụng api.openai.com

client = OpenAI( api_key=os.environ.get("YOUR_HOLYSHEEP_API_KEY"), base_url="https://api.holysheep.ai/v1" # Endpoint chính thức ) def get_embedding(text: str, model: str = "text-embedding-3-small") -> list[float]: """ Generate embedding vector cho một đoạn text. Args: text: Đoạn văn bản cần tạo embedding model: Model embedding (text-embedding-3-small, text-embedding-3-large) Returns: Vector embedding dưới dạng list[float] """ response = client.embeddings.create( model=model, input=text ) return response.data[0].embedding def batch_embeddings(texts: list[str], model: str = "text-embedding-3-small") -> list[list[float]]: """ Generate embeddings cho batch văn bản - TỐI ƯU cho production. Supports up to 2048 tokens per request. Args: texts: List các đoạn text cần embed model: Model embedding Returns: List các vector embeddings """ response = client.embeddings.create( model=model, input=texts # Batch processing - hiệu quả hơn nhiều ) return [item.embedding for item in response.data]

Benchmark function

def benchmark_embedding(latencies: list[float]) -> dict: """Tính toán metrics hiệu suất embedding generation.""" import statistics return { "avg_latency_ms": statistics.mean(latencies), "p50_latency_ms": statistics.median(latencies), "p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)], "p99_latency_ms": sorted(latencies)[int(len(latencies) * 0.99)], "total_requests": len(latencies) }

Document Processing Pipeline

Đây là module xử lý document từ knowledge base với chiến lược chunking thông minh:

from dataclasses import dataclass
from typing import Generator
import re

@dataclass
class DocumentChunk:
    """Represents a chunk of document with metadata."""
    content: str
    chunk_id: str
    document_id: str
    metadata: dict
    embedding: list[float] = None

class IntelligentChunker:
    """
    Smart text chunking strategy cho enterprise knowledge base.
    Kết hợp nhiều phương pháp để tối ưu quality.
    """
    
    def __init__(
        self,
        chunk_size: int = 512,
        overlap: int = 64,
        min_chunk_size: int = 128
    ):
        self.chunk_size = chunk_size
        self.overlap = overlap
        self.min_chunk_size = min_chunk_size
    
    def chunk_by_semantic_units(self, text: str) -> Generator[str, None, None]:
        """
        Chunk document dựa trên semantic units (paragraphs, sections).
        Giữ nguyên context và cấu trúc tài liệu.
        """
        # Tách theo paragraphs
        paragraphs = re.split(r'\n\s*\n', text)
        
        current_chunk = []
        current_size = 0
        
        for para in paragraphs:
            para_size = len(para)
            
            # Nếu paragraph đơn lẻ lớn hơn chunk_size
            if para_size > self.chunk_size:
                if current_chunk:
                    yield self._combine_chunks(current_chunk)
                    current_chunk = []
                
                # Split paragraph lớn thành sentences
                sentences = re.split(r'(?<=[.!?])\s+', para)
                for sent in sentences:
                    if current_size + len(sent) > self.chunk_size:
                        if current_size >= self.min_chunk_size:
                            yield self._combine_chunks(current_chunk)
                        current_chunk = [sent]
                        current_size = len(sent)
                    else:
                        current_chunk.append(sent)
                        current_size += len(sent)
            
            # Nếu thêm paragraph vào sẽ vượt chunk_size
            elif current_size + para_size > self.chunk_size:
                if current_size >= self.min_chunk_size:
                    yield self._combine_chunks(current_chunk)
                
                # Keep overlap
                if self.overlap > 0 and current_chunk:
                    overlap_text = ' '.join(current_chunk)
                    if len(overlap_text) > self.overlap:
                        overlap_text = overlap_text[-self.overlap:]
                    current_chunk = [overlap_text]
                    current_size = len(overlap_text)
                else:
                    current_chunk = []
                    current_size = 0
                
                current_chunk.append(para)
                current_size += para_size
            else:
                current_chunk.append(para)
                current_size += para_size
        
        # Yield remaining chunk
        if current_chunk and current_size >= self.min_chunk_size:
            yield self._combine_chunks(current_chunk)
    
    def _combine_chunks(self, chunks: list[str]) -> str:
        """Combine list of text chunks into single string."""
        return ' '.join(chunks)

class KnowledgeBaseProcessor:
    """Xử lý toàn bộ knowledge base: ingest, embed, store."""
    
    def __init__(
        self,
        vector_store,  # Pinecone, Qdrant, or Weaviate client
        embedding_client: OpenAI,
        batch_size: int = 100
    ):
        self.vector_store = vector_store
        self.embedding_client = embedding_client
        self.chunker = IntelligentChunker()
        self.batch_size = batch_size
    
    async def ingest_document(
        self,
        document_id: str,
        content: str,
        metadata: dict
    ) -> list[DocumentChunk]:
        """
        Ingest một document vào knowledge base với async processing.
        """
        chunks_data = []
        
        # Generate chunks
        for idx, chunk_text in enumerate(self.chunker.chunk_by_semantic_units(content)):
            chunk = DocumentChunk(
                content=chunk_text,
                chunk_id=f"{document_id}_{idx}",
                document_id=document_id,
                metadata={
                    **metadata,
                    "chunk_index": idx
                }
            )
            chunks_data.append(chunk)
        
        # Batch embedding generation
        embeddings = await self._generate_embeddings_batch(chunks_data)
        
        # Store to vector database
        await self._store_chunks(chunks_data, embeddings)
        
        return chunks_data
    
    async def _generate_embeddings_batch(
        self,
        chunks: list[DocumentChunk]
    ) -> list[list[float]]:
        """Generate embeddings với batching và retry logic."""
        all_embeddings = []
        
        for i in range(0, len(chunks), self.batch_size):
            batch = chunks[i:i + self.batch_size]
            texts = [chunk.content for chunk in batch]
            
            response = self.embedding_client.embeddings.create(
                model="text-embedding-3-small",
                input=texts
            )
            
            batch_embeddings = [item.embedding for item in response.data]
            all_embeddings.extend(batch_embeddings)
        
        return all_embeddings
    
    async def _store_chunks(
        self,
        chunks: list[DocumentChunk],
        embeddings: list[list[float]]
    ):
        """Store chunks với embeddings vào vector database."""
        vectors = []
        
        for chunk, embedding in zip(chunks, embeddings):
            chunk.embedding = embedding
            vectors.append({
                "id": chunk.chunk_id,
                "values": embedding,
                "metadata": {
                    "content": chunk.content,
                    "document_id": chunk.document_id,
                    **chunk.metadata
                }
            })
        
        await self.vector_store.upsert(vectors)

Concurrency Control và Rate Limiting

Đây là phần quan trọng nhất khi xử lý large-scale embedding generation. Chúng ta cần kiểm soát concurrency để tránh rate limit và optimize throughput:

import asyncio
from typing import TypeVar, Callable, Awaitable
from dataclasses import dataclass
import time
import logging

logger = logging.getLogger(__name__)

@dataclass
class RateLimitConfig:
    """Cấu hình rate limiting cho API calls."""
    max_concurrent: int = 10        # Số request đồng thời tối đa
    requests_per_minute: int = 3000 # RPM limit
    retry_attempts: int = 3         # Số lần retry khi fail
    retry_delay: float = 1.0        # Delay ban đầu giữa retries (seconds)

class SemaphoreBatcher:
    """
    Async batch processor với concurrency control và rate limiting.
    Đảm bảo không vượt quá API limits của HolySheep AI.
    """
    
    def __init__(self, config: RateLimitConfig):
        self.config = config
        self.semaphore = asyncio.Semaphore(config.max_concurrent)
        self.request_timestamps: list[float] = []
        self._lock = asyncio.Lock()
    
    async def process_with_semaphore(
        self,
        items: list[str],
        process_fn: Callable[[list[str]], Awaitable[list]]
    ) -> list:
        """
        Process items với semaphore control và automatic batching.
        """
        results = []
        
        # Auto-batching: chia thành batches phù hợp với concurrency
        batch_size = min(len(items), self.config.max_concurrent * 2)
        
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            batch_results = await self._process_batch(batch, process_fn)
            results.extend(batch_results)
        
        return results
    
    async def _process_batch(
        self,
        batch: list[str],
        process_fn: Callable
    ) -> list:
        """Xử lý một batch với semaphore."""
        async with self.semaphore:
            # Check và enforce rate limit
            await self._enforce_rate_limit()
            
            try:
                result = await process_fn(batch)
                return result
            except Exception as e:
                logger.error(f"Batch processing failed: {e}")
                return await self._retry_with_backoff(batch, process_fn)
    
    async def _enforce_rate_limit(self):
        """
        Enforce rate limit: requests_per_minute.
        HolySheep AI có limit linh hoạt, nhưng best practice vẫn nên control.
        """
        async with self._lock:
            now = time.time()
            # Loại bỏ timestamps cũ hơn 1 phút
            self.request_timestamps = [
                ts for ts in self.request_timestamps
                if now - ts < 60
            ]
            
            # Nếu đã đạt limit, chờ
            if len(self.request_timestamps) >= self.config.requests_per_minute:
                oldest = self.request_timestamps[0]
                wait_time = 60 - (now - oldest) + 0.1
                if wait_time > 0:
                    logger.info(f"Rate limit reached, waiting {wait_time:.2f}s")
                    await asyncio.sleep(wait_time)
            
            self.request_timestamps.append(time.time())
    
    async def _retry_with_backoff(
        self,
        batch: list[str],
        process_fn: Callable,
        attempt: int = 0
    ) -> list:
        """Retry với exponential backoff."""
        if attempt >= self.config.retry_attempts:
            logger.error(f"Max retry attempts reached for batch of {len(batch)} items")
            return []
        
        delay = self.config.retry_delay * (2 ** attempt)
        logger.warning(f"Retrying batch in {delay}s (attempt {attempt + 1})")
        await asyncio.sleep(delay)
        
        try:
            return await process_fn(batch)
        except Exception as e:
            logger.error(f"Retry {attempt + 1} failed: {e}")
            return await self._retry_with_backoff(batch, process_fn, attempt + 1)

class EnterpriseEmbeddingPipeline:
    """
    Production-ready embedding pipeline cho enterprise knowledge base.
    Supports incremental updates và real-time indexing.
    """
    
    def __init__(self, api_key: str, config: RateLimitConfig = None):
        self.client = OpenAI(
            api_key=api_key,
            base_url="https://api.holysheep.ai/v1"
        )
        self.rate_limiter = SemaphoreBatcher(config or RateLimitConfig())
        self.chunker = IntelligentChunker()
    
    async def index_knowledge_base(
        self,
        documents: list[dict],
        vector_store,
        show_progress: bool = True