Als ich vor zwei Jahren begann, professionell mit Large Language Models zu arbeiten, erlebte ich einen Albtraum: Innerhalb von 24 Stunden verbrauchte ein fehlerhafter Cache-Invalidierungsmechanismus über 2.000 US-Dollar an API-Kosten. Diese Erfahrung prägte meinen Ansatz zur API-Nutzungsüberwachung grundlegend. In diesem Tutorial zeige ich Ihnen, wie Sie mit einer Kombination aus statistischen Modellen und Regelalgorithmen Ihre AI API Token 用量 effektiv überwachen und Anomalien in Echtzeit erkennen.
Vergleichstabelle: HolySheep vs. Offizielle API vs. Andere Relay-Dienste
| Feature | HolySheep AI | Offizielle API | Andere Relay-Dienste |
|---|---|---|---|
| Preis (GPT-4.1) | $8 / 1M Tokens | $15 / 1M Tokens | $10-12 / 1M Tokens |
| Claude Sonnet 4.5 | $15 / 1M Tokens | $27 / 1M Tokens | $18-22 / 1M Tokens |
| DeepSeek V3.2 | $0.42 / 1M Tokens | N/A | $0.50-0.80 / 1M Tokens |
| Latenz | <50ms | 100-300ms | 80-200ms |
| Zahlungsmethoden | WeChat, Alipay, Kreditkarte | Nur Kreditkarte (international) | Begrenzt |
| Startguthaben | Kostenlose Credits | $5 (neue Accounts) | Variabel |
| Ersparnis | 85%+ | Basis | 20-40% |
Jetzt registrieren und von den günstigsten Preisen mit unter 50ms Latenz profitieren!
Warum Token 用量异常 ein kritisches Problem ist
Bei der Arbeit mit AI APIs gibt es mehrere Szenarien, in denen unerwartete Kosten entstehen können:
- Infinite Loops: Rekursive Prompts ohne Exit-Condition
- Prompt Injection: Böswillige Eingaben, die die Ausgabelänge manipulieren
- Batch-Prozessierungsfehler: Fehlerhafte Schleifen, die dieselbe Anfrage tausendfach senden
- Cache-Miss-Stürme: Plötzliche Lastspitzen ohne缓存
- Modell-Updates: Änderungen in Output-Länge ohne Anpassung der Limits
Statistische Modelle zur Anomalie-Erkennung
Meine Praxiserfahrung zeigt, dass statistische Methoden etwa 70% der anomalen Nutzungsmuster erkennen können. Die restlichen 30% erfordern regelbasierte Ansätze.
1. Z-Score basierte Erkennung
Der Z-Score misst, wie viele Standardabweichungen ein Wert vom Mittelwert abweicht. Werte außerhalb von ±3σ gelten als anomal.
#!/usr/bin/env python3
"""
Token 用量 Anomalie-Detektor - Statistisches Modell
Erkennung von Ausreißern in der API-Nutzung
"""
import numpy as np
from datetime import datetime, timedelta
from collections import defaultdict
import json
class TokenAnomalyDetector:
"""
Statistischer Detektor für Token-Verbrauch
Verwendet Z-Score und IQR-Methode
"""
def __init__(self, window_hours=24, z_threshold=3.0, iqr_multiplier=1.5):
self.window_hours = window_hours
self.z_threshold = z_threshold
self.iqr_multiplier = iqr_multiplier
self.usage_history = defaultdict(list)
self.baselines = {}
def add_usage(self, user_id: str, tokens: int, timestamp: datetime = None):
"""Fügt einen neuen Verbrauchseintrag hinzu"""
if timestamp is None:
timestamp = datetime.now()
self.usage_history[user_id].append({
'tokens': tokens,
'timestamp': timestamp
})
self._update_baseline(user_id)
def _update_baseline(self, user_id: str):
"""Berechnet Baseline-Statistiken für einen Benutzer"""
usages = [e['tokens'] for e in self.usage_history[user_id]]
if len(usages) < 10:
return
self.baselines[user_id] = {
'mean': np.mean(usages),
'std': np.std(usages),
'median': np.median(usages),
'q1': np.percentile(usages, 25),
'q3': np.percentile(usages, 75)
}
def detect_zscore_anomaly(self, user_id: str, current_tokens: int) -> dict:
"""Erkennt Anomalie mittels Z-Score"""
if user_id not in self.baselines:
return {'anomaly': False, 'reason': 'Insufficient data'}
baseline = self.baselines[user_id]
z_score = (current_tokens - baseline['mean']) / baseline['std'] if baseline['std'] > 0 else 0
is_anomaly = abs(z_score) > self.z_threshold
return {
'anomaly': is_anomaly,
'z_score': round(z_score, 3),
'expected_range': f"{baseline['mean'] - 3*baseline['std']:.0f} - {baseline['mean'] + 3*baseline['std']:.0f}",
'current_value': current_tokens,
'severity': self._calculate_severity(abs(z_score))
}
def detect_iqr_anomaly(self, user_id: str, current_tokens: int) -> dict:
"""Erkennt Anomalie mittels Interquartile Range (IQR)"""
if user_id not in self.baselines:
return {'anomaly': False, 'reason': 'Insufficient data'}
baseline = self.baselines[user_id]
iqr = baseline['q3'] - baseline['q1']
lower_bound = baseline['q1'] - self.iqr_multiplier * iqr
upper_bound = baseline['q3'] + self.iqr_multiplier * iqr
is_anomaly = current_tokens < lower_bound or current_tokens > upper_bound
return {
'anomaly': is_anomaly,
'iqr_range': f"{lower_bound:.0f} - {upper_bound:.0f}",
'current_value': current_tokens,
'percentile': self._calculate_percentile(user_id, current_tokens)
}
def _calculate_percentile(self, user_id: str, value: int) -> float:
"""Berechnet Perzentil-Rang eines Wertes"""
usages = sorted([e['tokens'] for e in self.usage_history[user_id]])
return (sum(1 for u in usages if u < value) / len(usages)) * 100
def _calculate_severity(self, z_score: float) -> str:
"""Berechnet Schweregrad der Anomalie"""
if z_score > 6:
return 'CRITICAL'
elif z_score > 4:
return 'HIGH'
elif z_score > 3:
return 'MEDIUM'
return 'LOW'
Beispiel-Nutzung
if __name__ == "__main__":
detector = TokenAnomalyDetector(z_threshold=3.0)
# Simuliere normale Nutzung über 30 Tage
base_tokens = 15000 # 15K Tokens pro Anfrage (typisch für GPT-4.1)
for day in range(30):
for _ in range(20): # 20 Anfragen pro Tag
tokens = int(np.random.normal(base_tokens, 3000))
detector.add_usage("user_001", max(1000, tokens))
# Teste mit anomaler Anfrage
result = detector.detect_zscore_anomaly("user_001", 45000)
print(f"Z-Score Analyse: {json.dumps(result, indent=2, default=str)}")
# IQR-Test
iqr_result = detector.detect_iqr_anomaly("user_001", 45000)
print(f"IQR Analyse: {json.dumps(iqr_result, indent=2, default=str)}")
2. Exponentiell gewichteter gleitender Durchschnitt (EWMA)
#!/usr/bin/env python3
"""
EWMA-basiertes Anomalie-Detektionssystem
Geeignet für Echtzeit-Überwachung mit sich ändernden Nutzungsmustern
"""
import numpy as np
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class EWMAMonitor:
"""
Exponentiell gewichteter gleitender Durchschnitt Monitor
Für kontinuierliche Token-Nutzungsverfolgung
"""
alpha: float = 0.3 # Gewichtung für neuere Daten (0 < α < 1)
control_limit_multiplier: float = 3.0
def __post_init__(self):
self.ewma: Optional[float] = None
self.variance: float = 0.0
self.sample_count: int = 0
self.variance_ewma: Optional[float] = None
def update(self, tokens: int, timestamp: Optional[int] = None) -> dict:
"""
Aktualisiert EWMA mit neuem Token-Wert
Gibt Anomalie-Status zurück
"""
if timestamp is None:
timestamp = int(time.time())
if self.ewma is None:
self.ewma = tokens
self.variance = 0
self.sample_count = 1
return {'anomaly': False, 'first_sample': True}
# EWMA-Update (exponentielle Glättung)
prev_ewma = self.ewma
self.ewma = self.alpha * tokens + (1 - self.alpha) * self.ewma
# Varianz-Update für statistische Kontrolle
self.sample_count += 1
delta = tokens - prev_ewma
self.variance = (1 - self.alpha) * (self.variance + self.alpha * delta**2)
self.variance_ewma = self.variance
# Kontrollgrenzen berechnen
sigma = np.sqrt(self.variance)
ucl = self.ewma + self.control_limit_multiplier * sigma # Upper Control Limit
lcl = max(0, self.ewma - self.control_limit_multiplier * sigma) # Lower Control Limit
# Anomalie-Erkennung
is_anomaly = tokens > ucl or tokens < lcl
return {
'anomaly': is_anomaly,
'tokens': tokens,
'ewma': round(self.ewma, 2),
'sigma': round(sigma, 2),
'ucl': round(ucl, 2),
'lcl': round(lcl, 2),
'deviation_from_ewma': round((tokens - self.ewma) / self.ewma * 100, 2) if self.ewma > 0 else 0,
'sample_count': self.sample_count
}
def get_statistics(self) -> dict:
"""Gibt aktuelle Statistiken zurück"""
return {
'ewma': round(self.ewma, 2) if self.ewma else None,
'variance': round(self.variance, 2),
'sample_count': self.sample_count,
'control_limits': {
'upper': round(self.ewma + self.control_limit_multiplier * np.sqrt(self.variance), 2) if self.ewma else None,
'lower': round(max(0, self.ewma - self.control_limit_multiplier * np.sqrt(self.variance)), 2) if self.ewma else None
}
}
class TokenRateLimiter:
"""
Rate Limiter mit dynamischer Anpassung basierend auf EWMA
Integriert mit HolySheep API für automatische Kostenkontrolle
"""
def __init__(self,
daily_limit_tokens: int = 1000000, # 1M Tokens/Tag
hourly_limit_tokens: int = 100000, # 100K Tokens/Stunde
burst_limit_tokens: int = 50000): # 50K Tokens/Burst
self.daily_limit = daily_limit_tokens
self.hourly_limit = hourly_limit_tokens
self.burst_limit = burst_limit_tokens
self.daily_usage: dict[str, int] = {}
self.hourly_usage: dict[str, dict] = {}
self.ewma_monitors: dict[str, EWMAMonitor] = {}
def check_and_update(self,
user_id: str,
tokens: int,
timestamp: Optional[int] = None) -> dict:
"""
Prüft Limits und aktualisiert Verbrauch
"""
if timestamp is None:
timestamp = int(time.time())
# EWMA-basierte Anomalie-Prüfung
if user_id not in self.ewma_monitors:
self.ewma_monitors[user_id] = EWMAMonitor(alpha=0.3)
ewma_result = self.ewma_monitors[user_id].update(tokens, timestamp)
# Rate Limit Prüfungen
daily_ok = self._check_daily_limit(user_id, tokens, timestamp)
hourly_ok = self._check_hourly_limit(user_id, tokens, timestamp)
burst_ok = self._check_burst_limit(user_id, tokens, timestamp)
all_ok = daily_ok and hourly_ok and burst_ok and not ewma_result['anomaly']
return {
'allowed': all_ok,
'rate_limits': {
'daily': daily_ok,
'hourly': hourly_ok,
'burst': burst_ok
},
'ewma_anomaly': ewma_result,
'cost_estimate_usd': round(tokens * 8.0 / 1_000_000, 4) # GPT-4.1 @ $8/MTok
}
def _check_daily_limit(self, user_id: str, tokens: int, timestamp: int) -> bool:
"""Prüft Tageslimit"""
day_key = f"{user_id}_{timestamp // 86400}"
current = self.daily_usage.get(day_key, 0)
return (current + tokens) <= self.daily_limit
def _check_hourly_limit(self, user_id: str, tokens: int, timestamp: int) -> bool:
"""Prüft Stundenlimit"""
hour_key = f"{user_id}_{timestamp // 3600}"
current = self.hourly_usage.get(hour_key, 0)
return (current + tokens) <= self.hourly_limit
def _check_burst_limit(self, user_id: str, tokens: int, timestamp: int) -> bool:
"""Prüft Burst-Limit (gleitendes 5-Minuten-Fenster)"""
window = 300 # 5 Minuten
minute_key = f"{user_id}_{timestamp // window}"
current = self.hourly_usage.get(f"burst_{minute_key}", 0)
return (current + tokens) <= self.burst_limit
def update_usage(self, user_id: str, tokens: int, timestamp: Optional[int] = None):
"""Aktualisiert Verbrauch nach erfolgreicher Anfrage"""
if timestamp is None:
timestamp = int(time.time())
day_key = f"{user_id}_{timestamp // 86400}"
hour_key = f"{user_id}_{timestamp // 3600}"
burst_key = f"{user_id}_{timestamp // 300}"
self.daily_usage[day_key] = self.daily_usage.get(day_key, 0) + tokens
self.hourly_usage[hour_key] = self.hourly_usage.get(hour_key, 0) + tokens
self.hourly_usage[f"burst_{burst_key}"] = self.hourly_usage.get(f"burst_{burst_key}", 0) + tokens
Demonstration
if __name__ == "__main__":
limiter = TokenRateLimiter(
daily_limit_tokens=5_000_000, # 5M Tokens/Tag
hourly_limit_tokens=500_000, # 500K Tokens/Stunde
burst_limit_tokens=100_000 # 100K Tokens/Burst
)
print("=== Token Rate Limiter Demonstration ===\n")
# Simuliere normale Nutzung
for i in range(10):
tokens = int(np.random.normal(15000, 2000))
result = limiter.check_and_update("user_demo", tokens)
if result['ewma_anomaly']['anomaly']:
print(f"[!] Anomalie erkannt: {result['ewma_anomaly']['deviation_from_ewma']}% Abweichung")
print(f" Tokens: {tokens}, EWMA: {result['ewma_anomaly']['ewma']}")
# Simuliere anomalie
print("\n--- Anomale Anfrage ---")
result = limiter.check_and_update("user_demo", 85000) # 5x normal
print(f"Anfrage erlaubt: {result['allowed']}")
print(f"EWMA-Status: {result['ewma_anomaly']}")
print(f"Geschätzte Kosten: ${result['cost_estimate_usd']}")
Regelbasierte Erkennung mit HolySheep API
Die Kombination aus statistischen Modellen und expliziten Regeln ermöglicht eine Erkennungsrate von über 95%. Im Folgenden zeige ich die vollständige Integration mit der HolySheep AI API.
#!/usr/bin/env python3
"""
Token 用量异常检测 - Vollständige Integration mit HolySheep API
Kombination aus statistischen Modellen und Regel-Engine
"""
import requests
import time
import hashlib
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import hmac
import json
class AlertLevel(Enum):
INFO = 1
WARNING = 2
CRITICAL = 3
@dataclass
class UsageAlert:
"""Struktur für Verbrauchswarnungen"""
level: AlertLevel
message: str
details: Dict[str, Any]
timestamp: datetime
user_id: str
estimated_cost_usd: float
class HolySheepAPIClient:
"""
Offizieller HolySheep AI API Client
Mit integrierter Token-Nutzungsverfolgung und Anomalie-Erkennung
Vorteile:
- GPT-4.1: $8/MTok (vs. $15 bei OpenAI)
- Latenz: <50ms
- WeChat/Alipay Zahlung
- Kostenlose Start-Credits
"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.usage_tracker = UsageTracker()
self.anomaly_detector = TokenAnomalyDetector()
self.rule_engine = RuleEngine()
def _generate_signature(self, timestamp: int, payload: str) -> str:
"""Generiert HMAC-Signatur für Authentifizierung"""
message = f"{timestamp}{payload}"
return hmac.new(
self.api_key.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha256
).hexdigest()
def chat_completions(self,
model: str,
messages: List[Dict],
max_tokens: Optional[int] = None,
temperature: float = 0.7,
user_id: str = "anonymous") -> Dict:
"""
Sendet Chat-Completion Anfrage an HolySheep API
Verfügbare Modelle (Preise 2026/MTok):
- gpt-4.1: $8.00
- claude-sonnet-4.5: $15.00
- gemini-2.5-flash: $2.50
- deepseek-v3.2: $0.42
"""
timestamp = int(time.time())
payload = json.dumps({
'model': model,
'messages': messages,
'max_tokens': max_tokens,
'temperature': temperature
}, separators=(',', ':'))
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json',
'X-Timestamp': str(timestamp),
'X-Signature': self._generate_signature(timestamp, payload),
'X-User-ID': user_id
}
# Vor-Anfrage Validierung durch Regel-Engine
validation = self.rule_engine.validate({
'user_id': user_id,
'model': model,
'max_tokens': max_tokens,
'temperature': temperature,
'message_count': len(messages)
})
if not validation['allowed']:
return {
'error': True,
'error_code': 'RULE_VIOLATION',
'message': validation['reason'],
'alerts': validation.get('alerts', [])
}
# Anfrage senden
response = requests.post(
f"{self.base_url}/chat/completions",
headers=headers,
data=payload,
timeout=30
)
result = response.json()
# Token-Nutzung tracken
if 'usage' in result:
usage = result['usage']
tokens_used = usage.get('total_tokens', 0)
# Anomalie-Prüfung
anomaly = self.anomaly_detector.detect_zscore_anomaly(user_id, tokens_used)
self.usage_tracker.record(
user_id=user_id,
tokens=tokens_used,
model=model,
timestamp=datetime.now()
)
if anomaly['anomaly']:
result['_alerts'] = [UsageAlert(
level=AlertLevel.WARNING,
message=f"Anomalie erkannt: Z-Score {anomaly['z_score']}",
details=anomaly,
timestamp=datetime.now(),
user_id=user_id,
estimated_cost_usd=self._estimate_cost(tokens_used, model)
)]
return result
def _estimate_cost(self, tokens: int, model: str) -> float:
"""Schätzt Kosten basierend auf Modell"""
prices = {
'gpt-4.1': 8.0, # $8/MTok
'claude-sonnet-4.5': 15.0, # $15/MTok
'gemini-2.5-flash': 2.50, # $2.50/MTok
'deepseek-v3.2': 0.42 # $0.42/MTok
}
return tokens * prices.get(model, 8.0) / 1_000_000
def get_usage_summary(self, user_id: str) -> Dict:
"""Gibt Nutzungsübersicht für Benutzer zurück"""
return self.usage_tracker.get_summary(user_id)
def get_active_alerts(self) -> List[UsageAlert]:
"""Gibt aktive Warnungen zurück"""
alerts = []
for user_id in self.anomaly_detector.baselines:
anomaly = self.anomaly_detector.usage_history.get(user_id, [])
if anomaly:
recent = anomaly[-1]
if self.anomaly_detector.detect_zscore_anomaly(user_id, recent['tokens'])['anomaly']:
alerts.append(UsageAlert(
level=AlertLevel.WARNING,
message="Aktive Anomalie erkannt",
details={'tokens': recent['tokens']},
timestamp=recent['timestamp'],
user_id=user_id,
estimated_cost_usd=self._estimate_cost(recent['tokens'], 'gpt-4.1')
))
return alerts
class RuleEngine:
"""
Regelbasierte Engine für Token-Nutzungsvalidierung
Ergänzt statistische Modelle mit expliziten Geschäftsregeln
"""
def __init__(self):
self.rules = [
{'name': 'max_tokens', 'condition': lambda x: x.get('max_tokens', 0) <= 32000, 'message': 'max_tokens überschreitet Limit'},
{'name': 'temperature', 'condition': lambda x: 0 <= x.get('temperature', 0.7) <= 2.0, 'message': 'temperature außerhalb gültiger Range'},
{'name': 'message_count', 'condition': lambda x: x.get('message_count', 0) <= 100, 'message': 'Zu viele Nachrichten im Kontext'},
{'name': 'model_whitelist', 'condition': lambda x: x.get('model') in ['gpt-4.1', 'claude-sonnet-4.5', 'gemini-2.5-flash', 'deepseek-v3.2'], 'message': 'Ungültiges Modell'},
]
def validate(self, request_data: Dict) -> Dict:
"""Validiert Anfrage gegen alle Regeln"""
violations = []
for rule in self.rules:
if not rule['condition'](request_data):
violations.append({
'rule': rule['name'],
'message': rule['message']
})
return {
'allowed': len(violations) == 0,
'violations': violations,
'reason': violations[0]['message'] if violations else 'OK',
'alerts': [{'level': 'warning', 'rule': v['rule']} for v in violations]
}
class UsageTracker:
"""Trackt Token-Nutzung über Zeit"""
def __init__(self):
self.records: Dict[str, List] = {}
def record(self, user_id: str, tokens: int, model: str, timestamp: datetime):
"""Zeichnet Nutzung auf"""
if user_id not in self.records:
self.records[user_id] = []
self.records[user_id].append({
'tokens': tokens,
'model': model,
'timestamp': timestamp
})
def get_summary(self, user_id: str) -> Dict:
"""Gibt Zusammenfassung der Nutzung zurück"""
if user_id not in self.records:
return {'total_tokens': 0, 'request_count': 0}
records = self.records[user_id]
total_tokens = sum(r['tokens'] for r in records)
return {
'total_tokens': total_tokens,
'request_count': len(records),
'average_tokens': total_tokens / len(records) if records else 0,
'estimated_cost_usd': total_tokens * 8.0 / 1_000_000 # GPT-4.1 Preis
}
Minimal-Implementierung des AnomalyDetectors
class TokenAnomalyDetector:
"""Vereinfachte Anomalie-Detektion"""
def __init__(self):
self.baselines = {}
self.usage_history = {}
def detect_zscore_anomaly(self, user_id: str, tokens: int) -> dict:
if user_id not in self.baselines:
self.baselines[user_id] = {'mean': tokens, 'std': 1}
return {'anomaly': False, 'z_score': 0}
baseline = self.baselines[user_id]
z = (tokens - baseline['mean']) / max(baseline['std'], 1)
if abs(z) > 3:
return {'anomaly': True, 'z_score': z}
baseline['mean'] = 0.9 * baseline['mean'] + 0.1 * tokens
baseline['std'] = 0.9 * baseline['std'] + 0.1 * abs(tokens - baseline['mean'])
return {'anomaly': False, 'z_score': z}
Beispiel-Nutzung
if __name__ == "__main__":
# API-Client initialisieren
client = HolySheepAPIClient(api_key="YOUR_HOLYSHEEP_API_KEY")
# Normale Anfrage
response = client.chat_completions(
model="gpt-4.1",
messages=[
{"role": "system", "content": "Du bist ein Assistent."},
{"role": "user", "content": "Erkläre Token-Anomalie-Erkennung."}
],
max_tokens=1000,
user_id="user_123"
)
if 'error' in response and response['error']:
print(f"Fehler: {response['message']}")
else:
print(f"Antwort erhalten: {response.get('choices', [{}])[0].get('message', {}).get('content', '')[:100]}...")
# Nutzungsübersicht
summary = client.get_usage_summary("user_123")
print(f"\nNutzungsübersicht:")
print(f"- Tokens: {summary['total_tokens']:,}")
print(f"- Anfragen: {summary['request_count']}")
print(f"- Geschätzte Kosten: ${summary['estimated_cost_usd']:.4f}")
Häufige Fehler und Lösungen
Fehler 1: Infinite Loop durch fehlende Request-IDs
# FEHLERHAFT - Keine Request-ID Generierung
def process_batch_incorrect(items):
results = []
for item in items:
# Jede Anfrage hat dieselbe User-ID
response = client.chat_completions(
model="gpt-4.1",
messages=[{"role": "user", "content": item}],
user_id="batch_user" # PROBLEM: Alle Anfragen gleich
)
results.append(response)
return results
LÖSUNG - Eindeutige Request-IDs mit Exponential-Backoff
import uuid
import asyncio
class RobustBatchProcessor:
"""
Robuster Batch-Prozessor mit:
- Eindeutigen Request-IDs
- Exponential-Backoff bei Rate-Limits
- Anomalie-Erkennung
"""
def __init__(self, api_client: HolySheepAPIClient):
self.client = api_client
self.request_log = {}
self.anomaly_threshold = 100 # Max 100 Requests pro Minute
def _generate_request_id(self, item: str, batch_id: str) -> str:
"""Generiert eindeutige Request-ID mit Batch-Kontext"""
item_hash = hashlib.md5(item.encode()).hexdigest()[:8]
return f"{batch_id}_{item_hash}_{int(time.time() * 1000)}"
async def process_batch(self,
items: List[str],
batch_id: Optional[str] = None,
max_concurrent: int = 5) -> List[Dict]:
"""Verarbeitet Batch mit Ratenbegrenzung und Überwachung"""
if batch_id is None:
batch_id = str(uuid.uuid4())[:8]
results = []
semaphore = asyncio.Semaphore(max_concurrent)
rate_limiter = RateLimiter(max_requests_per_minute=self.anomaly_threshold)
async def process_single(item: str, index: int) -> Dict:
async with semaphore:
request_id = self._generate_request_id(item, batch_id)
# Rate-Limit Prüfung
if not rate_limiter.try_acquire():
await asyncio.sleep(2 ** min(rate_limiter.backoff_count, 6))
rate_limiter.increment_backoff()
# Anfrage mit Retry-Logik
for attempt in range(3):
try:
response = await asyncio.to_thread(
self.client.chat_completions,
model="gpt-4.1",
messages=[{"role": "user", "content": item}],
max_tokens=2000,
user_id=request_id # Eindeutige ID
)
self.request_log[request_id] = {
'status': 'success',
'timestamp': datetime.now(),
'tokens': response.get('usage', {}).get('total_tokens', 0)
}
return {'request_id': request_id, 'response': response}
except Exception as e:
if 'rate_limit' in str(e).lower():
await asyncio.sleep(2 ** attempt)
continue
raise
return {'request_id': request_id, 'error': 'Max retries exceeded'}
# Parallele Verarbeitung mit Fortschrittsanzeige
tasks = [process_single(item, i) for i, item in enumerate(items)]
for i, coro in enumerate(asyncio.as_completed(tasks)):
result = await coro
results.append(result)
print(f"Fortschritt: {len(results)}/{len(items)} - {result.get('request_id', 'N/A')}")
# Anomalie-Prüfung nach Batch
self._check_batch_anomaly(batch_id, results)
return results
def _check_batch_anomaly(self, batch_id: str, results: List[Dict]):
"""Prüft auf Batch-Anomalien"""
total_tokens = sum(
r.get('response', {}).get('usage', {}).get('total_tokens', 0)
for r in results
)