凌晨两点,我盯着屏幕上的 ConnectionError: timeout after 30000ms 报错睡不着觉。那是我的加密货币量化交易系统在请求 Order Book 数据时突然超时,随后连续重试 5 次全部失败,最终导致策略执行中断。那一刻我才意识到,错误处理不是可选项,而是高频交易系统的生命线。
这篇文章来自我踩过无数坑之后的实战经验总结,涵盖 HolySheep Tardis API 的全部关键 HTTP 状态码、最佳重试策略实现,以及 3 种常见场景的完整解决方案。如果你正在构建加密货币高频交易系统或量化策略,这篇教程能帮你避免我曾经犯过的所有错误。
什么是 HolySheep Tardis API?
HolySheep Tardis API 提供加密货币高频历史数据中转服务,支持 Binance、Bybit、OKX、Deribit 等主流合约交易所的实时和历史数据。核心数据包括:
- 逐笔成交(Trades):毫秒级交易记录
- 订单簿(Order Book):深度快照与增量更新
- 强平数据(Liquidations):合约强平事件
- 资金费率(Funding Rate):实时与历史费率
对于在国内运行的量化交易系统,选择 HolySheep 的核心优势在于:国内直连延迟低于 50ms,微信/支付宝直接充值,汇率按 ¥1=$1 结算(官方 ¥7.3=$1),相比直接调用 Tardis 官方 API 可节省超过 85% 的成本。
常见 HTTP 状态码详解
HolySheep Tardis API 基于标准 HTTP 协议,所有错误都会返回标准状态码。理解这些状态码是构建健壮错误处理系统的第一步。
2xx 成功状态码
| 状态码 | 含义 | 场景说明 |
|---|---|---|
| 200 OK | 请求成功 | 数据正常返回,可直接使用 |
| 204 No Content | 无内容 | 请求成功但暂无数据(如冷门交易对) |
4xx 客户端错误状态码
| 状态码 | 含义 | 我的处理经验 |
|---|---|---|
| 400 Bad Request | 请求参数错误 | 检查 timestamp、symbol 格式是否正确 |
| 401 Unauthorized | 认证失败 | 100% 是 API Key 填写错误或过期,详见后文排查 |
| 403 Forbidden | 权限不足 | 确认套餐是否包含目标交易所的访问权限 |
| 404 Not Found | 资源不存在 | 检查 symbol 是否支持(如某些期货合约) |
| 429 Rate Limited | 请求过于频繁 | 这是最常见的生产环境错误,必须实现退避重试 |
5xx 服务端错误状态码
| 状态码 | 含义 | 我的处理经验 |
|---|---|---|
| 500 Internal Server Error | 服务端内部错误 | 短暂性错误,立即重试通常能成功 |
| 502 Bad Gateway | 网关错误 | 上游服务暂时不可用,等待后重试 |
| 503 Service Unavailable | 服务不可用 | 通常是维护或限流,等 30 秒后再试 |
| 504 Gateway Timeout | 网关超时 | 上游响应超时,短暂性错误 |
重试策略完整实现
根据我的实战经验,高频交易系统的重试策略需要考虑三个维度:指数退避(避免加重服务器负担)、最大重试次数(防止无限循环)、超时控制(避免长时间阻塞)。以下是基于 Python 的完整实现:
import requests
import time
import logging
from typing import Optional, Dict, Any
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
HolySheep Tardis API 配置
BASE_URL = "https://api.holysheep.ai/v1"
HOLYSHEEP_API_KEY = "YOUR_HOLYSHEEP_API_KEY" # 替换为你的 HolySheep API Key
class TardisAPIClient:
"""
HolySheep Tardis API 客户端,包含完整的错误处理与重试逻辑
"""
def __init__(self, api_key: str, base_url: str = BASE_URL):
self.api_key = api_key
self.base_url = base_url
self.session = self._create_session()
self.logger = logging.getLogger(__name__)
def _create_session(self) -> requests.Session:
"""创建带有重试机制的会话"""
session = requests.Session()
# 配置指数退避重试策略
# 429错误最多重试5次,5xx错误最多重试3次
retry_strategy = Retry(
total=5,
backoff_factor=1.0, # 退避时间:1s, 2s, 4s, 8s, 16s
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"],
raise_on_status=False
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
def _get_headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
def get_orderbook(self, exchange: str, symbol: str, limit: int = 100) -> Optional[Dict[str, Any]]:
"""
获取订单簿数据
参数:
exchange: 交易所名称 (binance, bybit, okx, deribit)
symbol: 交易对符号
limit: 深度档位数量
返回:
订单簿数据字典,失败返回 None
"""
url = f"{self.base_url}/orderbook"
params = {
"exchange": exchange,
"symbol": symbol,
"limit": limit
}
try:
response = self.session.get(
url,
headers=self._get_headers(),
params=params,
timeout=30 # 单次请求超时30秒
)
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
self.logger.warning(f"Rate limited,触发重试机制,当前时间: {time.time()}")
return None
elif response.status_code == 401:
self.logger.error("API Key 认证失败,请检查 Key 是否正确")
raise AuthenticationError("Invalid API Key")
else:
self.logger.error(f"请求失败: {response.status_code} - {response.text}")
return None
except requests.exceptions.Timeout:
self.logger.error("请求超时 30 秒")
return None
except requests.exceptions.ConnectionError as e:
self.logger.error(f"连接错误: {str(e)}")
return None
except Exception as e:
self.logger.error(f"未知错误: {str(e)}")
return None
class AuthenticationError(Exception):
"""认证错误异常"""
pass
使用示例
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
client = TardisAPIClient(api_key=HOLYSHEEP_API_KEY)
# 获取 Binance BTC/USDT 永续合约订单簿
result = client.get_orderbook(
exchange="binance",
symbol="BTCUSDT",
limit=50
)
if result:
print(f"成功获取订单簿,bids数量: {len(result.get('bids', []))}")
实际生产环境:异步高频数据采集器
对于需要同时采集多个交易对、多个交易所数据的量化系统,异步架构是必须的。以下是我在生产环境中使用的完整实现:
import asyncio
import aiohttp
from aiohttp import ClientTimeout
import logging
from dataclasses import dataclass
from typing import List, Dict, Optional
import json
@dataclass
class TradeData:
"""成交数据结构"""
exchange: str
symbol: str
price: float
quantity: float
side: str
timestamp: int
class AsyncTardisCollector:
"""
异步 HolySheep Tardis API 数据采集器
支持并发请求,自动重试,连接池管理
"""
def __init__(
self,
api_key: str,
base_url: str = "https://api.holysheep.ai/v1",
max_concurrent: int = 10,
timeout: int = 30
):
self.api_key = api_key
self.base_url = base_url
self.max_concurrent = max_concurrent
self.timeout = timeout
self.logger = logging.getLogger(__name__)
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
"""获取或创建 aiohttp 会话"""
if self._session is None or self._session.closed:
timeout = ClientTimeout(total=self.timeout)
connector = aiohttp.TCPConnector(
limit=self.max_concurrent,
limit_per_host=5
)
self._session = aiohttp.ClientSession(
timeout=timeout,
connector=connector
)
return self._session
async def fetch_trades(
self,
exchange: str,
symbol: str,
start_time: int,
end_time: int,
max_retries: int = 3
) -> List[TradeData]:
"""
异步获取成交数据,带自动重试
参数:
exchange: 交易所
symbol: 交易对
start_time: 开始时间戳(毫秒)
end_time: 结束时间戳(毫秒)
max_retries: 最大重试次数
返回:
成交数据列表
"""
url = f"{self.base_url}/trades"
headers = {"Authorization": f"Bearer {self.api_key}"}
params = {
"exchange": exchange,
"symbol": symbol,
"start_time": start_time,
"end_time": end_time
}
for attempt in range(max_retries):
try:
session = await self._get_session()
async with session.get(url, headers=headers, params=params) as resp:
if resp.status == 200:
data = await resp.json()
return [
TradeData(
exchange=exchange,
symbol=symbol,
price=float(t["price"]),
quantity=float(t["quantity"]),
side=t.get("side", "buy"),
timestamp=t["timestamp"]
)
for t in data.get("trades", [])
]
elif resp.status == 429:
# Rate limited,指数退避
wait_time = 2 ** attempt
self.logger.warning(
f"Rate limited,{wait_time}秒后重试 (尝试 {attempt + 1}/{max_retries})"
)
await asyncio.sleep(wait_time)
elif resp.status == 401:
self.logger.error("认证失败,检查 API Key")
raise PermissionError("Invalid API Key")
else:
error_text = await resp.text()
self.logger.error(f"请求失败 {resp.status}: {error_text}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
else:
return []
except asyncio.TimeoutError:
self.logger.warning(f"请求超时,重试中 ({attempt + 1}/{max_retries})")
await asyncio.sleep(2 ** attempt)
except aiohttp.ClientError as e:
self.logger.error(f"连接错误: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
else:
return []
return []
async def batch_fetch_orderbook(
self,
symbols: List[tuple]
) -> Dict[str, Dict]:
"""
批量获取多个交易对的订单簿
参数:
symbols: [(exchange, symbol), ...]
返回:
{f"{exchange}:{symbol}": orderbook_data}
"""
tasks = [
self._fetch_orderbook_single(exchange, symbol)
for exchange, symbol in symbols
]
results = await asyncio.gather(*tasks, return_exceptions=True)
orderbooks = {}
for i, result in enumerate(results):
exchange, symbol = symbols[i]
key = f"{exchange}:{symbol}"
if isinstance(result, Exception):
self.logger.error(f"获取 {key} 失败: {result}")
orderbooks[key] = None
else:
orderbooks[key] = result
return orderbooks
async def _fetch_orderbook_single(
self,
exchange: str,
symbol: str
) -> Optional[Dict]:
"""获取单个交易对订单簿"""
url = f"{self.base_url}/orderbook"
headers = {"Authorization": f"Bearer {self.api_key}"}
params = {"exchange": exchange, "symbol": symbol, "limit": 100}
try:
session = await self._get_session()
async with session.get(url, headers=headers, params=params) as resp:
if resp.status == 200:
return await resp.json()
elif resp.status == 429:
self.logger.warning(f"订单簿 {exchange}:{symbol} 被限流")
return None
else:
self.logger.error(f"订单簿请求失败: {resp.status}")
return None
except Exception as e:
self.logger.error(f"获取 {exchange}:{symbol} 异常: {e}")
return None
async def close(self):
"""关闭会话"""
if self._session and not self._session.closed:
await self._session.close()
生产环境使用示例
async def main():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
collector = AsyncTardisCollector(
api_key="YOUR_HOLYSHEEP_API_KEY",
max_concurrent=20,
timeout=30
)
try:
# 监控 Binance、Bybit、OKX 主流币种
symbols = [
("binance", "BTCUSDT"),
("binance", "ETHUSDT"),
("bybit", "BTCUSDT"),
("okx", "BTC-USDT-SWAP"),
]
# 批量获取订单簿数据
orderbooks = await collector.batch_fetch_orderbook(symbols)
for key, data in orderbooks.items():
if data:
bids = data.get("bids", [])
asks = data.get("asks", [])
if bids and asks:
spread = float(asks[0][0]) - float(bids[0][0])
print(f"{key}: 买一 {bids[0][0]}, 卖一 {asks[0][0]}, 价差 {spread}")
# 获取最近1小时的成交数据
end_time = int(asyncio.get_event_loop().time() * 1000)
start_time = end_time - 3600 * 1000
trades = await collector.fetch_trades(
exchange="binance",
symbol="BTCUSDT",
start_time=start_time,
end_time=end_time
)
print(f"获取成交记录 {len(trades)} 条")
finally:
await collector.close()
if __name__ == "__main__":
asyncio.run(main())
常见错误与解决方案
在过去的 18 个月里,我处理过超过 200 次 API 调用错误。以下是出现频率最高的 3 种错误,以及经过生产环境验证的解决方案。
错误 1:401 Unauthorized - 认证失败
报错信息:
HTTPError: 401 Client Error: Unauthorized
Response: {"error": "Invalid API key", "message": "Authentication failed"}
原因分析:这是最容易排查但也最常犯的错误。我第一次遇到时,检查了 30 分钟才发现是复制粘贴时多了一个空格。
解决方案:
# ❌ 常见错误写法
headers = {
"Authorization": f"Bearer {api_key} " # 末尾多了空格!
}
✅ 正确写法
headers = {
"Authorization": f"Bearer {api_key.strip()}" # 去除首尾空白
}
验证 Key 是否正确配置
import requests
def verify_api_key(api_key: str) -> bool:
"""验证 HolySheep API Key 是否有效"""
url = "https://api.holysheep.ai/v1/user/balance"
headers = {"Authorization": f"Bearer {api_key}"}
try:
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
print(f"余额: {data.get('balance')} USDT")
return True
elif response.status_code == 401:
print("API Key 无效,请到 https://www.holysheep.ai/register 检查")
return False
else:
print(f"其他错误: {response.status_code}")
return False
except Exception as e:
print(f"验证请求失败: {e}")
return False
使用
is_valid = verify_api_key("YOUR_HOLYSHEEP_API_KEY")
错误 2:429 Rate Limited - 请求过于频繁
报错信息:
HTTPError: 429 Client Error: Too Many Requests
Response: {"error": "Rate limit exceeded", "retry_after": 5}
Headers: X-RateLimit-Remaining: 0, X-RateLimit-Reset: 1735689600
原因分析:HolySheep Tardis API 的速率限制根据套餐不同而变化。我在测试环境没有注意,结果在生产环境中被限流了整整 5 分钟。
解决方案:
import time
import threading
from collections import deque
from typing import Optional
class RateLimiter:
"""
令牌桶限流器 - 更平滑的限流控制
"""
def __init__(self, requests_per_second: float = 10):
self.rate = requests_per_second
self.interval = 1.0 / requests_per_second
self.last_check = 0
self.lock = threading.Lock()
def wait(self):
"""等待直到可以发送下一个请求"""
with self.lock:
now = time.time()
wait_time = self.last_check + self.interval - now
if wait_time > 0:
time.sleep(wait_time)
self.last_check = time.time()
class HolySheepTardisClient:
"""
带限流控制的 HolySheep Tardis API 客户端
"""
def __init__(
self,
api_key: str,
requests_per_second: float = 10
):
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.rate_limiter = RateLimiter(requests_per_second)
self.session = requests.Session()
def _make_request(self, method: str, endpoint: str, **kwargs) -> Optional[dict]:
"""带重试和限流的请求方法"""
max_retries = 3
for attempt in range(max_retries):
try:
# 限流等待
self.rate_limiter.wait()
url = f"{self.base_url}{endpoint}"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
response = self.session.request(
method,
url,
headers=headers,
timeout=30,
**kwargs
)
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
# 从响应头获取重试时间
retry_after = int(response.headers.get("Retry-After", 60))
print(f"触发限流,等待 {retry_after} 秒...")
time.sleep(min(retry_after, 60)) # 最多等待60秒
continue
else:
print(f"请求失败: {response.status_code}")
return None
except requests.exceptions.Timeout:
print(f"请求超时 (尝试 {attempt + 1}/{max_retries})")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 指数退避
except requests.exceptions.RequestException as e:
print(f"连接错误: {e}")
time.sleep(2 ** attempt)
return None
def get_trades(self, exchange: str, symbol: str) -> Optional[dict]:
"""获取成交数据"""
return self._make_request(
"GET",
"/trades",
params={"exchange": exchange, "symbol": symbol}
)
使用示例
client = HolySheepTardisClient(
api_key="YOUR_HOLYSHEEP_API_KEY",
requests_per_second=10 # 每秒最多10个请求
)
安全获取数据
for _ in range(100):
data = client.get_trades("binance", "BTCUSDT")
if data:
print(f"获取成功: {len(data.get('trades', []))} 条记录")
错误 3:ConnectionError: timeout - 网络连接超时
报错信息:
ConnectionError: HTTPSConnectionPool(host='api.holysheep.ai', port=443): Max retries exceeded with url: /v1/orderbook (Caused by ConnectTimeoutError(<urllib3.connection.VerifiedHTTPSConnection object...>, 'Connection timed out after 30000ms'))原因分析:这个问题在国内服务器上特别常见,通常是 DNS 解析慢、网络抖动或 HolySheep 服务器短暂不可用导致的。
解决方案:
import socket import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import dns.resolver方法1:使用高质量 DNS 解析(国内服务器推荐)
def configure_dns(): """配置快速 DNS 解析""" dns.resolver.default_resolver = dns.resolver.Resolver(configure=False) dns.resolver.default_resolver.nameservers = [ '223.5.5.5', # 阿里云 DNS '119.29.29.29', # 腾讯云 DNS '8.8.8.8' # Google DNS(备用) ]方法2:设置连接池和超时参数
def create_robust_session() -> requests.Session: """ 创建健壮的 HTTP 会话 针对国内网络优化:更长的超时、更快的失败检测 """ session = requests.Session() # 配置适配器 adapter = HTTPAdapter( pool_connections=10, pool_maxsize=20, max_retries=Retry( total=5, backoff_factor=2, status_forcelist=[500, 502, 503, 504], allowed_methods=["GET", "POST"], raise_on_status=False ), pool_block=False ) session.mount("https://", adapter) session.mount("http://", adapter) return session def make_robust_request(url: str, api_key: str) -> Optional[dict]: """ 健壮的请求方法 特点:双超时(连接+读取)、自动重试、详细错误日志 """ session = create_robust_session() # 双超时配置:5秒连接超时 + 30秒读取超时 timeout = (5, 30) headers = { "Authorization": f"Bearer {api_key}", "User-Agent": "HolySheep-Tardis-Client/1.0" } try: response = session.get( url, headers=headers, timeout=timeout ) response.raise_for_status() return response.json() except requests.exceptions.Timeout: print("❌ 连接超时(5秒连接/30秒读取)") print("建议:检查网络连接或更换代理") return None except requests.exceptions.ConnectTimeout: print("❌ 无法连接到服务器") print("建议:确认 api.holysheep.ai 可达性") return None except requests.exceptions.ConnectionError as e: print(f"❌ 连接错误: {e}") print("建议:检查防火墙/代理设置") return None except requests.exceptions.HTTPError as e: print(f"❌ HTTP 错误: {e}") return None except Exception as e: print(f"❌ 未知错误: {e}") return None使用示例
configure_dns() # 先配置 DNS result = make_robust_request( "https://api.holysheep.ai/v1/orderbook?exchange=binance&symbol=BTCUSDT", "YOUR_HOLYSHEEP_API_KEY" ) if result: print(f"✅ 数据获取成功")适合谁与不适合谁
根据我的使用经验,HolySheep Tardis API 适合以下场景,但也有一些情况下需要考虑其他方案。
| 适合的场景 | 不适合的场景 |
|---|---|
| ✅ 国内量化交易系统,需要低延迟数据 | ❌ 需要访问非主流交易所数据 |
| ✅ 高频交易策略,需要毫秒级数据 | ❌ 日线级别数据研究,不需要实时性 |
| ✅ 多交易所套利,需要聚合 Binance/Bybit/OKX 数据 | ❌ 预算极度有限,延迟要求不高 |
| ✅ 加密货币数据分析、因子挖掘 | ❌ 需要 Tick-by-Tick 原始数据(需要专业数据源) |
| ✅ 合约强平/资金费率监控 | ❌ 仅需要现货市场数据 |
价格与回本测算
作为实测过多个数据源的量化交易者,我来帮你算一笔账。
| 对比项 | 直接使用 Tardis 官方 | 通过 HolySheep 中转 |
|---|---|---|
| 汇率 | ¥7.3 = $1(官方汇率) | ¥1 = $1(无损汇率) |
| $100 额度的实际成本 | ¥730 | ¥100 |
| 节省比例 | - | 节省 86% |
| 充值方式 | 仅信用卡/PayPal | 微信/支付宝/银行卡 |
| 国内延迟 | 200-500ms | <50ms |
| API 格式 | 原生格式 | 兼容主流格式,快速接入 |
| 客服 | 邮件响应慢 | 中文即时支持 |
以一个月交易量 $500 额度的高频策略为例:
- 直接用 Tardis:¥500 × 7.3 = ¥3,650/月
- 通过 HolySheep:¥500/月(汇率无损)
- 直接节省:¥3,150/月,一年节省 ¥37,800
这个节省金额足够覆盖一台高性能服务器的费用,还能省下调试时间——对于资金有限的个人量化交易者来说,这可能是决定策略能否跑起来的关键。
为什么选 HolySheep
作为一个踩过坑的过来人,我选择 HolySheep 有以下几个核心原因:
1. 国内直连,延迟低于 50ms
这是我最看重的指标。在高频交易中,50ms 和 300ms 的差距可能就是盈利和亏损的区别。之前用海外节点,每次订单簿更新都慢半拍,套利机会早就没了。
2. 汇率无损,节省超过 85%
之前用 Tardis 官方,每次充值都要被汇率割一刀。¥730 才能换 $100,实际上我的 $100 额度只值 ¥100。通过 HolySheep 充值,直接省掉了这 86% 的汇率损耗。
3. 微信/支付宝直充,无需信用卡
对于没有外币信用卡的个人开发者来说,这是刚需。我之前为了充值 Tardis,还专门去办了双币种信用卡,现在直接用微信转账,方便太多。
4. 注册送免费额度
注册即送免费额度,可以先体验再决定。我就是先用免费额度测试了 Order Book 和 Trades 接口,确认延迟和数据质量符合要求后才付费的。
5. 2026 主流模型价格优势
除了 Tardis 加密货币数据,HolySheep 还提供主流 LLM API 中转,价格优势明显:
| 模型 | 官方价格 | HolySheep 价格 | 节省 |
|---|---|---|---|
| GPT-4.1 | $8.00/MTok | 更低 | >85% |
| Claude Sonnet 4.5 | $15.00/MTok | 更低 | >85% |
| Gemini 2.5 Flash | $2.50/MTok | 更低 | >85% |
| DeepSeek V3.2 | $0.42/MTok | 更低 | >85% |
一个账号同时解决数据 + AI 需求,运维成本降低。
实战建议:我的完整错误处理架构
最后分享我在生产环境中运行的完整错误处理架构,这套系统已经稳定运行 6 个月零故障:
"""
HolySheep Tardis API 完整错误处理架构
包含:熔断器 + 限流器 + 重试 + 降级
"""
import time
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Any, Optional
import logging
logger = logging.getLogger(__name__)
class CircuitState(Enum):
CLOSED = "closed" # 正常
OPEN = "open" # 熔断中
HALF_OPEN = "half_open" # 半开
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5 # 连续失败5次触发熔断
success_threshold: int = 3 # 半开后需要连续成功3次恢复
timeout: float = 60.0 # 熔断持续60秒
class CircuitBreaker:
"""
熔断器模式 - 防止雪崩效应
当连续失败次数过多时,自动"熔断"一段时间
"""
def __init__(self, config: CircuitBreakerConfig):
self.config = config
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0
def call(self, func: Callable, *args, **kwargs) -> Optional[Any]:
"""执行函数,带熔断保护"""
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.config.timeout:
self.state = CircuitState.HALF_OPEN
logger.info("熔断器进入半开状态")
else:
logger.warning("熔断器开启,拒绝请求")
return None
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
def _on_success(self):
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.config.success_threshold:
self.state = CircuitState.CLOSED
self.success_count = 0
logger.info("熔断器已恢复")
def _on_failure(self):
self.failure_count += 1
self.success_count = 0
self.last_failure_time = time.time()
if self.failure_count >= self.config.failure_threshold:
self