作为一名经历过双十一零点促销崩塌的技术负责人,我至今记得那个凌晨:AI 客服系统在 23:59 分涌入的 8000 并发请求面前彻底瘫痪,购物车页面的智能推荐全部返回超时错误。那一晚我们损失了 200 万 GMV,更重要的是用户体验的信任度跌至谷底。从那以后,我养成了一个习惯:任何 AI 功能上线前,必须用 Locust 和 k6 做完整的负载测试。今天这篇文章,就是我从血泪教训中总结出的实战方案。

为什么 AI API 负载测试与普通 HTTP 测试截然不同

很多人习惯用 Apache Bench(ab)或 wrk 测试普通接口,这对 AI API 来说是远远不够的。AI 对话 API 有几个独特的挑战:首先是响应延迟不可预测,从 200ms 到 30s 不等;其次是 Token 消耗与响应长度直接挂钩,需要监控吞吐量而非仅仅 QPS;最后是流式输出(Server-Sent Events)场景下的长连接管理。我曾经在压测时忽略了 SSE 的连接复用问题,导致测试结束时大量 TIME_WAIT 连接耗尽系统文件句柄,直接触发 OOM。

Locust:Python 友好的分布式压测利器

Locust 的核心优势

Locust 使用纯 Python 定义用户行为,支持分布式水平扩展,内置实时 Web UI 可以直观看到 TPS、响应时间分布、失败率等核心指标。最重要的是,它允许我用代码精确模拟真实用户的对话流程,而不是简单的固定请求重复。

Locust 实战代码

# locustfile.py
from locust import HttpUser, task, between, events
import json
import random
import time

class AIChatUser(HttpUser):
    wait_time = between(1, 3)  # 用户思考时间 1-3 秒
    
    def on_start(self):
        """初始化会话,配置 API 凭证"""
        self.api_key = "YOUR_HOLYSHEEP_API_KEY"
        self.base_url = "https://api.holysheep.ai/v1"
    
    @task(3)
    def chat_completion(self):
        """AI 对话任务 - 权重 3,执行频率最高"""
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        # 模拟真实用户问题分布
        prompts = [
            "查一下我的订单什么时候发货?",
            "推荐一款适合送父母的生日礼物",
            "退货流程是什么?需要多长时间?",
            "这款手机和另一款对比有什么区别?",
            "学生有什么优惠政策?"
        ]
        
        payload = {
            "model": "gpt-4.1",
            "messages": [
                {"role": "user", "content": random.choice(prompts)}
            ],
            "max_tokens": 500,
            "temperature": 0.7
        }
        
        start_time = time.time()
        with self.client.post(
            f"{self.base_url}/chat/completions",
            json=payload,
            headers=headers,
            catch_response=True
        ) as response:
            duration = time.time() - start_time
            
            if response.status_code == 200:
                response.success()
            elif response.status_code == 429:
                response.failure(f"Rate limited - 耗时 {duration:.2f}s")
            else:
                response.failure(f"请求失败 {response.status_code} - 耗时 {duration:.2f}s")
    
    @task(1)
    def embedding_task(self):
        """文档向量化任务 - 权重 1,执行频率较低"""
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": "text-embedding-3-large",
            "input": "这是一段需要向量化的产品描述文本,用于 RAG 检索系统"
        }
        
        with self.client.post(
            f"{self.base_url}/embeddings",
            json=payload,
            headers=headers,
            catch_response=True
        ) as response:
            if response.status_code == 200:
                response.success()
            else:
                response.failure(f"Embedding失败: {response.status_code}")
# 分布式压测启动命令

单机快速测试(headless 模式)

locust -f locustfile.py \ --host=https://api.holysheep.ai \ --users=500 \ --spawn-rate=20 \ --run-time=15m \ --headless \ --html=report.html

分布式 master 节点(控制台)

locust -f locustfile.py \ --master \ --bind-host 0.0.0.0 \ --port 8089

分布式 worker 节点(可部署多台)

locust -f locustfile.py \ --worker \ --master-host 192.168.1.100

Kubernetes 部署配置

kubectl apply -f locust-worker-deployment.yaml

k6:高性能 JavaScript 压测框架

k6 为何更适合大规模压测

