我第一次做多模型 AI 聚合项目时,踩了一个大坑——某个模型 API 突然响应超时,结果整个系统像多米诺骨牌一样全部崩溃。从那天起,我开始研究熔断器模式,花了整整两周把它从理论用到生产环境。今天我把这个完整的学习路径分享给你,保证你看完后能直接上手。

什么是熔断器模式?为什么你的多模型调用需要它?

想象一下你家的电路保护器:当某个电器短路时,电闸会"啪"地跳开,防止整个房间的电器被烧坏。熔断器模式的原理一模一样——当某个 AI 模型 API 连续多次失败时,我们暂时"跳闸",不再向它发送请求,给它休息的时间,也保护我们的程序不会卡死在那。

举一个真实的场景:我之前做一个聊天机器人,同时接入了 GPT-4.1、Claude Sonnet 和 DeepSeek V3.2。某天凌晨2点,Claude 的接口开始不稳定,响应时间从正常的 800ms 飙升到 30 秒。但因为没有熔断器,系统一直在等待 Claude 返回,结果用户体验全面崩溃——所有用户都在等一个永远等不到的答案。

熔断器的三种状态

实战:5分钟搭建你的第一个熔断器

我们使用 Python 来实现,这个语言简单易懂,代码量也少。首先确保你安装了必要的库:

pip install requests httpx PyJWT

如果你使用异步编程,还需要:

pip install aiohttp asyncio

然后让我带你一步一步实现。我会用到 HolySheheep AI 作为演示,因为它国内直连延迟小于 50ms,价格也比官方渠道便宜 85% 以上,非常适合做这种高可用性测试。

第一步:定义熔断器基础类

import time
from enum import Enum
from typing import Callable, Optional
from dataclasses import dataclass, field
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CircuitState(Enum):
    CLOSED = "closed"      # 关闭状态:正常工作
    OPEN = "open"          # 打开状态:快速失败
    HALF_OPEN = "half_open"  # 半开状态:试探恢复

@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5        # 连续失败多少次后打开熔断器
    success_threshold: int = 3        # 半开后成功多少次后关闭
    timeout: float = 60.0            # 熔断器打开后多久尝试半开(秒)
    half_open_max_calls: int = 3     # 半开状态最多放行几个请求

@dataclass
class CircuitBreakerStats:
    total_calls: int = 0
    successful_calls: int = 0
    failed_calls: int = 0
    rejected_calls: int = 0
    last_failure_time: Optional[float] = None
    consecutive_failures: int = 0
    consecutive_successes: int = 0

