凌晨两点,你的监控面板突然报警——线上服务开始批量返回 429 Too Many Requests 错误。用户反馈 Claude 响应超时,技术团队紧急排查,最后发现是某个开发者误将压测脚本跑到了生产环境,在 30 秒内向 API 发起了 800 次请求。

这不是故事结局,而是你真正需要理解 Rate Limiting 的起点。作为同时运维多个 AI 项目的技术负责人,我见过太多团队因为不了解限流机制,在项目关键节点遭遇 API 服务商封禁,轻则损失数小时开发时间,重则影响产品上线计划。

本文将带你从源码级别理解 AI API 网关的限流实现,并提供可直接落地的 Python/Golang/Node.js 生产级代码方案。

为什么 AI API 必须配置 Rate Limiting

主流 AI API 服务商(BOpenAI、Anthropic、Google 等)的免费层级通常限制在每分钟 60-150 次请求,而付费层级虽有更高配额,但单个 API Key 的 TPM(Tokens Per Minute)和 RPM(Requests Per Minute)限制依然存在。下表展示主流 AI 服务的官方限流参数:

服务商 免费层 RPM 免费层 TPM 付费层 RPM 付费层 TPM
OpenAI GPT-4 50 15,000 500 120,000
Anthropic Claude 25 4,000 100 40,000
Google Gemini 60 60,000 300 1,000,000
DeepSeek V3 120 20,000 1000 200,000

不配置客户端限流的后果:

我曾见过某创业团队在产品首发日因为没有做客户端限流,单日 API 费用账单高达 23,000 元,远超当月预算。这不是技术问题,这是工程意识问题。

Rate Limiting 核心算法实现

生产环境的限流需要同时处理请求频率(RPM)和 Token 吞吐量(TPM),以下是三种主流算法的对比与 Python 实现:

1. 令牌桶算法(Token Bucket)—— 推荐方案

令牌桶是处理突发流量的最佳选择。系统以固定速率向桶中添加令牌,请求到达时消耗令牌,桶满时拒绝请求。这种算法允许一定程度的突发流量,同时保证长期平均速率不超限。

import time
import threading
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
import asyncio


@dataclass
class TokenBucketRateLimiter:
    """
    令牌桶限流器 - 支持 RPM 和 TPM 双维度限流
    """
    rpm_limit: int = 60              # 每分钟最大请求数
    tpm_limit: int = 150000          # 每分钟最大 Token 数
    burst_size: int = 10             # 允许的突发请求数
    
    _request_timestamps: deque = field(default_factory=deque)
    _token_timestamps: deque = field(default_factory=deque)
    _lock: threading.Lock = field(default_factory=threading.Lock)
    _last_refill_time: float = field(default_factory=lambda: time.time())
    
    def __post_init__(self):
        self._lock = threading.Lock()
        self._request_timestamps = deque()
        self._token_timestamps = deque()
    
    def _cleanup_old_timestamps(self, now: float, window: float = 60.0):
        """清理 60 秒外的时间戳"""
        cutoff = now - window
        
        while self._request_timestamps and self._request_timestamps[0] < cutoff:
            self._request_timestamps.popleft()
        
        while self._token_timestamps and self._token_timestamps[0] < cutoff:
            self._token_timestamps.popleft()
    
    def acquire(self, tokens: int = 1, timeout: float = 30.0) -> bool:
        """
        尝试获取限流许可
        
        Args:
            tokens: 本次请求预计消耗的 Token 数
            timeout: 最大等待时间(秒)
        
        Returns:
            True: 获取成功,可以发起请求
            False: 超时放弃
        """
        deadline = time.time() + timeout
        
        while time.time() < deadline:
            with self._lock:
                now = time.time()
                self._cleanup_old_timestamps(now)
                
                current_rpm = len(self._request_timestamps)
                current_tpm = sum(self._token_timestamps)
                
                # 检查是否能立即放行
                if current_rpm < self.rpm_limit and current_tpm + tokens <= self.tpm_limit:
                    self._request_timestamps.append(now)
                    self._token_timestamps.append(tokens)
                    return True
                
                # 计算需要等待的时间
                wait_times = []
                
                if current_rpm >= self.rpm_limit:
                    # 距离最旧请求过去 60 秒的时间
                    oldest_request = self._request_timestamps[0]
                    wait_times.append(oldest_request + 60.0 - now)
                
                if current_tpm + tokens > self.tpm_limit:
                    # 需要等待足够的 Token 配额释放
                    accumulated = 0
                    for t in self._token_timestamps:
                        accumulated += t
                        if accumulated + tokens > self.tpm_limit:
                            break
                    # 这里简化处理,实际需要根据时间戳计算
                    wait_times.append(1.0)  # 至少等待 1 秒
            
            # 释放锁后等待
            wait_time = min(wait_times) if wait_times else 1.0
            time.sleep(min(wait_time, deadline - time.time()))
        
        return False
    
    def get_remaining(self) -> dict:
        """获取当前配额剩余量"""
        now = time.time()
        with self._lock:
            self._cleanup_old_timestamps(now)
            return {
                "rpm_remaining": self.rpm_limit - len(self._request_timestamps),
                "tpm_remaining": self.tpm_limit - sum(self._token_timestamps),
                "rpm_used": len(self._request_timestamps),
                "tpm_used": sum(self._token_timestamps)
            }


