在高频调用大模型 API 的场景中,重复请求造成的成本浪费不容忽视。我曾在一家 AI 客服项目中发现,同一个问题被不同用户咨询多次,每次都触发完整的 API 调用,月底账单让人触目惊心。本文将详细介绍如何基于 Redis 实现语义级别的响应缓存,配合向量相似度匹配,实现"问法不同但语义相同"场景下的缓存命中。

HolySheep vs 官方 API vs 其他中转站核心对比

对比维度 HolySheep AI 官方 API 其他中转站
汇率优势 ¥1=$1(无损) ¥7.3=$1(溢价约85%) ¥5-6=$1(溢价30-50%)
国内延迟 <50ms 直连 200-500ms(跨洋) 80-150ms
GPT-4.1 output $8/MTok $8/MTok $8-10/MTok
Claude Sonnet 4.5 $15/MTok $15/MTok $16-20/MTok
充值方式 微信/支付宝/银行卡 仅国际信用卡 参差不齐
免费额度 注册即送 $5试用额度 部分有,新用户有限
缓存优化 兼容所有方案 需自行实现 需自行实现

通过上表可以看出,立即注册 HolySheep AI 不仅在价格上占据绝对优势,其稳定的国内线路和高可用性也为缓存方案提供了可靠的底层保障。

为什么需要语义缓存?

传统的精确匹配缓存只能处理完全相同的请求文本,但用户的表达方式千变万化:

这些问法不同但意图相同的 query,如果每次都调用 API,成本会急剧上升。语义缓存的核心思路是:将用户 query 转换为向量,通过余弦相似度判断语义是否相近,从而实现"理解相同意思就命中缓存"。

技术架构概览

整体方案涉及以下组件:

完整实现代码

1. 环境依赖安装

pip install redis openai numpy tiktoken

2. 语义缓存核心类实现

import redis
import json
import numpy as np
from openai import OpenAI
from typing import Optional, Dict, Any, Tuple