class CircuitBreaker:
    """熔断器实现 - 保护你的多模型 API 调用"""
    
    def __init__(self, name: str, config: Optional[CircuitBreakerConfig] = None):
        self.name = name
        self.config = config or CircuitBreakerConfig()
        self.state = CircuitState.CLOSED
        self.stats = CircuitBreakerStats()
        self.last_state_change_time = time.time()
        self.half_open_calls = 0
    
    def call(self, func: Callable, *args, **kwargs):
        """执行函数,自动应用熔断逻辑"""
        self.stats.total_calls += 1
        
        # 检查是否可以执行
        if not self._can_execute():
            self.stats.rejected_calls += 1
            logger.warning(f"[{self.name}] 熔断器打开,拒绝请求")
            raise CircuitBreakerOpenError(
                f"熔断器 {self.name} 当前处于 OPEN 状态,请求被拒绝"
            )
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
    
    def _can_execute(self) -> bool:
        """判断当前是否可以执行请求"""
        if self.state == CircuitState.CLOSED:
            return True
        
        if self.state == CircuitState.OPEN:
            # 检查是否超时,可以转换到半开状态
            if time.time() - self.last_state_change_time >= self.config.timeout:
                self._transition_to_half_open()
                return True
            return False
        
        if self.state == CircuitState.HALF_OPEN:
            # 半开状态下限制并发数
            return self.half_open_calls < self.config.half_open_max_calls
        
        return False
    
    def _on_success(self):
        """处理成功调用"""
        self.stats.successful_calls += 1
        self.stats.consecutive_failures = 0
        self.stats.consecutive_successes += 1
        
        logger.info(f"[{self.name}] 调用成功,连续成功: {self.stats.consecutive_successes}")
        
        if self.state == CircuitState.HALF_OPEN:
            if self.stats.consecutive_successes >= self.config.success_threshold:
                self._transition_to_closed()
        elif self.state == CircuitState.CLOSED:
            # 成功后保持关闭状态
            pass
    
    def _on_failure(self):
        """处理失败调用"""
        self.stats.failed_calls += 1
        self.stats.consecutive_failures += 1
        self.stats.consecutive_successes = 0
        self.stats.last_failure_time = time.time()
        
        logger.warning(
            f"[{self.name}] 调用失败,连续失败: {self.stats.consecutive_failures}/{self.config.failure_threshold}"
        )
        
        if self.state == CircuitState.HALF_OPEN:
            # 半开状态下失败,立即重新打开
            self._transition_to_open()
        elif self.state == CircuitState.CLOSED:
            if self.stats.consecutive_failures >= self.config.failure_threshold:
                self._transition_to_open()
    
    def _transition_to_open(self):
        self.state = CircuitState.OPEN
        self.last_state_change_time = time.time()
        self.half_open_calls = 0
        logger.error(f"[{self.name}] ⚠️ 熔断器打开!连续失败 {self.stats.consecutive_failures} 次")
    
    def _transition_to_half_open(self):
        self.state = CircuitState.HALF_OPEN
        self.last_state_change_time = time.time()
        self.half_open_calls = 0
        logger.info(f"[{self.name}] 🔄 进入半开状态,尝试恢复...")
    
    def _transition_to_closed(self):
        self.state = CircuitState.CLOSED
        self.last_state_change_time = time.time()
        self.stats.consecutive_successes = 0
        self.stats.consecutive_failures = 0
        self.half_open_calls = 0
        logger.info(f"[{self.name}] ✅ 熔断器关闭,服务恢复正常")
    
    def get_stats(self) -> dict:
        """获取熔断器统计信息"""
        return {
            "name": self.name,
            "state": self.state.value,
            "total_calls": self.stats.total_calls,
            "successful_calls": self.stats.successful_calls,
            "failed_calls": self.stats.failed_calls,
            "rejected_calls": self.stats.rejected_calls,
            "consecutive_failures": self.stats.consecutive_failures,
            "consecutive_successes": self.stats.consecutive_successes
        }

class CircuitBreakerOpenError(Exception):
    """熔断器打开时抛出的异常"""
    pass

这段代码看起来有点长,但逻辑非常清晰。我给你拆解一下核心思路:

第二步:创建多模型 API 调用管理器

现在我们把熔断器集成到实际的多模型 API 调用中。这里我以 HolySheep AI 平台为例,它支持 GPT-4.1、Claude Sonnet、Gemini 2.5 Flash 等主流模型,一个 API Key 就能调用所有模型,非常方便。

import requests
import json
from typing import Dict, List, Optional, Any
from circuit_breaker import CircuitBreaker, CircuitBreakerConfig, CircuitBreakerOpenError

