作为在国内提供 AI API 服务的 SRE 工程师,我见过太多团队因为没有建立完善的 SLO 体系,在 API 调用失败时手忙脚乱。本文将详细介绍如何为 AI API 接入场景定义 SLO,并通过代码实现自动化追踪预警。

一、主流 AI API 服务商对比

在深入 SLO 实践之前,我们先通过对比表格快速了解各平台的核心差异:

对比维度 HolySheep AI 官方 API(OpenAI/Anthropic) 其他中转站
汇率优势 ¥1=$1 无损 ¥7.3=$1(溢价530%) ¥5-6=$1
国内延迟 <50ms 直连 200-500ms(需跨境) 80-200ms
充值方式 微信/支付宝/银行卡 国际信用卡 部分支持微信
GPT-4.1 输出价格 $8/MTok $15/MTok $9-12/MTok
Claude Sonnet 4.5 $15/MTok $15/MTok(汇率损耗) $17-20/MTok
Gemini 2.5 Flash $2.50/MTok $2.50/MTok(汇率损耗) $3-4/MTok
DeepSeek V3.2 $0.42/MTok 不支持 $0.5-1/MTok
免费额度 注册即送 $5(需信用卡) 少量或无

从上表可以看出,HolySheep AI 在国内使用场景下具有压倒性优势:汇率无损意味着成本直降85%以上,<50ms 的延迟对于实时应用至关重要,而微信/支付宝充值对国内开发者极为友好。

二、什么是 AI API SLO?为什么必须关注?

SLO(Service Level Objective,服务等级目标)是 SRE 领域的核心概念。对于 AI API 调用场景,SLO 包含以下几个关键维度:

在我过去参与的多个 AI 产品项目中,团队往往只关注功能是否正常,而忽视了 SLO 建设。直到某天 API 限流或超时导致线上故障,才意识到可观测性建设的重要性。

三、AI API SLO 定义与代码实现

3.1 使用 Prometheus + Grafana 构建 SLO 看板

以下是一个完整的 SLO 追踪方案,基于 Prometheus 指标采集和 Grafana 可视化展示:

import requests
import time
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from datetime import datetime, timedelta
import threading

定义 Prometheus 指标

API_REQUEST_COUNT = Counter( 'ai_api_requests_total', 'Total AI API requests', ['model', 'status_code', 'provider'] ) API_REQUEST_LATENCY = Histogram( 'ai_api_request_duration_seconds', 'AI API request latency in seconds', ['model', 'provider'], buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0] ) API_ERROR_COUNT = Counter( 'ai_api_errors_total', 'Total AI API errors', ['model', 'error_type', 'provider'] ) CURRENT_SLO_GAUGE = Gauge( 'ai_api_current_slo_percentage', 'Current SLO availability percentage', ['provider'] ) class AISLOService: """ AI API SLO 追踪服务 用于监控 API 调用成功率、延迟分布、错误率等核心指标 """ def __init__(self, base_url="https://api.holysheep.ai/v1"): self.base_url = base_url self.api_key = "YOUR_HOLYSHEEP_API_KEY" self.slo_window_days = 30 # SLO 统计窗口:30天 self.error_budget = 0.001 # 0.1% 错误预算 # 内部计数器 self._total_requests = 0 self._failed_requests = 0 self._window_start = datetime.now() def call_api(self, model: str, prompt: str, temperature: float = 0.7) -> dict: """ 调用 AI API 并记录 SLO 指标 """ start_time = time.time() headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } payload = { "model": model, "messages": [{"role": "user", "content": prompt}], "temperature": temperature, "max_tokens": 1000 } try: response = requests.post( f"{self.base_url}/chat/completions", headers=headers, json=payload, timeout=30 ) latency = time.time() - start_time # 记录 Prometheus 指标 API_REQUEST_COUNT.labels( model=model, status_code=response.status_code, provider="holysheep" ).inc() API_REQUEST_LATENCY.labels( model=model, provider="holysheep" ).observe(latency) if response.status_code != 200: self._failed_requests += 1 API_ERROR_COUNT.labels( model=model, error_type=f"http_{response.status_code}", provider="holysheep" ).inc() # 更新 SLO 状态 self._total_requests += 1 self._update_slo_gauge() return { "status": "success" if response.status_code == 200 else "failed", "latency_ms": round(latency * 1000, 2), "response": response.json() if response.status_code == 200 else None } except requests.exceptions.Timeout: self._handle_error(model, "timeout") return {"status": "failed", "error": "Request timeout"} except requests.exceptions.ConnectionError as e: self._handle_error(model, "connection_error") return {"status": "failed", "error": f"Connection error: {str(e)}"} except Exception as e: self._handle_error(model, "unknown_error") return {"status": "failed", "error": str(e)} def _handle_error(self, model: str, error_type: str): """处理错误并记录指标""" self._total_requests += 1 self._failed_requests += 1 API_ERROR_COUNT.labels( model=model, error_type=error_type, provider="holysheep" ).inc() self._update_slo_gauge() def _update_slo_gauge(self): """更新当前 SLO 可用性指标""" if self._total_requests > 0: availability = (self._total_requests - self._failed_requests) / self._total_requests CURRENT_SLO_GAUGE.labels(provider="holysheep").set(availability * 100) def get_slo_report(self) -> dict: """ 生成 SLO 报告,包含错误预算消耗情况 """ current_slo = ((self._total_requests - self._failed_requests) / self._total_requests * 100) if self._total_requests > 0 else 100 # 计算错误预算 allowed_errors = self._total_requests * self.error_budget remaining_budget = allowed_errors - self._failed_requests budget_consumed = (self._failed_requests / allowed_errors * 100) if allowed_errors > 0 else 0 return { "provider": "HolySheep AI", "total_requests": self._total_requests, "failed_requests": self._failed_requests, "current_availability": f"{current_slo:.4f}%", "target_slo": "99.9%", "error_budget_remaining": max(0, remaining_budget), "budget_consumed_percentage": min(100, budget_consumed), "status": "healthy" if current_slo >= 99.9 else "warning" if current_slo >= 99 else "critical" }

