凌晨三点,我的手机突然震动。监控告警显示:当前 Token 消耗速率 127,000 tokens/min,是昨日同时段的 340%。登录 HolySheheep AI 控制台查看,账单已悄然燃烧了 $47。这不是 DDoS 攻击,而是一段死循环代码在疯狂调用 Chat Completions 接口。

这次事件后,我花了整整两周构建了一套 Token 用量异常检测系统。今天把完整方案分享给你,包含统计学模型、规则引擎、Python 实战代码,以及 HolySheep AI 的低成本接入实践。

为什么需要 Token 用量异常检测?

在我维护的智能客服系统中,API 费用曾两次失控:一次是 Prompt 长度没做截断导致单次请求消耗 8000 tokens;另一次是重试逻辑没有指数退避,失败后疯狂堆积请求。 HolySheheep AI 的汇率优势(¥1=$1,官方¥7.3=$1)虽然已经帮我省了 85% 成本,但如果用量失控,省下来的钱分分钟被反噬。

异常检测的核心价值:

方案架构:统计模型 + 规则双引擎

我的检测系统采用双层架构:

两者结果取并集,任何一层触发即告警。

一、统计模型实现

1.1 标准差异常检测

最基础但最有效的方案:计算过去 N 个时间窗口的平均值和标准差,超出 3σ 即告警。

import numpy as np
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional
import httpx

@dataclass
class TokenUsage:
    timestamp: datetime
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int

class StatisticalAnomalyDetector:
    """基于统计学的 Token 用量异常检测器"""
    
    def __init__(self, window_size: int = 60, threshold_sigma: float = 3.0):
        """
        Args:
            window_size: 时间窗口数量(每个窗口5分钟)
            threshold_sigma: 标准差倍数,超过此值触发告警
        """
        self.window_size = window_size
        self.threshold_sigma = threshold_sigma
        self.history: List[TokenUsage] = []
    
    def add_usage(self, usage: TokenUsage):
        """添加新的用量数据点"""
        self.history.append(usage)
        # 保持滑动窗口大小
        if len(self.history) > self.window_size * 2:
            self.history = self.history[-self.window_size:]
    
    def detect(self) -> Optional[dict]:
        """执行异常检测,返回告警信息或 None"""
        if len(self.history) < 10:
            return None  # 数据不足,跳过检测
        
        # 使用最近 window_size 个数据点
        recent = self.history[-self.window_size:]
        totals = np.array([u.total_tokens for u in recent])
        
        mean = np.mean(totals)
        std = np.std(totals)
        
        # 当前值
        current = totals[-1]
        
        # 计算 Z-Score
        if std > 0:
            z_score = (current - mean) / std
        else:
            z_score = 0 if current == mean else float('inf')
        
        # 异常判定
        if abs(z_score) > self.threshold_sigma:
            direction = "激增" if z_score > 0 else "骤降"
            return {
                "alert": True,
                "type": direction,
                "current_tokens": current,
                "mean_tokens": round(mean, 2),
                "std_tokens": round(std, 2),
                "z_score": round(z_score, 2),
                "severity": "HIGH" if abs(z_score) > 4 else "MEDIUM"
            }
        
        return None

使用示例

detector = StatisticalAnomalyDetector(window_size=60, threshold_sigma=3.0) print("统计异常检测器初始化完成,60个窗口(5小时),3σ阈值")

1.2 指数加权移动平均 (EWMA) 检测

标准差检测对突发异常敏感,但响应较慢。EWMA 通过赋予近期数据更高权重,能更快捕捉趋势变化。