使用示例

limiter = TokenBucketRateLimiter(rpm_limit=500, tpm_limit=120000) if limiter.acquire(tokens=500): print("请求通过,正在调用 AI API...") # response = call_ai_api() else: print("配额耗尽,建议等待后重试")

2. 滑动窗口算法(Sliding Window)—— 精度优先方案

import time
from typing import Dict, List, Tuple
from dataclasses import dataclass
import threading


@dataclass
class SlidingWindowRateLimiter:
    """
    滑动窗口限流器 - 提供更精确的限流控制
    适用于对延迟敏感的高并发场景
    """
    max_requests: int
    window_seconds: float = 60.0
    
    _requests: List[float] = field(default_factory=list)
    _lock: threading.Lock = field(default_factory=threading.Lock)
    
    def __post_init__(self):
        self._lock = threading.Lock()
        self._requests = []
    
    def is_allowed(self) -> Tuple[bool, float]:
        """
        检查请求是否允许
        
        Returns:
            (is_allowed, retry_after): 是否允许,及如果拒绝需要等待的秒数
        """
        now = time.time()
        
        with self._lock:
            # 移除窗口外的请求记录
            cutoff = now - self.window_seconds
            self._requests = [t for t in self._requests if t > cutoff]
            
            if len(self._requests) < self.max_requests:
                self._requests.append(now)
                return True, 0.0
            
            # 计算最早请求过期还需多久
            oldest = min(self._requests)
            retry_after = oldest + self.window_seconds - now
            
            return False, max(0.0, retry_after)
    
    def wait_and_acquire(self, timeout: float = 60.0) -> bool:
        """等待直到获取许可或超时"""
        deadline = time.time() + timeout
        
        while time.time() < deadline:
            allowed, wait_time = self.is_allowed()
            
            if allowed:
                return True
            
            sleep_time = min(wait_time, deadline - time.time())
            if sleep_time > 0:
                time.sleep(sleep_time)
        
        return False


class MultiDimensionalLimiter:
    """
    多维度限流器 - 同时限制 RPM 和 TPM
    """
    def __init__(self, rpm: int, tpm: int):
        self.rpm_limiter = SlidingWindowRateLimiter(rpm, 60.0)
        self.tpm_limiter = SlidingWindowRateLimiter(tpm, 60.0)
        self._lock = threading.Lock()
    
    def acquire(self, tokens: int = 1, timeout: float = 30.0) -> Tuple[bool, str]:
        """
        尝试获取限流许可
        
        Returns:
            (success, reason): 是否成功,及失败原因
        """
        deadline = time.time() + timeout
        
        while time.time() < deadline:
            with self._lock:
                rpm_allowed, rpm_wait = self.rpm_limiter.is_allowed()
                
                if not rpm_allowed:
                    time.sleep(min(rpm_wait, deadline - time.time()))
                    continue
                
                # TPM 限流:每个请求按 1 Token 处理,实际需要根据估算
                tpm_allowed, tpm_wait = self.tpm_limiter.is_allowed()
                
                if not tpm_allowed:
                    time.sleep(min(tpm_wait, deadline - time.time()))
                    continue
                
                # 都允许
                self.rpm_limiter._requests.append(time.time())
                self.tpm_limiter._requests.append(time.time())
                return True, ""
        
        return False, "timeout"


实际使用场景:封装 API 调用

