凌晨三点,我的手机突然震动。监控告警显示:当前 Token 消耗速率 127,000 tokens/min,是昨日同时段的 340%。登录 HolySheheep AI 控制台查看,账单已悄然燃烧了 $47。这不是 DDoS 攻击,而是一段死循环代码在疯狂调用 Chat Completions 接口。
这次事件后,我花了整整两周构建了一套 Token 用量异常检测系统。今天把完整方案分享给你,包含统计学模型、规则引擎、Python 实战代码,以及 HolySheep AI 的低成本接入实践。
为什么需要 Token 用量异常检测?
在我维护的智能客服系统中,API 费用曾两次失控:一次是 Prompt 长度没做截断导致单次请求消耗 8000 tokens;另一次是重试逻辑没有指数退避,失败后疯狂堆积请求。 HolySheheep AI 的汇率优势(¥1=$1,官方¥7.3=$1)虽然已经帮我省了 85% 成本,但如果用量失控,省下来的钱分分钟被反噬。
异常检测的核心价值:
- ✅ 实时发现 Token 暴增(可能存在 Prompt 泄漏或死循环)
- ✅ 识别 Token 骤降(接口异常、降级或被限流)
- ✅ 预测账单峰值,避免月底天价账单
- ✅ 捕获未授权调用(API Key 泄漏后的异常消费)
方案架构:统计模型 + 规则双引擎
我的检测系统采用双层架构:
- 统计模型层:基于历史数据自动学习正常消耗模式,适合检测渐进式异常
- 规则引擎层:人工定义硬性阈值,适合检测明确的红线违规
两者结果取并集,任何一层触发即告警。
一、统计模型实现
1.1 标准差异常检测
最基础但最有效的方案:计算过去 N 个时间窗口的平均值和标准差,超出 3σ 即告警。
import numpy as np
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional
import httpx
@dataclass
class TokenUsage:
timestamp: datetime
prompt_tokens: int
completion_tokens: int
total_tokens: int
class StatisticalAnomalyDetector:
"""基于统计学的 Token 用量异常检测器"""
def __init__(self, window_size: int = 60, threshold_sigma: float = 3.0):
"""
Args:
window_size: 时间窗口数量(每个窗口5分钟)
threshold_sigma: 标准差倍数,超过此值触发告警
"""
self.window_size = window_size
self.threshold_sigma = threshold_sigma
self.history: List[TokenUsage] = []
def add_usage(self, usage: TokenUsage):
"""添加新的用量数据点"""
self.history.append(usage)
# 保持滑动窗口大小
if len(self.history) > self.window_size * 2:
self.history = self.history[-self.window_size:]
def detect(self) -> Optional[dict]:
"""执行异常检测,返回告警信息或 None"""
if len(self.history) < 10:
return None # 数据不足,跳过检测
# 使用最近 window_size 个数据点
recent = self.history[-self.window_size:]
totals = np.array([u.total_tokens for u in recent])
mean = np.mean(totals)
std = np.std(totals)
# 当前值
current = totals[-1]
# 计算 Z-Score
if std > 0:
z_score = (current - mean) / std
else:
z_score = 0 if current == mean else float('inf')
# 异常判定
if abs(z_score) > self.threshold_sigma:
direction = "激增" if z_score > 0 else "骤降"
return {
"alert": True,
"type": direction,
"current_tokens": current,
"mean_tokens": round(mean, 2),
"std_tokens": round(std, 2),
"z_score": round(z_score, 2),
"severity": "HIGH" if abs(z_score) > 4 else "MEDIUM"
}
return None
使用示例
detector = StatisticalAnomalyDetector(window_size=60, threshold_sigma=3.0)
print("统计异常检测器初始化完成,60个窗口(5小时),3σ阈值")
1.2 指数加权移动平均 (EWMA) 检测
标准差检测对突发异常敏感,但响应较慢。EWMA 通过赋予近期数据更高权重,能更快捕捉趋势变化。
class EWMADetector:
"""指数加权移动平均异常检测器"""
def __init__(self, span: int = 12, threshold_ratio: float = 2.0):
"""
Args:
span: EWMA 跨度,值越小对近期变化越敏感
threshold_ratio: 当前值超过 EWMA 的多少倍触发告警
"""
self.span = span
self.threshold_ratio = threshold_ratio
self.ewma: Optional[float] = None
self.alpha = 2 / (span + 1)
self.history: List[float] = []
def update(self, value: float) -> Optional[dict]:
"""更新 EWMA 值并返回异常检测结果"""
self.history.append(value)
# 初始化 EWMA
if self.ewma is None:
self.ewma = value
return None
# 更新 EWMA: E_t = α * x_t + (1-α) * E_{t-1}
self.ewma = self.alpha * value + (1 - self.alpha) * self.ewma
# 异常检测:当前值超过 EWMA 的 threshold_ratio 倍
if self.ewma > 0 and value > self.ewma * self.threshold_ratio:
return {
"alert": True,
"current": value,
"ewma": round(self.ewma, 2),
"ratio": round(value / self.ewma, 2),
"type": "SUDDEN_SPIKE",
"message": f"Token消耗突增:当前{value},EWMA基准{round(self.ewma, 2)},比率{value/self.ewma:.1f}x"
}
return None
集成到实际监控系统
ewma_detector = EWMADetector(span=12, threshold_ratio=2.0)
print("EWMA检测器就绪,span=12,对2倍以上突增敏感")
1.3 增量变化率检测
我最喜欢用的补充检测器,专门捕获环比增长率异常。
class RateOfChangeDetector:
"""Token消耗环比增长率检测器"""
def __init__(self, min_samples: int = 5,
max_growth_rate: float = 1.5,
max_drop_rate: float = 0.5):
"""
Args:
min_samples: 最小样本数
max_growth_rate: 允许的最大增长率(150%)
max_drop_rate: 允许的最小保留率(50%)
"""
self.min_samples = min_samples
self.max_growth_rate = max_growth_rate
self.max_drop_rate = max_drop_rate
self.history: List[float] = []
def check(self, current_tokens: int) -> Optional[dict]:
"""检测增量变化率异常"""
self.history.append(current_tokens)
if len(self.history) < self.min_samples:
return None
# 计算最近 min_samples-1 个窗口的平均增长率
recent = self.history[-(self.min_samples):]
growth_rates = []
for i in range(1, len(recent)):
if recent[i-1] > 0:
rate = recent[i] / recent[i-1]
growth_rates.append(rate)
if not growth_rates:
return None
avg_growth_rate = np.mean(growth_rates)
# 检测异常增长
if avg_growth_rate > self.max_growth_rate:
return {
"alert": True,
"type": "EXCESSIVE_GROWTH",
"avg_growth_rate": round(avg_growth_rate, 2),
"threshold": self.max_growth_rate,
"severity": "HIGH"
}
# 检测异常下降(可能服务中断)
if avg_growth_rate < self.max_drop_rate:
return {
"alert": True,
"type": "SEVERE_DROP",
"avg_growth_rate": round(avg_growth_rate, 2),
"threshold": self.max_drop_rate,
"severity": "MEDIUM"
}
return None
print("增长率检测器配置完成,150%增长阈值,50%骤降阈值")
二、规则引擎实现
统计模型擅长捕捉"微妙"的异常,但对于明确的红线,我更信任规则引擎。以下是我生产环境中的规则配置:
from enum import Enum
from typing import Callable, Any
import yaml
class RuleType(Enum):
THRESHOLD = "threshold" # 绝对阈值
RATE = "rate" # 变化率
QUOTA = "quota" # 配额限制
TIME_WINDOW = "time_window" # 时间窗口累计
@dataclass
class AlertRule:
name: str
rule_type: RuleType
condition: Callable[[dict], bool]
severity: str
message_template: str
enabled: bool = True
class RuleEngine:
"""基于规则的用量检测引擎"""
def __init__(self):
self.rules: List[AlertRule] = []
self._init_default_rules()
def _init_default_rules(self):
"""初始化默认规则(可根据业务调整)"""
# 规则1:单分钟 Token 超过 100,000
self.rules.append(AlertRule(
name="单分钟用量超限",
rule_type=RuleType.THRESHOLD,
condition=lambda ctx: ctx.get("tokens_per_minute", 0) > 100_000,
severity="HIGH",
message_template="单分钟Token消耗{tokens_per_minute},超过阈值100,000"
))
# 规则2:单小时消耗超过 2,000,000 tokens
self.rules.append(AlertRule(
name="小时累计超限",
rule_type=RuleType.QUOTA,
condition=lambda ctx: ctx.get("tokens_per_hour", 0) > 2_000_000,
severity="HIGH",
message_template="小时累计消耗{tokens_per_hour},超过配额2,000,000"
))
# 规则3:单次请求 Token 超过 50,000(可能 Prompt 未截断)
self.rules.append(AlertRule(
name="单次请求量异常",
rule_type=RuleType.THRESHOLD,
condition=lambda ctx: ctx.get("tokens_per_request", 0) > 50_000,
severity="MEDIUM",
message_template="单次请求消耗{tokens_per_request},建议检查Prompt长度"
))
# 规则4:连续失败率超过 20%
self.rules.append(AlertRule(
name="失败率过高",
rule_type=RuleType.RATE,
condition=lambda ctx: ctx.get("failure_rate", 0) > 0.2,
severity="HIGH",
message_template="请求失败率{failure_rate:.1%},超过20%阈值"
))
# 规则5:每分钟请求数超过 500(防止频率滥用)
self.rules.append(AlertRule(
name="请求频率超限",
rule_type=RuleType.RATE,
condition=lambda ctx: ctx.get("requests_per_minute", 0) > 500,
severity="MEDIUM",
message_template="分钟请求数{requests_per_minute},超过500次阈值"
))
print(f"规则引擎初始化完成,共加载 {len(self.rules)} 条规则")
def evaluate(self, context: dict) -> List[dict]:
"""评估所有规则,返回触发的告警列表"""
alerts = []
for rule in self.rules:
if not rule.enabled:
continue
try:
if rule.condition(context):
alert = {
"rule_name": rule.name,
"severity": rule.severity,
"message": rule.message_template.format(**context)
}
alerts.append(alert)
except Exception as e:
print(f"规则 {rule.name} 执行出错: {e}")
return alerts
规则引擎实例
rule_engine = RuleEngine()
print("规则引擎已就绪,可通过 YAML 配置动态调整阈值")
三、HolySheheep AI 集成实战
在 HolySheheep AI 上构建监控系统有独特优势:¥1=$1 的汇率让我能用相同预算监控更多维度,国内直连 <50ms 的延迟让实时检测成为可能。
以下是与 HolySheheep AI API 深度集成的完整代码:
import asyncio
import json
from datetime import datetime
from collections import defaultdict
import hashlib
class HolySheepMonitor:
"""HolySheheep AI Token 用量实时监控器"""
def __init__(self, api_key: str, base_url: str = "https://api.holysheep.ai/v1"):
self.api_key = api_key
self.base_url = base_url
self.client = httpx.AsyncClient(
base_url=base_url,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
timeout=30.0
)
# 初始化检测器
self.stat_detector = StatisticalAnomalyDetector(window_size=60)
self.ewma_detector = EWMADetector(span=12)
self.roc_detector = RateOfChangeDetector(min_samples=5)
self.rule_engine = RuleEngine()
# 统计状态
self.minute_stats = defaultdict(int) # 每分钟 token 数
self.hour_stats = defaultdict(int) # 每小时 token 数
self.request_stats = [] # 单次请求统计
self.failed_requests = 0
self.total_requests = 0
# 告警回调
self.alert_callbacks = []
def on_alert(self, callback):
"""注册告警回调函数"""
self.alert_callbacks.append(callback)
async def _send_alert(self, alert: dict):
"""触发告警"""
alert["timestamp"] = datetime.now().isoformat()
for callback in self.alert_callbacks:
await callback(alert)
async def log_request(self, prompt_tokens: int, completion_tokens: int,
response_time_ms: float, success: bool = True):
"""记录一次 API 调用"""
total = prompt_tokens + completion_tokens
self.total_requests += 1
if not success:
self.failed_requests += 1
now = datetime.now()
minute_key = now.strftime("%Y%m%d%H%M")
hour_key = now.strftime("%Y%m%d%H")
self.minute_stats[minute_key] += total
self.hour_stats[hour_key] += total
self.request_stats.append({
"timestamp": now,
"tokens": total,
"prompt": prompt_tokens,
"completion": completion_tokens,
"latency_ms": response_time_ms
})
# 保持最近 2 小时数据
self._cleanup_old_stats()
# 执行多维度检测
await self._run_detection()
async def _run_detection(self):
"""运行所有检测器"""
now = datetime.now()
minute_key = now.strftime("%Y%m%d%H%M")
hour_key = now.strftime("%Y%m%d%H")
# 构建检测上下文
context = {
"tokens_per_minute": self.minute_stats.get(minute_key, 0),
"tokens_per_hour": self.hour_stats.get(hour_key, 0),
"tokens_per_request": self.request_stats[-1]["tokens"] if self.request_stats else 0,
"failure_rate": self.failed_requests / max(self.total_requests, 1),
"requests_per_minute": len([r for r in self.request_stats
if r["timestamp"].strftime("%Y%m%d%H%M") == minute_key])
}
# 1. 统计模型检测
current_usage = TokenUsage(
timestamp=now,
prompt_tokens=self.request_stats[-1]["prompt"],
completion_tokens=self.request_stats[-1]["completion"],
total_tokens=context["tokens_per_minute"]
)
stat_result = self.stat_detector.detect()
if stat_result and stat_result.get("alert"):
await self._send_alert({
**stat_result,
"source": "STATISTICAL_MODEL",
"rule": "STDEV_3SIGMA"
})
# 2. EWMA 检测
ewma_result = self.ewma_detector.update(context["tokens_per_minute"])
if ewma_result and ewma_result.get("alert"):
await self._send_alert({
**ewma_result,
"source": "STATISTICAL_MODEL",
"rule": "EWMA_RATIO"
})
# 3. 增长率检测
roc_result = self.roc_detector.check(context["tokens_per_minute"])
if roc_result and roc_result.get("alert"):
await self._send_alert({
**roc_result,
"source": "STATISTICAL_MODEL",
"rule": "RATE_OF_CHANGE"
})
# 4. 规则引擎检测
rule_alerts = self.rule_engine.evaluate(context)
for alert in rule_alerts:
await self._send_alert({
**alert,
"source": "RULE_ENGINE"
})
def _cleanup_old_stats(self):
"""清理过期统计数据"""
now = datetime.now()
cutoff_minute = now.timestamp() - 7200 # 2小时前
# 清理 minute_stats
expired_keys = [k for k, v in self.minute_stats.items()
if datetime.strptime(k, "%Y%m%d%H%M").timestamp() < cutoff_minute]
for k in expired_keys:
del self.minute_stats[k]
# 清理 hour_stats
cutoff_hour = now.timestamp() - 86400 # 24小时前
expired_hour_keys = [k for k in self.hour_stats.keys()
if datetime.strptime(k, "%Y%m%d%H").timestamp() < cutoff_hour]
for k in expired_hour_keys:
del self.hour_stats[k]
# 清理 request_stats
self.request_stats = [r for r in self.request_stats
if r["timestamp"].timestamp() > cutoff_minute]
async def close(self):
await self.client.aclose()
使用示例
async def main():
# 初始化监控器(使用你的 HolySheheep API Key)
monitor = HolySheheepMonitor(
api_key="YOUR_HOLYSHEEP_API_KEY",
base_url="https://api.holysheep.ai/v1"
)
# 注册告警回调
async def handle_alert(alert):
print(f"🚨 告警触发: {json.dumps(alert, ensure_ascii=False, indent=2)}")
# 这里可以接入钉钉/企微/飞书通知
# await send_dingtalk(alert)
monitor.on_alert(handle_alert)
# 模拟 API 调用记录
await monitor.log_request(
prompt_tokens=500,
completion_tokens=150,
response_time_ms=45.3,
success=True
)
print("✅ HolySheheep AI 监控器初始化完成")
print(" - 国内直连延迟: <50ms")
print(" - 支持 ¥1=$1 低成本监控")
asyncio.run(main())
四、封装成通用中间件
我更推荐将监控封装成 OpenAI SDK 中间件,这样所有 API 调用自动被监控,无需修改业务代码:
from typing import Optional, Dict, Any, List, Iterator
import time
import logging
class HolySheepMonitoringMiddleware:
"""
HolySheheep AI 监控中间件
自动拦截所有 API 调用并记录 Token 消耗
"""
def __init__(self, api_key: str, monitor: HolySheepMonitor):
self.api_key = api_key
self.monitor = monitor
self.logger = logging.getLogger("HolySheepMonitor")
async def chat_completion(self, messages: List[dict],
model: str = "gpt-4o",
**kwargs) -> Dict[str, Any]:
"""包装 Chat Completions 调用"""
start_time = time.time()
try:
response = await self.monitor.client.post(
"/chat/completions",
json={
"model": model,
"messages": messages,
**kwargs
}
)
elapsed_ms = (time.time() - start_time) * 1000
response.raise_for_status()
data = response.json()
# 提取 Token 消耗
usage = data.get("usage", {})
prompt_tokens = usage.get("prompt_tokens", 0)
completion_tokens = usage.get("completion_tokens", 0)
# 记录到监控器
await self.monitor.log_request(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
response_time_ms=elapsed_ms,
success=True
)
self.logger.info(
f"ChatCompletion | model={model} | "
f"prompt={prompt_tokens} | completion={completion_tokens} | "
f"latency={elapsed_ms:.0f}ms"
)
return data
except httpx.HTTPStatusError as e:
elapsed_ms = (time.time() - start_time) * 1000
await self.monitor.log_request(
prompt_tokens=0,
completion_tokens=0,
response_time_ms=elapsed_ms,
success=False
)
self.logger.error(f"API调用失败: {e.response.status_code}")
raise
快速接入示例
monitor = HolySheepMonitor(api_key="YOUR_HOLYSHEEP_API_KEY")
middleware = HolySheepMonitoringMiddleware(
api_key="YOUR_HOLYSHEEP_API_KEY",
monitor=monitor
)
使用方式与标准 OpenAI SDK 完全一致
messages = [{"role": "user", "content": "你好"}]
response = await middleware.chat_completion(messages, model="gpt-4o")
print("中间件封装完成,零侵入接入现有项目")
常见报错排查
在我部署这套监控系统的过程中,遇到了几个典型问题,记录下来供你参考:
报错 1:401 Unauthorized - API Key 验证失败
这是最常见的错误,通常是 API Key 格式或权限问题。
# ❌ 错误写法
headers = {"Authorization": "YOUR_HOLYSHEEP_API_KEY"} # 缺少 Bearer 前缀
✅ 正确写法
headers = {"Authorization": f"Bearer {api_key}"}
解决方案:
# 完整的请求头配置
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
验证 Key 是否有效
import httpx
async def verify_api_key(api_key: str) -> bool:
try:
response = await httpx.AsyncClient().post(
"https://api.holysheep.ai/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json={
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "test"}],
"max_tokens": 5
},
timeout=10.0
)
return response.status_code == 200
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
print("❌ API Key 无效或已过期")
return False
raise
调用验证
is_valid = asyncio.run(verify_api_key("YOUR_HOLYSHEEP_API_KEY"))
print(f"API Key 验证结果: {'✅ 有效' if is_valid else '❌ 无效'}")
报错 2:ConnectionError: timeout - 国内访问超时
从国内直接访问部分 API 服务可能出现连接超时。
# ❌ 默认超时设置导致长等待
client = httpx.AsyncClient(timeout=30.0) # 实际可能需要更短
✅ 针对国内网络优化超时配置
client = httpx.AsyncClient(
base_url="https://api.holysheep.ai/v1",
timeout=httpx.Timeout(
connect=5.0, # 连接超时 5 秒
read=30.0, # 读取超时 30 秒
write=10.0, # 写入超时 10 秒
pool=10.0 # 连接池超时 10 秒
),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
)
✅ 添加重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def resilient_request(url: str, **kwargs):
try:
response = await client.post(url, **kwargs)
return response
except httpx.TimeoutException:
print("⏰ 请求超时,触发重试...")
raise
except httpx.ConnectError as e:
print(f"🔌 连接失败: {e}")
raise
print("超时配置优化完成,建议使用 HolySheheep AI 国内直连 <50ms")
报错 3:QuotaExceededError - 超出配额限制
Token 消耗超限时返回此错误,需要立即检查用量并可能需要升级套餐。
# 监控配额使用情况
async def check_quota_usage(api_key: str):
"""检查当前配额使用情况"""
try:
response = await client.get(
"/usage/current",
headers={"Authorization": f"Bearer {api_key}"}
)
if response.status_code == 200:
data = response.json()
return {
"total_tokens": data.get("total_tokens", 0),
"used_tokens": data.get("used_tokens", 0),
"remaining_tokens": data.get("remaining_tokens", 0),
"usage_ratio": data.get("used_tokens", 0) / max(data.get("total_tokens", 1), 1)
}
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
# 某些 API 可能不支持此端点
return None
raise
主动降级策略
async def chat_with_fallback(messages: List[dict], model: str):
"""带降级策略的聊天接口"""
# 按价格排序的降级路径
# GPT-4.1 $8 → Gemini 2.5 Flash $2.50 → DeepSeek V3.2 $0.42
fallback_models = {
"gpt-4.1": ["gemini-2.5-flash", "deepseek-v3.2"],
"claude-sonnet-4.5": ["gemini-2.5-flash", "deepseek-v3.2"],
"gpt-4o": ["gpt-4o-mini", "deepseek-v3.2"]
}
try:
response = await client.post(
"/chat/completions",
json={"model": model, "messages": messages}
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 429: # Rate limit
# 检查是否有降级路径
fallbacks = fallback_models.get(model, [])
for fallback in fallbacks:
try:
print(f"⚠️ {model} 超出配额,尝试降级到 {fallback}")
response = await client.post(
"/chat/completions",
json={"model": fallback, "messages": messages}
)
return response.json()
except:
continue
print("❌ 所有模型均超出配额")
raise
except Exception as e:
print(f"❌ 请求异常: {e}")
raise
print("配额检查和降级策略已配置")
报错 4:数据漂移导致误报
统计模型可能因为业务正常波动(如营销活动)产生误报。
# 解决方案:引入人工标注的"白名单时段"
@dataclass
class WhitelistWindow:
start_hour: int
end_hour: int
reason: str
multiplier: float = 1.0 # 临时放宽阈值倍数
class AdaptiveDetector(StatisticalAnomalyDetector):
"""自适应异常检测器,支持白名单和阈值动态调整"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.whitelist: List[WhitelistWindow] = []
self.baseline_multiplier: float = 1.0
def add_whitelist(self, window: WhitelistWindow):
"""添加白名单时段"""
self.whitelist.append(window)
def is_whitelisted(self) -> bool:
"""检查当前是否在白名单时段"""
now = datetime.now()
current_hour = now.hour
for window in self.whitelist:
if window.start_hour <= current_hour < window.end_hour:
return True
return False
def detect(self) -> Optional[dict]:
"""自适应异常检测"""
# 白名单时段内放宽阈值
if self.is_whitelisted():
original_threshold = self.threshold_sigma
self.threshold_sigma *= 2.0 # 白名单时段放宽 2 倍
result = super().detect()
self.threshold_sigma = original_threshold
return result
return super().detect()
使用示例:添加促销活动的白名单
detector = AdaptiveDetector(window_size=60, threshold_sigma=3.0)
detector.add_whitelist(WhitelistWindow(
start_hour=10, # 上午10点
end_hour=12, # 到中午12点
reason="双11促销活动",
multiplier=1.5
))
print("自适应检测器已启用,自动识别白名单时段避免误报")
五、监控大屏与告警配置
检测到异常后,还需要有效的告警渠道。以下是我使用的告警架构:
import aiohttp
from enum import Enum
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
class AlertDispatcher:
"""告警分发器,支持多渠道通知"""
def __init__(self):
self.handlers = {}
def register(self, channel: str, handler):
self.handlers[channel] = handler
async def dispatch(self, alert: dict, level: AlertLevel):
"""分发告警到各渠道"""
level_str = level.value
for channel, handler in self.handlers.items():
try:
await handler(alert, level_str)
except Exception as e:
print(f"渠道 {channel} 发送失败: {e}")
async def send_dingtalk(self, webhook: str, alert: dict, level: str):
"""发送钉钉群消息"""
message = {
"msgtype": "markdown",
"markdown": {
"title": f"🚨 HolySheheep AI 告警 [{level.upper()}]",
"text": f"""## Token 用量异常告警
**告警时间**: {alert.get('timestamp')}
**告警来源**: {alert.get('source')} / {alert.get('rule_name', alert.get('rule', 'N/A'))}
**严重程度**: {alert.get('severity', level.upper())}
**详情**:
- 当前 Token: {alert.get('current_tokens', alert.get('current', 'N/A'))}
- 平均基准: {alert.get('mean_tokens', alert.get('ewma', 'N/A'))}
- Z-Score: {alert.get('z_score', 'N/A')}
**处理建议**:
1. 检查是否存在异常调用
2. 审查最近部署的代码变更
3. 联系 HolySheheep AI 支持: https://www.holysheep.ai/support
"""
}
}
async with aiohttp.ClientSession() as session:
await session.post(webhook, json=message)
async def send_feishu(self, webhook: str, alert: dict, level: str):
"""发送飞书消息"""
color = {"critical": "red", "error": "orange", "warning": "yellow"}.get(level, "grey")
message = {
"msg_type": "interactive",
"card": {
"header": {
"title": {"tag": "plain_text", "content": f"🚨 HolySheheep AI 告警"},
"template": color
},
"elements": [
{"tag": "div", "text": {"tag": "lark_md",
"content": f"**告警来源**: {alert.get('source')}\n"
f"**详情**: {alert.get('message', alert.get('type'))}"}}
]
}
}
async with aiohttp.ClientSession() as session:
await session.post(webhook, json=message)
配置告警
dispatcher = AlertDispatcher()
dispatcher.register("dingtalk",
lambda a, l: dispatcher.send_dingtalk("YOUR