class EWMADetector:
    """指数加权移动平均异常检测器"""
    
    def __init__(self, span: int = 12, threshold_ratio: float = 2.0):
        """
        Args:
            span: EWMA 跨度,值越小对近期变化越敏感
            threshold_ratio: 当前值超过 EWMA 的多少倍触发告警
        """
        self.span = span
        self.threshold_ratio = threshold_ratio
        self.ewma: Optional[float] = None
        self.alpha = 2 / (span + 1)
        self.history: List[float] = []
    
    def update(self, value: float) -> Optional[dict]:
        """更新 EWMA 值并返回异常检测结果"""
        self.history.append(value)
        
        # 初始化 EWMA
        if self.ewma is None:
            self.ewma = value
            return None
        
        # 更新 EWMA: E_t = α * x_t + (1-α) * E_{t-1}
        self.ewma = self.alpha * value + (1 - self.alpha) * self.ewma
        
        # 异常检测:当前值超过 EWMA 的 threshold_ratio 倍
        if self.ewma > 0 and value > self.ewma * self.threshold_ratio:
            return {
                "alert": True,
                "current": value,
                "ewma": round(self.ewma, 2),
                "ratio": round(value / self.ewma, 2),
                "type": "SUDDEN_SPIKE",
                "message": f"Token消耗突增:当前{value},EWMA基准{round(self.ewma, 2)},比率{value/self.ewma:.1f}x"
            }
        
        return None

集成到实际监控系统

ewma_detector = EWMADetector(span=12, threshold_ratio=2.0) print("EWMA检测器就绪,span=12,对2倍以上突增敏感")

1.3 增量变化率检测

我最喜欢用的补充检测器,专门捕获环比增长率异常。

class RateOfChangeDetector:
    """Token消耗环比增长率检测器"""
    
    def __init__(self, min_samples: int = 5, 
                 max_growth_rate: float = 1.5,
                 max_drop_rate: float = 0.5):
        """
        Args:
            min_samples: 最小样本数
            max_growth_rate: 允许的最大增长率(150%)
            max_drop_rate: 允许的最小保留率(50%)
        """
        self.min_samples = min_samples
        self.max_growth_rate = max_growth_rate
        self.max_drop_rate = max_drop_rate
        self.history: List[float] = []
    
    def check(self, current_tokens: int) -> Optional[dict]:
        """检测增量变化率异常"""
        self.history.append(current_tokens)
        
        if len(self.history) < self.min_samples:
            return None
        
        # 计算最近 min_samples-1 个窗口的平均增长率
        recent = self.history[-(self.min_samples):]
        growth_rates = []
        
        for i in range(1, len(recent)):
            if recent[i-1] > 0:
                rate = recent[i] / recent[i-1]
                growth_rates.append(rate)
        
        if not growth_rates:
            return None
        
        avg_growth_rate = np.mean(growth_rates)
        
        # 检测异常增长
        if avg_growth_rate > self.max_growth_rate:
            return {
                "alert": True,
                "type": "EXCESSIVE_GROWTH",
                "avg_growth_rate": round(avg_growth_rate, 2),
                "threshold": self.max_growth_rate,
                "severity": "HIGH"
            }
        
        # 检测异常下降(可能服务中断)
        if avg_growth_rate < self.max_drop_rate:
            return {
                "alert": True,
                "type": "SEVERE_DROP",
                "avg_growth_rate": round(avg_growth_rate, 2),
                "threshold": self.max_drop_rate,
                "severity": "MEDIUM"
            }
        
        return None

print("增长率检测器配置完成,150%增长阈值,50%骤降阈值")

二、规则引擎实现

统计模型擅长捕捉"微妙"的异常,但对于明确的红线,我更信任规则引擎。以下是我生产环境中的规则配置:

from enum import Enum
from typing import Callable, Any
import yaml

class RuleType(Enum):
    THRESHOLD = "threshold"           # 绝对阈值
    RATE = "rate"                     # 变化率
    QUOTA = "quota"                   # 配额限制
    TIME_WINDOW = "time_window"       # 时间窗口累计

@dataclass
class AlertRule:
    name: str
    rule_type: RuleType
    condition: Callable[[dict], bool]
    severity: str
    message_template: str
    enabled: bool = True

