作为 AI 应用开发者,你是否曾在深夜被天价账单惊醒?让我们先看一组 2026 年主流模型的价格数据:GPT-4.1 output $8/MTok、Claude Sonnet 4.5 output $15/MTok、Gemini 2.5 Flash output $2.50/MTok、DeepSeek V3.2 output $0.42/MTok。使用 HolySheep API 按 ¥1=$1 无损结算(官方汇率为 ¥7.3=$1),每月 100 万 token 的实际费用差距触目惊心:

我曾在某初创公司负责 AI 平台架构时,亲历过一次 API Key 泄露导致的账单风暴——黑客在 3 小时内消耗了近 200 美元额度的调用资源。这次惨痛教训让我深刻认识到:AI API 安全监控不是可选项,而是生存必需品。本文将分享我沉淀两年的一套完整的异常调用模式识别与自动封禁系统。

一、为什么你的 AI API 需要安全监控

当前 AI API 调用面临三大安全威胁:

我曾帮助一个日均调用量 50 万次的 AI SaaS 平台搭建监控系统,上线第一周就拦截了 3 起异常调用事件,避免潜在损失超过 $1,200。接下来的内容,我将手把手教你实现这套系统。

二、异常调用模式识别核心算法

异常模式识别需要从多个维度构建特征工程。我设计了一套基于滑动窗口的实时统计系统:

2.1 核心指标采集

import time
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Dict, List, Optional
import threading

@dataclass
class CallRecord:
    """单次 API 调用记录"""
    timestamp: float
    tokens_used: int
    endpoint: str
    model: str
    ip_address: str
    user_id: str

class MetricsCollector:
    """指标采集器 - 用于检测异常调用模式"""
    
    def __init__(self, window_seconds: int = 60):
        self.window_seconds = window_seconds
        self.call_history: Dict[str, deque] = defaultdict(
            lambda: deque(maxlen=10000)
        )
        self.lock = threading.Lock()
        
        # 异常阈值配置(可根据业务调整)
        self.thresholds = {
            'max_calls_per_minute': 100,
            'max_tokens_per_minute': 50000,
            'max_unique_ips_per_user': 5,
            'max_cost_per_day': 100.0,  # 美元
        }
    
    def record_call(self, api_key: str, record: CallRecord):
        """记录一次 API 调用"""
        with self.lock:
            self.call_history[api_key].append(record)
            self._cleanup_old_records(api_key)
    
    def _cleanup_old_records(self, api_key: str):
        """清理超过窗口期的记录"""
        cutoff_time = time.time() - self.window_seconds
        history = self.call_history[api_key]
        
        # 移除过期记录
        while history and history[0].timestamp < cutoff_time:
            history.popleft()
    
    def get_call_stats(self, api_key: str) -> Dict:
        """获取调用统计信息"""
        with self.lock:
            records = list(self.call_history[api_key])
        
        if not records:
            return {
                'call_count': 0,
                'total_tokens': 0,
                'unique_ips': set(),
                'avg_tokens_per_call': 0
            }
        
        return {
            'call_count': len(records),
            'total_tokens': sum(r.tokens_used for r in records),
            'unique_ips': set(r.ip_address for r in records),
            'avg_tokens_per_call': sum(r.tokens_used for r in records) / len(records),
            'last_call_time': max(r.timestamp for r in records),
            'requests_per_second': len(records) / self.window_seconds
        }

使用示例

collector = MetricsCollector(window_seconds=60) collector.record_call( 'sk-test-12345', CallRecord( timestamp=time.time(), tokens_used=500, endpoint='/v1/chat/completions', model='gpt-4.1', ip_address='203.0.113.45', user_id='user_001' ) ) stats = collector.get_call_stats('sk-test-12345') print(f"当前 QPS: {stats['requests_per_second']:.2f}")

2.2 异常检测引擎

import numpy as np
from enum import Enum
from typing import Tuple

class AnomalyType(Enum):
    """异常类型枚举"""
    NONE = "正常"
    HIGH_FREQUENCY = "高频调用"
    HIGH_TOKEN_USAGE = "Token 消耗异常"
    MULTI_IP_ABUSE = "多 IP 滥用"
    SPIKE_DETECTION = "流量突增"
    UNUSUAL_PATTERN = "异常模式"

