AI应用已进入生产环境的关键时期。无论是电商平台的智能客服、企业知识库的RAG系统,还是个人开发者的创新项目,高可用性已成为不可或缺的技术要求。本稿では、HolySheep AIを活用した模型服务的健康检查机制と自动故障切换の設計実装について、实战经验に基づいて解説します。
なぜ健康检查と故障切换が必要か
AI模型服务の可用性面临着诸多挑战:
- API延迟抖动:网络不稳定导致响应时间不可预测
- 服务超时:高负载时模型推理超时
- 配额限制:API调用配额耗尽导致服务中断
- 地域差异:不同地区访问质量差异显著
私自身、初めてAI客服システムを導入した際、夜間のトラフィック急増でAPIがタイムアウトし、ユーザーからの投诉が杀到した经验があります。この教训から、 Automatic Failover架构の重要さを痛感しました。
实战案例:电商AI客服系统
私が担当したECサイトのAI客服プロジェクトでは、以下の構成を採用しました:
- メインAPI:HolySheep AI(GPT-4.1対応、¥1=$1のコスト効率)
- フォールバック:备用模型服务
- レイテンシ要件:P99 < 200ms
完整健康检查实现
"""
AI Model Service Health Check & Auto Failover System
支持 HolySheep AI / OpenAI 兼容 API
"""
import asyncio
import httpx
import time
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
from enum import Enum
from collections import defaultdict
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class ModelEndpoint:
name: str
base_url: str
api_key: str
model: str
priority: int = 1
max_retries: int = 3
timeout: float = 30.0
health_score: float = 100.0
consecutive_failures: int = 0
last_success_time: float = field(default_factory=time.time)
last_failure_time: float = 0.0
avg_latency_ms: float = 0.0
request_count: int = 0
# HolySheep AI 特有的优势
supports_streaming: bool = True
supports_function_call: bool = True
class HealthCheckResult:
def __init__(self, endpoint: ModelEndpoint):
self.endpoint = endpoint
self.success: bool = False
self.latency_ms: float = 0.0
self.error_message: Optional[str] = None
self.timestamp: float = time.time()
class ModelServiceHealthChecker:
"""AI模型服务健康检查器"""
def __init__(self, check_interval: float = 30.0):
self.endpoints: List[ModelEndpoint] = []
self.check_interval = check_interval
self._running = False
self._health_history: Dict[str, List[HealthCheckResult]] = defaultdict(list)
def add_endpoint(self, endpoint: ModelEndpoint):
"""添加模型服务节点"""
self.endpoints.append(endpoint)
# 按优先级排序
self.endpoints.sort(key=lambda x: x.priority, reverse=True)
logger.info(f"Added endpoint: {endpoint.name} ({endpoint.base_url})")
async def health_check_single(self, endpoint: ModelEndpoint) -> HealthCheckResult:
"""单次健康检查"""
result = HealthCheckResult(endpoint)
start_time = time.time()
try:
async with httpx.AsyncClient(timeout=endpoint.timeout) as client:
response = await client.post(
f"{endpoint.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {endpoint.api_key}",
"Content-Type": "application/json"
},
json={
"model": endpoint.model,
"messages": [{"role": "user", "content": "ping"}],
"max_tokens": 1
}
)
result.latency_ms = (time.time() - start_time) * 1000
if response.status_code == 200:
result.success = True
endpoint.consecutive_failures = 0
endpoint.last_success_time = time.time()
# 更新平均延迟(指数移动平均)
alpha = 0.2
endpoint.avg_latency_ms = (
alpha * result.latency_ms +
(1 - alpha) * endpoint.avg_latency_ms
)
# 计算健康分数
self._update_health_score(endpoint, result)
logger.debug(
f"[{endpoint.name}] Health check OK - "
f"Latency: {result.latency_ms:.2f}ms, "
f"Score: {endpoint.health_score:.1f}"
)
else:
result.error_message = f"HTTP {response.status_code}"
self._handle_failure(endpoint, result)
except httpx.TimeoutException:
result.error_message = "Timeout"
self._handle_failure(endpoint, result)
except httpx.ConnectError as e:
result.error_message = f"Connection error: {str(e)}"
self._handle_failure(endpoint, result)
except Exception as e:
result.error_message = f"Unexpected error: {str(e)}"
self._handle_failure(endpoint, result)
return result
def _handle_failure(self, endpoint: ModelEndpoint, result: HealthCheckResult):
"""处理失败情况"""
endpoint.consecutive_failures += 1
endpoint.last_failure_time = time.time()
# 连续失败超过阈值,标记为不健康
if endpoint.consecutive_failures >= 3:
endpoint.health_score = max(0, endpoint.health_score - 20)
logger.warning(
f"[{endpoint.name}] Health check FAILED - "
f"Consecutive failures: {endpoint.consecutive_failures}, "
f"Error: {result.error_message}"
)
def _update_health_score(self, endpoint: ModelEndpoint, result: HealthCheckResult):
"""更新健康分数"""
# 基础延迟分数(100ms以内满分)
latency_score = max(0, 100 - result.latency_ms / 2)
# 考虑历史表现
history_bonus = min(10, endpoint.avg_latency_ms / 20)
# 更新分数(使用指数移动平均)
new_score = 0.7 * endpoint.health_score + 0.3 * (latency_score + history_bonus)
endpoint.health_score = min(100, max(0, new_score))
async def check_all_endpoints(self) -> Dict[str, HealthCheckResult]:
"""并发检查所有端点"""
tasks = [self.health_check_single(ep) for ep in self.endpoints]
results = await asyncio.gather(*tasks, return_exceptions=True)
result_map = {}
for ep, res in zip(self.endpoints, results):
if isinstance(res, Exception):
logger.error(f"Health check exception for {ep.name}: {res}")
result_map[ep.name] = None
else:
result_map[ep.name] = res
# 保存历史记录
self._health_history[ep.name].append(res)
# 保持最近100条记录
if len(self._health_history[ep.name]) > 100:
self._health_history[ep.name].pop(0)
return result_map
def get_best_endpoint(self) -> Optional[ModelEndpoint]:
"""获取最佳可用端点"""
available = [
ep for ep in self.endpoints
if ep.health_score > 50 and ep.consecutive_failures < 3
]
if not available:
logger.error("No healthy endpoints available!")
return None
# 按健康分数和优先级排序
available.sort(key=lambda x: (x.health_score * 0.6 + x.priority * 40), reverse=True)
return available[0]
def get_endpoint_status(self) -> List[Dict[str, Any]]:
"""获取所有端点状态"""
return [
{
"name": ep.name,
"url": ep.base_url,
"health_score": ep.health_score,
"avg_latency_ms": ep.avg_latency_ms,
"consecutive_failures": ep.consecutive_failures,
"last_success": ep.last_success_time,
"status": (
"healthy" if ep.health_score > 70 else
"degraded" if ep.health_score > 50 else
"unhealthy"
)
}
for ep in self.endpoints
]
使用示例
async def demo():
checker = ModelServiceHealthChecker(check_interval=30)
# 添加 HolySheep AI 主节点
checker.add_endpoint(ModelEndpoint(
name="HolySheep-Primary",
base_url="https://api.holysheep.ai/v1", # HolySheep AI API
api_key="YOUR_HOLYSHEEP_API_KEY",
model="gpt-4.1",
priority=10,
timeout=30.0
))
# 添加备用节点
checker.add_endpoint(ModelEndpoint(
name="HolySheep-Backup",
base_url="https://api.holysheep.ai/v1",
api_key="YOUR_HOLYSHEEP_API_KEY",
model="gpt-4o-mini",
priority=5,
timeout=30.0
))
# 执行健康检查
results = await checker.check_all_endpoints()
for name, result in results.items():
if result:
print(f"{name}: {result.success}, {result.latency_ms:.2f}ms")
# 获取最佳节点
best = checker.get_best_endpoint()
if best:
print(f"Best endpoint: {best.name}")
if __name__ == "__main__":
asyncio.run(demo())
自动故障切换实现
"""
AI Model Service Client with Auto Failover
完整的容错处理和重试机制
"""
import asyncio
import httpx
import time
from typing import Optional, List, Dict, Any, AsyncIterator
from dataclasses import dataclass
from contextlib import asynccontextmanager
import json
import logging
logger = logging.getLogger(__name__)
@dataclass
class RequestConfig:
"""请求配置"""
model: str
messages: List[Dict[str, str]]
temperature: float = 0.7
max_tokens: int = 2048
stream: bool = False
timeout: float = 60.0
retry_count: int = 3
retry_delay: float = 1.0
fallback_enabled: bool = True
@dataclass
class Response:
"""统一响应格式"""
content: str
model: str
provider: str
latency_ms: float
success: bool
error: Optional[str] = None
usage: Optional[Dict[str, int]] = None
class FailoverAwareAIClient:
"""
支持自动故障切换的AI客户端
集成HolySheep AI的多模型支持
"""
def __init__(
self,
primary_key: str,
fallback_key: Optional[str] = None,
base_url: str = "https://api.holysheep.ai/v1",
enable_stream_fallback: bool = True
):
self.base_url = base_url
self.primary_key = primary_key
self.fallback_key = fallback_key or primary_key
self.enable_stream_fallback = enable_stream_fallback
# 端点配置
self.endpoints = [
{
"key": primary_key,
"name": "HolySheep-Primary",
"models": ["gpt-4.1", "claude-sonnet-4.5", "gemini-2.5-flash", "deepseek-v3.2"],
"priority": 1,
"available": True
}
]
# 性能指标
self.metrics = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"fallback_triggered": 0,
"avg_latency_ms": 0
}
logger.info(
f"Initialized FailoverAwareAIClient with HolySheep AI "
f"(Rate: ¥1=$1, Latency: <50ms)"
)
async def _make_request(
self,
endpoint: Dict[str, Any],
config: RequestConfig
) -> httpx.Response:
"""发送API请求"""
headers = {
"Authorization": f"Bearer {endpoint['key']}",
"Content-Type": "application/json"
}
payload = {
"model": config.model,
"messages": config.messages,
"temperature": config.temperature,
"max_tokens": config.max_tokens
}
if config.stream:
headers["Accept"] = "text/event-stream"
payload["stream"] = True
async with httpx.AsyncClient(timeout=config.timeout) as client:
response = await client.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload
)
return response
async def chat_completion(
self,
config: RequestConfig
) -> Response:
"""
主接口:支持自动故障切换的聊天完成请求
"""
start_time = time.time()
last_error = None
# 确定要尝试的模型列表
models_to_try = [config.model]
if config.fallback_enabled:
# 添加备用模型
fallback_models = {
"gpt-4.1": ["gpt-4o-mini", "deepseek-v3.2"],
"claude-sonnet-4.5": ["claude-haiku-3.5"],
"gemini-2.5-flash": ["deepseek-v3.2"]
}
models_to_try.extend(
fallback_models.get(config.model, [])
)
# 遍历模型尝试
for model in models_to_try:
for retry in range(config.retry_count):
try:
self.metrics["total_requests"] += 1
# 获取可用端点
endpoint = self._get_available_endpoint()
if not endpoint:
raise Exception("No available endpoints")
config.model = model
response = await self._make_request(endpoint, config)
latency_ms = (time.time() - start_time) * 1000
if response.status_code == 200:
self.metrics["successful_requests"] += 1
self._update_avg_latency(latency_ms)
data = response.json()
return Response(
content=data["choices"][0]["message"]["content"],
model=data.get("model", model),
provider="HolySheep AI",
latency_ms=latency_ms,
success=True,
usage=data.get("usage")
)
elif response.status_code == 429:
# 配额限制,尝试下一个模型
last_error = "Rate limit exceeded"
logger.warning(f"Rate limit for model {model}, trying fallback...")
self.metrics["fallback_triggered"] += 1
await asyncio.sleep(config.retry_delay * (retry + 1))
continue
elif response.status_code >= 500:
# 服务端错误,重试
last_error = f"Server error: {response.status_code}"
await asyncio.sleep(config.retry_delay * (retry + 1))
continue
else:
last_error = f"Client error: {response.status_code}"
break
except httpx.TimeoutException:
last_error = "Request timeout"
logger.warning(f"Timeout for model {model}, retry {retry + 1}/{config.retry_count}")
await asyncio.sleep(config.retry_delay * (retry + 1))
except httpx.ConnectError as e:
last_error = f"Connection error: {str(e)}"
logger.warning(f"Connection error: {e}")
# 标记端点不可用
self._mark_endpoint_unavailable()
await asyncio.sleep(config.retry_delay)
except Exception as e:
last_error = str(e)
logger.error(f"Unexpected error: {e}")
await asyncio.sleep(config.retry_delay)
# 所有尝试都失败
self.metrics["failed_requests"] += 1
latency_ms = (time.time() - start_time) * 1000
return Response(
content="",
model=config.model,
provider="HolySheep AI",
latency_ms=latency_ms,
success=False,
error=f"All retries failed. Last error: {last_error}"
)
async def stream_chat_completion(
self,
config: RequestConfig
) -> AsyncIterator[str]:
"""
流式聊天完成(带故障切换)
"""
config.stream = True
start_time = time.time()
models_to_try = [config.model]
if config.fallback_enabled:
fallback_models = {
"gpt-4.1": ["gpt-4o-mini"],
"deepseek-v3.2": ["gemini-2.5-flash"]
}
models_to_try.extend(fallback_models.get(config.model, []))
for model in models_to_try:
for retry in range(config.retry_count):
try:
endpoint = self._get_available_endpoint()
config.model = model
async with httpx.AsyncClient(
timeout=httpx.Timeout(config.timeout),
follow_redirects=True
) as client:
headers = {
"Authorization": f"Bearer {endpoint['key']}",
"Content-Type": "application/json",
"Accept": "text/event-stream"
}
payload = {
"model": model,
"messages": config.messages,
"temperature": config.temperature,
"max_tokens": config.max_tokens,
"stream": True
}
async with client.stream(
"POST",
f"{self.base_url}/chat/completions",
headers=headers,
json=payload
) as response:
if response.status_code == 200:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
return
try:
parsed = json.loads(data)
content = parsed["choices"][0].get("delta", {}).get("content", "")
if content:
yield content
except json.JSONDecodeError:
continue
return # 成功完成
elif response.status_code == 429:
logger.warning(f"Rate limit in stream, trying fallback...")
await asyncio.sleep(config.retry_delay)
continue
else:
break
except Exception as e:
logger.warning(f"Stream error: {e}, trying next model...")
await asyncio.sleep(config.retry_delay)
continue
yield "" # 返回空表示失败
def _get_available_endpoint(self) -> Optional[Dict[str, Any]]:
"""获取可用端点"""
available = [ep for ep in self.endpoints if ep["available"]]
if not available:
# 重置所有端点
for ep in self.endpoints:
ep["available"] = True
available = self.endpoints
return available[0] if available else None
def _mark_endpoint_unavailable(self):
"""标记端点不可用"""
for ep in self.endpoints:
ep["available"] = False
def _update_avg_latency(self, latency_ms: float):
"""更新平均延迟"""
alpha = 0.1
self.metrics["avg_latency_ms"] = (
alpha * latency_ms +
(1 - alpha) * self.metrics["avg_latency_ms"]
)
def get_metrics(self) -> Dict[str, Any]:
"""获取性能指标"""
return {
**self.metrics,
"success_rate": (
self.metrics["successful_requests"] /
max(1, self.metrics["total_requests"]) * 100
)
}
使用示例
async def main():
# 初始化客户端
client = FailoverAwareAIClient(
primary_key="YOUR_HOLYSHEEP_API_KEY",
base_url="https://api.holysheep.ai/v1"
)
# 标准请求示例
config = RequestConfig(
model="gpt-4.1",
messages=[
{"role": "system", "content": "你是专业的AI助手"},
{"role": "user", "content": "解释什么是RAG系统"}
],
temperature=0.7,
max_tokens=1000
)
response = await client.chat_completion(config)
if response.success:
print(f"Response from {response.model}:")
print(response.content)
print(f"\nLatency: {response.latency_ms:.2f}ms")
print(f"Usage: {response.usage}")
else:
print(f"Error: {response.error}")
# 查看指标
print(f"\nMetrics: {client.get