启动 Prometheus 指标服务器(默认端口 8000)

start_http_server(8000)

初始化 SLO 服务

slo_service = AISLOService()

示例调用

if __name__ == "__main__": print("=" * 50) print("AI API SLO 监控系统已启动") print("Prometheus 指标端点: http://localhost:8000/metrics") print("=" * 50) # 测试调用 result = slo_service.call_api( model="gpt-4.1", prompt="请用一句话解释什么是 SRE" ) print(f"调用结果: {result}") # 获取 SLO 报告 report = slo_service.get_slo_report() print(f"SLO 报告: {report}")

3.2 SLO 告警规则配置(Alertmanager)

以下是 Prometheus Alertmanager 的告警规则配置,当 SLO 低于阈值时自动触发告警:

groups:
  - name: ai_api_slo_alerts
    rules:
      # SLO 可用性告警(低于 99.9%)
      - alert: AIAvailabilityBelowSLO
        expr: ai_api_current_slo_percentage{provider="holysheep"} < 99.9
        for: 5m
        labels:
          severity: warning
          team: sre
        annotations:
          summary: "AI API 可用性低于 SLO 目标"
          description: "当前可用性: {{ $value }}%,目标: 99.9%"
      
      # 严重告警(低于 99%)
      - alert: AIAvailabilityCritical
        expr: ai_api_current_slo_percentage{provider="holysheep"} < 99
        for: 2m
        labels:
          severity: critical
          team: sre
        annotations:
          summary: "AI API 可用性严重低于 SLO"
          description: "当前可用性: {{ $value }}%,请立即检查!"
      
      # P99 延迟告警(超过 2000ms)
      - alert: AILatencyHigh
        expr: histogram_quantile(0.99, ai_api_request_duration_seconds_bucket) > 2
        for: 5m
        labels:
          severity: warning
          team: sre
        annotations:
          summary: "AI API P99 延迟过高"
          description: "P99 延迟: {{ $value | humanizeDuration }}"
      
      # 错误率突增告警
      - alert: AIErrorRateSpike
        expr: rate(ai_api_errors_total[5m]) > 0.01
        for: 2m
        labels:
          severity: critical
          team: sre
        annotations:
          summary: "AI API 错误率突增"
          description: "5分钟内错误率: {{ $value | humanizePercentage }}"
      
      # 错误预算消耗告警(消耗超过 50%)
      - alert: AIErrorsBudgetExhausted
        expr: |
          (ai_api_errors_total / (ai_api_requests_total * 0.001)) > 50
        for: 10m
        labels:
          severity: warning
          team: sre
        annotations:
          summary: "AI API 错误预算消耗超过 50%"
          description: "错误预算已消耗: {{ $value | humanizePercentage }}"
      
      # 超时告警
      - alert: AIRequestTimeout
        expr: rate(ai_api_errors_total{error_type="timeout"}[5m]) > 0.1
        for: 1m
        labels:
          severity: warning
          team: sre
        annotations:
          summary: "AI API 请求超时频繁"
          description: "5分钟内超时次数: {{ $value | humanize }}次/秒"

四、常见报错排查

4.1 错误案例一:API Key 无效或未授权

# 错误响应示例
{
  "error": {
    "message": "Invalid API key provided",
    "type": "invalid_request_error",
    "code": "invalid_api_key"
  }
}

HTTP 状态码: 401 Unauthorized

排查步骤:

1. 检查 API Key 是否正确设置

2. 确认 API Key 未过期或被撤销

