Als ich vor zwei Jahren begann, professionell mit Large Language Models zu arbeiten, erlebte ich einen Albtraum: Innerhalb von 24 Stunden verbrauchte ein fehlerhafter Cache-Invalidierungsmechanismus über 2.000 US-Dollar an API-Kosten. Diese Erfahrung prägte meinen Ansatz zur API-Nutzungsüberwachung grundlegend. In diesem Tutorial zeige ich Ihnen, wie Sie mit einer Kombination aus statistischen Modellen und Regelalgorithmen Ihre AI API Token 用量 effektiv überwachen und Anomalien in Echtzeit erkennen.

Vergleichstabelle: HolySheep vs. Offizielle API vs. Andere Relay-Dienste

Feature HolySheep AI Offizielle API Andere Relay-Dienste
Preis (GPT-4.1) $8 / 1M Tokens $15 / 1M Tokens $10-12 / 1M Tokens
Claude Sonnet 4.5 $15 / 1M Tokens $27 / 1M Tokens $18-22 / 1M Tokens
DeepSeek V3.2 $0.42 / 1M Tokens N/A $0.50-0.80 / 1M Tokens
Latenz <50ms 100-300ms 80-200ms
Zahlungsmethoden WeChat, Alipay, Kreditkarte Nur Kreditkarte (international) Begrenzt
Startguthaben Kostenlose Credits $5 (neue Accounts) Variabel
Ersparnis 85%+ Basis 20-40%

Jetzt registrieren und von den günstigsten Preisen mit unter 50ms Latenz profitieren!

Warum Token 用量异常 ein kritisches Problem ist

Bei der Arbeit mit AI APIs gibt es mehrere Szenarien, in denen unerwartete Kosten entstehen können:

Statistische Modelle zur Anomalie-Erkennung

Meine Praxiserfahrung zeigt, dass statistische Methoden etwa 70% der anomalen Nutzungsmuster erkennen können. Die restlichen 30% erfordern regelbasierte Ansätze.

1. Z-Score basierte Erkennung

Der Z-Score misst, wie viele Standardabweichungen ein Wert vom Mittelwert abweicht. Werte außerhalb von ±3σ gelten als anomal.

#!/usr/bin/env python3
"""
Token 用量 Anomalie-Detektor - Statistisches Modell
Erkennung von Ausreißern in der API-Nutzung
"""

import numpy as np
from datetime import datetime, timedelta
from collections import defaultdict
import json