class RuleEngine:
    """基于规则的用量检测引擎"""
    
    def __init__(self):
        self.rules: List[AlertRule] = []
        self._init_default_rules()
    
    def _init_default_rules(self):
        """初始化默认规则(可根据业务调整)"""
        
        # 规则1:单分钟 Token 超过 100,000
        self.rules.append(AlertRule(
            name="单分钟用量超限",
            rule_type=RuleType.THRESHOLD,
            condition=lambda ctx: ctx.get("tokens_per_minute", 0) > 100_000,
            severity="HIGH",
            message_template="单分钟Token消耗{tokens_per_minute},超过阈值100,000"
        ))
        
        # 规则2:单小时消耗超过 2,000,000 tokens
        self.rules.append(AlertRule(
            name="小时累计超限",
            rule_type=RuleType.QUOTA,
            condition=lambda ctx: ctx.get("tokens_per_hour", 0) > 2_000_000,
            severity="HIGH",
            message_template="小时累计消耗{tokens_per_hour},超过配额2,000,000"
        ))
        
        # 规则3:单次请求 Token 超过 50,000(可能 Prompt 未截断)
        self.rules.append(AlertRule(
            name="单次请求量异常",
            rule_type=RuleType.THRESHOLD,
            condition=lambda ctx: ctx.get("tokens_per_request", 0) > 50_000,
            severity="MEDIUM",
            message_template="单次请求消耗{tokens_per_request},建议检查Prompt长度"
        ))
        
        # 规则4:连续失败率超过 20%
        self.rules.append(AlertRule(
            name="失败率过高",
            rule_type=RuleType.RATE,
            condition=lambda ctx: ctx.get("failure_rate", 0) > 0.2,
            severity="HIGH",
            message_template="请求失败率{failure_rate:.1%},超过20%阈值"
        ))
        
        # 规则5:每分钟请求数超过 500(防止频率滥用)
        self.rules.append(AlertRule(
            name="请求频率超限",
            rule_type=RuleType.RATE,
            condition=lambda ctx: ctx.get("requests_per_minute", 0) > 500,
            severity="MEDIUM",
            message_template="分钟请求数{requests_per_minute},超过500次阈值"
        ))
        
        print(f"规则引擎初始化完成,共加载 {len(self.rules)} 条规则")
    
    def evaluate(self, context: dict) -> List[dict]:
        """评估所有规则,返回触发的告警列表"""
        alerts = []
        
        for rule in self.rules:
            if not rule.enabled:
                continue
            
            try:
                if rule.condition(context):
                    alert = {
                        "rule_name": rule.name,
                        "severity": rule.severity,
                        "message": rule.message_template.format(**context)
                    }
                    alerts.append(alert)
            except Exception as e:
                print(f"规则 {rule.name} 执行出错: {e}")
        
        return alerts

规则引擎实例

rule_engine = RuleEngine() print("规则引擎已就绪,可通过 YAML 配置动态调整阈值")

三、HolySheheep AI 集成实战

在 HolySheheep AI 上构建监控系统有独特优势:¥1=$1 的汇率让我能用相同预算监控更多维度,国内直连 <50ms 的延迟让实时检测成为可能。

以下是与 HolySheheep AI API 深度集成的完整代码:

import asyncio
import json
from datetime import datetime
from collections import defaultdict
import hashlib