class AnomalyDetector:
    """基于统计的异常检测引擎"""
    
    def __init__(self, collector: MetricsCollector):
        self.collector = collector
        # 历史基线(用于计算标准差)
        self.baseline_stats: Dict[str, Dict] = {}
    
    def detect(self, api_key: str) -> Tuple[AnomalyType, float, str]:
        """
        检测异常,返回 (异常类型, 置信度, 详细描述)
        置信度范围 0.0 - 1.0
        """
        stats = self.collector.get_call_stats(api_key)
        
        if stats['call_count'] == 0:
            return AnomalyType.NONE, 0.0, "无调用记录"
        
        # 1. 高频调用检测
        freq_score = self._check_frequency(stats)
        if freq_score > 0.8:
            return AnomalyType.HIGH_FREQUENCY, freq_score, \
                f"QPS={stats['requests_per_second']:.2f}, 超过阈值"
        
        # 2. Token 消耗异常检测
        token_score = self._check_token_usage(stats)
        if token_score > 0.8:
            return AnomalyType.HIGH_TOKEN_USAGE, token_score, \
                f"单分钟消耗 {stats['total_tokens']} tokens, 超过阈值"
        
        # 3. 多 IP 滥用检测
        ip_score = self._check_multi_ip(stats)
        if ip_score > 0.7:
            return AnomalyType.MULTI_IP_ABUSE, ip_score, \
                f"检测到 {len(stats['unique_ips'])} 个不同 IP"
        
        # 4. 流量突增检测(相比历史基线)
        spike_score = self._check_spike(api_key, stats)
        if spike_score > 0.85:
            return AnomalyType.SPIKE_DETECTION, spike_score, \
                "流量相比基线突增超过 300%"
        
        return AnomalyType.NONE, 0.0, "调用模式正常"
    
    def _check_frequency(self, stats: Dict) -> float:
        """检测高频调用"""
        threshold = self.collector.thresholds['max_calls_per_minute']
        actual = stats['requests_per_second'] * 60
        
        if actual <= threshold:
            return 0.0
        
        # 使用 sigmoid 函数平滑计算置信度
        excess_ratio = (actual - threshold) / threshold
        return min(1.0, 1 / (1 + np.exp(-5 * (excess_ratio - 1))))
    
    def _check_token_usage(self, stats: Dict) -> float:
        """检测 Token 消耗异常"""
        threshold = self.collector.thresholds['max_tokens_per_minute']
        actual = stats['total_tokens']
        
        if actual <= threshold:
            return 0.0
        
        excess_ratio = (actual - threshold) / threshold
        return min(1.0, 1 / (1 + np.exp(-3 * (excess_ratio - 0.5))))
    
    def _check_multi_ip(self, stats: Dict) -> float:
        """检测多 IP 滥用"""
        threshold = self.collector.thresholds['max_unique_ips_per_user']
        actual = len(stats['unique_ips'])
        
        if actual <= threshold:
            return 0.0
        
        excess_ratio = (actual - threshold) / threshold
        return min(1.0, excess_ratio * 0.8)
    
    def _check_spike(self, api_key: str, stats: Dict) -> float:
        """检测流量突增"""
        if api_key not in self.baseline_stats:
            # 首次记录,建立基线
            self.baseline_stats[api_key] = {
                'avg_tokens': stats['total_tokens'],
                'std_tokens': stats['total_tokens'] * 0.1,  # 初始标准差 10%
                'samples': 1
            }
            return 0.0
        
        baseline = self.baseline_stats[api_key]
        current = stats['total_tokens']
        
        # 更新基线(指数移动平均)
        alpha = 0.2
        baseline['avg_tokens'] = alpha * current + (1 - alpha) * baseline['avg_tokens']
        baseline['samples'] += 1
        
        # 计算 Z-score
        z_score = (current - baseline['avg_tokens']) / max(baseline['std_tokens'], 1)
        
        if z_score <= 2:
            return 0.0
        
        # Z-score > 2 视为异常
        return min(1.0, (z_score - 2) / 3)

实际部署建议:置信度 > 0.7 触发告警,> 0.85 自动封禁

三、自动封禁系统实现

检测到异常后,需要及时执行封禁操作。我设计了一套分级封禁机制:

import json
import redis
from datetime import datetime, timedelta
from typing import Optional
import hashlib