class AIRateLimitedClient: """ 带限流的 AI API 客户端 """ def __init__(self, api_key: str, base_url: str, rpm: int = 500, tpm: int = 120000): self.api_key = api_key self.base_url = base_url self.limiter = MultiDimensionalLimiter(rpm, tpm) def chat_completion(self, messages: List[dict], timeout: float = 60.0) -> dict: """发送聊天请求,自动处理限流""" # 估算 Token 数量(简化版,实际应使用 tiktoken) estimated_tokens = sum(len(m.get('content', '')) // 4 for m in messages) + 100 # 尝试获取限流许可 success, reason = self.limiter.acquire(tokens=estimated_tokens, timeout=timeout) if not success: raise RateLimitError(f"无法获取 API 配额: {reason}") # 发起实际请求 return self._do_request(messages) def _do_request(self, messages: List[dict]) -> dict: """实际执行 API 请求""" import requests headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } payload = { "model": "gpt-4o", "messages": messages, "max_tokens": 4096 } response = requests.post( f"{self.base_url}/chat/completions", headers=headers, json=payload, timeout=120 ) if response.status_code == 429: retry_after = int(response.headers.get('Retry-After', 5)) raise RateLimitError(f"API 限流,需要等待 {retry_after} 秒") response.raise_for_status() return response.json() class RateLimitError(Exception): """限流异常""" pass

集成 HolySheep API 的最佳实践

我在多个生产项目中采用 HolySheep AI 作为主要中转服务,其核心优势在于:

以下是与 HolySheep 集成的完整代码示例:

import os
import time
import asyncio
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from collections import deque
import aiohttp
import httpx


============== 配置区 ==============

HOLYSHEEP_API_KEY = "YOUR_HOLYSHEEP_API_KEY" # 从 HolySheep 获取的 API Key HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1" # HolySheep API 端点

HolySheep 各模型价格参考($/M Tokens output)