3. 验证 base_url 是否配置正确(应为 https://api.holysheep.ai/v1)

4. 检查请求头 Authorization 格式是否正确

正确配置示例:

import os API_KEY = os.environ.get("HOLYSHEEP_API_KEY") # 不要硬编码在代码中 headers = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" }

验证 API Key 有效性

def verify_api_key(api_key: str) -> bool: """验证 API Key 是否有效""" test_url = "https://api.holysheep.ai/v1/models" response = requests.get( test_url, headers={"Authorization": f"Bearer {api_key}"} ) return response.status_code == 200

4.2 错误案例二:限流(Rate Limit Exceeded)

# 错误响应示例
{
  "error": {
    "message": "Rate limit exceeded. Please retry after 5 seconds.",
    "type": "rate_limit_error",
    "code": "rate_limit_exceeded",
    "retry_after_ms": 5000
  }
}

HTTP 状态码: 429 Too Many Requests

解决方案一:实现指数退避重试

import time import random def call_with_retry(url: str, headers: dict, payload: dict, max_retries: int = 3): """带指数退避的重试机制""" for attempt in range(max_retries): try: response = requests.post(url, headers=headers, json=payload, timeout=30) if response.status_code == 429: # 获取重试时间 retry_after = response.headers.get('Retry-After', 5) wait_time = int(retry_after) * (2 ** attempt) + random.uniform(0, 1) print(f"触发限流,等待 {wait_time:.2f} 秒后重试...") time.sleep(wait_time) continue return response except requests.exceptions.RequestException as e: if attempt == max_retries - 1: raise wait_time = 2 ** attempt + random.uniform(0, 1) print(f"请求异常: {e},{wait_time:.2f}秒后重试...") time.sleep(wait_time)

解决方案二:使用信号量控制并发

import asyncio from threading import Semaphore class RateLimitedClient: """限流控制客户端""" def __init__(self, max_concurrent: int = 10, requests_per_second: int = 50): self.semaphore = Semaphore(max_concurrent) self.last_request_time = 0 self.min_interval = 1.0 / requests_per_second def call_api(self, url: str, headers: dict, payload: dict) -> dict: """带限流控制的 API 调用""" with self.semaphore: # 限速:确保请求间隔 now = time.time() time_since_last = now - self.last_request_time if time_since_last < self.min_interval: time.sleep(self.min_interval - time_since_last) self.last_request_time = time.time() response = requests.post(url, headers=headers, json=payload, timeout=30) return response

4.3 错误案例三:模型不支持或请求超时

# 模型不支持错误
{
  "error": {
    "message": "The model 'gpt-5' does not exist or you do not have access to it.",
    "type": "invalid_request_error",
    "code": "model_not_found"
  }
}

HTTP 状态码: 404 Not Found

解决方案:先获取可用模型列表

def list_available_models(api_key: str) -> list: """获取账号可用的模型列表""" url = "https://api.holysheep.ai/v1/models" headers = {"Authorization": f"Bearer {api_key}"} response = requests.get(url, headers=headers) if response.status_code == 200: models = response.json().get("data", []) return [m["id"] for m in models] else: raise Exception(f"获取模型列表失败: {response.text}")

请求超时错误

常见超时原因:

1. 网络延迟(HolySheep 国内 <50ms,通常不是问题)

2. 模型处理时间过长

3. 请求负载过大

解决方案:设置合理的超时时间

TIMEOUT_CONFIG = { "connect_timeout": 5, # 连接超时:5秒 "read_timeout": 60, # 读取超时:60秒(AI 生成可能需要较长时间) } def create_session_with_timeout(): """创建带超时配置的 requests Session""" session = requests.Session() adapter = requests.adapters.HTTPAdapter( max_retries=1, pool_connections=10, pool_maxsize=20 ) session.mount('https://', adapter) return session def call_with_timeout(url: str, headers: dict, payload: dict) -> dict: """带超时控制的 API 调用""" session = create_session_with_timeout() try: response = session.post( url, headers=headers, json=payload, timeout=(TIMEOUT_CONFIG["connect_timeout"], TIMEOUT_CONFIG["read_timeout"]) ) return response except requests.exceptions.Timeout: # 记录超时日志,便于排查 print(f"请求超时,payload token 数可能过大,建议减少 max_tokens") raise except requests.exceptions.ReadTimeout: print("读取超时,可能是服务端处理时间过长") raise

五、实战经验:我的 SLO 建设心得

在我负责的 AI 对话产品中,曾因 SLO 缺失导致过一次严重故障。当时我们直接调用官方 API,由于跨境延迟不稳定(200-500ms 波动),用户频繁遇到超时问题,但我们没有任何监控告警,直到客诉爆发才后知后觉。

后来迁移到 HolySheep AI 后,P99 延迟从 450ms 骤降至 45ms 以内,但我深知不能只依赖低延迟,必须建立完善的 SLO 体系。于是我们构建了 Prometheus + Grafana 监控体系,定义了三层告警