class BanManager:
    """自动封禁管理器"""
    
    def __init__(self, redis_host: str = 'localhost', redis_port: int = 6379):
        # 连接 Redis 存储封禁列表(生产环境建议使用独立 Redis 实例)
        self.redis = redis.Redis(
            host=redis_host,
            port=redis_port,
            db=0,
            decode_responses=True
        )
        
        # 封禁级别配置
        self.ban_levels = {
            'warning': {'duration': 0, 'action': 'alert_only'},           # 仅告警
            'temp': {'duration': 300, 'action': 'rate_limit'},            # 临时限流 5 分钟
            'short': {'duration': 3600, 'action': 'block'},               # 封禁 1 小时
            'long': {'duration': 86400, 'action': 'block'},               # 封禁 24 小时
            'permanent': {'duration': -1, 'action': 'block'},             # 永久封禁
        }
        
        # 自动封禁规则
        self.auto_ban_rules = [
            {'anomaly_score': 0.85, 'consecutive_count': 2, 'ban_level': 'short'},
            {'anomaly_score': 0.95, 'consecutive_count': 1, 'ban_level': 'long'},
        ]
    
    def is_banned(self, identifier: str) -> Tuple[bool, Optional[str]]:
        """
        检查是否被封禁
        返回 (是否封禁, 封禁级别)
        """
        # 支持按 API Key、IP、User ID 封禁
        for ban_type in ['key', 'ip', 'user']:
            key = f"ban:{ban_type}:{identifier}"
            ban_info = self.redis.get(key)
            
            if ban_info:
                info = json.loads(ban_info)
                return True, info.get('level', 'unknown')
        
        return False, None
    
    def ban(
        self,
        identifier: str,
        ban_type: str,  # 'key', 'ip', 'user'
        level: str,
        reason: str,
        admin_note: str = ""
    ) -> bool:
        """执行封禁"""
        if level not in self.ban_levels:
            raise ValueError(f"Unknown ban level: {level}")
        
        config = self.ban_levels[level]
        key = f"ban:{ban_type}:{identifier}"
        
        ban_info = {
            'level': level,
            'reason': reason,
            'admin_note': admin_note,
            'banned_at': datetime.utcnow().isoformat(),
            'expires_at': (
                datetime.utcnow() + timedelta(seconds=config['duration'])
            ).isoformat() if config['duration'] > 0 else None,
            'action': config['action']
        }
        
        # 写入 Redis
        ttl = config['duration'] if config['duration'] > 0 else 31536000  # 默认 1 年
        self.redis.setex(key, ttl, json.dumps(ban_info))
        
        # 记录操作日志(可接入日志系统)
        self._log_ban_action(identifier, ban_type, level, reason)
        
        return True
    
    def unban(self, identifier: str, ban_type: str) -> bool:
        """解除封禁"""
        key = f"ban:{ban_type}:{identifier}"
        return self.redis.delete(key) > 0
    
    def auto_ban_decision(
        self,
        identifier: str,
        ban_type: str,
        anomaly_scores: List[float]
    ) -> Optional[str]:
        """
        根据异常评分历史自动决定封禁级别
        返回 None 表示不封禁,否则返回封禁级别
        """
        max_score = max(anomaly_scores) if anomaly_scores else 0
        consecutive_count = len(anomaly_scores)
        
        for rule in self.auto_ban_rules:
            if (max_score >= rule['anomaly_score'] and 
                consecutive_count >= rule['consecutive_count']):
                self.ban(
                    identifier=identifier,
                    ban_type=ban_type,
                    level=rule['ban_level'],
                    reason=f"自动封禁: 异常评分={max_score:.2f}, "
                           f"连续异常={consecutive_count}次"
                )
                return rule['ban_level']
        
        return None
    
    def _log_ban_action(
        self,
        identifier: str,
        ban_type: str,
        level: str,
        reason: str
    ):
        """记录封禁操作日志"""
        log_key = f"logs:ban:{datetime.utcnow().strftime('%Y%m%d')}"
        log_entry = json.dumps({
            'timestamp': datetime.utcnow().isoformat(),
            'identifier': identifier,
            'type': ban_type,
            'level': level,
            'reason': reason
        })
        self.redis.rpush(log_key, log_entry)
        self.redis.expire(log_key, 86400 * 30)  # 保留 30 天

生产环境使用示例

