Trong bài viết này, chúng ta sẽ đi sâu vào việc xây dựng một hệ thống tìm kiếm thông minh cho enterprise knowledge base sử dụng vector embedding. Đây là bài toán cực kỳ phổ biến khi doanh nghiệp cần tìm kiếm nội dung semantically thay vì chỉ keyword matching truyền thống.
Tại sao cần Intelligent Search cho Knowledge Base?
Phương pháp tìm kiếm truyền thống dựa trên TF-IDF hoặc BM25 có những hạn chế nghiêm trọng:
- Không hiểu ngữ nghĩa: Không thể nhận ra "car" và "automobile" là cùng một khái niệm
- Không xử lý được synonyms: Người dùng tìm "làm sao để reset password" nhưng tài liệu ghi "hướng dẫn đặt lại mật khẩu"
- Precision/Recall thấp: Kết quả trả về thường không chính xác hoặc thiếu tài liệu liên quan
Kiến trúc tổng thể
Chúng ta sẽ xây dựng hệ thống với kiến trúc sau:
┌─────────────────────────────────────────────────────────────────┐
│ ENTERPRISE KNOWLEDGE BASE │
│ INTELLIGENT SEARCH SYSTEM │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────────┐ ┌───────────────────────┐ │
│ │ User │───▶│ Query │───▶│ Embedding API │ │
│ │ Input │ │ Preprocess │ │ (HolySheep AI) │ │
│ └──────────┘ └──────────────┘ └───────────┬───────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ ┌──────────────┐ ┌───────────────────────┐ │
│ │ Results │◀───│ Ranking & │◀───│ Vector Search │ │
│ │ Display │ │ Re-ranking │ │ (Pinecone/Qdrant) │ │
│ └──────────┘ └──────────────┘ └───────────────────────┘ │
│ │
│ ┌──────────┐ ┌──────────────┐ ┌───────────────────────┐ │
│ │ Document │───▶│ Chunking & │───▶│ Batch Embedding │ │
│ │ Ingestion│ │ Preprocess │ │ Generation │ │
│ └──────────┘ └──────────────┘ └───────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Cấu hình HolySheep AI Embedding API
HolySheep AI cung cấp API endpoint hoàn toàn tương thích với OpenAI format, với độ trễ trung bình dưới 50ms và chi phí tiết kiệm đến 85% so với các provider khác. Bạn có thể đăng ký tại đây để nhận tín dụng miễn phí.
import os
from openai import OpenAI
Cấu hình HolySheep AI - KHÔNG sử dụng api.openai.com
client = OpenAI(
api_key=os.environ.get("YOUR_HOLYSHEEP_API_KEY"),
base_url="https://api.holysheep.ai/v1" # Endpoint chính thức
)
def get_embedding(text: str, model: str = "text-embedding-3-small") -> list[float]:
"""
Generate embedding vector cho một đoạn text.
Args:
text: Đoạn văn bản cần tạo embedding
model: Model embedding (text-embedding-3-small, text-embedding-3-large)
Returns:
Vector embedding dưới dạng list[float]
"""
response = client.embeddings.create(
model=model,
input=text
)
return response.data[0].embedding
def batch_embeddings(texts: list[str], model: str = "text-embedding-3-small") -> list[list[float]]:
"""
Generate embeddings cho batch văn bản - TỐI ƯU cho production.
Supports up to 2048 tokens per request.
Args:
texts: List các đoạn text cần embed
model: Model embedding
Returns:
List các vector embeddings
"""
response = client.embeddings.create(
model=model,
input=texts # Batch processing - hiệu quả hơn nhiều
)
return [item.embedding for item in response.data]
Benchmark function
def benchmark_embedding(latencies: list[float]) -> dict:
"""Tính toán metrics hiệu suất embedding generation."""
import statistics
return {
"avg_latency_ms": statistics.mean(latencies),
"p50_latency_ms": statistics.median(latencies),
"p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)],
"p99_latency_ms": sorted(latencies)[int(len(latencies) * 0.99)],
"total_requests": len(latencies)
}
Document Processing Pipeline
Đây là module xử lý document từ knowledge base với chiến lược chunking thông minh:
from dataclasses import dataclass
from typing import Generator
import re
@dataclass
class DocumentChunk:
"""Represents a chunk of document with metadata."""
content: str
chunk_id: str
document_id: str
metadata: dict
embedding: list[float] = None
class IntelligentChunker:
"""
Smart text chunking strategy cho enterprise knowledge base.
Kết hợp nhiều phương pháp để tối ưu quality.
"""
def __init__(
self,
chunk_size: int = 512,
overlap: int = 64,
min_chunk_size: int = 128
):
self.chunk_size = chunk_size
self.overlap = overlap
self.min_chunk_size = min_chunk_size
def chunk_by_semantic_units(self, text: str) -> Generator[str, None, None]:
"""
Chunk document dựa trên semantic units (paragraphs, sections).
Giữ nguyên context và cấu trúc tài liệu.
"""
# Tách theo paragraphs
paragraphs = re.split(r'\n\s*\n', text)
current_chunk = []
current_size = 0
for para in paragraphs:
para_size = len(para)
# Nếu paragraph đơn lẻ lớn hơn chunk_size
if para_size > self.chunk_size:
if current_chunk:
yield self._combine_chunks(current_chunk)
current_chunk = []
# Split paragraph lớn thành sentences
sentences = re.split(r'(?<=[.!?])\s+', para)
for sent in sentences:
if current_size + len(sent) > self.chunk_size:
if current_size >= self.min_chunk_size:
yield self._combine_chunks(current_chunk)
current_chunk = [sent]
current_size = len(sent)
else:
current_chunk.append(sent)
current_size += len(sent)
# Nếu thêm paragraph vào sẽ vượt chunk_size
elif current_size + para_size > self.chunk_size:
if current_size >= self.min_chunk_size:
yield self._combine_chunks(current_chunk)
# Keep overlap
if self.overlap > 0 and current_chunk:
overlap_text = ' '.join(current_chunk)
if len(overlap_text) > self.overlap:
overlap_text = overlap_text[-self.overlap:]
current_chunk = [overlap_text]
current_size = len(overlap_text)
else:
current_chunk = []
current_size = 0
current_chunk.append(para)
current_size += para_size
else:
current_chunk.append(para)
current_size += para_size
# Yield remaining chunk
if current_chunk and current_size >= self.min_chunk_size:
yield self._combine_chunks(current_chunk)
def _combine_chunks(self, chunks: list[str]) -> str:
"""Combine list of text chunks into single string."""
return ' '.join(chunks)
class KnowledgeBaseProcessor:
"""Xử lý toàn bộ knowledge base: ingest, embed, store."""
def __init__(
self,
vector_store, # Pinecone, Qdrant, or Weaviate client
embedding_client: OpenAI,
batch_size: int = 100
):
self.vector_store = vector_store
self.embedding_client = embedding_client
self.chunker = IntelligentChunker()
self.batch_size = batch_size
async def ingest_document(
self,
document_id: str,
content: str,
metadata: dict
) -> list[DocumentChunk]:
"""
Ingest một document vào knowledge base với async processing.
"""
chunks_data = []
# Generate chunks
for idx, chunk_text in enumerate(self.chunker.chunk_by_semantic_units(content)):
chunk = DocumentChunk(
content=chunk_text,
chunk_id=f"{document_id}_{idx}",
document_id=document_id,
metadata={
**metadata,
"chunk_index": idx
}
)
chunks_data.append(chunk)
# Batch embedding generation
embeddings = await self._generate_embeddings_batch(chunks_data)
# Store to vector database
await self._store_chunks(chunks_data, embeddings)
return chunks_data
async def _generate_embeddings_batch(
self,
chunks: list[DocumentChunk]
) -> list[list[float]]:
"""Generate embeddings với batching và retry logic."""
all_embeddings = []
for i in range(0, len(chunks), self.batch_size):
batch = chunks[i:i + self.batch_size]
texts = [chunk.content for chunk in batch]
response = self.embedding_client.embeddings.create(
model="text-embedding-3-small",
input=texts
)
batch_embeddings = [item.embedding for item in response.data]
all_embeddings.extend(batch_embeddings)
return all_embeddings
async def _store_chunks(
self,
chunks: list[DocumentChunk],
embeddings: list[list[float]]
):
"""Store chunks với embeddings vào vector database."""
vectors = []
for chunk, embedding in zip(chunks, embeddings):
chunk.embedding = embedding
vectors.append({
"id": chunk.chunk_id,
"values": embedding,
"metadata": {
"content": chunk.content,
"document_id": chunk.document_id,
**chunk.metadata
}
})
await self.vector_store.upsert(vectors)
Concurrency Control và Rate Limiting
Đây là phần quan trọng nhất khi xử lý large-scale embedding generation. Chúng ta cần kiểm soát concurrency để tránh rate limit và optimize throughput:
import asyncio
from typing import TypeVar, Callable, Awaitable
from dataclasses import dataclass
import time
import logging
logger = logging.getLogger(__name__)
@dataclass
class RateLimitConfig:
"""Cấu hình rate limiting cho API calls."""
max_concurrent: int = 10 # Số request đồng thời tối đa
requests_per_minute: int = 3000 # RPM limit
retry_attempts: int = 3 # Số lần retry khi fail
retry_delay: float = 1.0 # Delay ban đầu giữa retries (seconds)
class SemaphoreBatcher:
"""
Async batch processor với concurrency control và rate limiting.
Đảm bảo không vượt quá API limits của HolySheep AI.
"""
def __init__(self, config: RateLimitConfig):
self.config = config
self.semaphore = asyncio.Semaphore(config.max_concurrent)
self.request_timestamps: list[float] = []
self._lock = asyncio.Lock()
async def process_with_semaphore(
self,
items: list[str],
process_fn: Callable[[list[str]], Awaitable[list]]
) -> list:
"""
Process items với semaphore control và automatic batching.
"""
results = []
# Auto-batching: chia thành batches phù hợp với concurrency
batch_size = min(len(items), self.config.max_concurrent * 2)
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_results = await self._process_batch(batch, process_fn)
results.extend(batch_results)
return results
async def _process_batch(
self,
batch: list[str],
process_fn: Callable
) -> list:
"""Xử lý một batch với semaphore."""
async with self.semaphore:
# Check và enforce rate limit
await self._enforce_rate_limit()
try:
result = await process_fn(batch)
return result
except Exception as e:
logger.error(f"Batch processing failed: {e}")
return await self._retry_with_backoff(batch, process_fn)
async def _enforce_rate_limit(self):
"""
Enforce rate limit: requests_per_minute.
HolySheep AI có limit linh hoạt, nhưng best practice vẫn nên control.
"""
async with self._lock:
now = time.time()
# Loại bỏ timestamps cũ hơn 1 phút
self.request_timestamps = [
ts for ts in self.request_timestamps
if now - ts < 60
]
# Nếu đã đạt limit, chờ
if len(self.request_timestamps) >= self.config.requests_per_minute:
oldest = self.request_timestamps[0]
wait_time = 60 - (now - oldest) + 0.1
if wait_time > 0:
logger.info(f"Rate limit reached, waiting {wait_time:.2f}s")
await asyncio.sleep(wait_time)
self.request_timestamps.append(time.time())
async def _retry_with_backoff(
self,
batch: list[str],
process_fn: Callable,
attempt: int = 0
) -> list:
"""Retry với exponential backoff."""
if attempt >= self.config.retry_attempts:
logger.error(f"Max retry attempts reached for batch of {len(batch)} items")
return []
delay = self.config.retry_delay * (2 ** attempt)
logger.warning(f"Retrying batch in {delay}s (attempt {attempt + 1})")
await asyncio.sleep(delay)
try:
return await process_fn(batch)
except Exception as e:
logger.error(f"Retry {attempt + 1} failed: {e}")
return await self._retry_with_backoff(batch, process_fn, attempt + 1)
class EnterpriseEmbeddingPipeline:
"""
Production-ready embedding pipeline cho enterprise knowledge base.
Supports incremental updates và real-time indexing.
"""
def __init__(self, api_key: str, config: RateLimitConfig = None):
self.client = OpenAI(
api_key=api_key,
base_url="https://api.holysheep.ai/v1"
)
self.rate_limiter = SemaphoreBatcher(config or RateLimitConfig())
self.chunker = IntelligentChunker()
async def index_knowledge_base(
self,
documents: list[dict],
vector_store,
show_progress: bool = True