class SemanticCache:
    """语义缓存实现 - 基于Redis + 向量相似度"""
    
    def __init__(
        self,
        redis_host: str = "localhost",
        redis_port: int = 6379,
        redis_db: int = 0,
        embedding_model: str = "text-embedding-3-small",
        similarity_threshold: float = 0.88,
        cache_ttl: int = 86400 * 7,  # 默认7天过期
        vector_dim: int = 1536,
        max_candidates: int = 20
    ):
        # Redis连接
        self.redis_client = redis.Redis(
            host=redis_host,
            port=redis_port,
            db=redis_db,
            decode_responses=True
        )
        
        # HolySheep API客户端配置
        self.client = OpenAI(
            api_key="YOUR_HOLYSHEEP_API_KEY",
            base_url="https://api.holysheep.ai/v1"
        )
        
        self.embedding_model = embedding_model
        self.similarity_threshold = similarity_threshold
        self.cache_ttl = cache_ttl
        self.vector_dim = vector_dim
        self.max_candidates = max_candidates
        
        # 索引数据结构
        self.index_key = "semantic_cache:index"
        self.cache_prefix = "semantic_cache:response:"
    
    def _get_embedding(self, text: str) -> np.ndarray:
        """调用HolySheep获取文本向量"""
        response = self.client.embeddings.create(
            model=self.embedding_model,
            input=text
        )
        embedding = response.data[0].embedding
        return np.array(embedding, dtype=np.float32)
    
    def _cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
        """计算余弦相似度"""
        dot_product = np.dot(vec1, vec2)
        norm_product = np.linalg.norm(vec1) * np.linalg.norm(vec2)
        return float(dot_product / norm_product) if norm_product > 0 else 0.0
    
    def _store_vector(self, cache_id: str, vector: np.ndarray) -> None:
        """将向量存储到Redis(使用JSON序列化)"""
        key = f"{self.cache_prefix}vec:{cache_id}"
        self.redis_client.set(key, json.dumps(vector.tolist()), ex=self.cache_ttl)
    
    def _get_vector(self, cache_id: str) -> Optional[np.ndarray]:
        """从Redis获取向量"""
        key = f"{self.cache_prefix}vec:{cache_id}"
        data = self.redis_client.get(key)
        if data:
            return np.array(json.loads(data), dtype=np.float32)
        return None
    
    def _store_response(self, cache_id: str, response_data: Dict[str, Any]) -> None:
        """存储API响应"""
        key = f"{self.cache_prefix}resp:{cache_id}"
        self.redis_client.set(key, json.dumps(response_data), ex=self.cache_ttl)
    
    def _get_response(self, cache_id: str) -> Optional[Dict[str, Any]]:
        """获取缓存的响应"""
        key = f"{self.cache_prefix}resp:{cache_id}"
        data = self.redis_client.get(key)
        if data:
            return json.loads(data)
        return None
    
    def _add_to_index(self, cache_id: str) -> None:
        """将cache_id添加到索引列表"""
        self.redis_client.lpush(self.index_key, cache_id)
        # 限制索引大小,防止内存溢出
        self.redis_client.ltrim(self.index_key, 0, 9999)
    
    def get(self, query: str) -> Tuple[Optional[Dict[str, Any]], bool]:
        """
        获取缓存或调用API
        
        Returns:
            (response_data, cache_hit)
        """
        # Step 1: 获取查询的向量
        query_vector = self._get_embedding(query)
        
        # Step 2: 从索引中获取候选cache_id
        candidate_ids = self.redis_client.lrange(self.index_key, 0, self.max_candidates - 1)
        
        if not candidate_ids:
            # 索引为空,直接调用API
            return self._call_api_and_cache(query, query_vector), False
        
        # Step 3: 遍历候选,找到相似度最高的
        best_similarity = 0.0
        best_cache_id = None
        
        for cache_id in candidate_ids:
            cached_vector = self._get_vector(cache_id)
            if cached_vector is not None:
                similarity = self._cosine_similarity(query_vector, cached_vector)
                if similarity > best_similarity:
                    best_similarity = similarity
                    best_cache_id = cache_id
        
        # Step 4: 判断是否命中
        if best_cache_id and best_similarity >= self.similarity_threshold:
            print(f"✅ 缓存命中! cache_id={best_cache_id}, similarity={best_similarity:.4f}")
            cached_response = self._get_response(best_cache_id)
            if cached_response:
                return cached_response, True
        
        # Step 5: 未命中,调用API并缓存
        print(f"❌ 缓存未命中, best_similarity={best_similarity:.4f}, 调用API...")
        return self._call_api_and_cache(query, query_vector), False
    
    def _call_api_and_cache(self, query: str, query_vector: np.ndarray) -> Dict[str, Any]:
        """调用API并缓存结果"""
        # 调用HolySheep Chat API
        response = self.client.chat.completions.create(
            model="gpt-4.1",
            messages=[
                {"role": "system", "content": "你是一个有帮助的AI助手。"},
                {"role": "user", "content": query}
            ],
            temperature=0.7,
            max_tokens=1000
        )
        
        response_data = {
            "content": response.choices[0].message.content,
            "model": response.model,
            "usage": {
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens
            }
        }
        
        # 生成cache_id并存储
        import hashlib
        cache_id = hashlib.md5(query.encode()).hexdigest()[:16]
        
        self._store_vector(cache_id, query_vector)
        self._store_response(cache_id, response_data)
        self._add_to_index(cache_id)
        
        return response_data


使用示例

