去年双十一,我负责的电商 AI 客服系统遭遇了前所未有的挑战——商品知识库膨胀至 280 万条向量,单日搜索请求峰值突破 50 万次。最初使用暴力匹配(Brute Force),P99 延迟高达 2.3 秒,用户体验断崖式下滑。经过三个月的技术选型和优化,最终在 HolySheep AI 的帮助下,实现了 P99 延迟低于 80ms、召回率 95%+ 的目标。本文将完整复盘这套百万级向量近似最近邻(ANN)搜索架构的实现过程。

一、业务场景与技术选型

我们的电商 RAG 系统需要解决的核心问题是:根据用户问题,从商品详情、用户评价、客服话术等文档中快速召回最相关的 Top-K 段落。每个文档经过 embedding 模型编码后,生成 1536 维的 float32 向量,存储在向量数据库中。查询时,用户问题同样编码为向量,通过余弦相似度计算,返回最相似的文档片段。

技术选型时,我对比了业界主流方案:

最终,我选择了 HolySheep AI 作为 embedding + 近似最近邻搜索的统一方案。原因很简单:国内直连延迟 < 50ms,汇率优惠(¥1 = $1,相较官方节省 85%+),注册即送免费额度,2026 年主流模型定价透明(GPT-4.1 $8/MTok、DeepSeek V3.2 $0.42/MTok)。对于日均 50 万次查询的规模,月度成本可控。

二、完整代码实现

2.1 环境配置与初始化

"""
百万级向量 ANN 搜索完整实现
环境依赖:pip install requests numpy tiktoken
"""

import requests
import numpy as np
import hashlib
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
import json
import time

HolySheep AI 配置

HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1" API_KEY = "YOUR_HOLYSHEEP_API_KEY" # 替换为你的 HolySheep API Key @dataclass class SearchResult: """搜索结果数据结构""" text: str score: float metadata: dict @dataclass class VectorDocument: """带元数据的向量文档""" id: str vector: List[float] text: str metadata: dict class HolySheepANNClient: """HolySheep AI 向量搜索客户端封装""" def __init__(self, api_key: str, model: str = "text-embedding-3-large"): self.api_key = api_key self.model = model self.base_url = HOLYSHEEP_BASE_URL self.headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } # 缓存已处理的文档ID self._processed_ids = set() def generate_document_id(self, text: str, metadata: dict) -> str: """基于内容生成唯一文档ID""" content = f"{text}:{json.dumps(metadata, sort_keys=True)}" return hashlib.sha256(content.encode()).hexdigest()[:16] def embed_texts(self, texts: List[str], batch_size: int = 100) -> List[List[float]]: """ 批量文本向量化 内部自动处理分批,支持千条以上文本 """ all_embeddings = [] for i in range(0, len(texts), batch_size): batch = texts[i:i + batch_size] payload = { "model": self.model, "input": batch } response = requests.post( f"{self.base_url}/embeddings", headers=self.headers, json=payload, timeout=30 ) if response.status_code != 200: raise RuntimeError(f"Embedding API 错误: {response.status_code} - {response.text}") result = response.json() all_embeddings.extend([item["embedding"] for item in result["data"]]) # 避免请求过于密集 if i + batch_size < len(texts): time.sleep(0.1) return all_embeddings def index_documents(self, documents: List[VectorDocument]) -> Dict: """ 将文档批量索引到 HolySheep AI 返回索引结果统计 """ # 生成向量 texts = [doc.text for doc in documents] embeddings = self.embed_texts(texts) # 构建索引 payload index_payload = { "documents": [ { "id": doc.id, "vector": emb, "text": doc.text, "metadata": doc.metadata } for doc, emb in zip(documents, embeddings) ], "index_type": "hnsw", # HNSW 算法,平衡精度与速度 "ef_construction": 200, # 索引构建参数,越高越精准但越慢 "m": 16 # HNSW 连接数参数 } response = requests.post( f"{self.base_url}/vectors/index", headers=self.headers, json=index_payload, timeout=60 ) return response.json() def search( self, query: str, top_k: int = 10, min_score: float = 0.7, filter_metadata: Optional[Dict] = None ) -> List[SearchResult]: """ 近似最近邻搜索 - query: 查询文本 - top_k: 返回 Top-K 结果 - min_score: 最低相似度阈值 - filter_metadata: 元数据过滤条件 """ # 向量化查询 query_embedding = self.embed_texts([query])[0] # 搜索请求 search_payload = { "vector": query_embedding, "top_k": top_k, "min_score": min_score, "algorithm": "hnsw_approximate", # HNSW 近似搜索 "ef_search": 100, # 搜索时动态列表大小,越大越精准 "distance_metric": "cosine" # 余弦相似度 } if filter_metadata: search_payload["filter"] = filter_metadata start_time = time.time() response = requests.post( f"{self.base_url}/vectors/search", headers=self.headers, json=search_payload, timeout=10 ) latency_ms = (time.time() - start_time) * 1000 if response.status_code != 200: raise RuntimeError(f"搜索 API 错误: {response.status_code} - {response.text}") result = response.json() # 解析结果 search_results = [ SearchResult( text=hit["text"], score=hit["score"], metadata=hit.get("metadata", {}) ) for hit in result.get("hits", []) ] print(f"查询「{query[:20]}...」返回 {len(search_results)} 条结果,延迟 {latency_ms:.1f}ms") return search_results

全局客户端实例(单例模式)

_client = None def get_client() -> HolySheepANNClient: global _client if _client is None: _client = HolySheepANNClient(api_key=API_KEY) return _client

2.2 百万级数据导入与增量更新

"""
百万级向量数据导入与增量更新策略
支持断点续传、批量提交、增量同步
"""

import concurrent.futures
from typing import Generator, Iterator
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class MillionScaleVectorPipeline:
    """
    百万级向量导入管道
    核心优化:
    1. 分批并行请求(控制并发数)
    2. 增量ID去重(避免重复索引)
    3. 断点续传(记录已处理进度)
    """
    
    def __init__(
        self,
        client: HolySheepANNClient,
        batch_size: int = 500,
        max_workers: int = 4,
        checkpoint_file: str = "index_checkpoint.json"
    ):
        self.client = client
        self.batch_size = batch_size
        self.max_workers = max_workers
        self.checkpoint_file = checkpoint_file
        self.progress = {"processed": 0, "failed": 0, "last_id": None}
    
    def load_checkpoint(self) -> dict:
        """加载断点"""
        import os
        if os.path.exists(self.checkpoint_file):
            with open(self.checkpoint_file, 'r') as f:
                return json.load(f)
        return self.progress
    
    def save_checkpoint(self):
        """保存断点"""
        with open(self.checkpoint_file, 'w') as f:
            json.dump(self.progress, f)
    
    def batch_generator(
        self, 
        raw_documents: Iterator[dict]
    ) -> Generator[List[VectorDocument], None, None]:
        """
        从原始数据流中分批生成 VectorDocument
        raw_documents 可以是 CSV、JSONL、数据库游标等迭代器
        """
        batch = []
        checkpoint = self.load_checkpoint()
        start_from = checkpoint.get("processed", 0)
        skipped = 0
        
        for doc in raw_documents:
            skipped += 1
            if skipped <= start_from:
                continue
            
            vector_doc = VectorDocument(
                id=self.client.generate_document_id(doc["text"], doc.get("metadata", {})),
                vector=[],  # 暂不生成向量,等待批量处理
                text=doc["text"],
                metadata=doc.get("metadata", {})
            )
            batch.append(vector_doc)
            
            if len(batch) >= self.batch_size:
                yield batch
                self.progress["processed"] += len(batch)
                self.save_checkpoint()
                batch = []
        
        # 处理剩余数据
        if batch:
            yield batch
    
    def import_from_csv(self, csv_path: str) -> dict:
        """
        从 CSV 文件导入百万级向量数据
        CSV 格式:text,category,source,created_at
        """
        import csv
        
        def csv_generator():
            with open(csv_path, 'r', encoding='utf-8') as f:
                reader = csv.DictReader(f)
                for row in reader:
                    yield {
                        "text": row["text"],
                        "metadata": {
                            "category": row.get("category", ""),
                            "source": row.get("source", ""),
                            "created_at": row.get("created_at", "")
                        }
                    }
        
        return self._import_pipeline(csv_generator())
    
    def _import_pipeline(self, document_generator) -> dict:
        """核心导入管道"""
        total_imported = 0
        total_failed = 0
        start_time = time.time()
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = []
            
            for batch in self.batch_generator(document_generator):
                future = executor.submit(self._process_batch, batch)
                futures.append(future)
                
                # 控制内存使用,队列积压不超过 10 个批次
                if len(futures) > 10:
                    done, futures = concurrent.futures.wait(
                        futures, 
                        return_when=concurrent.futures.FIRST_COMPLETED
                    )
                    for f in done:
                        success, failed = f.result()
                        total_imported += success
                        total_failed += failed
                
                logger.info(f"已提交 {len(futures)} 个批次,总进度: {total_imported} 条")
        
        # 等待剩余任务完成
        for future in concurrent.futures.as_completed(futures):
            success, failed = future.result()
            total_imported += success
            total_failed += failed
        
        elapsed = time.time() - start_time
        
        return {
            "total_imported": total_imported,
            "total_failed": total_failed,
            "elapsed_seconds": elapsed,
            "throughput": total_imported / elapsed if elapsed > 0 else 0
        }
    
    def _process_batch(self, batch: List[VectorDocument]) -> Tuple[int, int]:
        """处理单个批次"""
        try:
            self.client.index_documents(batch)
            return len(batch), 0
        except Exception as e:
            logger.error(f"批次索引失败: {e}")
            return 0, len(batch)
    
    def incremental_update(
        self, 
        new_documents: List[dict],
        deleted_ids: List[str]
    ) -> dict:
        """
        增量更新:添加新文档 + 删除过期文档
        适用于高频更新的知识库场景
        """
        # 添加新文档
        new_docs = [
            VectorDocument(
                id=self.client.generate_document_id(doc["text"], doc.get("metadata", {})),
                vector=[],
                text=doc["text"],
                metadata=doc.get("metadata", {})
            )
            for doc in new_documents
        ]
        
        # 批量添加
        added = 0
        for i in range(0, len(new_docs), self.batch_size):
            batch = new_docs[i:i + self.batch_size]
            success, _ = self._process_batch(batch)
            added += success
        
        # 删除过期文档
        delete_response = requests.post(
            f"{self.client.base_url}/vectors/delete",
            headers=self.client.headers,
            json={"ids": deleted_ids},
            timeout=30
        )
        
        deleted = len(deleted_ids) if delete_response.status_code == 200 else 0
        
        return {"added": added, "deleted": deleted}


使用示例

if __name__ == "__main__": client = get_client() pipeline = MillionScaleVectorPipeline( client=client, batch_size=500, max_workers=4 ) # 导入 CSV 数据 result = pipeline.import_from_csv("knowledge_base.csv") print(f"导入完成:成功 {result['total_imported']} 条,失败 {result['total_failed']} 条") print(f"耗时 {result['elapsed_seconds']:.1f}s,吞吐量 {result['throughput']:.1f} 条/秒")

三、电商 RAG 系统完整集成

"""
电商 AI 客服 RAG 系统完整实现
功能:用户问题 → 向量检索 → 上下文组装 → LLM 生成回答
支持多轮对话、意图识别、来源标注
"""

from typing import List, Optional, Dict
from collections import defaultdict
import re

class EcommerceRAGSystem:
    """
    电商 RAG 系统核心类
    集成 HolySheep AI 向量检索 + 大模型生成
    """
    
    def __init__(
        self,
        ann_client: HolySheepANNClient,
        llm_api_key: str = API_KEY,
        llm_model: str = "gpt-4.1"
    ):
        self.ann_client = ann_client
        self.llm_api_key = llm_api_key
        self.llm_model = llm_model
        self.conversation_history: List[Dict] = []
        self.max_history_length = 10
    
    def _build_system_prompt(self) -> str:
        """构建电商客服系统提示词"""
        return """你是一个专业的电商智能客服助手,名为「小Holy」。