class HolySheepMonitor:
    """HolySheheep AI Token 用量实时监控器"""
    
    def __init__(self, api_key: str, base_url: str = "https://api.holysheep.ai/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self.client = httpx.AsyncClient(
            base_url=base_url,
            headers={
                "Authorization": f"Bearer {api_key}",
                "Content-Type": "application/json"
            },
            timeout=30.0
        )
        
        # 初始化检测器
        self.stat_detector = StatisticalAnomalyDetector(window_size=60)
        self.ewma_detector = EWMADetector(span=12)
        self.roc_detector = RateOfChangeDetector(min_samples=5)
        self.rule_engine = RuleEngine()
        
        # 统计状态
        self.minute_stats = defaultdict(int)  # 每分钟 token 数
        self.hour_stats = defaultdict(int)    # 每小时 token 数
        self.request_stats = []                # 单次请求统计
        self.failed_requests = 0
        self.total_requests = 0
        
        # 告警回调
        self.alert_callbacks = []
    
    def on_alert(self, callback):
        """注册告警回调函数"""
        self.alert_callbacks.append(callback)
    
    async def _send_alert(self, alert: dict):
        """触发告警"""
        alert["timestamp"] = datetime.now().isoformat()
        for callback in self.alert_callbacks:
            await callback(alert)
    
    async def log_request(self, prompt_tokens: int, completion_tokens: int,
                         response_time_ms: float, success: bool = True):
        """记录一次 API 调用"""
        total = prompt_tokens + completion_tokens
        self.total_requests += 1
        
        if not success:
            self.failed_requests += 1
        
        now = datetime.now()
        minute_key = now.strftime("%Y%m%d%H%M")
        hour_key = now.strftime("%Y%m%d%H")
        
        self.minute_stats[minute_key] += total
        self.hour_stats[hour_key] += total
        self.request_stats.append({
            "timestamp": now,
            "tokens": total,
            "prompt": prompt_tokens,
            "completion": completion_tokens,
            "latency_ms": response_time_ms
        })
        
        # 保持最近 2 小时数据
        self._cleanup_old_stats()
        
        # 执行多维度检测
        await self._run_detection()
    
    async def _run_detection(self):
        """运行所有检测器"""
        now = datetime.now()
        minute_key = now.strftime("%Y%m%d%H%M")
        hour_key = now.strftime("%Y%m%d%H")
        
        # 构建检测上下文
        context = {
            "tokens_per_minute": self.minute_stats.get(minute_key, 0),
            "tokens_per_hour": self.hour_stats.get(hour_key, 0),
            "tokens_per_request": self.request_stats[-1]["tokens"] if self.request_stats else 0,
            "failure_rate": self.failed_requests / max(self.total_requests, 1),
            "requests_per_minute": len([r for r in self.request_stats 
                                        if r["timestamp"].strftime("%Y%m%d%H%M") == minute_key])
        }
        
        # 1. 统计模型检测
        current_usage = TokenUsage(
            timestamp=now,
            prompt_tokens=self.request_stats[-1]["prompt"],
            completion_tokens=self.request_stats[-1]["completion"],
            total_tokens=context["tokens_per_minute"]
        )
        
        stat_result = self.stat_detector.detect()
        if stat_result and stat_result.get("alert"):
            await self._send_alert({
                **stat_result,
                "source": "STATISTICAL_MODEL",
                "rule": "STDEV_3SIGMA"
            })
        
        # 2. EWMA 检测
        ewma_result = self.ewma_detector.update(context["tokens_per_minute"])
        if ewma_result and ewma_result.get("alert"):
            await self._send_alert({
                **ewma_result,
                "source": "STATISTICAL_MODEL",
                "rule": "EWMA_RATIO"
            })
        
        # 3. 增长率检测
        roc_result = self.roc_detector.check(context["tokens_per_minute"])
        if roc_result and roc_result.get("alert"):
            await self._send_alert({
                **roc_result,
                "source": "STATISTICAL_MODEL",
                "rule": "RATE_OF_CHANGE"
            })
        
        # 4. 规则引擎检测
        rule_alerts = self.rule_engine.evaluate(context)
        for alert in rule_alerts:
            await self._send_alert({
                **alert,
                "source": "RULE_ENGINE"
            })
    
    def _cleanup_old_stats(self):
        """清理过期统计数据"""
        now = datetime.now()
        cutoff_minute = now.timestamp() - 7200  # 2小时前
        
        # 清理 minute_stats
        expired_keys = [k for k, v in self.minute_stats.items() 
                       if datetime.strptime(k, "%Y%m%d%H%M").timestamp() < cutoff_minute]
        for k in expired_keys:
            del self.minute_stats[k]
        
        # 清理 hour_stats
        cutoff_hour = now.timestamp() - 86400  # 24小时前
        expired_hour_keys = [k for k in self.hour_stats.keys()
                            if datetime.strptime(k, "%Y%m%d%H").timestamp() < cutoff_hour]
        for k in expired_hour_keys:
            del self.hour_stats[k]
        
        # 清理 request_stats
        self.request_stats = [r for r in self.request_stats 
                             if r["timestamp"].timestamp() > cutoff_minute]
    
    async def close(self):
        await self.client.aclose()

