作为一名经历过双十一零点促销崩塌的技术负责人,我至今记得那个凌晨:AI 客服系统在 23:59 分涌入的 8000 并发请求面前彻底瘫痪,购物车页面的智能推荐全部返回超时错误。那一晚我们损失了 200 万 GMV,更重要的是用户体验的信任度跌至谷底。从那以后,我养成了一个习惯:任何 AI 功能上线前,必须用 Locust 和 k6 做完整的负载测试。今天这篇文章,就是我从血泪教训中总结出的实战方案。
为什么 AI API 负载测试与普通 HTTP 测试截然不同
很多人习惯用 Apache Bench(ab)或 wrk 测试普通接口,这对 AI API 来说是远远不够的。AI 对话 API 有几个独特的挑战:首先是响应延迟不可预测,从 200ms 到 30s 不等;其次是 Token 消耗与响应长度直接挂钩,需要监控吞吐量而非仅仅 QPS;最后是流式输出(Server-Sent Events)场景下的长连接管理。我曾经在压测时忽略了 SSE 的连接复用问题,导致测试结束时大量 TIME_WAIT 连接耗尽系统文件句柄,直接触发 OOM。
Locust:Python 友好的分布式压测利器
Locust 的核心优势
Locust 使用纯 Python 定义用户行为,支持分布式水平扩展,内置实时 Web UI 可以直观看到 TPS、响应时间分布、失败率等核心指标。最重要的是,它允许我用代码精确模拟真实用户的对话流程,而不是简单的固定请求重复。
Locust 实战代码
# locustfile.py
from locust import HttpUser, task, between, events
import json
import random
import time
class AIChatUser(HttpUser):
wait_time = between(1, 3) # 用户思考时间 1-3 秒
def on_start(self):
"""初始化会话,配置 API 凭证"""
self.api_key = "YOUR_HOLYSHEEP_API_KEY"
self.base_url = "https://api.holysheep.ai/v1"
@task(3)
def chat_completion(self):
"""AI 对话任务 - 权重 3,执行频率最高"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
# 模拟真实用户问题分布
prompts = [
"查一下我的订单什么时候发货?",
"推荐一款适合送父母的生日礼物",
"退货流程是什么?需要多长时间?",
"这款手机和另一款对比有什么区别?",
"学生有什么优惠政策?"
]
payload = {
"model": "gpt-4.1",
"messages": [
{"role": "user", "content": random.choice(prompts)}
],
"max_tokens": 500,
"temperature": 0.7
}
start_time = time.time()
with self.client.post(
f"{self.base_url}/chat/completions",
json=payload,
headers=headers,
catch_response=True
) as response:
duration = time.time() - start_time
if response.status_code == 200:
response.success()
elif response.status_code == 429:
response.failure(f"Rate limited - 耗时 {duration:.2f}s")
else:
response.failure(f"请求失败 {response.status_code} - 耗时 {duration:.2f}s")
@task(1)
def embedding_task(self):
"""文档向量化任务 - 权重 1,执行频率较低"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "text-embedding-3-large",
"input": "这是一段需要向量化的产品描述文本,用于 RAG 检索系统"
}
with self.client.post(
f"{self.base_url}/embeddings",
json=payload,
headers=headers,
catch_response=True
) as response:
if response.status_code == 200:
response.success()
else:
response.failure(f"Embedding失败: {response.status_code}")
# 分布式压测启动命令
单机快速测试(headless 模式)
locust -f locustfile.py \
--host=https://api.holysheep.ai \
--users=500 \
--spawn-rate=20 \
--run-time=15m \
--headless \
--html=report.html
分布式 master 节点(控制台)
locust -f locustfile.py \
--master \
--bind-host 0.0.0.0 \
--port 8089
分布式 worker 节点(可部署多台)
locust -f locustfile.py \
--worker \
--master-host 192.168.1.100
Kubernetes 部署配置
kubectl apply -f locust-worker-deployment.yaml
k6:高性能 JavaScript 压测框架
k6 为何更适合大规模压测
k6 由 Grafana Labs 主导开发,核心引擎用 Go 编写,性能远超基于 Python 的 Locust。它使用 JavaScript 定义测试脚本,对前端工程师非常友好。更重要的是,k6 原生支持输出到 Prometheus + Grafana 生态,可以轻松集成到现有的监控体系中。
k6 核心代码实现
// k6-load-test.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate, Trend } from 'k6/metrics';
// 自定义指标
const errorRate = new Rate('errors');
const responseTime = new Trend('ai_response_time');
export const options = {
scenarios: {
// 常规聊天场景:逐步增加用户
chat_ramp: {
executor: 'ramping-vus',
startVUs: 0,
stages: [
{ duration: '2m', target: 300 },
{ duration: '5m', target: 300 },
{ duration: '2m', target: 0 },
],
},
// 突发流量场景:模拟秒杀
burst_test: {
executor: 'constant-vus',
vus: 500,
duration: '3m',
},
},
thresholds: {
http_req_duration: ['p(95)<3000', 'p(99)<5000'],
errors: ['rate<0.05'],
},
};
const BASE_URL = 'https://api.holysheep.ai/v1';
const API_KEY = 'YOUR_HOLYSHEEP_API_KEY';
export default function () {
const headers = {
'Authorization': Bearer ${API_KEY},
'Content-Type': 'application/json',
};
// AI 对话请求
const chatPayload = JSON.stringify({
model: __ENV.MODEL || 'gpt-4.1',
messages: [
{ role: 'user', content: '帮我查询最近的订单状态和物流信息' }
],
max_tokens: 800,
temperature: 0.7,
});
const chatStart = Date.now();
const chatRes = http.post(
${BASE_URL}/chat/completions,
chatPayload,
{ headers }
);
responseTime.add(Date.now() - chatStart);
const chatSuccess = check(chatRes, {
'chat status 200': (r) => r.status === 200,
'chat has content': (r) => {
try {
const body = JSON.parse(r.body);
return body.choices && body.choices[0].message;
} catch (e) {
return false;
}
},
});
errorRate.add(!chatSuccess);
// 模拟用户思考间隔
sleep(Math.random() * 2 + 1);
}
# k6 常用命令
本地运行测试
k6 run k6-load-test.js
输出到 InfluxDB(需配合 Grafana)
k6 run \
--out influxdb=http://localhost:8086/k6 \
k6-load-test.js
云端分布式测试(利用 k6 Cloud)
k6 cloud k6-load-test.js
使用环境变量切换模型
k6 run -e MODEL=deepseek-v3.2 k6-load-test.js
导出 Prometheus 格式指标
k6 run \
--out prometheus \
--tag testid=holysheep-load \
k6-load-test.js
Grafana Dashboard JSON
导入 k6 官方 Dashboard: 10661
混合压测架构设计
我建议采用分层压测策略:用 Locust 做端到端的业务场景模拟(比如完整的客服对话流程),用 k6 做 API 层的高并发基准测试。两种工具配合使用,可以从业务层和协议层两个维度验证系统容量。
# docker-compose.yml - 完整压测环境
version: '3.8'
services:
# Locust Master
locust-master:
image: locustio/locust:2.20.0
ports:
- "8089:8089"
volumes:
- ./locust:/mnt/locust
command: -f /mnt/locust/locustfile.py --master --bind-host 0.0.0.0
networks:
- load-test-net
# Locust Workers (3个节点)
locust-worker:
image: locustio/locust:2.20.0
volumes:
- ./locust:/mnt/locust
command: -f /mnt/locust/locustfile.py --worker --master-host locust-master
deploy:
replicas: 3
depends_on:
- locust-master
networks:
- load-test-net
# k6 Runner
k6-runner:
image: grafana/k6:0.49.0
volumes:
- ./k6:/mnt/k6
environment:
- K6_CLOUD_TOKEN=${K6_CLOUD_TOKEN}
command: run -o cloud /mnt/k6/k6-load-test.js
networks:
- load-test-net
# Prometheus
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
networks:
- load-test-net
# Grafana
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin123
networks:
- load-test-net
networks:
load-test-net:
driver: bridge
HolySheep API 压测实战
我在压测时选择了 HolySheep 作为 API 提供商,主要基于三个原因:首先是汇率优势,¥1=$1 无损结算,比官方 $1=¥7.3 节省超过 85%;其次是国内直连延迟低于 50ms,避免了代理抖动对测试数据的干扰;最后是透明的价格体系,GPT-4.1 $8/MTok、Claude Sonnet 4.5 $15/MTok、Gemini 2.5 Flash $2.50/MTok、DeepSeek V3.2 $0.42/MTok,新注册还送免费额度。下面是我针对 HolySheep 写的专用压测脚本:
# holy_sheep_benchmark.py
import requests
import time
import statistics
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import List
@dataclass
class RequestResult:
latency: float
success: bool
tokens: int
error: str = None
class HolySheepBenchmark:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.results: List[RequestResult] = []
def chat_completion(self, model: str = "gpt-4.1") -> RequestResult:
"""单次对话请求"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": [{"role": "user", "content": "请用50字介绍微服务架构"}],
"max_tokens": 100
}
start = time.time()
try:
resp = requests.post(
f"{self.base_url}/chat/completions",
json=payload,
headers=headers,
timeout=(5, 30)
)
latency = (time.time() - start) * 1000
if resp.status_code == 200:
data = resp.json()
tokens = data.get('usage', {}).get('completion_tokens', 0)
return RequestResult(latency, True, tokens)
else:
return RequestResult(latency, False, 0, f"HTTP {resp.status_code}")
except Exception as e:
return RequestResult(0, False, 0, str(e))
def run_load_test(self, concurrency: int, duration_seconds: int):
"""并发压测主函数"""
print(f"开始压测: 并发 {concurrency}, 持续 {duration_seconds}s")
start_time = time.time()
requests_count = 0
with ThreadPoolExecutor(max_workers=concurrency) as executor:
while time.time() - start_time < duration_seconds:
futures = [
executor.submit(self.chat_completion)
for _ in range(concurrency)
]
for future in as_completed(futures):
self.results.append(future.result())
requests_count += 1
return self.generate_report(requests_count)
def generate_report(self, total_requests: int):
"""生成压测报告"""
latencies = [r.latency for r in self.results if r.success]
errors = [r for r in self.results if not r.success]
report = f"""
=== HolySheep API 压测报告 ===
总请求数: {total_requests}
成功数: {len(latencies)}
失败数: {len(errors)}
成功率: {len(latencies)/total_requests*100:.2f}%
响应延迟 (ms):
平均: {statistics.mean(latencies):.2f}
中位数: {statistics.median(latencies):.2f}
P95: {statistics.quantiles(latencies, n=20)[18]:.2f}
P99: {statistics.quantiles(latencies, n=100)[98]:.2f}
总 Token: {sum(r.tokens for r in self.results)}
========================"""
return report
if __name__ == "__main__":
benchmark = HolySheepBenchmark("YOUR_HOLYSHEEP_API_KEY")
# 阶梯式压测
for concurrency in [50, 100, 200]:
report = benchmark.run_load_test(concurrency, 60)
print(report)
benchmark.results.clear()
常见报错排查
1. 429 Rate Limit 超限
这是压测时最常见的错误。高并发请求触发了 API 的速率限制,返回 429 Too Many Requests。解决方案是实现指数退避重试机制:
# 重试装饰器实现
import time
import random
from functools import wraps
def exponential_backoff_retry(max_retries=5, base_delay=1):
"""指数退避重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
response = func(*args, **kwargs)
if response.status_code == 429:
# 计算退避时间: 1s, 2s, 4s, 8s, 16s
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited, 等待 {delay:.2f