MODEL_PRICING = { "gpt-4o": 15.0, # $15 / MTok "gpt-4o-mini": 2.50, # $2.50 / MTok "claude-sonnet-4-20250514": 15.0, # $15 / MTok "claude-3-5-sonnet-20241022": 15.0, "gemini-2.5-flash": 2.50, # $2.50 / MTok "deepseek-chat": 0.42, # $0.42 / MTok - 性价比极高 } @dataclass class HolySheepRateLimiter: """ HolySheep API 专用限流器 已根据 HolySheep 各层级配额配置默认参数 """ rpm_limit: int = 1000 tpm_limit: int = 200000 tpd_limit: int = 10000000 # 每日限制 _minute_requests: deque = field(default_factory=lambda: deque()) _minute_tokens: deque = field(default_factory=lambda: deque()) _daily_tokens: deque = field(default_factory=lambda: deque()) _lock: asyncio.Lock = field(default_factory=asyncio.Lock) def __post_init__(self): self._lock = asyncio.Lock() async def acquire(self, estimated_tokens: int = 1000) -> bool: """ 异步获取限流许可 """ async with self._lock: now = time.time() minute_cutoff = now - 60 day_cutoff = now - 86400 # 清理过期记录 while self._minute_requests and self._minute_requests[0] < minute_cutoff: self._minute_requests.popleft() self._minute_tokens.popleft() while self._daily_tokens and self._daily_tokens[0] < day_cutoff: self._daily_tokens.popleft() # 检查配额 current_rpm = len(self._minute_requests) current_tpm = sum(self._minute_tokens) current_tpd = sum(self._daily_tokens) if current_rpm >= self.rpm_limit: oldest = self._minute_requests[0] wait = oldest + 60 - now await asyncio.sleep(max(0, wait)) return await self.acquire(estimated_tokens) if current_tpm + estimated_tokens > self.tpm_limit: await asyncio.sleep(1) return await self.acquire(estimated_tokens) if current_tpd + estimated_tokens > self.tpd_limit: raise Exception("每日 Token 配额已用尽,请明天重试") # 记录本次请求 self._minute_requests.append(now) self._minute_tokens.append(estimated_tokens) self._daily_tokens.append(estimated_tokens) return True class HolySheepAIClient: """ HolySheep AI API 客户端 - 带完整限流和重试机制 """ def __init__( self, api_key: str = HOLYSHEEP_API_KEY, base_url: str = HOLYSHEEP_BASE_URL, rpm: int = 1000, tpm: int = 200000 ): self.api_key = api_key self.base_url = base_url self.rate_limiter = HolySheepRateLimiter(rpm_limit=rpm, tpm_limit=tpm) # 重试配置 self.max_retries = 3 self.retry_base_delay = 1.0 # 基础重试延迟(秒) async def chat_completion( self, model: str, messages: List[Dict[str, Any]], temperature: float = 0.7, max_tokens: Optional[int] = None, **kwargs ) -> Dict[str, Any]: """ 发送聊天补全请求 Args: model: 模型名称(如 'gpt-4o', 'claude-sonnet-4-20250514') messages: 消息列表 temperature: 温度参数 max_tokens: 最大输出 Token 数 """ # 估算 Token 数量 estimated_input_tokens = sum(len(m.get('content', '')) for m in messages) // 4 # 获取限流许可 await self.rate_limiter.acquire(estimated_input_tokens) # 构建请求 payload = { "model": model, "messages": messages, "temperature": temperature, } if max_tokens: payload["max_tokens"] = max_tokens payload.update(kwargs) # 执行请求(带重试) for attempt in range(self.max_retries): try: async with httpx.AsyncClient(timeout=120.0) as client: response = await client.post( f"{self.base_url}/chat/completions", headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" }, json=payload ) if response.status_code == 429: # API 端限流 - 指数退避 retry_after = response.headers.get('Retry-After', str(self.retry_base_delay * (2 ** attempt))) await asyncio.sleep(float(retry_after)) continue if response.status_code == 401: raise AuthenticationError("API Key 无效或已过期,请检查 HolySheep 配置") response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: if attempt < self.max_retries - 1: delay = self.retry_base_delay * (2 ** attempt) await asyncio.sleep(delay) else: raise raise Exception("请求失败,已达到最大重试次数") async def batch_chat( self, requests: List[Dict[str, Any]], concurrency: int = 5 ) -> List[Dict[str, Any]]: """ 批量请求 - 带并发控制 """ semaphore = asyncio.Semaphore(concurrency) async def limited_request(req: Dict[str, Any]) -> Dict[str, Any]: async with semaphore: return await self.chat_completion(**req) tasks = [limited_request(req) for req in requests] return await asyncio.gather(*tasks, return_exceptions=True) class AuthenticationError(Exception): """认证错误""" pass

============== 使用示例 ==============

async def main(): # 初始化客户端 client = HolySheepAIClient( api_key="YOUR_HOLYSHEEP_API_KEY", rpm=1000, # 每分钟 1000 请求 tpm=200000 # 每分钟 200K Token ) try: # 单次请求 response = await client.chat_completion( model="gpt-4o", messages=[ {"role": "system", "content": "你是一个有帮助的助手"}, {"role": "user", "content": "请解释什么是 Rate Limiting"} ], max_tokens=500 ) print(f"响应: {response['choices'][0]['message']['content']}") # 批量请求示例 batch_requests = [ {"model": "gpt-4o", "messages": [{"role": "user", "content": f"问题 {i}"}]} for i in range(10) ] results = await client.batch_chat(batch_requests, concurrency=3) for i, result in enumerate(results): if isinstance(result, dict): print(f"请求 {i} 成功: {result.get('choices', [{}])[0].get('message', {}).get('content', '')[:50]}...") else: print(f"请求 {i} 失败: {result}") except AuthenticationError as e: print(f"认证失败: {e}") print("请访问 https://www.holysheep.ai/register 获取有效的 API Key") except Exception as e: print(f"请求异常: {e}") if __name__ == "__main__": asyncio.run(main())

常见报错排查

在实际部署中,限流相关的错误是开发者最常遇到的挑战。以下是三个核心报错场景及其完整解决方案:

报错 1:429 Too Many Requests

# 错误响应示例

HTTP/1.1 429 Too Many Requests

Retry-After: 5

X-RateLimit-Limit: 1000

X-RateLimit-Remaining: 0

X-RateLimit-Reset: 1735689600

{ "error": { "message": "Rate limit exceeded for Completions API. ", "type": "requests", "code": "rate_limit_exceeded" } }

解决方案:实现指数退避重试

async def request_with_retry(client, payload, max_retries=5): for attempt in range(max_retries): response = await client.chat_completion(**payload) if response.status_code == 429: # 解析 Retry-After 头 retry_after = int(response.headers.get('Retry-After', 1)) # 指数退避 + 随机抖动 import random actual_delay = retry_after * (2 ** attempt) + random.uniform(0, 1) print(f"触发限流,等待 {actual_delay:.2f} 秒后重试(第 {attempt + 1} 次)") await asyncio.sleep(actual_delay) continue return response raise Exception(f"达到最大重试次数 {max_retries},请求失败")

报错 2:401 Unauthorized - Invalid API Key

# 错误响应

HTTP/1.1 401 Unauthorized

{ "error": { "message": "Incorrect API key provided", "type": "invalid_request_error", "code": "invalid_api_key" } }

排查步骤

def validate_api_key(api_key: str) -> bool: """ 验证 API Key 格式和有效性 """ import os # 1. 检查环境变量是否设置 api_key = api_key or os.environ.get('HOLYSHEEP_API_KEY') if not api_key: print("❌ 错误:HOLYSHEEP_API_KEY 环境变量未设置") print("请运行: export HOLYSHEEP_API_KEY='your-key-here'") return False # 2. 检查 Key 格式(HolySheep Key 以 hs_ 开头) if not api_key.startswith('sk-') and not api_key.startswith('hs-'): print("⚠️ 警告:API Key 格式可能不正确") print("HolySheep API Key 应以 'sk-' 开头") # 3. 检查 Key 长度 if len(api_key) < 20: print(f"❌ 错误:API Key 长度异常({len(api_key)} 位)") return False # 4. 测试连接 import httpx try: response = httpx.get( "https://api.holysheep.ai/v1/models", headers={"Authorization": f"Bearer {api_key}"}, timeout=10 ) if response.status_code == 401: print("❌ 错误:API Key 无效或已过期") print("👉 请前往 https://www.holysheep.ai/register 重新获取") return False if response.status_code == 200: print("✅ API Key 验证通过") return True except httpx.ConnectError: print("❌ 连接失败:无法访问 HolySheep API") print("请检查网络代理配置或联系 [email protected]") return False return False

使用

if not validate_api_key("YOUR_HOLYSHEEP_API_KEY"): sys.exit(1)

报错 3:ConnectionError / Timeout

# 错误类型

aiohttp.ClientConnectorError: Cannot connect to host api.holysheep.ai:443

httpx.ConnectTimeout: Connection timeout

完整重试包装器

import asyncio import httpx from typing import Any, Dict, Optional import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ResilientAIClient: """ 带完整错误处理和重试机制的 AI 客户端 """ def __init__(self, api_key: str, base_url: str = "https://api.holysheep.ai/v1"): self.api_key = api_key self.base_url = base_url self.timeout = httpx.Timeout(60.0, connect=10.0) self.max_retries = 3 async def _make_request( self, method: str, endpoint: str, **kwargs ) -> httpx.Response: """ 通用请求方法,带完整错误处理 """ url = f"{self.base_url}{endpoint}" headers = kwargs.pop('headers', {}) headers['Authorization'] = f"Bearer {self.api_key}" async with httpx.AsyncClient(timeout=self.timeout) as client: for attempt in range(self.max_retries): try: response = await client.request(method, url, headers=headers, **kwargs) return response except httpx.ConnectTimeout: logger.warning(f"连接超时(第 {attempt + 1} 次重试)") if attempt < self.max_retries - 1: await asyncio.sleep(2 ** attempt) continue raise ConnectionError("无法连接到 HolySheep API,请检查网络") except httpx.ReadTimeout: logger.warning(f"读取超时(请求处理时间过长,第 {attempt + 1} 次重试)") if attempt < self.max_retries - 1: await asyncio.sleep(2 ** attempt) continue raise ConnectionError("AI 模型响应超时,请尝试减少 max_tokens 参数") except httpx.ConnectError as e: logger.warning(f"连接错误: {e}") if 'Connection refused' in str(e): raise ConnectionError( "无法连接 API,请检查:\n" "1. API 地址是否正确: https://api.holysheep.ai/v1\n" "2. 防火墙/代理设置\n" "3. DNS 解析是否正常" ) if attempt < self.max_retries - 1: await asyncio.sleep(2 ** attempt) continue raise except Exception as e: logger.error(f"未知错误: {type(e).__name__}: {e}") raise raise Exception("请求失败,已达到最大重试次数") async def chat_completion(self, **payload) -> Dict[str, Any]: """ 发送聊天补全请求 """ response = await self._make_request( 'POST', '/chat/completions', json=payload ) if response.status_code == 401: raise AuthenticationError("API Key 无效") if response.status_code == 429: retry_after = response.headers.get('Retry-After', '60') raise RateLimitError(f"触发限流,需等待 {retry_after} 秒") if response.status_code >= 400: raise APIError(f"API 错误 {response.status_code}: {response.text}") return response.json() class APIError(Exception): """API 业务错误""" pass class RateLimitError(Exception): """限流错误""" pass

Golang 实现版本

对于 Golang 项目,以下是等效的限流实现:

package main

import (
    "context"
    "fmt"
    "net/http"
    "sync"
    "time"
    
    "github.com/google/uuid"
)

// TokenBucketRateLimiter 令牌桶限流器 (Golang版)
type TokenBucketRateLimiter struct {
    mu          sync.Mutex
    rpmLimit    int
    tpmLimit    int
    requests    []time.Time
    tokens      []int
    windowSec   float64
}

// NewTokenBucketRateLimiter 创建限流器
func NewTokenBucketRateLimiter(rpm, tpm int) *TokenBucketRateLimiter {
    return &TokenBucketRateLimiter{
        rpmLimit:  rpm,
        tpmLimit:  tpm,
        requests:  make([]time.Time, 0),
        tokens:    make([]int, 0),
        windowSec: 60.0,
    }
}

// cleanup 清理过期记录
func (l *TokenBucketRateLimiter) cleanup(now time.Time) {
    cutoff := now.Add(-time.Duration(l.windowSec * 1e9))
    
    // 清理 requests
    newRequests := make([]time.Time, 0)
    for _, t := range l.requests {
        if t.After(cutoff) {
            newRequests = append(newRequests, t)
        }
    }
    l.requests = newRequests
    
    // 清理 tokens
    newTokens := make([]int, 0)
    for i, t := range l.tokens {
        if i < len(l.requests) {
            newTokens = append(newTokens, t)
        }
    }
    l.tokens = newTokens
}

// Acquire 尝试获取限流许可
func (l *TokenBucketRateLimiter) Acquire(tokens int) (bool, time.Duration) {
    deadline := time.Now().Add(30 * time.Second)
    
    for time.Now().Before(deadline) {
        l.mu.Lock()
        now := time.Now()
        l.cleanup(now)
        
        currentRPM := len(l.requests)
        var currentTPM int
        for _, t := range l.tokens {
            currentTPM += t
        }
        
        // 检查配额
        if currentRPM < l.rpmLimit && currentTPM + tokens <= l.tpmLimit {
            l.requests = append(l.requests, now)
            l.tokens = append(l.tokens, tokens)
            l.mu.Unlock()
            return true, 0
        }
        
        // 计算等待时间
        var waitTime time.Duration = time.Second
        if len(l.requests) > 0 {
            oldest := l.requests[0]
            waitTime = oldest.Add(60 * time.Second).Sub(now)
            if waitTime < 0 {
                waitTime = time.Second
            }
        }
        l.mu.Unlock()
        
        time.Sleep(waitTime)
    }
    
    return false, 30 * time.Second
}

// HolySheepClient HolySheep API 客户端
type HolySheepClient struct {
    APIKey     string
    BaseURL    string
    Limiter    *TokenBucketRateLimiter
    HTTPClient *http.Client
}

// NewHolySheepClient 创建客户端
func NewHolySheepClient(apiKey string) *HolySheepClient {
    return &HolySheepClient{
        APIKey:  apiKey,
        BaseURL: "https://api.holysheep.ai/v1",
        Limiter: NewTokenBucketRateLimiter(1000, 200000), // RPM: 1000, TPM: 200K
        HTTPClient: &http.Client{
            Timeout: 120 * time.Second,
        },
    }
}

// ChatCompletion 聊天补全请求
func (c *HolySheepClient) ChatCompletion(ctx context.Context, model string, messages []map[string]string) (map[string]interface{}, error) {
    // 估算 tokens
    estimatedTokens := 0
    for _, m := range messages {
        estimatedTokens += len(m["content"]) / 4
    }
    
    // 获取限流许可
    ok, wait := c.Limiter.Acquire(estimatedTokens)
    if !ok {
        return nil, fmt.Errorf("限流超时,需要等待 %v", wait)
    }
    
    // 构建请求 (实际使用可使用 golang.org/x/net/webhook 或自定义序列化)
    payload := map[string]interface{}{
        "model":    model,
        "messages": messages,
    }
    
    req, err := http.NewRequestWithContext(ctx, "POST", c.BaseURL+"/chat/completions", nil)
    if err != nil {
        return nil, err
    }
    
    req.Header.Set("Authorization", "Bearer "+c.APIKey)
    req.Header.Set("Content-Type", "application/json")
    
    resp, err := c.HTTPClient.Do(req)
    if err != nil {
        return nil, fmt.Errorf("请求失败: %w", err)
    }
    defer resp.Body.Close()
    
    if resp.StatusCode == 429 {
        return nil, fmt.Errorf("