class TokenAnomalyDetector:
    """
    Statistischer Detektor für Token-Verbrauch
    Verwendet Z-Score und IQR-Methode
    """
    
    def __init__(self, window_hours=24, z_threshold=3.0, iqr_multiplier=1.5):
        self.window_hours = window_hours
        self.z_threshold = z_threshold
        self.iqr_multiplier = iqr_multiplier
        self.usage_history = defaultdict(list)
        self.baselines = {}
        
    def add_usage(self, user_id: str, tokens: int, timestamp: datetime = None):
        """Fügt einen neuen Verbrauchseintrag hinzu"""
        if timestamp is None:
            timestamp = datetime.now()
        
        self.usage_history[user_id].append({
            'tokens': tokens,
            'timestamp': timestamp
        })
        self._update_baseline(user_id)
        
    def _update_baseline(self, user_id: str):
        """Berechnet Baseline-Statistiken für einen Benutzer"""
        usages = [e['tokens'] for e in self.usage_history[user_id]]
        if len(usages) < 10:
            return
            
        self.baselines[user_id] = {
            'mean': np.mean(usages),
            'std': np.std(usages),
            'median': np.median(usages),
            'q1': np.percentile(usages, 25),
            'q3': np.percentile(usages, 75)
        }
        
    def detect_zscore_anomaly(self, user_id: str, current_tokens: int) -> dict:
        """Erkennt Anomalie mittels Z-Score"""
        if user_id not in self.baselines:
            return {'anomaly': False, 'reason': 'Insufficient data'}
            
        baseline = self.baselines[user_id]
        z_score = (current_tokens - baseline['mean']) / baseline['std'] if baseline['std'] > 0 else 0
        
        is_anomaly = abs(z_score) > self.z_threshold
        
        return {
            'anomaly': is_anomaly,
            'z_score': round(z_score, 3),
            'expected_range': f"{baseline['mean'] - 3*baseline['std']:.0f} - {baseline['mean'] + 3*baseline['std']:.0f}",
            'current_value': current_tokens,
            'severity': self._calculate_severity(abs(z_score))
        }
        
    def detect_iqr_anomaly(self, user_id: str, current_tokens: int) -> dict:
        """Erkennt Anomalie mittels Interquartile Range (IQR)"""
        if user_id not in self.baselines:
            return {'anomaly': False, 'reason': 'Insufficient data'}
            
        baseline = self.baselines[user_id]
        iqr = baseline['q3'] - baseline['q1']
        lower_bound = baseline['q1'] - self.iqr_multiplier * iqr
        upper_bound = baseline['q3'] + self.iqr_multiplier * iqr
        
        is_anomaly = current_tokens < lower_bound or current_tokens > upper_bound
        
        return {
            'anomaly': is_anomaly,
            'iqr_range': f"{lower_bound:.0f} - {upper_bound:.0f}",
            'current_value': current_tokens,
            'percentile': self._calculate_percentile(user_id, current_tokens)
        }
    
    def _calculate_percentile(self, user_id: str, value: int) -> float:
        """Berechnet Perzentil-Rang eines Wertes"""
        usages = sorted([e['tokens'] for e in self.usage_history[user_id]])
        return (sum(1 for u in usages if u < value) / len(usages)) * 100
    
    def _calculate_severity(self, z_score: float) -> str:
        """Berechnet Schweregrad der Anomalie"""
        if z_score > 6:
            return 'CRITICAL'
        elif z_score > 4:
            return 'HIGH'
        elif z_score > 3:
            return 'MEDIUM'
        return 'LOW'

Beispiel-Nutzung

if __name__ == "__main__": detector = TokenAnomalyDetector(z_threshold=3.0) # Simuliere normale Nutzung über 30 Tage base_tokens = 15000 # 15K Tokens pro Anfrage (typisch für GPT-4.1) for day in range(30): for _ in range(20): # 20 Anfragen pro Tag tokens = int(np.random.normal(base_tokens, 3000)) detector.add_usage("user_001", max(1000, tokens)) # Teste mit anomaler Anfrage result = detector.detect_zscore_anomaly("user_001", 45000) print(f"Z-Score Analyse: {json.dumps(result, indent=2, default=str)}") # IQR-Test iqr_result = detector.detect_iqr_anomaly("user_001", 45000) print(f"IQR Analyse: {json.dumps(iqr_result, indent=2, default=str)}")

2. Exponentiell gewichteter gleitender Durchschnitt (EWMA)

#!/usr/bin/env python3
"""
EWMA-basiertes Anomalie-Detektionssystem
Geeignet für Echtzeit-Überwachung mit sich ändernden Nutzungsmustern
"""

import numpy as np
from dataclasses import dataclass
from typing import Optional
import time