使用示例

async def main(): # 初始化监控器(使用你的 HolySheheep API Key) monitor = HolySheheepMonitor( api_key="YOUR_HOLYSHEEP_API_KEY", base_url="https://api.holysheep.ai/v1" ) # 注册告警回调 async def handle_alert(alert): print(f"🚨 告警触发: {json.dumps(alert, ensure_ascii=False, indent=2)}") # 这里可以接入钉钉/企微/飞书通知 # await send_dingtalk(alert) monitor.on_alert(handle_alert) # 模拟 API 调用记录 await monitor.log_request( prompt_tokens=500, completion_tokens=150, response_time_ms=45.3, success=True ) print("✅ HolySheheep AI 监控器初始化完成") print(" - 国内直连延迟: <50ms") print(" - 支持 ¥1=$1 低成本监控") asyncio.run(main())

四、封装成通用中间件

我更推荐将监控封装成 OpenAI SDK 中间件,这样所有 API 调用自动被监控,无需修改业务代码:

from typing import Optional, Dict, Any, List, Iterator
import time
import logging

class HolySheepMonitoringMiddleware:
    """
    HolySheheep AI 监控中间件
    自动拦截所有 API 调用并记录 Token 消耗
    """
    
    def __init__(self, api_key: str, monitor: HolySheepMonitor):
        self.api_key = api_key
        self.monitor = monitor
        self.logger = logging.getLogger("HolySheepMonitor")
    
    async def chat_completion(self, messages: List[dict], 
                              model: str = "gpt-4o",
                              **kwargs) -> Dict[str, Any]:
        """包装 Chat Completions 调用"""
        start_time = time.time()
        
        try:
            response = await self.monitor.client.post(
                "/chat/completions",
                json={
                    "model": model,
                    "messages": messages,
                    **kwargs
                }
            )
            
            elapsed_ms = (time.time() - start_time) * 1000
            response.raise_for_status()
            data = response.json()
            
            # 提取 Token 消耗
            usage = data.get("usage", {})
            prompt_tokens = usage.get("prompt_tokens", 0)
            completion_tokens = usage.get("completion_tokens", 0)
            
            # 记录到监控器
            await self.monitor.log_request(
                prompt_tokens=prompt_tokens,
                completion_tokens=completion_tokens,
                response_time_ms=elapsed_ms,
                success=True
            )
            
            self.logger.info(
                f"ChatCompletion | model={model} | "
                f"prompt={prompt_tokens} | completion={completion_tokens} | "
                f"latency={elapsed_ms:.0f}ms"
            )
            
            return data
            
        except httpx.HTTPStatusError as e:
            elapsed_ms = (time.time() - start_time) * 1000
            await self.monitor.log_request(
                prompt_tokens=0,
                completion_tokens=0,
                response_time_ms=elapsed_ms,
                success=False
            )
            self.logger.error(f"API调用失败: {e.response.status_code}")
            raise

快速接入示例

monitor = HolySheepMonitor(api_key="YOUR_HOLYSHEEP_API_KEY") middleware = HolySheepMonitoringMiddleware( api_key="YOUR_HOLYSHEEP_API_KEY", monitor=monitor )

使用方式与标准 OpenAI SDK 完全一致

messages = [{"role": "user", "content": "你好"}]

response = await middleware.chat_completion(messages, model="gpt-4o")

print("中间件封装完成,零侵入接入现有项目")

常见报错排查

在我部署这套监控系统的过程中,遇到了几个典型问题,记录下来供你参考:

报错 1:401 Unauthorized - API Key 验证失败

这是最常见的错误,通常是 API Key 格式或权限问题。

# ❌ 错误写法
headers = {"Authorization": "YOUR_HOLYSHEEP_API_KEY"}  # 缺少 Bearer 前缀

✅ 正确写法

headers = {"Authorization": f"Bearer {api_key}"}

解决方案:

# 完整的请求头配置
headers = {
    "Authorization": f"Bearer {api_key}",
    "Content-Type": "application/json"
}