ban_manager = BanManager(redis_host='your-redis-host.com') result = ban_manager.is_banned('203.0.113.45') print(f"IP 封禁状态: {result[0]}, 级别: {result[1]}")

四、完整监控中间件实现

将检测和封禁逻辑整合为一个 FastAPI 中间件,实现生产级别的安全防护:

# security_middleware.py
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
import time
import logging

导入前述模块

from metrics_collector import MetricsCollector, CallRecord

from anomaly_detector import AnomalyDetector, AnomalyType

from ban_manager import BanManager

logger = logging.getLogger(__name__) class AISecurityMiddleware(BaseHTTPMiddleware): """ AI API 安全监控中间件 功能: 1. 实时采集调用指标 2. 检测异常调用模式 3. 触发自动封禁 4. 发送告警通知 """ def __init__( self, app, collector: MetricsCollector, detector: AnomalyDetector, ban_manager: BanManager, webhook_url: str = None # 告警 Webhook ): super().__init__(app) self.collector = collector self.detector = detector self.ban_manager = ban_manager self.webhook_url = webhook_url # 异常评分历史(用于判断连续异常) self.score_history: Dict[str, List[float]] = defaultdict(list) async def dispatch(self, request: Request, call_next): # 提取标识信息 api_key = request.headers.get('Authorization', '').replace('Bearer ', '') client_ip = request.client.host if request.client else 'unknown' # 1. 检查封禁状态 is_banned, ban_level = self.ban_manager.is_banned(api_key) if is_banned: logger.warning(f"封禁的 API Key 尝试访问: {api_key[:10]}... from {client_ip}") return JSONResponse( status_code=403, content={ 'error': 'Access denied', 'message': 'Your API key has been suspended', 'reason': ban_level } ) # IP 封禁检查 ip_banned, ip_level = self.ban_manager.is_banned(client_ip) if ip_banned: return JSONResponse( status_code=403, content={'error': 'IP address blocked'} ) # 2. 记录调用(使用 HolySheep API 时可获取详细 token 使用) request_body = await request.body() tokens_used = self._estimate_tokens(request_body) record = CallRecord( timestamp=time.time(), tokens_used=tokens_used, endpoint=str(request.url.path), model=self._extract_model(request_body), ip_address=client_ip, user_id=api_key[:20] # 简化用户标识 ) self.collector.record_call(api_key, record) # 3. 异常检测 anomaly_type, confidence, description = self.detector.detect(api_key) if anomaly_type != AnomalyType.NONE: # 记录评分历史 self.score_history[api_key].append(confidence) # 保留最近 10 次记录 if len(self.score_history[api_key]) > 10: self.score_history[api_key] = self.score_history[api_key][-10:] # 触发自动封禁 ban_level = self.ban_manager.auto_ban_decision( api_key, 'key', self.score_history[api_key] ) # 发送告警 await self._send_alert(api_key, client_ip, anomaly_type, confidence, description) if ban_level: logger.error( f"自动封禁: Key={api_key[:10]}..., " f"类型={anomaly_type.value}, 级别={ban_level}" ) return JSONResponse( status_code=429, content={ 'error': 'Rate limit exceeded', 'message': f'异常调用检测,自动封禁 {ban_level} 级别', 'details': description } ) # 4. 正常处理请求 response = await call_next(request) return response def _estimate_tokens(self, body: bytes) -> int: """估算 Token 消耗(简化版)""" try: data = json.loads(body) messages = data.get('messages', []) # 粗略估算:每 4 个字符约 1 token text = ''.join(m.get('content', '') for m in messages) return len(text) // 4 except: return 100 # 默认值 def _extract_model(self, body: bytes) -> str: """提取模型名称""" try: data = json.loads(body) return data.get('model', 'unknown') except: return 'unknown' async def _send_alert( self, api_key: str, ip: str, anomaly_type: AnomalyType, confidence: float, description: str ): """发送告警通知""" alert_msg = { 'type': 'security_alert', 'api_key_preview': api_key[:10] + '...', 'ip': ip, 'anomaly_type': anomaly_type.value, 'confidence': f"{confidence:.2%}", 'description': description, 'timestamp': datetime.utcnow().isoformat() } logger.warning(f"安全告警: {json.dumps(alert_msg)}") # 可接入飞书、钉钉、企业微信 Webhook if self.webhook_url: try: import httpx async with httpx