核心能力:
1. 回答商品相关问题(价格、规格、库存、物流等)
2. 提供购物建议和比价信息
3. 处理退换货、售后等常见问题

回答规范:
- 使用友好、亲切的语气
- 回复中必须标注信息来源,例如「根据商品详情页显示...」
- 如果信息不足以回答,明确告知用户「暂无该信息,建议联系人工客服」
- 对于涉及价格、促销等时效性信息,注明「以下信息仅供参考,请以实际为准」
- 单次回答不超过 200 字,突出重点

请根据以下检索到的上下文信息,回答用户问题。如果上下文中没有相关信息,直接说明无法回答。
"""
    
    def _assemble_context(
        self, 
        search_results: List[SearchResult],
        max_context_length: int = 4000
    ) -> str:
        """
        组装检索上下文
        - 按相似度降序排列
        - 限制总长度(token 预算)
        - 添加来源标注
        """
        context_parts = []
        current_length = 0
        
        for result in search_results:
            source = result.metadata.get("source", "知识库")
            category = result.metadata.get("category", "通用")
            
            part = f"【来源:{source} | 分类:{category} | 相似度:{result.score:.2f}】\n{result.text}\n"
            
            if current_length + len(part) > max_context_length:
                break
            
            context_parts.append(part)
            current_length += len(part)
        
        return "\n---\n".join(context_parts)
    
    def _generate_response(
        self,
        user_question: str,
        context: str,
        conversation_history: List[Dict]
    ) -> str:
        """
        调用 LLM 生成回答
        使用 HolySheep AI API,汇率优势明显
        """
        messages = [
            {"role": "system", "content": self._build_system_prompt()},
            {"role": "user", "content": f"【上下文】\n{context}\n\n【用户问题】{user_question}"}
        ]
        
        # 添加对话历史(仅最近 N 轮)
        for hist in conversation_history[-self.max_history_length:]:
            messages.insert(1, {"role": "user", "content": hist["question"]})
            messages.insert(2, {"role": "assistant", "content": hist["answer"]})
        
        payload = {
            "model": self.llm_model,
            "messages": messages,
            "temperature": 0.7,
            "max_tokens": 500
        }
        
        response = requests.post(
            f"{HOLYSHEEP_BASE_URL}/chat/completions",
            headers={
                "Authorization": f"Bearer {self.llm_api_key}",
                "