k6 由 Grafana Labs 主导开发,核心引擎用 Go 编写,性能远超基于 Python 的 Locust。它使用 JavaScript 定义测试脚本,对前端工程师非常友好。更重要的是,k6 原生支持输出到 Prometheus + Grafana 生态,可以轻松集成到现有的监控体系中。

k6 核心代码实现

// k6-load-test.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate, Trend } from 'k6/metrics';

// 自定义指标
const errorRate = new Rate('errors');
const responseTime = new Trend('ai_response_time');

export const options = {
  scenarios: {
    // 常规聊天场景:逐步增加用户
    chat_ramp: {
      executor: 'ramping-vus',
      startVUs: 0,
      stages: [
        { duration: '2m', target: 300 },
        { duration: '5m', target: 300 },
        { duration: '2m', target: 0 },
      ],
    },
    // 突发流量场景:模拟秒杀
    burst_test: {
      executor: 'constant-vus',
      vus: 500,
      duration: '3m',
    },
  },
  thresholds: {
    http_req_duration: ['p(95)<3000', 'p(99)<5000'],
    errors: ['rate<0.05'],
  },
};

const BASE_URL = 'https://api.holysheep.ai/v1';
const API_KEY = 'YOUR_HOLYSHEEP_API_KEY';

export default function () {
  const headers = {
    'Authorization': Bearer ${API_KEY},
    'Content-Type': 'application/json',
  };

  // AI 对话请求
  const chatPayload = JSON.stringify({
    model: __ENV.MODEL || 'gpt-4.1',
    messages: [
      { role: 'user', content: '帮我查询最近的订单状态和物流信息' }
    ],
    max_tokens: 800,
    temperature: 0.7,
  });

  const chatStart = Date.now();
  const chatRes = http.post(
    ${BASE_URL}/chat/completions,
    chatPayload,
    { headers }
  );

  responseTime.add(Date.now() - chatStart);

  const chatSuccess = check(chatRes, {
    'chat status 200': (r) => r.status === 200,
    'chat has content': (r) => {
      try {
        const body = JSON.parse(r.body);
        return body.choices && body.choices[0].message;
      } catch (e) {
        return false;
      }
    },
  });

  errorRate.add(!chatSuccess);
  
  // 模拟用户思考间隔
  sleep(Math.random() * 2 + 1);
}
# k6 常用命令

本地运行测试

k6 run k6-load-test.js

输出到 InfluxDB(需配合 Grafana)

k6 run \ --out influxdb=http://localhost:8086/k6 \ k6-load-test.js

云端分布式测试(利用 k6 Cloud)

k6 cloud k6-load-test.js

使用环境变量切换模型

k6 run -e MODEL=deepseek-v3.2 k6-load-test.js

导出 Prometheus 格式指标

k6 run \ --out prometheus \ --tag testid=holysheep-load \ k6-load-test.js

Grafana Dashboard JSON

导入 k6 官方 Dashboard: 10661

混合压测架构设计

我建议采用分层压测策略:用 Locust 做端到端的业务场景模拟(比如完整的客服对话流程),用 k6 做 API 层的高并发基准测试。两种工具配合使用,可以从业务层和协议层两个维度验证系统容量。

# docker-compose.yml - 完整压测环境
version: '3.8'

services:
  # Locust Master
  locust-master:
    image: locustio/locust:2.20.0
    ports:
      - "8089:8089"
    volumes:
      - ./locust:/mnt/locust
    command: -f /mnt/locust/locustfile.py --master --bind-host 0.0.0.0
    networks:
      - load-test-net

  # Locust Workers (3个节点)
  locust-worker:
    image: locustio/locust:2.20.0
    volumes:
      - ./locust:/mnt/locust
    command: -f /mnt/locust/locustfile.py --worker --master-host locust-master
    deploy:
      replicas: 3
    depends_on:
      - locust-master
    networks:
      - load-test-net

  # k6 Runner
  k6-runner:
    image: grafana/k6:0.49.0
    volumes:
      - ./k6:/mnt/k6
    environment:
      - K6_CLOUD_TOKEN=${K6_CLOUD_TOKEN}
    command: run -o cloud /mnt/k6/k6-load-test.js
    networks:
      - load-test-net

  # Prometheus
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    networks:
      - load-test-net

  # Grafana
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin123
    networks:
      - load-test-net