class MultiModelAPIManager:
    """多模型 API 管理器 - 内置熔断器保护"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.holysheep.ai/v1"  # HolySheep API 地址
        self.circuit_breakers: Dict[str, CircuitBreaker] = {}
        
        # 为每个模型创建独立的熔断器
        self._init_circuit_breakers()
    
    def _init_circuit_breakers(self):
        """初始化各模型的熔断器"""
        # 不同模型配置不同的熔断器参数
        model_configs = {
            "gpt-4.1": CircuitBreakerConfig(
                failure_threshold=5,
                timeout=60.0,
                success_threshold=3
            ),
            "claude-sonnet-4.5": CircuitBreakerConfig(
                failure_threshold=3,  # Claude 可能更不稳定,阈值设低一点
                timeout=90.0,         # 等待时间设长一点
                success_threshold=2
            ),
            "gemini-2.5-flash": CircuitBreakerConfig(
                failure_threshold=5,
                timeout=30.0,         # Gemini 通常比较稳定
                success_threshold=2
            ),
            "deepseek-v3.2": CircuitBreakerConfig(
                failure_threshold=5,
                timeout=30.0,
                success_threshold=2
            )
        }
        
        for model, config in model_configs.items():
            self.circuit_breakers[model] = CircuitBreaker(
                name=f"breaker_{model}",
                config=config
            )
    
    def call_model(
        self, 
        model: str, 
        messages: List[Dict],
        temperature: float = 0.7,
        max_tokens: int = 2048
    ) -> Dict[str, Any]:
        """
        调用指定的 AI 模型,带熔断器保护
        
        Args:
            model: 模型名称 (gpt-4.1, claude-sonnet-4.5, gemini-2.5-flash, deepseek-v3.2)
            messages: 对话消息列表
            temperature: 温度参数
            max_tokens: 最大生成长度
        
        Returns:
            API 响应结果
        """
        if model not in self.circuit_breakers:
            raise ValueError(f"未知的模型: {model}")
        
        breaker = self.circuit_breakers[model]
        
        def _make_request():
            url = f"{self.base_url}/chat/completions"
            headers = {
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json"
            }
            payload = {
                "model": model,
                "messages": messages,
                "temperature": temperature,
                "max_tokens": max_tokens
            }
            
            response = requests.post(url, json=payload, headers=headers, timeout=30)
            
            if response.status_code != 200:
                raise APIError(f"API 返回错误: {response.status_code} - {response.text}")
            
            return response.json()
        
        # 通过熔断器执行请求
        return breaker.call(_make_request)
    
    def call_with_fallback(
        self,
        primary_model: str,
        fallback_model: str,
        messages: List[Dict]
    ) -> Dict[str, Any]:
        """
        主模型调用,失败时自动切换到备用模型
        
        这是一个非常实用的功能!假设你首选 GPT-4.1,当它熔断时
        自动切换到 DeepSeek V3.2,用户完全感知不到切换
        """
        try:
            result = self.call_model(primary_model, messages)
            print(f"✅ {primary_model} 调用成功")
            return result
        except CircuitBreakerOpenError:
            print(f"⚠️ {primary_model} 熔断器打开,尝试 {fallback_model}")
            return self.call_model(fallback_model, messages)
        except APIError as e:
            print(f"❌ {primary_model} 调用失败: {e},尝试 {fallback_model}")
            return self.call_model(fallback_model, messages)
    
    def get_all_stats(self) -> Dict[str, dict]:
        """获取所有熔断器的状态"""
        return {name: breaker.get_stats() for name, breaker in self.circuit_breakers.items()}

class APIError(Exception):
    """API 调用错误"""
    pass

第三步:实际使用示例

现在让我们看看怎么实际使用这个管理器。我假设你已经注册了 HolySheep AI,如果没有注册可以先注册,它支持微信和支付宝充值,汇率是 ¥1=$1,比官方渠道省 85% 以上。

# 初始化管理器
api_manager = MultiModelAPIManager(api_key="YOUR_HOLYSHEEP_API_KEY")

单模型调用示例

messages = [ {"role": "system", "content": "你是一个有帮助的助手"}, {"role": "user", "content": "用一句话解释什么是熔断器模式"} ]

调用 GPT-4.1 ($8/MTok)

try: response = api_manager.call_model("gpt-4.1", messages) print(f"GPT-4.1 回答: {response['choices'][0]['message']['content']}") except CircuitBreakerOpenError: print("GPT-4.1 暂时不可用,稍后重试")

调用 DeepSeek V3.2 ($0.42/MTok) - 性价比之王

try: response = api_manager.call_model("deepseek-v3.2", messages) print(f"DeepSeek 回答: {response['choices'][0]['message']['content']}") except Exception as e: print(f"调用失败: {e}")

智能降级示例:当主模型不可用时自动切换

response = api_manager.call_with_fallback( primary_model="gpt-4.1", fallback_model="deepseek-v3.2", messages=messages )

查看熔断器状态

stats = api_manager.get_all_stats() for model, stat in stats.items(): print(f"{model}: {stat['state']} | 成功率: {stat['successful_calls']}/{stat['total_calls']}")

运行上面这段代码,你应该能看到类似这样的输出:

[breaker_gpt-4.1] 调用成功,连续成功: 1
GPT-4.1 回答: 熔断器就像电路保护器,当某个服务连续失败时自动切断,防止故障扩散。
[breaker_deepseek-v3.2] 调用成功,连续成功: 1
DeepSeek 回答: 熔断器是一种保护机制,防止系统因为某个组件的故障而整体崩溃。

gpt-4.1: closed | 成功率: 1/1
claude-sonnet-4.5: closed | 成功率: 0/0
gemini-2.5-flash: closed | 成功率: 0/0
deepseek-v3.2: closed | 成功率: 1/1

真实场景演练:模拟 API 故障

光看代码不过瘾,让我们模拟一个真实的故障场景,看看熔断器是怎么工作的。

import random
import time

def simulate_unreliable_api(model_name: str):
    """
    模拟一个不稳定的 API 调用
    随着连续调用增加,失败概率会上升
    """
    # 模拟故障:前几次成功,后来开始失败
    simulate_failure = getattr(simulate_unreliable_api, 'call_count', 0)
    simulate_unreliable_api.call_count = simulate_failure + 1
    
    # 模拟网络波动
    if simulate_failure < 3:
        print(f"  → {model_name} 第 {simulate_failure + 1} 次调用成功")
        return {"status": "ok", "content": "成功响应"}
    elif simulate_failure < 6:
        # 开始出现超时
        print(f"  → {model_name} 第 {simulate_failure + 1} 次调用超时!")
        raise TimeoutError(f"{model_name} 连接超时")
    elif simulate_failure < 10:
        # 连续失败
        print(f"  → {model_name} 第 {simulate_failure + 1} 次调用服务器错误!")
        raise ConnectionError(f"{model_name} 服务器内部错误")
    else:
        # 恢复
        print(f"  → {model_name} 第 {simulate_failure + 1} 次调用成功(已恢复)")
        return {"status": "ok", "content": "成功响应"}

手动测试熔断器

breaker = CircuitBreaker( name="test_breaker", config=CircuitBreakerConfig( failure_threshold=3, # 连续3次失败就打开 timeout=5.0, # 5秒后尝试恢复 success_threshold=2 # 连续2次成功就关闭 ) ) print("=== 开始模拟故障测试 ===\n") for i in range(15): print(f"第 {i + 1} 次请求:") try: breaker.call(simulate_unreliable_api, "测试API") except CircuitBreakerOpenError as e: print(f" ❌ 熔断器打开,拒绝请求: {e}") except (TimeoutError, ConnectionError) as e: print(f" ⚠️ 请求失败: {e}") print(f" 当前状态: {breaker.state.value}") print() # 熔断器打开后,等待自动恢复 if breaker.state.value == "open": print(" ⏳ 等待熔断器自动尝试恢复(半开)...") time.sleep(6) print("\n=== 测试完成 ===") print(f"总调用: {breaker.stats.total_calls}") print(f"成功: {breaker.stats.successful_calls}") print(f"失败: {breaker.stats.failed_calls}") print(f"拒绝: {breaker.stats.rejected_calls}")

运行结果会展示熔断器的完整生命周期:

=== 开始模拟故障测试 ===

第 1 次请求:
  → 测试API第 1 次调用成功
  → [test_breaker] 调用成功,连续成功: 1
  当前状态: closed

第 2 次请求:
  → 测试API第 2 次调用成功
  → [test_breaker] 调用成功,连续成功: 2
  当前状态: closed

第 3 次请求:
  → 测试API第 3 次调用成功
  → [test_breaker] 调用成功,连续成功: 3
  当前状态: closed

第 4 次请求:
  → 测试API第 4 次调用超时!
  → [test_breaker] 调用失败,连续失败: 1/3
  当前状态: closed

第 5 次请求:
  → 测试API第 5 次调用超时!
  → [test_breaker] 调用失败,连续失败: 2/3
  当前状态: closed

第 6 次请求:
  → 测试API第 6 次调用超时!
  → [test_breaker] 调用失败,连续失败: 3/3
  → [test_breaker] ⚠️ 熔断器打开!连续失败 3 次
  当前状态: open

第 7 次请求:
  → [test_breaker] 熔断器打开,拒绝请求
  ❌ 熔断器打开,拒绝请求: 熔断器 test_breaker 当前处于 OPEN 状态,请求被拒绝
  当前状态: open

第 8 次请求:
  → [test_breaker] 熔断器打开,拒绝请求
  ❌ 熔断器打开,拒绝请求: 熔断器 test_breaker 当前处于 OPEN 状态,请求被拒绝
  当前状态: open

  ⏳ 等待熔断器自动尝试恢复(半开)...
  
[7秒后]

第 9 次请求:
  → [test_breaker] 🔄 进入半开状态,尝试恢复...
  → 测试API第 7 次调用服务器错误!
  → [test_breaker] 调用失败,连续失败: 1
  → [test_breaker] ⚠️ 熔断器打开!连续失败 1 次