验证 Key 是否有效

import httpx async def verify_api_key(api_key: str) -> bool: try: response = await httpx.AsyncClient().post( "https://api.holysheep.ai/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, json={ "model": "gpt-4o-mini", "messages": [{"role": "user", "content": "test"}], "max_tokens": 5 }, timeout=10.0 ) return response.status_code == 200 except httpx.HTTPStatusError as e: if e.response.status_code == 401: print("❌ API Key 无效或已过期") return False raise

调用验证

is_valid = asyncio.run(verify_api_key("YOUR_HOLYSHEEP_API_KEY")) print(f"API Key 验证结果: {'✅ 有效' if is_valid else '❌ 无效'}")

报错 2:ConnectionError: timeout - 国内访问超时

从国内直接访问部分 API 服务可能出现连接超时。

# ❌ 默认超时设置导致长等待
client = httpx.AsyncClient(timeout=30.0)  # 实际可能需要更短

✅ 针对国内网络优化超时配置

client = httpx.AsyncClient( base_url="https://api.holysheep.ai/v1", timeout=httpx.Timeout( connect=5.0, # 连接超时 5 秒 read=30.0, # 读取超时 30 秒 write=10.0, # 写入超时 10 秒 pool=10.0 # 连接池超时 10 秒 ), limits=httpx.Limits(max_connections=100, max_keepalive_connections=20) )

✅ 添加重试机制

from tenacity import retry, stop_after_attempt, wait_exponential @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10) ) async def resilient_request(url: str, **kwargs): try: response = await client.post(url, **kwargs) return response except httpx.TimeoutException: print("⏰ 请求超时,触发重试...") raise except httpx.ConnectError as e: print(f"🔌 连接失败: {e}") raise print("超时配置优化完成,建议使用 HolySheheep AI 国内直连 <50ms")

报错 3:QuotaExceededError - 超出配额限制

Token 消耗超限时返回此错误,需要立即检查用量并可能需要升级套餐。

# 监控配额使用情况
async def check_quota_usage(api_key: str):
    """检查当前配额使用情况"""
    try:
        response = await client.get(
            "/usage/current",
            headers={"Authorization": f"Bearer {api_key}"}
        )
        
        if response.status_code == 200:
            data = response.json()
            return {
                "total_tokens": data.get("total_tokens", 0),
                "used_tokens": data.get("used_tokens", 0),
                "remaining_tokens": data.get("remaining_tokens", 0),
                "usage_ratio": data.get("used_tokens", 0) / max(data.get("total_tokens", 1), 1)
            }
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            # 某些 API 可能不支持此端点
            return None
        raise

主动降级策略

async def chat_with_fallback(messages: List[dict], model: str): """带降级策略的聊天接口""" # 按价格排序的降级路径 # GPT-4.1 $8 → Gemini 2.5 Flash $2.50 → DeepSeek V3.2 $0.42 fallback_models = { "gpt-4.1": ["gemini-2.5-flash", "deepseek-v3.2"], "claude-sonnet-4.5": ["gemini-2.5-flash", "deepseek-v3.2"], "gpt-4o": ["gpt-4o-mini", "deepseek-v3.2"] } try: response = await client.post( "/chat/completions", json={"model": model, "messages": messages} ) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: if e.response.status_code == 429: # Rate limit # 检查是否有降级路径 fallbacks = fallback_models.get(model, []) for fallback in fallbacks: try: print(f"⚠️ {model} 超出配额,尝试降级到 {fallback}") response = await client.post( "/chat/completions", json={"model": fallback, "messages": messages} ) return response.json() except: continue print("❌ 所有模型均超出配额") raise except Exception as e: print(f"❌ 请求异常: {e}") raise print("配额检查和降级策略已配置")

报错 4:数据漂移导致误报

统计模型可能因为业务正常波动(如营销活动)产生误报。

# 解决方案:引入人工标注的"白名单时段"
@dataclass
class WhitelistWindow:
    start_hour: int
    end_hour: int
    reason: str
    multiplier: float = 1.0  # 临时放宽阈值倍数

class AdaptiveDetector(StatisticalAnomalyDetector):
    """自适应异常检测器,支持白名单和阈值动态调整"""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.whitelist: List[WhitelistWindow] = []
        self.baseline_multiplier: float = 1.0
    
    def add_whitelist(self, window: WhitelistWindow):
        """添加白名单时段"""
        self.whitelist.append(window)
    
    def is_whitelisted(self) -> bool:
        """检查当前是否在白名单时段"""
        now = datetime.now()
        current_hour = now.hour
        
        for window in self.whitelist:
            if window.start_hour <= current_hour < window.end_hour:
                return True
        return False
    
    def detect(self) -> Optional[dict]:
        """自适应异常检测"""
        # 白名单时段内放宽阈值
        if self.is_whitelisted():
            original_threshold = self.threshold_sigma
            self.threshold_sigma *= 2.0  # 白名单时段放宽 2 倍
            result = super().detect()
            self.threshold_sigma = original_threshold
            return result
        
        return super().detect()

使用示例:添加促销活动的白名单

detector = AdaptiveDetector(window_size=60, threshold_sigma=3.0) detector.add_whitelist(WhitelistWindow( start_hour=10, # 上午10点 end_hour=12, # 到中午12点 reason="双11促销活动", multiplier=1.5 )) print("自适应检测器已启用,自动识别白名单时段避免误报")

五、监控大屏与告警配置

检测到异常后,还需要有效的告警渠道。以下是我使用的告警架构:

import aiohttp
from enum import Enum

class AlertLevel(Enum):
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

class AlertDispatcher:
    """告警分发器,支持多渠道通知"""
    
    def __init__(self):
        self.handlers = {}
    
    def register(self, channel: str, handler):
        self.handlers[channel] = handler
    
    async def dispatch(self, alert: dict, level: AlertLevel):
        """分发告警到各渠道"""
        level_str = level.value
        
        for channel, handler in self.handlers.items():
            try:
                await handler(alert, level_str)
            except Exception as e:
                print(f"渠道 {channel} 发送失败: {e}")
    
    async def send_dingtalk(self, webhook: str, alert: dict, level: str):
        """发送钉钉群消息"""
        message = {
            "msgtype": "markdown",
            "markdown": {
                "title": f"🚨 HolySheheep AI 告警 [{level.upper()}]",
                "text": f"""## Token 用量异常告警

**告警时间**: {alert.get('timestamp')}

**告警来源**: {alert.get('source')} / {alert.get('rule_name', alert.get('rule', 'N/A'))}

**严重程度**: {alert.get('severity', level.upper())}

**详情**:
- 当前 Token: {alert.get('current_tokens', alert.get('current', 'N/A'))}
- 平均基准: {alert.get('mean_tokens', alert.get('ewma', 'N/A'))}
- Z-Score: {alert.get('z_score', 'N/A')}

**处理建议**: 
1. 检查是否存在异常调用
2. 审查最近部署的代码变更
3. 联系 HolySheheep AI 支持: https://www.holysheep.ai/support
"""
            }
        }
        
        async with aiohttp.ClientSession() as session:
            await session.post(webhook, json=message)
    
    async def send_feishu(self, webhook: str, alert: dict, level: str):
        """发送飞书消息"""
        color = {"critical": "red", "error": "orange", "warning": "yellow"}.get(level, "grey")
        
        message = {
            "msg_type": "interactive",
            "card": {
                "header": {
                    "title": {"tag": "plain_text", "content": f"🚨 HolySheheep AI 告警"},
                    "template": color
                },
                "elements": [
                    {"tag": "div", "text": {"tag": "lark_md", 
                     "content": f"**告警来源**: {alert.get('source')}\n"
                               f"**详情**: {alert.get('message', alert.get('type'))}"}}
                ]
            }
        }
        
        async with aiohttp.ClientSession() as session:
            await session.post(webhook, json=message)

配置告警

dispatcher = AlertDispatcher() dispatcher.register("dingtalk", lambda a, l: dispatcher.send_dingtalk("YOUR