在高频调用大模型 API 的场景中,重复请求造成的成本浪费不容忽视。我曾在一家 AI 客服项目中发现,同一个问题被不同用户咨询多次,每次都触发完整的 API 调用,月底账单让人触目惊心。本文将详细介绍如何基于 Redis 实现语义级别的响应缓存,配合向量相似度匹配,实现"问法不同但语义相同"场景下的缓存命中。
HolySheep vs 官方 API vs 其他中转站核心对比
| 对比维度 | HolySheep AI | 官方 API | 其他中转站 |
|---|---|---|---|
| 汇率优势 | ¥1=$1(无损) | ¥7.3=$1(溢价约85%) | ¥5-6=$1(溢价30-50%) |
| 国内延迟 | <50ms 直连 | 200-500ms(跨洋) | 80-150ms |
| GPT-4.1 output | $8/MTok | $8/MTok | $8-10/MTok |
| Claude Sonnet 4.5 | $15/MTok | $15/MTok | $16-20/MTok |
| 充值方式 | 微信/支付宝/银行卡 | 仅国际信用卡 | 参差不齐 |
| 免费额度 | 注册即送 | $5试用额度 | 部分有,新用户有限 |
| 缓存优化 | 兼容所有方案 | 需自行实现 | 需自行实现 |
通过上表可以看出,立即注册 HolySheep AI 不仅在价格上占据绝对优势,其稳定的国内线路和高可用性也为缓存方案提供了可靠的底层保障。
为什么需要语义缓存?
传统的精确匹配缓存只能处理完全相同的请求文本,但用户的表达方式千变万化:
- "如何重置密码?" vs "密码忘了怎么办"
- "帮我查询余额" vs "看看我还有多少钱"
- "取消订阅" vs "我不想继续续费了"
这些问法不同但意图相同的 query,如果每次都调用 API,成本会急剧上升。语义缓存的核心思路是:将用户 query 转换为向量,通过余弦相似度判断语义是否相近,从而实现"理解相同意思就命中缓存"。
技术架构概览
整体方案涉及以下组件:
- Redis:存储向量和原始响应
- 嵌入模型:将文本转为 1536 维向量(OpenAI text-embedding-3-small)
- 相似度阈值:通常设定 0.85-0.95 之间
- TTL 管理:缓存过期时间控制
完整实现代码
1. 环境依赖安装
pip install redis openai numpy tiktoken
2. 语义缓存核心类实现
import redis
import json
import numpy as np
from openai import OpenAI
from typing import Optional, Dict, Any, Tuple
class SemanticCache:
"""语义缓存实现 - 基于Redis + 向量相似度"""
def __init__(
self,
redis_host: str = "localhost",
redis_port: int = 6379,
redis_db: int = 0,
embedding_model: str = "text-embedding-3-small",
similarity_threshold: float = 0.88,
cache_ttl: int = 86400 * 7, # 默认7天过期
vector_dim: int = 1536,
max_candidates: int = 20
):
# Redis连接
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
# HolySheep API客户端配置
self.client = OpenAI(
api_key="YOUR_HOLYSHEEP_API_KEY",
base_url="https://api.holysheep.ai/v1"
)
self.embedding_model = embedding_model
self.similarity_threshold = similarity_threshold
self.cache_ttl = cache_ttl
self.vector_dim = vector_dim
self.max_candidates = max_candidates
# 索引数据结构
self.index_key = "semantic_cache:index"
self.cache_prefix = "semantic_cache:response:"
def _get_embedding(self, text: str) -> np.ndarray:
"""调用HolySheep获取文本向量"""
response = self.client.embeddings.create(
model=self.embedding_model,
input=text
)
embedding = response.data[0].embedding
return np.array(embedding, dtype=np.float32)
def _cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
"""计算余弦相似度"""
dot_product = np.dot(vec1, vec2)
norm_product = np.linalg.norm(vec1) * np.linalg.norm(vec2)
return float(dot_product / norm_product) if norm_product > 0 else 0.0
def _store_vector(self, cache_id: str, vector: np.ndarray) -> None:
"""将向量存储到Redis(使用JSON序列化)"""
key = f"{self.cache_prefix}vec:{cache_id}"
self.redis_client.set(key, json.dumps(vector.tolist()), ex=self.cache_ttl)
def _get_vector(self, cache_id: str) -> Optional[np.ndarray]:
"""从Redis获取向量"""
key = f"{self.cache_prefix}vec:{cache_id}"
data = self.redis_client.get(key)
if data:
return np.array(json.loads(data), dtype=np.float32)
return None
def _store_response(self, cache_id: str, response_data: Dict[str, Any]) -> None:
"""存储API响应"""
key = f"{self.cache_prefix}resp:{cache_id}"
self.redis_client.set(key, json.dumps(response_data), ex=self.cache_ttl)
def _get_response(self, cache_id: str) -> Optional[Dict[str, Any]]:
"""获取缓存的响应"""
key = f"{self.cache_prefix}resp:{cache_id}"
data = self.redis_client.get(key)
if data:
return json.loads(data)
return None
def _add_to_index(self, cache_id: str) -> None:
"""将cache_id添加到索引列表"""
self.redis_client.lpush(self.index_key, cache_id)
# 限制索引大小,防止内存溢出
self.redis_client.ltrim(self.index_key, 0, 9999)
def get(self, query: str) -> Tuple[Optional[Dict[str, Any]], bool]:
"""
获取缓存或调用API
Returns:
(response_data, cache_hit)
"""
# Step 1: 获取查询的向量
query_vector = self._get_embedding(query)
# Step 2: 从索引中获取候选cache_id
candidate_ids = self.redis_client.lrange(self.index_key, 0, self.max_candidates - 1)
if not candidate_ids:
# 索引为空,直接调用API
return self._call_api_and_cache(query, query_vector), False
# Step 3: 遍历候选,找到相似度最高的
best_similarity = 0.0
best_cache_id = None
for cache_id in candidate_ids:
cached_vector = self._get_vector(cache_id)
if cached_vector is not None:
similarity = self._cosine_similarity(query_vector, cached_vector)
if similarity > best_similarity:
best_similarity = similarity
best_cache_id = cache_id
# Step 4: 判断是否命中
if best_cache_id and best_similarity >= self.similarity_threshold:
print(f"✅ 缓存命中! cache_id={best_cache_id}, similarity={best_similarity:.4f}")
cached_response = self._get_response(best_cache_id)
if cached_response:
return cached_response, True
# Step 5: 未命中,调用API并缓存
print(f"❌ 缓存未命中, best_similarity={best_similarity:.4f}, 调用API...")
return self._call_api_and_cache(query, query_vector), False
def _call_api_and_cache(self, query: str, query_vector: np.ndarray) -> Dict[str, Any]:
"""调用API并缓存结果"""
# 调用HolySheep Chat API
response = self.client.chat.completions.create(
model="gpt-4.1",
messages=[
{"role": "system", "content": "你是一个有帮助的AI助手。"},
{"role": "user", "content": query}
],
temperature=0.7,
max_tokens=1000
)
response_data = {
"content": response.choices[0].message.content,
"model": response.model,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
}
}
# 生成cache_id并存储
import hashlib
cache_id = hashlib.md5(query.encode()).hexdigest()[:16]
self._store_vector(cache_id, query_vector)
self._store_response(cache_id, response_data)
self._add_to_index(cache_id)
return response_data
使用示例
if __name__ == "__main__":
cache = SemanticCache(
redis_host="localhost",
redis_port=6379,
similarity_threshold=0.88,
cache_ttl=86400 * 30 # 30天缓存
)
# 第一次查询 - 缓存未命中
response1, hit1 = cache.get("如何重置我的账户密码?")
print(f"Response: {response1['content'][:100]}...")
# 第二次查询 - 语义相近,缓存命中
response2, hit2 = cache.get("密码忘了怎么找回?")
print(f"Cache hit: {hit2}")
3. 生产环境优化版(支持分布式和监控)
import redis
import hashlib
import time
import logging
from datetime import datetime
from collections import defaultdict
from typing import Optional, Dict, Any, List, Tuple
from contextlib import contextmanager
import numpy as np
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProductionSemanticCache:
"""生产级语义缓存 - 支持分布式锁、熔断、监控"""
def __init__(
self,
redis_url: str = "redis://localhost:6379/0",
api_key: str = "YOUR_HOLYSHEEP_API_KEY",
base_url: str = "https://api.holysheep.ai/v1",
embedding_model: str = "text-embedding-3-small",
chat_model: str = "gpt-4.1",
similarity_threshold: float = 0.90,
cache_ttl: int = 86400,
vector_dim: int = 1536,
max_cache_size: int = 5000,
circuit_breaker_threshold: int = 5,
circuit_breaker_timeout: int = 60
):
self.redis_client = redis.from_url(redis_url, decode_responses=False)
from openai import OpenAI
self.openai_client = OpenAI(api_key=api_key, base_url=base_url)
self.embedding_model = embedding_model
self.chat_model = chat_model
self.similarity_threshold = similarity_threshold
self.cache_ttl = cache_ttl
self.vector_dim = vector_dim
self.max_cache_size = max_cache_size
# 熔断器状态
self.failure_count = 0
self.circuit_breaker_threshold = circuit_breaker_threshold
self.circuit_breaker_timeout = circuit_breaker_timeout
self.circuit_open_since = None
# 监控指标
self.metrics = defaultdict(int)
def _check_circuit_breaker(self) -> bool:
"""检查熔断器状态"""
if self.circuit_open_since is None:
return True
if time.time() - self.circuit_open_since > self.circuit_breaker_timeout:
logger.info("🔄 熔断器恢复,尝试请求...")
self.circuit_open_since = None
self.failure_count = 0
return True
return False
def _trigger_circuit_breaker(self) -> None:
"""触发熔断器"""
self.failure_count += 1
if self.failure_count >= self.circuit_breaker_threshold:
self.circuit_open_since = time.time()
logger.warning(f"⚠️ 熔断器开启,持续{self.circuit_breaker_timeout}秒")
@contextmanager
def _distributed_lock(self, key: str, timeout: int = 10):
"""分布式锁,避免缓存击穿"""
lock_key = f"lock:{key}"
lock_acquired = self.redis_client.set(lock_key, "1", nx=True, ex=timeout)
if not lock_acquired:
# 等待锁释放
time.sleep(0.1)
yield False
else:
try:
yield True
finally:
self.redis_client.delete(lock_key)
def _get_embedding(self, text: str) -> Optional[np.ndarray]:
"""获取文本嵌入向量(带重试机制)"""
if not self._check_circuit_breaker():
return None
for attempt in range(3):
try:
response = self.openai_client.embeddings.create(
model=self.embedding_model,
input=text[:8000] # 限制输入长度
)
return np.array(response.data[0].embedding, dtype=np.float32)
except Exception as e:
logger.warning(f"Embedding API错误 (尝试 {attempt+1}/3): {e}")
self._trigger_circuit_breaker()
if attempt < 2:
time.sleep(1 * (attempt + 1))
return None
def _cosine_similarity_batch(self, query_vec: np.ndarray, cache_vecs: List[np.ndarray]) -> List[float]:
"""批量计算余弦相似度"""
if not cache_vecs:
return []
cache_matrix = np.vstack(cache_vecs)
similarities = np.dot(cache_matrix, query_vec) / (
np.linalg.norm(cache_matrix, axis=1) * np.linalg.norm(query_vec) + 1e-8
)
return similarities.tolist()
def get(self, query: str, user_id: Optional[str] = None) -> Dict[str, Any]:
"""获取响应(支持用户级别缓存隔离)"""
start_time = time.time()
cache_key_prefix = f"user:{user_id}:" if user_id else "global:"
cache_hash = hashlib.md5(query.encode()).hexdigest()
# 查询缓存
cached = self._check_cache(cache_key_prefix, cache_hash)
if cached:
self.metrics["cache_hit"] += 1
cached["cache_hit"] = True
cached["latency_ms"] = (time.time() - start_time) * 1000
return cached
# 获取嵌入向量
query_vector = self._get_embedding(query)
if query_vector is None:
return {"error": "无法获取嵌入向量,请检查API配置", "cache_hit": False}
# 语义搜索
semantic_result = self._semantic_search(query_vector, cache_key_prefix)
if semantic_result:
self.metrics["semantic_hit"] += 1
result = semantic_result.copy()
result["cache_hit"] = True
result["match_type"] = "semantic"
result["similarity"] = float(semantic_result.get("similarity", 0))
else:
# 调用API
self.metrics["api_call"] += 1
result = self._call_and_cache_api(query, query_vector, cache_key_prefix, cache_hash)
result["cache_hit"] = False
result["latency_ms"] = (time.time() - start_time) * 1000
return result
def _check_cache(self, prefix: str, cache_hash: str) -> Optional[Dict[str, Any]]:
"""精确匹配检查"""
key = f"{prefix}exact:{cache_hash}"
data = self.redis_client.get(key)
if data:
return eval(data) # 安全风险,实际使用json.loads
def _semantic_search(self, query_vector: np.ndarray, prefix: str) -> Optional[Dict[str, Any]]:
"""语义相似度搜索"""
index_key = f"{prefix}semantic:index"
candidates = self.redis_client.lrange(index_key, 0, 49)
if not candidates:
return None
vectors = []
valid_ids = []
for cid in candidates:
vec_key = f"{prefix}vec:{cid.decode()}"
vec_data = self.redis_client.get(vec_key)
if vec_data:
vectors.append(np.frombuffer(vec_data, dtype=np.float32))
valid_ids.append(cid.decode())
if not vectors:
return None
# 批量计算相似度
similarities = self._cosine_similarity_batch(query_vector, vectors)
# 找最佳匹配
best_idx = np.argmax(similarities)
best_sim = similarities[best_idx]
if best_sim >= self.similarity_threshold:
best_id = valid_ids[best_idx]
resp_key = f"{prefix}resp:{best_id}"
resp_data = self.redis_client.get(resp_key)
if resp_data:
result = eval(resp_data)
result["similarity"] = best_sim
result["cache_id"] = best_id
return result
return None
def _call_and_cache_api(
self,
query: str,
query_vector: np.ndarray,
prefix: str,
cache_hash: str
) -> Dict[str, Any]:
"""调用API并缓存"""
try:
response = self.openai_client.chat.completions.create(
model=self.chat_model,
messages=[
{"role": "system", "content": "你是一个专业的技术助手。"},
{"role": "user", "content": query}
],
temperature=0.7,
max_tokens=2000
)
result = {
"content": response.choices[0].message.content,
"model": response.model,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
},
"timestamp": datetime.now().isoformat()
}
# 生成cache_id并存储
cache_id = f"{cache_hash}_{int(time.time())}"
with self._distributed_lock(f"cache_write:{cache_hash}"):
# 存储向量(二进制格式节省空间)
vec_key = f"{prefix}vec:{cache_id}"
self.redis_client.set(vec_key, query_vector.tobytes(), ex=self.cache_ttl)
# 存储响应
resp_key = f"{prefix}resp:{cache_id}"
self.redis_client.set(resp_key, str(result), ex=self.cache_ttl)
# 更新索引
index_key = f"{prefix}semantic:index"
self.redis_client.lpush(index_key, cache_id)
self.redis_client.ltrim(index_key, 0, self.max_cache_size - 1)
# 精确匹配缓存
exact_key = f"{prefix}exact:{cache_hash}"
self.redis_client.set(exact_key, str(result), ex=self.cache_ttl)
self.failure_count = max(0, self.failure_count - 1)
return result
except Exception as e:
logger.error(f"API调用失败: {e}")
self._trigger_circuit_breaker()
raise
def get_metrics(self) -> Dict[str, int]:
"""获取缓存统计指标"""
total_requests = sum(self.metrics.values())
hit_rate = (
(self.metrics["cache_hit"] + self.metrics["semantic_hit"]) / total_requests
if total_requests > 0 else 0
)
return {
**self.metrics,
"total_requests": total_requests,
"hit_rate": round(hit_rate * 100, 2)
}
使用示例
if