@dataclass
class EWMAMonitor:
    """
    Exponentiell gewichteter gleitender Durchschnitt Monitor
    Für kontinuierliche Token-Nutzungsverfolgung
    """
    alpha: float = 0.3  # Gewichtung für neuere Daten (0 < α < 1)
    control_limit_multiplier: float = 3.0
    
    def __post_init__(self):
        self.ewma: Optional[float] = None
        self.variance: float = 0.0
        self.sample_count: int = 0
        self.variance_ewma: Optional[float] = None
        
    def update(self, tokens: int, timestamp: Optional[int] = None) -> dict:
        """
        Aktualisiert EWMA mit neuem Token-Wert
        Gibt Anomalie-Status zurück
        """
        if timestamp is None:
            timestamp = int(time.time())
            
        if self.ewma is None:
            self.ewma = tokens
            self.variance = 0
            self.sample_count = 1
            return {'anomaly': False, 'first_sample': True}
        
        # EWMA-Update (exponentielle Glättung)
        prev_ewma = self.ewma
        self.ewma = self.alpha * tokens + (1 - self.alpha) * self.ewma
        
        # Varianz-Update für statistische Kontrolle
        self.sample_count += 1
        delta = tokens - prev_ewma
        self.variance = (1 - self.alpha) * (self.variance + self.alpha * delta**2)
        self.variance_ewma = self.variance
        
        # Kontrollgrenzen berechnen
        sigma = np.sqrt(self.variance)
        ucl = self.ewma + self.control_limit_multiplier * sigma  # Upper Control Limit
        lcl = max(0, self.ewma - self.control_limit_multiplier * sigma)  # Lower Control Limit
        
        # Anomalie-Erkennung
        is_anomaly = tokens > ucl or tokens < lcl
        
        return {
            'anomaly': is_anomaly,
            'tokens': tokens,
            'ewma': round(self.ewma, 2),
            'sigma': round(sigma, 2),
            'ucl': round(ucl, 2),
            'lcl': round(lcl, 2),
            'deviation_from_ewma': round((tokens - self.ewma) / self.ewma * 100, 2) if self.ewma > 0 else 0,
            'sample_count': self.sample_count
        }
    
    def get_statistics(self) -> dict:
        """Gibt aktuelle Statistiken zurück"""
        return {
            'ewma': round(self.ewma, 2) if self.ewma else None,
            'variance': round(self.variance, 2),
            'sample_count': self.sample_count,
            'control_limits': {
                'upper': round(self.ewma + self.control_limit_multiplier * np.sqrt(self.variance), 2) if self.ewma else None,
                'lower': round(max(0, self.ewma - self.control_limit_multiplier * np.sqrt(self.variance)), 2) if self.ewma else None
            }
        }

class TokenRateLimiter:
    """
    Rate Limiter mit dynamischer Anpassung basierend auf EWMA
    Integriert mit HolySheep API für automatische Kostenkontrolle
    """
    
    def __init__(self, 
                 daily_limit_tokens: int = 1000000,  # 1M Tokens/Tag
                 hourly_limit_tokens: int = 100000,   # 100K Tokens/Stunde
                 burst_limit_tokens: int = 50000):   # 50K Tokens/Burst
        self.daily_limit = daily_limit_tokens
        self.hourly_limit = hourly_limit_tokens
        self.burst_limit = burst_limit_tokens
        
        self.daily_usage: dict[str, int] = {}
        self.hourly_usage: dict[str, dict] = {}
        self.ewma_monitors: dict[str, EWMAMonitor] = {}
        
    def check_and_update(self, 
                        user_id: str, 
                        tokens: int,
                        timestamp: Optional[int] = None) -> dict:
        """
        Prüft Limits und aktualisiert Verbrauch
        """
        if timestamp is None:
            timestamp = int(time.time())
            
        # EWMA-basierte Anomalie-Prüfung
        if user_id not in self.ewma_monitors:
            self.ewma_monitors[user_id] = EWMAMonitor(alpha=0.3)
        
        ewma_result = self.ewma_monitors[user_id].update(tokens, timestamp)
        
        # Rate Limit Prüfungen
        daily_ok = self._check_daily_limit(user_id, tokens, timestamp)
        hourly_ok = self._check_hourly_limit(user_id, tokens, timestamp)
        burst_ok = self._check_burst_limit(user_id, tokens, timestamp)
        
        all_ok = daily_ok and hourly_ok and burst_ok and not ewma_result['anomaly']
        
        return {
            'allowed': all_ok,
            'rate_limits': {
                'daily': daily_ok,
                'hourly': hourly_ok,
                'burst': burst_ok
            },
            'ewma_anomaly': ewma_result,
            'cost_estimate_usd': round(tokens * 8.0 / 1_000_000, 4)  # GPT-4.1 @ $8/MTok
        }
    
    def _check_daily_limit(self, user_id: str, tokens: int, timestamp: int) -> bool:
        """Prüft Tageslimit"""
        day_key = f"{user_id}_{timestamp // 86400}"
        current = self.daily_usage.get(day_key, 0)
        return (current + tokens) <= self.daily_limit
    
    def _check_hourly_limit(self, user_id: str, tokens: int, timestamp: int) -> bool:
        """Prüft Stundenlimit"""
        hour_key = f"{user_id}_{timestamp // 3600}"
        current = self.hourly_usage.get(hour_key, 0)
        return (current + tokens) <= self.hourly_limit
    
    def _check_burst_limit(self, user_id: str, tokens: int, timestamp: int) -> bool:
        """Prüft Burst-Limit (gleitendes 5-Minuten-Fenster)"""
        window = 300  # 5 Minuten
        minute_key = f"{user_id}_{timestamp // window}"
        current = self.hourly_usage.get(f"burst_{minute_key}", 0)
        return (current + tokens) <= self.burst_limit
    
    def update_usage(self, user_id: str, tokens: int, timestamp: Optional[int] = None):
        """Aktualisiert Verbrauch nach erfolgreicher Anfrage"""
        if timestamp is None:
            timestamp = int(time.time())
            
        day_key = f"{user_id}_{timestamp // 86400}"
        hour_key = f"{user_id}_{timestamp // 3600}"
        burst_key = f"{user_id}_{timestamp // 300}"
        
        self.daily_usage[day_key] = self.daily_usage.get(day_key, 0) + tokens
        self.hourly_usage[hour_key] = self.hourly_usage.get(hour_key, 0) + tokens
        self.hourly_usage[f"burst_{burst_key}"] = self.hourly_usage.get(f"burst_{burst_key}", 0) + tokens