if __name__ == "__main__": cache = SemanticCache( redis_host="localhost", redis_port=6379, similarity_threshold=0.88, cache_ttl=86400 * 30 # 30天缓存 ) # 第一次查询 - 缓存未命中 response1, hit1 = cache.get("如何重置我的账户密码?") print(f"Response: {response1['content'][:100]}...") # 第二次查询 - 语义相近,缓存命中 response2, hit2 = cache.get("密码忘了怎么找回?") print(f"Cache hit: {hit2}")

3. 生产环境优化版(支持分布式和监控)

import redis
import hashlib
import time
import logging
from datetime import datetime
from collections import defaultdict
from typing import Optional, Dict, Any, List, Tuple
from contextlib import contextmanager
import numpy as np

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ProductionSemanticCache:
    """生产级语义缓存 - 支持分布式锁、熔断、监控"""
    
    def __init__(
        self,
        redis_url: str = "redis://localhost:6379/0",
        api_key: str = "YOUR_HOLYSHEEP_API_KEY",
        base_url: str = "https://api.holysheep.ai/v1",
        embedding_model: str = "text-embedding-3-small",
        chat_model: str = "gpt-4.1",
        similarity_threshold: float = 0.90,
        cache_ttl: int = 86400,
        vector_dim: int = 1536,
        max_cache_size: int = 5000,
        circuit_breaker_threshold: int = 5,
        circuit_breaker_timeout: int = 60
    ):
        self.redis_client = redis.from_url(redis_url, decode_responses=False)
        
        from openai import OpenAI
        self.openai_client = OpenAI(api_key=api_key, base_url=base_url)
        
        self.embedding_model = embedding_model
        self.chat_model = chat_model
        self.similarity_threshold = similarity_threshold
        self.cache_ttl = cache_ttl
        self.vector_dim = vector_dim
        self.max_cache_size = max_cache_size
        
        # 熔断器状态
        self.failure_count = 0
        self.circuit_breaker_threshold = circuit_breaker_threshold
        self.circuit_breaker_timeout = circuit_breaker_timeout
        self.circuit_open_since = None
        
        # 监控指标
        self.metrics = defaultdict(int)
    
    def _check_circuit_breaker(self) -> bool:
        """检查熔断器状态"""
        if self.circuit_open_since is None:
            return True
        
        if time.time() - self.circuit_open_since > self.circuit_breaker_timeout:
            logger.info("🔄 熔断器恢复,尝试请求...")
            self.circuit_open_since = None
            self.failure_count = 0
            return True
        
        return False
    
    def _trigger_circuit_breaker(self) -> None:
        """触发熔断器"""
        self.failure_count += 1
        if self.failure_count >= self.circuit_breaker_threshold:
            self.circuit_open_since = time.time()
            logger.warning(f"⚠️ 熔断器开启,持续{self.circuit_breaker_timeout}秒")
    
    @contextmanager
    def _distributed_lock(self, key: str, timeout: int = 10):
        """分布式锁,避免缓存击穿"""
        lock_key = f"lock:{key}"
        lock_acquired = self.redis_client.set(lock_key, "1", nx=True, ex=timeout)
        
        if not lock_acquired:
            # 等待锁释放
            time.sleep(0.1)
            yield False
        else:
            try:
                yield True
            finally:
                self.redis_client.delete(lock_key)
    
    def _get_embedding(self, text: str) -> Optional[np.ndarray]:
        """获取文本嵌入向量(带重试机制)"""
        if not self._check_circuit_breaker():
            return None
        
        for attempt in range(3):
            try:
                response = self.openai_client.embeddings.create(
                    model=self.embedding_model,
                    input=text[:8000]  # 限制输入长度
                )
                return np.array(response.data[0].embedding, dtype=np.float32)
            except Exception as e:
                logger.warning(f"Embedding API错误 (尝试 {attempt+1}/3): {e}")
                self._trigger_circuit_breaker()
                if attempt < 2:
                    time.sleep(1 * (attempt + 1))
        
        return None
    
    def _cosine_similarity_batch(self, query_vec: np.ndarray, cache_vecs: List[np.ndarray]) -> List[float]:
        """批量计算余弦相似度"""
        if not cache_vecs:
            return []
        
        cache_matrix = np.vstack(cache_vecs)
        similarities = np.dot(cache_matrix, query_vec) / (
            np.linalg.norm(cache_matrix, axis=1) * np.linalg.norm(query_vec) + 1e-8
        )
        return similarities.tolist()
    
    def get(self, query: str, user_id: Optional[str] = None) -> Dict[str, Any]:
        """获取响应(支持用户级别缓存隔离)"""
        start_time = time.time()
        cache_key_prefix = f"user:{user_id}:" if user_id else "global:"
        cache_hash = hashlib.md5(query.encode()).hexdigest()
        
        # 查询缓存
        cached = self._check_cache(cache_key_prefix, cache_hash)
        if cached:
            self.metrics["cache_hit"] += 1
            cached["cache_hit"] = True
            cached["latency_ms"] = (time.time() - start_time) * 1000
            return cached
        
        # 获取嵌入向量
        query_vector = self._get_embedding(query)
        if query_vector is None:
            return {"error": "无法获取嵌入向量,请检查API配置", "cache_hit": False}
        
        # 语义搜索
        semantic_result = self._semantic_search(query_vector, cache_key_prefix)
        
        if semantic_result:
            self.metrics["semantic_hit"] += 1
            result = semantic_result.copy()
            result["cache_hit"] = True
            result["match_type"] = "semantic"
            result["similarity"] = float(semantic_result.get("similarity", 0))
        else:
            # 调用API
            self.metrics["api_call"] += 1
            result = self._call_and_cache_api(query, query_vector, cache_key_prefix, cache_hash)
            result["cache_hit"] = False
        
        result["latency_ms"] = (time.time() - start_time) * 1000
        return result
    
    def _check_cache(self, prefix: str, cache_hash: str) -> Optional[Dict[str, Any]]:
        """精确匹配检查"""
        key = f"{prefix}exact:{cache_hash}"
        data = self.redis_client.get(key)
        if data:
            return eval(data)  # 安全风险,实际使用json.loads
    
    def _semantic_search(self, query_vector: np.ndarray, prefix: str) -> Optional[Dict[str, Any]]:
        """语义相似度搜索"""
        index_key = f"{prefix}semantic:index"
        candidates = self.redis_client.lrange(index_key, 0, 49)
        
        if not candidates:
            return None
        
        vectors = []
        valid_ids = []
        
        for cid in candidates:
            vec_key = f"{prefix}vec:{cid.decode()}"
            vec_data = self.redis_client.get(vec_key)
            if vec_data:
                vectors.append(np.frombuffer(vec_data, dtype=np.float32))
                valid_ids.append(cid.decode())
        
        if not vectors:
            return None
        
        # 批量计算相似度
        similarities = self._cosine_similarity_batch(query_vector, vectors)
        
        # 找最佳匹配
        best_idx = np.argmax(similarities)
        best_sim = similarities[best_idx]
        
        if best_sim >= self.similarity_threshold:
            best_id = valid_ids[best_idx]
            resp_key = f"{prefix}resp:{best_id}"
            resp_data = self.redis_client.get(resp_key)
            
            if resp_data:
                result = eval(resp_data)
                result["similarity"] = best_sim
                result["cache_id"] = best_id
                return result
        
        return None
    
    def _call_and_cache_api(
        self, 
        query: str, 
        query_vector: np.ndarray,
        prefix: str,
        cache_hash: str
    ) -> Dict[str, Any]:
        """调用API并缓存"""
        try:
            response = self.openai_client.chat.completions.create(
                model=self.chat_model,
                messages=[
                    {"role": "system", "content": "你是一个专业的技术助手。"},
                    {"role": "user", "content": query}
                ],
                temperature=0.7,
                max_tokens=2000
            )
            
            result = {
                "content": response.choices[0].message.content,
                "model": response.model,
                "usage": {
                    "prompt_tokens": response.usage.prompt_tokens,
                    "completion_tokens": response.usage.completion_tokens,
                    "total_tokens": response.usage.total_tokens
                },
                "timestamp": datetime.now().isoformat()
            }
            
            # 生成cache_id并存储
            cache_id = f"{cache_hash}_{int(time.time())}"
            
            with self._distributed_lock(f"cache_write:{cache_hash}"):
                # 存储向量(二进制格式节省空间)
                vec_key = f"{prefix}vec:{cache_id}"
                self.redis_client.set(vec_key, query_vector.tobytes(), ex=self.cache_ttl)
                
                # 存储响应
                resp_key = f"{prefix}resp:{cache_id}"
                self.redis_client.set(resp_key, str(result), ex=self.cache_ttl)
                
                # 更新索引
                index_key = f"{prefix}semantic:index"
                self.redis_client.lpush(index_key, cache_id)
                self.redis_client.ltrim(index_key, 0, self.max_cache_size - 1)
                
                # 精确匹配缓存
                exact_key = f"{prefix}exact:{cache_hash}"
                self.redis_client.set(exact_key, str(result), ex=self.cache_ttl)
            
            self.failure_count = max(0, self.failure_count - 1)
            return result
            
        except Exception as e:
            logger.error(f"API调用失败: {e}")
            self._trigger_circuit_breaker()
            raise
    
    def get_metrics(self) -> Dict[str, int]:
        """获取缓存统计指标"""
        total_requests = sum(self.metrics.values())
        hit_rate = (
            (self.metrics["cache_hit"] + self.metrics["semantic_hit"]) / total_requests
            if total_requests > 0 else 0
        )
        
        return {
            **self.metrics,
            "total_requests": total_requests,
            "hit_rate": round(hit_rate * 100, 2)
        }


使用示例

if