凌晨两点,你的监控面板突然报警——线上服务开始批量返回 429 Too Many Requests 错误。用户反馈 Claude 响应超时,技术团队紧急排查,最后发现是某个开发者误将压测脚本跑到了生产环境,在 30 秒内向 API 发起了 800 次请求。
这不是故事结局,而是你真正需要理解 Rate Limiting 的起点。作为同时运维多个 AI 项目的技术负责人,我见过太多团队因为不了解限流机制,在项目关键节点遭遇 API 服务商封禁,轻则损失数小时开发时间,重则影响产品上线计划。
本文将带你从源码级别理解 AI API 网关的限流实现,并提供可直接落地的 Python/Golang/Node.js 生产级代码方案。
为什么 AI API 必须配置 Rate Limiting
主流 AI API 服务商(BOpenAI、Anthropic、Google 等)的免费层级通常限制在每分钟 60-150 次请求,而付费层级虽有更高配额,但单个 API Key 的 TPM(Tokens Per Minute)和 RPM(Requests Per Minute)限制依然存在。下表展示主流 AI 服务的官方限流参数:
| 服务商 | 免费层 RPM | 免费层 TPM | 付费层 RPM | 付费层 TPM |
|---|---|---|---|---|
| OpenAI GPT-4 | 50 | 15,000 | 500 | 120,000 |
| Anthropic Claude | 25 | 4,000 | 100 | 40,000 |
| Google Gemini | 60 | 60,000 | 300 | 1,000,000 |
| DeepSeek V3 | 120 | 20,000 | 1000 | 200,000 |
不配置客户端限流的后果:
- 触发 429 限流响应:API 返回 Retry-After 头,需要指数退避重试,增加延迟
- 账户被临时封禁:连续超限可能导致 1-24 小时的服务阻断
- Token 配额浪费:突发流量导致大量请求失败,已消耗 Token 无法退回
- 成本失控:无节制的并发请求在计费周期内产生超额账单
我曾见过某创业团队在产品首发日因为没有做客户端限流,单日 API 费用账单高达 23,000 元,远超当月预算。这不是技术问题,这是工程意识问题。
Rate Limiting 核心算法实现
生产环境的限流需要同时处理请求频率(RPM)和 Token 吞吐量(TPM),以下是三种主流算法的对比与 Python 实现:
1. 令牌桶算法(Token Bucket)—— 推荐方案
令牌桶是处理突发流量的最佳选择。系统以固定速率向桶中添加令牌,请求到达时消耗令牌,桶满时拒绝请求。这种算法允许一定程度的突发流量,同时保证长期平均速率不超限。
import time
import threading
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
import asyncio
@dataclass
class TokenBucketRateLimiter:
"""
令牌桶限流器 - 支持 RPM 和 TPM 双维度限流
"""
rpm_limit: int = 60 # 每分钟最大请求数
tpm_limit: int = 150000 # 每分钟最大 Token 数
burst_size: int = 10 # 允许的突发请求数
_request_timestamps: deque = field(default_factory=deque)
_token_timestamps: deque = field(default_factory=deque)
_lock: threading.Lock = field(default_factory=threading.Lock)
_last_refill_time: float = field(default_factory=lambda: time.time())
def __post_init__(self):
self._lock = threading.Lock()
self._request_timestamps = deque()
self._token_timestamps = deque()
def _cleanup_old_timestamps(self, now: float, window: float = 60.0):
"""清理 60 秒外的时间戳"""
cutoff = now - window
while self._request_timestamps and self._request_timestamps[0] < cutoff:
self._request_timestamps.popleft()
while self._token_timestamps and self._token_timestamps[0] < cutoff:
self._token_timestamps.popleft()
def acquire(self, tokens: int = 1, timeout: float = 30.0) -> bool:
"""
尝试获取限流许可
Args:
tokens: 本次请求预计消耗的 Token 数
timeout: 最大等待时间(秒)
Returns:
True: 获取成功,可以发起请求
False: 超时放弃
"""
deadline = time.time() + timeout
while time.time() < deadline:
with self._lock:
now = time.time()
self._cleanup_old_timestamps(now)
current_rpm = len(self._request_timestamps)
current_tpm = sum(self._token_timestamps)
# 检查是否能立即放行
if current_rpm < self.rpm_limit and current_tpm + tokens <= self.tpm_limit:
self._request_timestamps.append(now)
self._token_timestamps.append(tokens)
return True
# 计算需要等待的时间
wait_times = []
if current_rpm >= self.rpm_limit:
# 距离最旧请求过去 60 秒的时间
oldest_request = self._request_timestamps[0]
wait_times.append(oldest_request + 60.0 - now)
if current_tpm + tokens > self.tpm_limit:
# 需要等待足够的 Token 配额释放
accumulated = 0
for t in self._token_timestamps:
accumulated += t
if accumulated + tokens > self.tpm_limit:
break
# 这里简化处理,实际需要根据时间戳计算
wait_times.append(1.0) # 至少等待 1 秒
# 释放锁后等待
wait_time = min(wait_times) if wait_times else 1.0
time.sleep(min(wait_time, deadline - time.time()))
return False
def get_remaining(self) -> dict:
"""获取当前配额剩余量"""
now = time.time()
with self._lock:
self._cleanup_old_timestamps(now)
return {
"rpm_remaining": self.rpm_limit - len(self._request_timestamps),
"tpm_remaining": self.tpm_limit - sum(self._token_timestamps),
"rpm_used": len(self._request_timestamps),
"tpm_used": sum(self._token_timestamps)
}
使用示例
limiter = TokenBucketRateLimiter(rpm_limit=500, tpm_limit=120000)
if limiter.acquire(tokens=500):
print("请求通过,正在调用 AI API...")
# response = call_ai_api()
else:
print("配额耗尽,建议等待后重试")
2. 滑动窗口算法(Sliding Window)—— 精度优先方案
import time
from typing import Dict, List, Tuple
from dataclasses import dataclass
import threading
@dataclass
class SlidingWindowRateLimiter:
"""
滑动窗口限流器 - 提供更精确的限流控制
适用于对延迟敏感的高并发场景
"""
max_requests: int
window_seconds: float = 60.0
_requests: List[float] = field(default_factory=list)
_lock: threading.Lock = field(default_factory=threading.Lock)
def __post_init__(self):
self._lock = threading.Lock()
self._requests = []
def is_allowed(self) -> Tuple[bool, float]:
"""
检查请求是否允许
Returns:
(is_allowed, retry_after): 是否允许,及如果拒绝需要等待的秒数
"""
now = time.time()
with self._lock:
# 移除窗口外的请求记录
cutoff = now - self.window_seconds
self._requests = [t for t in self._requests if t > cutoff]
if len(self._requests) < self.max_requests:
self._requests.append(now)
return True, 0.0
# 计算最早请求过期还需多久
oldest = min(self._requests)
retry_after = oldest + self.window_seconds - now
return False, max(0.0, retry_after)
def wait_and_acquire(self, timeout: float = 60.0) -> bool:
"""等待直到获取许可或超时"""
deadline = time.time() + timeout
while time.time() < deadline:
allowed, wait_time = self.is_allowed()
if allowed:
return True
sleep_time = min(wait_time, deadline - time.time())
if sleep_time > 0:
time.sleep(sleep_time)
return False
class MultiDimensionalLimiter:
"""
多维度限流器 - 同时限制 RPM 和 TPM
"""
def __init__(self, rpm: int, tpm: int):
self.rpm_limiter = SlidingWindowRateLimiter(rpm, 60.0)
self.tpm_limiter = SlidingWindowRateLimiter(tpm, 60.0)
self._lock = threading.Lock()
def acquire(self, tokens: int = 1, timeout: float = 30.0) -> Tuple[bool, str]:
"""
尝试获取限流许可
Returns:
(success, reason): 是否成功,及失败原因
"""
deadline = time.time() + timeout
while time.time() < deadline:
with self._lock:
rpm_allowed, rpm_wait = self.rpm_limiter.is_allowed()
if not rpm_allowed:
time.sleep(min(rpm_wait, deadline - time.time()))
continue
# TPM 限流:每个请求按 1 Token 处理,实际需要根据估算
tpm_allowed, tpm_wait = self.tpm_limiter.is_allowed()
if not tpm_allowed:
time.sleep(min(tpm_wait, deadline - time.time()))
continue
# 都允许
self.rpm_limiter._requests.append(time.time())
self.tpm_limiter._requests.append(time.time())
return True, ""
return False, "timeout"
实际使用场景:封装 API 调用
class AIRateLimitedClient:
"""
带限流的 AI API 客户端
"""
def __init__(self, api_key: str, base_url: str, rpm: int = 500, tpm: int = 120000):
self.api_key = api_key
self.base_url = base_url
self.limiter = MultiDimensionalLimiter(rpm, tpm)
def chat_completion(self, messages: List[dict], timeout: float = 60.0) -> dict:
"""发送聊天请求,自动处理限流"""
# 估算 Token 数量(简化版,实际应使用 tiktoken)
estimated_tokens = sum(len(m.get('content', '')) // 4 for m in messages) + 100
# 尝试获取限流许可
success, reason = self.limiter.acquire(tokens=estimated_tokens, timeout=timeout)
if not success:
raise RateLimitError(f"无法获取 API 配额: {reason}")
# 发起实际请求
return self._do_request(messages)
def _do_request(self, messages: List[dict]) -> dict:
"""实际执行 API 请求"""
import requests
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "gpt-4o",
"messages": messages,
"max_tokens": 4096
}
response = requests.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload,
timeout=120
)
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 5))
raise RateLimitError(f"API 限流,需要等待 {retry_after} 秒")
response.raise_for_status()
return response.json()
class RateLimitError(Exception):
"""限流异常"""
pass
集成 HolySheep API 的最佳实践
我在多个生产项目中采用 HolySheep AI 作为主要中转服务,其核心优势在于:
- 汇率无损:¥1=$1,官方汇率为 ¥7.3=$1,使用 HolySheep 可节省超过 85% 的汇率损耗
- 国内直连:延迟低于 50ms,无需配置代理,适合国内开发者
- 统一接口:同时支持 OpenAI、Claude、Gemini、DeepSeek 等多个模型
以下是与 HolySheep 集成的完整代码示例:
import os
import time
import asyncio
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from collections import deque
import aiohttp
import httpx
============== 配置区 ==============
HOLYSHEEP_API_KEY = "YOUR_HOLYSHEEP_API_KEY" # 从 HolySheep 获取的 API Key
HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1" # HolySheep API 端点
HolySheep 各模型价格参考($/M Tokens output)
MODEL_PRICING = {
"gpt-4o": 15.0, # $15 / MTok
"gpt-4o-mini": 2.50, # $2.50 / MTok
"claude-sonnet-4-20250514": 15.0, # $15 / MTok
"claude-3-5-sonnet-20241022": 15.0,
"gemini-2.5-flash": 2.50, # $2.50 / MTok
"deepseek-chat": 0.42, # $0.42 / MTok - 性价比极高
}
@dataclass
class HolySheepRateLimiter:
"""
HolySheep API 专用限流器
已根据 HolySheep 各层级配额配置默认参数
"""
rpm_limit: int = 1000
tpm_limit: int = 200000
tpd_limit: int = 10000000 # 每日限制
_minute_requests: deque = field(default_factory=lambda: deque())
_minute_tokens: deque = field(default_factory=lambda: deque())
_daily_tokens: deque = field(default_factory=lambda: deque())
_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
def __post_init__(self):
self._lock = asyncio.Lock()
async def acquire(self, estimated_tokens: int = 1000) -> bool:
"""
异步获取限流许可
"""
async with self._lock:
now = time.time()
minute_cutoff = now - 60
day_cutoff = now - 86400
# 清理过期记录
while self._minute_requests and self._minute_requests[0] < minute_cutoff:
self._minute_requests.popleft()
self._minute_tokens.popleft()
while self._daily_tokens and self._daily_tokens[0] < day_cutoff:
self._daily_tokens.popleft()
# 检查配额
current_rpm = len(self._minute_requests)
current_tpm = sum(self._minute_tokens)
current_tpd = sum(self._daily_tokens)
if current_rpm >= self.rpm_limit:
oldest = self._minute_requests[0]
wait = oldest + 60 - now
await asyncio.sleep(max(0, wait))
return await self.acquire(estimated_tokens)
if current_tpm + estimated_tokens > self.tpm_limit:
await asyncio.sleep(1)
return await self.acquire(estimated_tokens)
if current_tpd + estimated_tokens > self.tpd_limit:
raise Exception("每日 Token 配额已用尽,请明天重试")
# 记录本次请求
self._minute_requests.append(now)
self._minute_tokens.append(estimated_tokens)
self._daily_tokens.append(estimated_tokens)
return True
class HolySheepAIClient:
"""
HolySheep AI API 客户端 - 带完整限流和重试机制
"""
def __init__(
self,
api_key: str = HOLYSHEEP_API_KEY,
base_url: str = HOLYSHEEP_BASE_URL,
rpm: int = 1000,
tpm: int = 200000
):
self.api_key = api_key
self.base_url = base_url
self.rate_limiter = HolySheepRateLimiter(rpm_limit=rpm, tpm_limit=tpm)
# 重试配置
self.max_retries = 3
self.retry_base_delay = 1.0 # 基础重试延迟(秒)
async def chat_completion(
self,
model: str,
messages: List[Dict[str, Any]],
temperature: float = 0.7,
max_tokens: Optional[int] = None,
**kwargs
) -> Dict[str, Any]:
"""
发送聊天补全请求
Args:
model: 模型名称(如 'gpt-4o', 'claude-sonnet-4-20250514')
messages: 消息列表
temperature: 温度参数
max_tokens: 最大输出 Token 数
"""
# 估算 Token 数量
estimated_input_tokens = sum(len(m.get('content', '')) for m in messages) // 4
# 获取限流许可
await self.rate_limiter.acquire(estimated_input_tokens)
# 构建请求
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
}
if max_tokens:
payload["max_tokens"] = max_tokens
payload.update(kwargs)
# 执行请求(带重试)
for attempt in range(self.max_retries):
try:
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json=payload
)
if response.status_code == 429:
# API 端限流 - 指数退避
retry_after = response.headers.get('Retry-After',
str(self.retry_base_delay * (2 ** attempt)))
await asyncio.sleep(float(retry_after))
continue
if response.status_code == 401:
raise AuthenticationError("API Key 无效或已过期,请检查 HolySheep 配置")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if attempt < self.max_retries - 1:
delay = self.retry_base_delay * (2 ** attempt)
await asyncio.sleep(delay)
else:
raise
raise Exception("请求失败,已达到最大重试次数")
async def batch_chat(
self,
requests: List[Dict[str, Any]],
concurrency: int = 5
) -> List[Dict[str, Any]]:
"""
批量请求 - 带并发控制
"""
semaphore = asyncio.Semaphore(concurrency)
async def limited_request(req: Dict[str, Any]) -> Dict[str, Any]:
async with semaphore:
return await self.chat_completion(**req)
tasks = [limited_request(req) for req in requests]
return await asyncio.gather(*tasks, return_exceptions=True)
class AuthenticationError(Exception):
"""认证错误"""
pass
============== 使用示例 ==============
async def main():
# 初始化客户端
client = HolySheepAIClient(
api_key="YOUR_HOLYSHEEP_API_KEY",
rpm=1000, # 每分钟 1000 请求
tpm=200000 # 每分钟 200K Token
)
try:
# 单次请求
response = await client.chat_completion(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一个有帮助的助手"},
{"role": "user", "content": "请解释什么是 Rate Limiting"}
],
max_tokens=500
)
print(f"响应: {response['choices'][0]['message']['content']}")
# 批量请求示例
batch_requests = [
{"model": "gpt-4o", "messages": [{"role": "user", "content": f"问题 {i}"}]}
for i in range(10)
]
results = await client.batch_chat(batch_requests, concurrency=3)
for i, result in enumerate(results):
if isinstance(result, dict):
print(f"请求 {i} 成功: {result.get('choices', [{}])[0].get('message', {}).get('content', '')[:50]}...")
else:
print(f"请求 {i} 失败: {result}")
except AuthenticationError as e:
print(f"认证失败: {e}")
print("请访问 https://www.holysheep.ai/register 获取有效的 API Key")
except Exception as e:
print(f"请求异常: {e}")
if __name__ == "__main__":
asyncio.run(main())
常见报错排查
在实际部署中,限流相关的错误是开发者最常遇到的挑战。以下是三个核心报错场景及其完整解决方案:
报错 1:429 Too Many Requests
# 错误响应示例
HTTP/1.1 429 Too Many Requests
Retry-After: 5
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1735689600
{
"error": {
"message": "Rate limit exceeded for Completions API. ",
"type": "requests",
"code": "rate_limit_exceeded"
}
}
解决方案:实现指数退避重试
async def request_with_retry(client, payload, max_retries=5):
for attempt in range(max_retries):
response = await client.chat_completion(**payload)
if response.status_code == 429:
# 解析 Retry-After 头
retry_after = int(response.headers.get('Retry-After', 1))
# 指数退避 + 随机抖动
import random
actual_delay = retry_after * (2 ** attempt) + random.uniform(0, 1)
print(f"触发限流,等待 {actual_delay:.2f} 秒后重试(第 {attempt + 1} 次)")
await asyncio.sleep(actual_delay)
continue
return response
raise Exception(f"达到最大重试次数 {max_retries},请求失败")
报错 2:401 Unauthorized - Invalid API Key
# 错误响应
HTTP/1.1 401 Unauthorized
{
"error": {
"message": "Incorrect API key provided",
"type": "invalid_request_error",
"code": "invalid_api_key"
}
}
排查步骤
def validate_api_key(api_key: str) -> bool:
"""
验证 API Key 格式和有效性
"""
import os
# 1. 检查环境变量是否设置
api_key = api_key or os.environ.get('HOLYSHEEP_API_KEY')
if not api_key:
print("❌ 错误:HOLYSHEEP_API_KEY 环境变量未设置")
print("请运行: export HOLYSHEEP_API_KEY='your-key-here'")
return False
# 2. 检查 Key 格式(HolySheep Key 以 hs_ 开头)
if not api_key.startswith('sk-') and not api_key.startswith('hs-'):
print("⚠️ 警告:API Key 格式可能不正确")
print("HolySheep API Key 应以 'sk-' 开头")
# 3. 检查 Key 长度
if len(api_key) < 20:
print(f"❌ 错误:API Key 长度异常({len(api_key)} 位)")
return False
# 4. 测试连接
import httpx
try:
response = httpx.get(
"https://api.holysheep.ai/v1/models",
headers={"Authorization": f"Bearer {api_key}"},
timeout=10
)
if response.status_code == 401:
print("❌ 错误:API Key 无效或已过期")
print("👉 请前往 https://www.holysheep.ai/register 重新获取")
return False
if response.status_code == 200:
print("✅ API Key 验证通过")
return True
except httpx.ConnectError:
print("❌ 连接失败:无法访问 HolySheep API")
print("请检查网络代理配置或联系 [email protected]")
return False
return False
使用
if not validate_api_key("YOUR_HOLYSHEEP_API_KEY"):
sys.exit(1)
报错 3:ConnectionError / Timeout
# 错误类型
aiohttp.ClientConnectorError: Cannot connect to host api.holysheep.ai:443
httpx.ConnectTimeout: Connection timeout
完整重试包装器
import asyncio
import httpx
from typing import Any, Dict, Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ResilientAIClient:
"""
带完整错误处理和重试机制的 AI 客户端
"""
def __init__(self, api_key: str, base_url: str = "https://api.holysheep.ai/v1"):
self.api_key = api_key
self.base_url = base_url
self.timeout = httpx.Timeout(60.0, connect=10.0)
self.max_retries = 3
async def _make_request(
self,
method: str,
endpoint: str,
**kwargs
) -> httpx.Response:
"""
通用请求方法,带完整错误处理
"""
url = f"{self.base_url}{endpoint}"
headers = kwargs.pop('headers', {})
headers['Authorization'] = f"Bearer {self.api_key}"
async with httpx.AsyncClient(timeout=self.timeout) as client:
for attempt in range(self.max_retries):
try:
response = await client.request(method, url, headers=headers, **kwargs)
return response
except httpx.ConnectTimeout:
logger.warning(f"连接超时(第 {attempt + 1} 次重试)")
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
raise ConnectionError("无法连接到 HolySheep API,请检查网络")
except httpx.ReadTimeout:
logger.warning(f"读取超时(请求处理时间过长,第 {attempt + 1} 次重试)")
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
raise ConnectionError("AI 模型响应超时,请尝试减少 max_tokens 参数")
except httpx.ConnectError as e:
logger.warning(f"连接错误: {e}")
if 'Connection refused' in str(e):
raise ConnectionError(
"无法连接 API,请检查:\n"
"1. API 地址是否正确: https://api.holysheep.ai/v1\n"
"2. 防火墙/代理设置\n"
"3. DNS 解析是否正常"
)
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
raise
except Exception as e:
logger.error(f"未知错误: {type(e).__name__}: {e}")
raise
raise Exception("请求失败,已达到最大重试次数")
async def chat_completion(self, **payload) -> Dict[str, Any]:
"""
发送聊天补全请求
"""
response = await self._make_request(
'POST',
'/chat/completions',
json=payload
)
if response.status_code == 401:
raise AuthenticationError("API Key 无效")
if response.status_code == 429:
retry_after = response.headers.get('Retry-After', '60')
raise RateLimitError(f"触发限流,需等待 {retry_after} 秒")
if response.status_code >= 400:
raise APIError(f"API 错误 {response.status_code}: {response.text}")
return response.json()
class APIError(Exception):
"""API 业务错误"""
pass
class RateLimitError(Exception):
"""限流错误"""
pass
Golang 实现版本
对于 Golang 项目,以下是等效的限流实现:
package main
import (
"context"
"fmt"
"net/http"
"sync"
"time"
"github.com/google/uuid"
)
// TokenBucketRateLimiter 令牌桶限流器 (Golang版)
type TokenBucketRateLimiter struct {
mu sync.Mutex
rpmLimit int
tpmLimit int
requests []time.Time
tokens []int
windowSec float64
}
// NewTokenBucketRateLimiter 创建限流器
func NewTokenBucketRateLimiter(rpm, tpm int) *TokenBucketRateLimiter {
return &TokenBucketRateLimiter{
rpmLimit: rpm,
tpmLimit: tpm,
requests: make([]time.Time, 0),
tokens: make([]int, 0),
windowSec: 60.0,
}
}
// cleanup 清理过期记录
func (l *TokenBucketRateLimiter) cleanup(now time.Time) {
cutoff := now.Add(-time.Duration(l.windowSec * 1e9))
// 清理 requests
newRequests := make([]time.Time, 0)
for _, t := range l.requests {
if t.After(cutoff) {
newRequests = append(newRequests, t)
}
}
l.requests = newRequests
// 清理 tokens
newTokens := make([]int, 0)
for i, t := range l.tokens {
if i < len(l.requests) {
newTokens = append(newTokens, t)
}
}
l.tokens = newTokens
}
// Acquire 尝试获取限流许可
func (l *TokenBucketRateLimiter) Acquire(tokens int) (bool, time.Duration) {
deadline := time.Now().Add(30 * time.Second)
for time.Now().Before(deadline) {
l.mu.Lock()
now := time.Now()
l.cleanup(now)
currentRPM := len(l.requests)
var currentTPM int
for _, t := range l.tokens {
currentTPM += t
}
// 检查配额
if currentRPM < l.rpmLimit && currentTPM + tokens <= l.tpmLimit {
l.requests = append(l.requests, now)
l.tokens = append(l.tokens, tokens)
l.mu.Unlock()
return true, 0
}
// 计算等待时间
var waitTime time.Duration = time.Second
if len(l.requests) > 0 {
oldest := l.requests[0]
waitTime = oldest.Add(60 * time.Second).Sub(now)
if waitTime < 0 {
waitTime = time.Second
}
}
l.mu.Unlock()
time.Sleep(waitTime)
}
return false, 30 * time.Second
}
// HolySheepClient HolySheep API 客户端
type HolySheepClient struct {
APIKey string
BaseURL string
Limiter *TokenBucketRateLimiter
HTTPClient *http.Client
}
// NewHolySheepClient 创建客户端
func NewHolySheepClient(apiKey string) *HolySheepClient {
return &HolySheepClient{
APIKey: apiKey,
BaseURL: "https://api.holysheep.ai/v1",
Limiter: NewTokenBucketRateLimiter(1000, 200000), // RPM: 1000, TPM: 200K
HTTPClient: &http.Client{
Timeout: 120 * time.Second,
},
}
}
// ChatCompletion 聊天补全请求
func (c *HolySheepClient) ChatCompletion(ctx context.Context, model string, messages []map[string]string) (map[string]interface{}, error) {
// 估算 tokens
estimatedTokens := 0
for _, m := range messages {
estimatedTokens += len(m["content"]) / 4
}
// 获取限流许可
ok, wait := c.Limiter.Acquire(estimatedTokens)
if !ok {
return nil, fmt.Errorf("限流超时,需要等待 %v", wait)
}
// 构建请求 (实际使用可使用 golang.org/x/net/webhook 或自定义序列化)
payload := map[string]interface{}{
"model": model,
"messages": messages,
}
req, err := http.NewRequestWithContext(ctx, "POST", c.BaseURL+"/chat/completions", nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.APIKey)
req.Header.Set("Content-Type", "application/json")
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("请求失败: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode == 429 {
return nil, fmt.Errorf("