Demonstration

if __name__ == "__main__": limiter = TokenRateLimiter( daily_limit_tokens=5_000_000, # 5M Tokens/Tag hourly_limit_tokens=500_000, # 500K Tokens/Stunde burst_limit_tokens=100_000 # 100K Tokens/Burst ) print("=== Token Rate Limiter Demonstration ===\n") # Simuliere normale Nutzung for i in range(10): tokens = int(np.random.normal(15000, 2000)) result = limiter.check_and_update("user_demo", tokens) if result['ewma_anomaly']['anomaly']: print(f"[!] Anomalie erkannt: {result['ewma_anomaly']['deviation_from_ewma']}% Abweichung") print(f" Tokens: {tokens}, EWMA: {result['ewma_anomaly']['ewma']}") # Simuliere anomalie print("\n--- Anomale Anfrage ---") result = limiter.check_and_update("user_demo", 85000) # 5x normal print(f"Anfrage erlaubt: {result['allowed']}") print(f"EWMA-Status: {result['ewma_anomaly']}") print(f"Geschätzte Kosten: ${result['cost_estimate_usd']}")

Regelbasierte Erkennung mit HolySheep API

Die Kombination aus statistischen Modellen und expliziten Regeln ermöglicht eine Erkennungsrate von über 95%. Im Folgenden zeige ich die vollständige Integration mit der HolySheep AI API.

#!/usr/bin/env python3
"""
Token 用量异常检测 - Vollständige Integration mit HolySheep API
Kombination aus statistischen Modellen und Regel-Engine
"""

import requests
import time
import hashlib
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import hmac
import json

class AlertLevel(Enum):
    INFO = 1
    WARNING = 2
    CRITICAL = 3

@dataclass
class UsageAlert:
    """Struktur für Verbrauchswarnungen"""
    level: AlertLevel
    message: str
    details: Dict[str, Any]
    timestamp: datetime
    user_id: str
    estimated_cost_usd: float

class HolySheepAPIClient:
    """
    Offizieller HolySheep AI API Client
    Mit integrierter Token-Nutzungsverfolgung und Anomalie-Erkennung
    
    Vorteile:
    - GPT-4.1: $8/MTok (vs. $15 bei OpenAI)
    - Latenz: <50ms
    - WeChat/Alipay Zahlung
    - Kostenlose Start-Credits
    """
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.holysheep.ai/v1"
        self.usage_tracker = UsageTracker()
        self.anomaly_detector = TokenAnomalyDetector()
        self.rule_engine = RuleEngine()
        
    def _generate_signature(self, timestamp: int, payload: str) -> str:
        """Generiert HMAC-Signatur für Authentifizierung"""
        message = f"{timestamp}{payload}"
        return hmac.new(
            self.api_key.encode('utf-8'),
            message.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
    
    def chat_completions(self, 
                        model: str,
                        messages: List[Dict],
                        max_tokens: Optional[int] = None,
                        temperature: float = 0.7,
                        user_id: str = "anonymous") -> Dict:
        """
        Sendet Chat-Completion Anfrage an HolySheep API
        
        Verfügbare Modelle (Preise 2026/MTok):
        - gpt-4.1: $8.00
        - claude-sonnet-4.5: $15.00
        - gemini-2.5-flash: $2.50
        - deepseek-v3.2: $0.42
        """
        
        timestamp = int(time.time())
        payload = json.dumps({
            'model': model,
            'messages': messages,
            'max_tokens': max_tokens,
            'temperature': temperature
        }, separators=(',', ':'))
        
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json',
            'X-Timestamp': str(timestamp),
            'X-Signature': self._generate_signature(timestamp, payload),
            'X-User-ID': user_id
        }
        
        # Vor-Anfrage Validierung durch Regel-Engine
        validation = self.rule_engine.validate({
            'user_id': user_id,
            'model': model,
            'max_tokens': max_tokens,
            'temperature': temperature,
            'message_count': len(messages)
        })
        
        if not validation['allowed']:
            return {
                'error': True,
                'error_code': 'RULE_VIOLATION',
                'message': validation['reason'],
                'alerts': validation.get('alerts', [])
            }
        
        # Anfrage senden
        response = requests.post(
            f"{self.base_url}/chat/completions",
            headers=headers,
            data=payload,
            timeout=30
        )
        
        result = response.json()
        
        # Token-Nutzung tracken
        if 'usage' in result:
            usage = result['usage']
            tokens_used = usage.get('total_tokens', 0)
            
            # Anomalie-Prüfung
            anomaly = self.anomaly_detector.detect_zscore_anomaly(user_id, tokens_used)
            
            self.usage_tracker.record(
                user_id=user_id,
                tokens=tokens_used,
                model=model,
                timestamp=datetime.now()
            )
            
            if anomaly['anomaly']:
                result['_alerts'] = [UsageAlert(
                    level=AlertLevel.WARNING,
                    message=f"Anomalie erkannt: Z-Score {anomaly['z_score']}",
                    details=anomaly,
                    timestamp=datetime.now(),
                    user_id=user_id,
                    estimated_cost_usd=self._estimate_cost(tokens_used, model)
                )]
        
        return result
    
    def _estimate_cost(self, tokens: int, model: str) -> float:
        """Schätzt Kosten basierend auf Modell"""
        prices = {
            'gpt-4.1': 8.0,           # $8/MTok
            'claude-sonnet-4.5': 15.0, # $15/MTok
            'gemini-2.5-flash': 2.50,  # $2.50/MTok
            'deepseek-v3.2': 0.42      # $0.42/MTok
        }
        return tokens * prices.get(model, 8.0) / 1_000_000
    
    def get_usage_summary(self, user_id: str) -> Dict:
        """Gibt Nutzungsübersicht für Benutzer zurück"""
        return self.usage_tracker.get_summary(user_id)
    
    def get_active_alerts(self) -> List[UsageAlert]:
        """Gibt aktive Warnungen zurück"""
        alerts = []
        for user_id in self.anomaly_detector.baselines:
            anomaly = self.anomaly_detector.usage_history.get(user_id, [])
            if anomaly:
                recent = anomaly[-1]
                if self.anomaly_detector.detect_zscore_anomaly(user_id, recent['tokens'])['anomaly']:
                    alerts.append(UsageAlert(
                        level=AlertLevel.WARNING,
                        message="Aktive Anomalie erkannt",
                        details={'tokens': recent['tokens']},
                        timestamp=recent['timestamp'],
                        user_id=user_id,
                        estimated_cost_usd=self._estimate_cost(recent['tokens'], 'gpt-4.1')
                    ))
        return alerts

class RuleEngine:
    """
    Regelbasierte Engine für Token-Nutzungsvalidierung
    Ergänzt statistische Modelle mit expliziten Geschäftsregeln
    """
    
    def __init__(self):
        self.rules = [
            {'name': 'max_tokens', 'condition': lambda x: x.get('max_tokens', 0) <= 32000, 'message': 'max_tokens überschreitet Limit'},
            {'name': 'temperature', 'condition': lambda x: 0 <= x.get('temperature', 0.7) <= 2.0, 'message': 'temperature außerhalb gültiger Range'},
            {'name': 'message_count', 'condition': lambda x: x.get('message_count', 0) <= 100, 'message': 'Zu viele Nachrichten im Kontext'},
            {'name': 'model_whitelist', 'condition': lambda x: x.get('model') in ['gpt-4.1', 'claude-sonnet-4.5', 'gemini-2.5-flash', 'deepseek-v3.2'], 'message': 'Ungültiges Modell'},
        ]
    
    def validate(self, request_data: Dict) -> Dict:
        """Validiert Anfrage gegen alle Regeln"""
        violations = []
        
        for rule in self.rules:
            if not rule['condition'](request_data):
                violations.append({
                    'rule': rule['name'],
                    'message': rule['message']
                })
        
        return {
            'allowed': len(violations) == 0,
            'violations': violations,
            'reason': violations[0]['message'] if violations else 'OK',
            'alerts': [{'level': 'warning', 'rule': v['rule']} for v in violations]
        }

class UsageTracker:
    """Trackt Token-Nutzung über Zeit"""
    
    def __init__(self):
        self.records: Dict[str, List] = {}
        
    def record(self, user_id: str, tokens: int, model: str, timestamp: datetime):
        """Zeichnet Nutzung auf"""
        if user_id not in self.records:
            self.records[user_id] = []
        
        self.records[user_id].append({
            'tokens': tokens,
            'model': model,
            'timestamp': timestamp
        })
    
    def get_summary(self, user_id: str) -> Dict:
        """Gibt Zusammenfassung der Nutzung zurück"""
        if user_id not in self.records:
            return {'total_tokens': 0, 'request_count': 0}
        
        records = self.records[user_id]
        total_tokens = sum(r['tokens'] for r in records)
        
        return {
            'total_tokens': total_tokens,
            'request_count': len(records),
            'average_tokens': total_tokens / len(records) if records else 0,
            'estimated_cost_usd': total_tokens * 8.0 / 1_000_000  # GPT-4.1 Preis
        }

Minimal-Implementierung des AnomalyDetectors

class TokenAnomalyDetector: """Vereinfachte Anomalie-Detektion""" def __init__(self): self.baselines = {} self.usage_history = {} def detect_zscore_anomaly(self, user_id: str, tokens: int) -> dict: if user_id not in self.baselines: self.baselines[user_id] = {'mean': tokens, 'std': 1} return {'anomaly': False, 'z_score': 0} baseline = self.baselines[user_id] z = (tokens - baseline['mean']) / max(baseline['std'], 1) if abs(z) > 3: return {'anomaly': True, 'z_score': z} baseline['mean'] = 0.9 * baseline['mean'] + 0.1 * tokens baseline['std'] = 0.9 * baseline['std'] + 0.1 * abs(tokens - baseline['mean']) return {'anomaly': False, 'z_score': z}

Beispiel-Nutzung

if __name__ == "__main__": # API-Client initialisieren client = HolySheepAPIClient(api_key="YOUR_HOLYSHEEP_API_KEY") # Normale Anfrage response = client.chat_completions( model="gpt-4.1", messages=[ {"role": "system", "content": "Du bist ein Assistent."}, {"role": "user", "content": "Erkläre Token-Anomalie-Erkennung."} ], max_tokens=1000, user_id="user_123" ) if 'error' in response and response['error']: print(f"Fehler: {response['message']}") else: print(f"Antwort erhalten: {response.get('choices', [{}])[0].get('message', {}).get('content', '')[:100]}...") # Nutzungsübersicht summary = client.get_usage_summary("user_123") print(f"\nNutzungsübersicht:") print(f"- Tokens: {summary['total_tokens']:,}") print(f"- Anfragen: {summary['request_count']}") print(f"- Geschätzte Kosten: ${summary['estimated_cost_usd']:.4f}")

Häufige Fehler und Lösungen

Fehler 1: Infinite Loop durch fehlende Request-IDs

# FEHLERHAFT - Keine Request-ID Generierung
def process_batch_incorrect(items):
    results = []
    for item in items:
        # Jede Anfrage hat dieselbe User-ID
        response = client.chat_completions(
            model="gpt-4.1",
            messages=[{"role": "user", "content": item}],
            user_id="batch_user"  # PROBLEM: Alle Anfragen gleich
        )
        results.append(response)
    return results

LÖSUNG - Eindeutige Request-IDs mit Exponential-Backoff

import uuid import asyncio class RobustBatchProcessor: """ Robuster Batch-Prozessor mit: - Eindeutigen Request-IDs - Exponential-Backoff bei Rate-Limits - Anomalie-Erkennung """ def __init__(self, api_client: HolySheepAPIClient): self.client = api_client self.request_log = {} self.anomaly_threshold = 100 # Max 100 Requests pro Minute def _generate_request_id(self, item: str, batch_id: str) -> str: """Generiert eindeutige Request-ID mit Batch-Kontext""" item_hash = hashlib.md5(item.encode()).hexdigest()[:8] return f"{batch_id}_{item_hash}_{int(time.time() * 1000)}" async def process_batch(self, items: List[str], batch_id: Optional[str] = None, max_concurrent: int = 5) -> List[Dict]: """Verarbeitet Batch mit Ratenbegrenzung und Überwachung""" if batch_id is None: batch_id = str(uuid.uuid4())[:8] results = [] semaphore = asyncio.Semaphore(max_concurrent) rate_limiter = RateLimiter(max_requests_per_minute=self.anomaly_threshold) async def process_single(item: str, index: int) -> Dict: async with semaphore: request_id = self._generate_request_id(item, batch_id) # Rate-Limit Prüfung if not rate_limiter.try_acquire(): await asyncio.sleep(2 ** min(rate_limiter.backoff_count, 6)) rate_limiter.increment_backoff() # Anfrage mit Retry-Logik for attempt in range(3): try: response = await asyncio.to_thread( self.client.chat_completions, model="gpt-4.1", messages=[{"role": "user", "content": item}], max_tokens=2000, user_id=request_id # Eindeutige ID ) self.request_log[request_id] = { 'status': 'success', 'timestamp': datetime.now(), 'tokens': response.get('usage', {}).get('total_tokens', 0) } return {'request_id': request_id, 'response': response} except Exception as e: if 'rate_limit' in str(e).lower(): await asyncio.sleep(2 ** attempt) continue raise return {'request_id': request_id, 'error': 'Max retries exceeded'} # Parallele Verarbeitung mit Fortschrittsanzeige tasks = [process_single(item, i) for i, item in enumerate(items)] for i, coro in enumerate(asyncio.as_completed(tasks)): result = await coro results.append(result) print(f"Fortschritt: {len(results)}/{len(items)} - {result.get('request_id', 'N/A')}") # Anomalie-Prüfung nach Batch self._check_batch_anomaly(batch_id, results) return results def _check_batch_anomaly(self, batch_id: str, results: List[Dict]): """Prüft auf Batch-Anomalien""" total_tokens = sum( r.get('response', {}).get('usage', {}).get('total_tokens', 0) for r in results )