networks:
  load-test-net:
    driver: bridge

HolySheep API 压测实战

我在压测时选择了 HolySheep 作为 API 提供商,主要基于三个原因:首先是汇率优势,¥1=$1 无损结算,比官方 $1=¥7.3 节省超过 85%;其次是国内直连延迟低于 50ms,避免了代理抖动对测试数据的干扰;最后是透明的价格体系,GPT-4.1 $8/MTok、Claude Sonnet 4.5 $15/MTok、Gemini 2.5 Flash $2.50/MTok、DeepSeek V3.2 $0.42/MTok,新注册还送免费额度。下面是我针对 HolySheep 写的专用压测脚本:

# holy_sheep_benchmark.py
import requests
import time
import statistics
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import List

@dataclass
class RequestResult:
    latency: float
    success: bool
    tokens: int
    error: str = None

class HolySheepBenchmark:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.holysheep.ai/v1"
        self.results: List[RequestResult] = []
    
    def chat_completion(self, model: str = "gpt-4.1") -> RequestResult:
        """单次对话请求"""
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        payload = {
            "model": model,
            "messages": [{"role": "user", "content": "请用50字介绍微服务架构"}],
            "max_tokens": 100
        }
        
        start = time.time()
        try:
            resp = requests.post(
                f"{self.base_url}/chat/completions",
                json=payload,
                headers=headers,
                timeout=(5, 30)
            )
            latency = (time.time() - start) * 1000
            
            if resp.status_code == 200:
                data = resp.json()
                tokens = data.get('usage', {}).get('completion_tokens', 0)
                return RequestResult(latency, True, tokens)
            else:
                return RequestResult(latency, False, 0, f"HTTP {resp.status_code}")
        except Exception as e:
            return RequestResult(0, False, 0, str(e))
    
    def run_load_test(self, concurrency: int, duration_seconds: int):
        """并发压测主函数"""
        print(f"开始压测: 并发 {concurrency}, 持续 {duration_seconds}s")
        start_time = time.time()
        requests_count = 0
        
        with ThreadPoolExecutor(max_workers=concurrency) as executor:
            while time.time() - start_time < duration_seconds:
                futures = [
                    executor.submit(self.chat_completion)
                    for _ in range(concurrency)
                ]
                for future in as_completed(futures):
                    self.results.append(future.result())
                    requests_count += 1
        
        return self.generate_report(requests_count)
    
    def generate_report(self, total_requests: int):
        """生成压测报告"""
        latencies = [r.latency for r in self.results if r.success]
        errors = [r for r in self.results if not r.success]
        
        report = f"""
=== HolySheep API 压测报告 ===
总请求数: {total_requests}
成功数: {len(latencies)}
失败数: {len(errors)}
成功率: {len(latencies)/total_requests*100:.2f}%

响应延迟 (ms):
  平均: {statistics.mean(latencies):.2f}
  中位数: {statistics.median(latencies):.2f}
  P95: {statistics.quantiles(latencies, n=20)[18]:.2f}
  P99: {statistics.quantiles(latencies, n=100)[98]:.2f}

总 Token: {sum(r.tokens for r in self.results)}
========================"""
        return report

if __name__ == "__main__":
    benchmark = HolySheepBenchmark("YOUR_HOLYSHEEP_API_KEY")
    
    # 阶梯式压测
    for concurrency in [50, 100, 200]:
        report = benchmark.run_load_test(concurrency, 60)
        print(report)
        benchmark.results.clear()

常见报错排查

1. 429 Rate Limit 超限

这是压测时最常见的错误。高并发请求触发了 API 的速率限制,返回 429 Too Many Requests。解决方案是实现指数退避重试机制:

# 重试装饰器实现
import time
import random
from functools import wraps

def exponential_backoff_retry(max_retries=5, base_delay=1):
    """指数退避重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    response = func(*args, **kwargs)
                    if response.status_code == 429:
                        # 计算退避时间: 1s, 2s, 4s, 8s, 16s
                        delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                        print(f"Rate limited, 等待 {delay:.2f