AI应用已进入生产环境的关键时期。无论是电商平台的智能客服、企业知识库的RAG系统,还是个人开发者的创新项目,高可用性已成为不可或缺的技术要求。本稿では、HolySheep AIを活用した模型服务的健康检查机制と自动故障切换の設計実装について、实战经验に基づいて解説します。

なぜ健康检查と故障切换が必要か

AI模型服务の可用性面临着诸多挑战:

私自身、初めてAI客服システムを導入した際、夜間のトラフィック急増でAPIがタイムアウトし、ユーザーからの投诉が杀到した经验があります。この教训から、 Automatic Failover架构の重要さを痛感しました。

实战案例:电商AI客服系统

私が担当したECサイトのAI客服プロジェクトでは、以下の構成を採用しました:

完整健康检查实现

"""
AI Model Service Health Check & Auto Failover System
支持 HolySheep AI / OpenAI 兼容 API
"""

import asyncio
import httpx
import time
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
from enum import Enum
from collections import defaultdict
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class HealthStatus(Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"


@dataclass
class ModelEndpoint:
    name: str
    base_url: str
    api_key: str
    model: str
    priority: int = 1
    max_retries: int = 3
    timeout: float = 30.0
    health_score: float = 100.0
    consecutive_failures: int = 0
    last_success_time: float = field(default_factory=time.time)
    last_failure_time: float = 0.0
    avg_latency_ms: float = 0.0
    request_count: int = 0
    
    # HolySheep AI 特有的优势
    supports_streaming: bool = True
    supports_function_call: bool = True


class HealthCheckResult:
    def __init__(self, endpoint: ModelEndpoint):
        self.endpoint = endpoint
        self.success: bool = False
        self.latency_ms: float = 0.0
        self.error_message: Optional[str] = None
        self.timestamp: float = time.time()


class ModelServiceHealthChecker:
    """AI模型服务健康检查器"""
    
    def __init__(self, check_interval: float = 30.0):
        self.endpoints: List[ModelEndpoint] = []
        self.check_interval = check_interval
        self._running = False
        self._health_history: Dict[str, List[HealthCheckResult]] = defaultdict(list)
        
    def add_endpoint(self, endpoint: ModelEndpoint):
        """添加模型服务节点"""
        self.endpoints.append(endpoint)
        # 按优先级排序
        self.endpoints.sort(key=lambda x: x.priority, reverse=True)
        logger.info(f"Added endpoint: {endpoint.name} ({endpoint.base_url})")
        
    async def health_check_single(self, endpoint: ModelEndpoint) -> HealthCheckResult:
        """单次健康检查"""
        result = HealthCheckResult(endpoint)
        
        start_time = time.time()
        
        try:
            async with httpx.AsyncClient(timeout=endpoint.timeout) as client:
                response = await client.post(
                    f"{endpoint.base_url}/chat/completions",
                    headers={
                        "Authorization": f"Bearer {endpoint.api_key}",
                        "Content-Type": "application/json"
                    },
                    json={
                        "model": endpoint.model,
                        "messages": [{"role": "user", "content": "ping"}],
                        "max_tokens": 1
                    }
                )
                
                result.latency_ms = (time.time() - start_time) * 1000
                
                if response.status_code == 200:
                    result.success = True
                    endpoint.consecutive_failures = 0
                    endpoint.last_success_time = time.time()
                    
                    # 更新平均延迟(指数移动平均)
                    alpha = 0.2
                    endpoint.avg_latency_ms = (
                        alpha * result.latency_ms + 
                        (1 - alpha) * endpoint.avg_latency_ms
                    )
                    
                    # 计算健康分数
                    self._update_health_score(endpoint, result)
                    
                    logger.debug(
                        f"[{endpoint.name}] Health check OK - "
                        f"Latency: {result.latency_ms:.2f}ms, "
                        f"Score: {endpoint.health_score:.1f}"
                    )
                else:
                    result.error_message = f"HTTP {response.status_code}"
                    self._handle_failure(endpoint, result)
                    
        except httpx.TimeoutException:
            result.error_message = "Timeout"
            self._handle_failure(endpoint, result)
            
        except httpx.ConnectError as e:
            result.error_message = f"Connection error: {str(e)}"
            self._handle_failure(endpoint, result)
            
        except Exception as e:
            result.error_message = f"Unexpected error: {str(e)}"
            self._handle_failure(endpoint, result)
            
        return result
    
    def _handle_failure(self, endpoint: ModelEndpoint, result: HealthCheckResult):
        """处理失败情况"""
        endpoint.consecutive_failures += 1
        endpoint.last_failure_time = time.time()
        
        # 连续失败超过阈值,标记为不健康
        if endpoint.consecutive_failures >= 3:
            endpoint.health_score = max(0, endpoint.health_score - 20)
            
        logger.warning(
            f"[{endpoint.name}] Health check FAILED - "
            f"Consecutive failures: {endpoint.consecutive_failures}, "
            f"Error: {result.error_message}"
        )
    
    def _update_health_score(self, endpoint: ModelEndpoint, result: HealthCheckResult):
        """更新健康分数"""
        # 基础延迟分数(100ms以内满分)
        latency_score = max(0, 100 - result.latency_ms / 2)
        
        # 考虑历史表现
        history_bonus = min(10, endpoint.avg_latency_ms / 20)
        
        # 更新分数(使用指数移动平均)
        new_score = 0.7 * endpoint.health_score + 0.3 * (latency_score + history_bonus)
        endpoint.health_score = min(100, max(0, new_score))
    
    async def check_all_endpoints(self) -> Dict[str, HealthCheckResult]:
        """并发检查所有端点"""
        tasks = [self.health_check_single(ep) for ep in self.endpoints]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        result_map = {}
        for ep, res in zip(self.endpoints, results):
            if isinstance(res, Exception):
                logger.error(f"Health check exception for {ep.name}: {res}")
                result_map[ep.name] = None
            else:
                result_map[ep.name] = res
                # 保存历史记录
                self._health_history[ep.name].append(res)
                # 保持最近100条记录
                if len(self._health_history[ep.name]) > 100:
                    self._health_history[ep.name].pop(0)
        
        return result_map
    
    def get_best_endpoint(self) -> Optional[ModelEndpoint]:
        """获取最佳可用端点"""
        available = [
            ep for ep in self.endpoints 
            if ep.health_score > 50 and ep.consecutive_failures < 3
        ]
        
        if not available:
            logger.error("No healthy endpoints available!")
            return None
            
        # 按健康分数和优先级排序
        available.sort(key=lambda x: (x.health_score * 0.6 + x.priority * 40), reverse=True)
        return available[0]
    
    def get_endpoint_status(self) -> List[Dict[str, Any]]:
        """获取所有端点状态"""
        return [
            {
                "name": ep.name,
                "url": ep.base_url,
                "health_score": ep.health_score,
                "avg_latency_ms": ep.avg_latency_ms,
                "consecutive_failures": ep.consecutive_failures,
                "last_success": ep.last_success_time,
                "status": (
                    "healthy" if ep.health_score > 70 else
                    "degraded" if ep.health_score > 50 else
                    "unhealthy"
                )
            }
            for ep in self.endpoints
        ]


使用示例

async def demo(): checker = ModelServiceHealthChecker(check_interval=30) # 添加 HolySheep AI 主节点 checker.add_endpoint(ModelEndpoint( name="HolySheep-Primary", base_url="https://api.holysheep.ai/v1", # HolySheep AI API api_key="YOUR_HOLYSHEEP_API_KEY", model="gpt-4.1", priority=10, timeout=30.0 )) # 添加备用节点 checker.add_endpoint(ModelEndpoint( name="HolySheep-Backup", base_url="https://api.holysheep.ai/v1", api_key="YOUR_HOLYSHEEP_API_KEY", model="gpt-4o-mini", priority=5, timeout=30.0 )) # 执行健康检查 results = await checker.check_all_endpoints() for name, result in results.items(): if result: print(f"{name}: {result.success}, {result.latency_ms:.2f}ms") # 获取最佳节点 best = checker.get_best_endpoint() if best: print(f"Best endpoint: {best.name}") if __name__ == "__main__": asyncio.run(demo())

自动故障切换实现

"""
AI Model Service Client with Auto Failover
完整的容错处理和重试机制
"""

import asyncio
import httpx
import time
from typing import Optional, List, Dict, Any, AsyncIterator
from dataclasses import dataclass
from contextlib import asynccontextmanager
import json
import logging

logger = logging.getLogger(__name__)


@dataclass
class RequestConfig:
    """请求配置"""
    model: str
    messages: List[Dict[str, str]]
    temperature: float = 0.7
    max_tokens: int = 2048
    stream: bool = False
    timeout: float = 60.0
    retry_count: int = 3
    retry_delay: float = 1.0
    fallback_enabled: bool = True


@dataclass
class Response:
    """统一响应格式"""
    content: str
    model: str
    provider: str
    latency_ms: float
    success: bool
    error: Optional[str] = None
    usage: Optional[Dict[str, int]] = None


class FailoverAwareAIClient:
    """
    支持自动故障切换的AI客户端
    集成HolySheep AI的多模型支持
    """
    
    def __init__(
        self,
        primary_key: str,
        fallback_key: Optional[str] = None,
        base_url: str = "https://api.holysheep.ai/v1",
        enable_stream_fallback: bool = True
    ):
        self.base_url = base_url
        self.primary_key = primary_key
        self.fallback_key = fallback_key or primary_key
        self.enable_stream_fallback = enable_stream_fallback
        
        # 端点配置
        self.endpoints = [
            {
                "key": primary_key,
                "name": "HolySheep-Primary",
                "models": ["gpt-4.1", "claude-sonnet-4.5", "gemini-2.5-flash", "deepseek-v3.2"],
                "priority": 1,
                "available": True
            }
        ]
        
        # 性能指标
        self.metrics = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "fallback_triggered": 0,
            "avg_latency_ms": 0
        }
        
        logger.info(
            f"Initialized FailoverAwareAIClient with HolySheep AI "
            f"(Rate: ¥1=$1, Latency: <50ms)"
        )
    
    async def _make_request(
        self,
        endpoint: Dict[str, Any],
        config: RequestConfig
    ) -> httpx.Response:
        """发送API请求"""
        
        headers = {
            "Authorization": f"Bearer {endpoint['key']}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": config.model,
            "messages": config.messages,
            "temperature": config.temperature,
            "max_tokens": config.max_tokens
        }
        
        if config.stream:
            headers["Accept"] = "text/event-stream"
            payload["stream"] = True
        
        async with httpx.AsyncClient(timeout=config.timeout) as client:
            response = await client.post(
                f"{self.base_url}/chat/completions",
                headers=headers,
                json=payload
            )
            return response
    
    async def chat_completion(
        self,
        config: RequestConfig
    ) -> Response:
        """
        主接口:支持自动故障切换的聊天完成请求
        """
        
        start_time = time.time()
        last_error = None
        
        # 确定要尝试的模型列表
        models_to_try = [config.model]
        if config.fallback_enabled:
            # 添加备用模型
            fallback_models = {
                "gpt-4.1": ["gpt-4o-mini", "deepseek-v3.2"],
                "claude-sonnet-4.5": ["claude-haiku-3.5"],
                "gemini-2.5-flash": ["deepseek-v3.2"]
            }
            models_to_try.extend(
                fallback_models.get(config.model, [])
            )
        
        # 遍历模型尝试
        for model in models_to_try:
            for retry in range(config.retry_count):
                try:
                    self.metrics["total_requests"] += 1
                    
                    # 获取可用端点
                    endpoint = self._get_available_endpoint()
                    if not endpoint:
                        raise Exception("No available endpoints")
                    
                    config.model = model
                    response = await self._make_request(endpoint, config)
                    
                    latency_ms = (time.time() - start_time) * 1000
                    
                    if response.status_code == 200:
                        self.metrics["successful_requests"] += 1
                        self._update_avg_latency(latency_ms)
                        
                        data = response.json()
                        
                        return Response(
                            content=data["choices"][0]["message"]["content"],
                            model=data.get("model", model),
                            provider="HolySheep AI",
                            latency_ms=latency_ms,
                            success=True,
                            usage=data.get("usage")
                        )
                    
                    elif response.status_code == 429:
                        # 配额限制,尝试下一个模型
                        last_error = "Rate limit exceeded"
                        logger.warning(f"Rate limit for model {model}, trying fallback...")
                        self.metrics["fallback_triggered"] += 1
                        await asyncio.sleep(config.retry_delay * (retry + 1))
                        continue
                        
                    elif response.status_code >= 500:
                        # 服务端错误,重试
                        last_error = f"Server error: {response.status_code}"
                        await asyncio.sleep(config.retry_delay * (retry + 1))
                        continue
                        
                    else:
                        last_error = f"Client error: {response.status_code}"
                        break
                        
                except httpx.TimeoutException:
                    last_error = "Request timeout"
                    logger.warning(f"Timeout for model {model}, retry {retry + 1}/{config.retry_count}")
                    await asyncio.sleep(config.retry_delay * (retry + 1))
                    
                except httpx.ConnectError as e:
                    last_error = f"Connection error: {str(e)}"
                    logger.warning(f"Connection error: {e}")
                    # 标记端点不可用
                    self._mark_endpoint_unavailable()
                    await asyncio.sleep(config.retry_delay)
                    
                except Exception as e:
                    last_error = str(e)
                    logger.error(f"Unexpected error: {e}")
                    await asyncio.sleep(config.retry_delay)
        
        # 所有尝试都失败
        self.metrics["failed_requests"] += 1
        latency_ms = (time.time() - start_time) * 1000
        
        return Response(
            content="",
            model=config.model,
            provider="HolySheep AI",
            latency_ms=latency_ms,
            success=False,
            error=f"All retries failed. Last error: {last_error}"
        )
    
    async def stream_chat_completion(
        self,
        config: RequestConfig
    ) -> AsyncIterator[str]:
        """
        流式聊天完成(带故障切换)
        """
        
        config.stream = True
        start_time = time.time()
        
        models_to_try = [config.model]
        if config.fallback_enabled:
            fallback_models = {
                "gpt-4.1": ["gpt-4o-mini"],
                "deepseek-v3.2": ["gemini-2.5-flash"]
            }
            models_to_try.extend(fallback_models.get(config.model, []))
        
        for model in models_to_try:
            for retry in range(config.retry_count):
                try:
                    endpoint = self._get_available_endpoint()
                    config.model = model
                    
                    async with httpx.AsyncClient(
                        timeout=httpx.Timeout(config.timeout),
                        follow_redirects=True
                    ) as client:
                        
                        headers = {
                            "Authorization": f"Bearer {endpoint['key']}",
                            "Content-Type": "application/json",
                            "Accept": "text/event-stream"
                        }
                        
                        payload = {
                            "model": model,
                            "messages": config.messages,
                            "temperature": config.temperature,
                            "max_tokens": config.max_tokens,
                            "stream": True
                        }
                        
                        async with client.stream(
                            "POST",
                            f"{self.base_url}/chat/completions",
                            headers=headers,
                            json=payload
                        ) as response:
                            
                            if response.status_code == 200:
                                async for line in response.aiter_lines():
                                    if line.startswith("data: "):
                                        data = line[6:]
                                        if data == "[DONE]":
                                            return
                                        try:
                                            parsed = json.loads(data)
                                            content = parsed["choices"][0].get("delta", {}).get("content", "")
                                            if content:
                                                yield content
                                        except json.JSONDecodeError:
                                            continue
                                
                                return  # 成功完成
                            
                            elif response.status_code == 429:
                                logger.warning(f"Rate limit in stream, trying fallback...")
                                await asyncio.sleep(config.retry_delay)
                                continue
                                
                            else:
                                break
                                
                except Exception as e:
                    logger.warning(f"Stream error: {e}, trying next model...")
                    await asyncio.sleep(config.retry_delay)
                    continue
        
        yield ""  # 返回空表示失败
    
    def _get_available_endpoint(self) -> Optional[Dict[str, Any]]:
        """获取可用端点"""
        available = [ep for ep in self.endpoints if ep["available"]]
        if not available:
            # 重置所有端点
            for ep in self.endpoints:
                ep["available"] = True
            available = self.endpoints
        return available[0] if available else None
    
    def _mark_endpoint_unavailable(self):
        """标记端点不可用"""
        for ep in self.endpoints:
            ep["available"] = False
    
    def _update_avg_latency(self, latency_ms: float):
        """更新平均延迟"""
        alpha = 0.1
        self.metrics["avg_latency_ms"] = (
            alpha * latency_ms + 
            (1 - alpha) * self.metrics["avg_latency_ms"]
        )
    
    def get_metrics(self) -> Dict[str, Any]:
        """获取性能指标"""
        return {
            **self.metrics,
            "success_rate": (
                self.metrics["successful_requests"] / 
                max(1, self.metrics["total_requests"]) * 100
            )
        }


使用示例

async def main(): # 初始化客户端 client = FailoverAwareAIClient( primary_key="YOUR_HOLYSHEEP_API_KEY", base_url="https://api.holysheep.ai/v1" ) # 标准请求示例 config = RequestConfig( model="gpt-4.1", messages=[ {"role": "system", "content": "你是专业的AI助手"}, {"role": "user", "content": "解释什么是RAG系统"} ], temperature=0.7, max_tokens=1000 ) response = await client.chat_completion(config) if response.success: print(f"Response from {response.model}:") print(response.content) print(f"\nLatency: {response.latency_ms:.2f}ms") print(f"Usage: {response.usage}") else: print(f"Error: {response.error}") # 查看指标 print(f"\nMetrics: {client.get