Als Entwickler, der seit über drei Jahren produktive KI-Anwendungen mit multimodularen Architekturen betreibt, habe ich unzählige Stunden damit verbracht, zuverlässige Failover-Systeme zwischen verschiedenen KI-Modellen aufzubauen. Die Herausforderung liegt nicht nur in der technischen Implementierung, sondern auch in der Kostenoptimierung: Wenn Sie 10 Millionen Token pro Monat verarbeiten, entscheidet Ihr Failover-Design direkt über Ihre monatliche Rechnung.
In diesem Tutorial zeige ich Ihnen, wie Sie mit HolySheep AI ein robustes Multi-Model-Failover-System implementieren, das gleichzeitig Kosten spart und Ausfallsicherheit gewährleistet.
Warum Multi-Model Failover?
Ein solides Failover-System ist unverzichtbar, wenn Sie geschäftskritische KI-Anwendungen betreiben. Die Vorteile liegen auf der Hand:
- Hohe Verfügbarkeit: Kein Single Point of Failure bei Modelldienstausfällen
- Kostenoptimierung: Automatische Auswahl des günstigsten verfügbaren Modells
- Latenzreduzierung: Failover auf Modelle mit niedrigerer Antwortzeit
- Resilienz: Überstehen von API-Limit-Überschreitungen und Rate-Limits
Kostenvergleich: Die 2026er Preise machen den Unterschied
Bevor wir in den Code eintauchen, müssen wir die aktuellen Preise verstehen. Die folgenden Daten sind für Mai 2026 verifiziert:
| Modell | Output-Preis ($/MTok) | 10M Token/Monat | Relativer Aufwand |
|---|---|---|---|
| DeepSeek V3.2 | $0.42 | $4.200 | Baseline |
| Gemini 2.5 Flash | $2.50 | $25.000 | ~6x teurer |
| GPT-4.1 | $8.00 | $80.000 | ~19x teurer |
| Claude Sonnet 4.5 | $15.00 | $150.000 | ~36x teurer |
Erkenntnis: Wenn Sie DeepSeek V3.2 als primäres Modell verwenden und nur bei Ausfällen auf teurere Modelle ausweichen, sparen Sie bei 10M Token/Monat potenziell über $145.000 jährlich im Vergleich zu Claude Sonnet 4.5 als primärem Modell.
Geeignet / Nicht geeignet für
✅ Perfekt geeignet für:
- Produktive KI-Anwendungen mit SLA-Anforderungen
- Batch-Verarbeitung mit hohem Token-Volumen
- Entwicklungsumgebungen mit Budget-Beschränkungen
- Apps, die sowohl Geschwindigkeit als auch Kosteneffizienz benötigen
- Teams, die WeChat oder Alipay für Zahlungen nutzen möchten
❌ Weniger geeignet für:
- Projekte mit extrem geringem Volumen (< 10.000 Token/Monat)
- Anwendungen, die ausschließlich ein einzelnes Modell erfordern (z.B. nur Claude-Features)
- Streng regulierte Branchen mit compliancespezifischen Modellvorgaben
Architektur des HolySheep Relay-Systems
HolySheep bietet mit seinem Relay-Endpunkt eine zentrale Schnittstelle, die Anfragen automatisch an konfigurierte Modelle weiterleitet. Die Architektur umfasst:
- Primary Model: Günstigstes Modell für reguläre Anfragen (DeepSeek V3.2)
- Fallback Chain: Zweitgünstigste bis teuerste Modelle in Prioritätsreihenfolge
- Health Monitoring: Automatische Erkennung von Ausfällen und Latenzproblemen
- Cost Tracker: Echtzeit-Überwachung der Token-Kosten
Implementierung: Schritt-für-Schritt
Voraussetzungen
Bevor Sie beginnen, benötigen Sie:
- Ein HolySheep AI Konto (inklusive kostenloser Start-Credits)
- Python 3.8+ mit
requestsundhttpxBibliotheken - Ihren HolySheep API-Key
Schritt 1: Grundlegendes Failover-System
#!/usr/bin/env python3
"""
HolySheep Multi-Model Failover System
Base URL: https://api.holysheep.ai/v1
"""
import requests
import time
from typing import Optional, List, Dict
from dataclasses import dataclass
from enum import Enum
class ModelPriority(Enum):
"""Modellprioritäten basierend auf Kosten (2026 Preise)"""
DEEPSEEK_V32 = 1 # $0.42/MTok - Primär
GEMINI_FLASH = 2 # $2.50/MTok - Fallback 1
GPT_41 = 3 # $8.00/MTok - Fallback 2
CLAUDE_SONNET = 4 # $15.00/MTok - Letzter Fallback
@dataclass
class ModelConfig:
name: str
provider: str
cost_per_mtok: float
max_latency_ms: int
enabled: bool = True
class HolySheepRelayClient:
"""Multi-Model Failover Client für HolySheep API"""
BASE_URL = "https://api.holysheep.ai/v1"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# Modellkonfiguration mit 2026 Preisen
self.models = {
"deepseek-v3.2": ModelConfig(
name="DeepSeek V3.2",
provider="deepseek",
cost_per_mtok=0.42,
max_latency_ms=2000,
enabled=True
),
"gpt-4.1": ModelConfig(
name="GPT-4.1",
provider="openai",
cost_per_mtok=8.00,
max_latency_ms=5000,
enabled=True
),
"claude-sonnet-4.5": ModelConfig(
name="Claude Sonnet 4.5",
provider="anthropic",
cost_per_mtok=15.00,
max_latency_ms=6000,
enabled=True
),
"gemini-2.5-flash": ModelConfig(
name="Gemini 2.5 Flash",
provider="google",
cost_per_mtok=2.50,
max_latency_ms=1500,
enabled=True
)
}
# Failover-Kette: Reihenfolge der Modelle bei Ausfällen
self.failover_chain = [
"deepseek-v3.2", # Günstigstes zuerst
"gemini-2.5-flash", # Zweitgünstigstes
"gpt-4.1", # Mittleres Preissegment
"claude-sonnet-4.5" # Teuerstes als Letztoption
]
# Statistiken
self.stats = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"total_tokens": 0,
"total_cost_usd": 0.0,
"model_usage": {m: 0 for m in self.failover_chain}
}
def chat_completion(
self,
messages: List[Dict],
primary_model: str = "deepseek-v3.2",
max_retries: int = 3
) -> Optional[Dict]:
"""
Führt Chat-Completion mit automatischem Failover durch.
Args:
messages: Chat-Nachrichten im OpenAI-Format
primary_model: Bevorzugtes Modell
max_retries: Maximale Wiederholungsversuche pro Modell
Returns:
API-Antwort oder None bei vollständigem Ausfall
"""
self.stats["total_requests"] += 1
# Failover-Kette erstellen
chain = [primary_model] + [m for m in self.failover_chain if m != primary_model]
last_error = None
for model in chain:
if not self.models[model].enabled:
print(f"⏭️ Modell {model} deaktiviert, überspringe...")
continue
for attempt in range(max_retries):
try:
start_time = time.time()
response = requests.post(
f"{self.BASE_URL}/chat/completions",
headers=self.headers,
json={
"model": model,
"messages": messages,
"max_tokens": 4096,
"temperature": 0.7
},
timeout=self.models[model].max_latency_ms / 1000
)
latency_ms = (time.time() - start_time) * 1000
if response.status_code == 200:
data = response.json()
# Statistiken aktualisieren
usage = data.get("usage", {})
tokens = usage.get("total_tokens", 0)
cost = (tokens / 1_000_000) * self.models[model].cost_per_mtok
self.stats["successful_requests"] += 1
self.stats["total_tokens"] += tokens
self.stats["total_cost_usd"] += cost
self.stats["model_usage"][model] += 1
print(f"✅ {model} | Latenz: {latency_ms:.0f}ms | "
f"Token: {tokens} | Kosten: ${cost:.4f}")
return data
elif response.status_code == 429:
print(f"⚠️ Rate-Limit bei {model}, versuche Fallback...")
last_error = f"Rate-Limited: {response.status_code}"
break # Sofort zum nächsten Modell
elif response.status_code >= 500:
print(f"🔴 Server-Fehler {response.status_code} bei {model}, "
f"Versuch {attempt + 1}/{max_retries}")
last_error = f"Server Error: {response.status_code}"
time.sleep(1 * (attempt + 1)) # Exponentielles Backoff
else:
last_error = f"Client Error: {response.status_code}"
except requests.Timeout:
print(f"⏱️ Timeout bei {model} nach "
f"{self.models[model].max_latency_ms}ms")
last_error = "Timeout"
except requests.RequestException as e:
print(f"❌ Netzwerkfehler bei {model}: {e}")
last_error = str(e)
# Alle Modelle fehlgeschlagen
self.stats["failed_requests"] += 1
print(f"🚫 Alle Modelle fehlgeschlagen. Letzter Fehler: {last_error}")
return None
def get_cost_report(self) -> Dict:
"""Generiert Kostenbericht für den aktuellen Zeitraum"""
return {
"Gesamtanfragen": self.stats["total_requests"],
"Erfolgreiche Anfragen": self.stats["successful_requests"],
"Fehlgeschlagene Anfragen": self.stats["failed_requests"],
"Erfolgsrate": (
f"{(self.stats['successful_requests'] / max(1, self.stats['total_requests'])) * 100:.1f}%"
),
"Gesamtkosten": f"${self.stats['total_cost_usd']:.2f}",
"Modellverteilung": {
self.models[m].name: {
"Anfragen": count,
"Anteil": f"{(count / max(1, self.stats['successful_requests'])) * 100:.1f}%"
}
for m, count in self.stats["model_usage"].items()
if count > 0
}
}
======== VERWENDUNGSBEISPIEL ========
if __name__ == "__main__":
# API-Key konfigurieren
client = HolySheepRelayClient(api_key="YOUR_HOLYSHEEP_API_KEY")
# Test-Nachrichten
test_messages = [
{"role": "system", "content": "Du bist ein hilfreicher Assistent."},
{"role": "user", "content": "Erkläre Multi-Model-Failover in einem Satz."}
]
print("=" * 60)
print("HolySheep Multi-Model Failover Test")
print("=" * 60)
# Anfrage mit automatischem Failover
result = client.chat_completion(
messages=test_messages,
primary_model="deepseek-v3.2"
)
if result:
print(f"\n📝 Antwort: {result['choices'][0]['message']['content']}")
# Kostenbericht ausgeben
print("\n" + "=" * 60)
print("KOSTENBERICHT")
print("=" * 60)
for key, value in client.get_cost_report().items():
print(f"{key}: {value}")
Schritt 2: Erweitertes Health-Monitoring und Automatisches Switchen
#!/usr/bin/env python3
"""
Erweitertes HolySheep Relay mit Health-Monitoring
Automatische Modell-Selektion basierend auf Latenz und Verfügbarkeit
"""
import asyncio
import aiohttp
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from collections import deque
import statistics
class HealthMonitor:
"""Überwacht die Gesundheit jedes Modells in Echtzeit"""
def __init__(self, window_size: int = 20):
# Historische Daten für jedes Modell
self.latency_history: Dict[str, deque] = {
m: deque(maxlen=window_size) for m in [
"deepseek-v3.2", "gemini-2.5-flash",
"gpt-4.1", "claude-sonnet-4.5"
]
}
self.error_history: Dict[str, deque] = {
m: deque(maxlen=window_size) for m in [
"deepseek-v3.2", "gemini-2.5-flash",
"gpt-4.1", "claude-sonnet-4.5"
]
}
# Kosten-Tracking (2026 Preise)
self.cost_per_mtok = {
"deepseek-v3.2": 0.42,
"gemini-2.5-flash": 2.50,
"gpt-4.1": 8.00,
"claude-sonnet-4.5": 15.00
}
# Konfigurierbare Schwellenwerte
self.max_latency_p95_ms = 3000 # 95. Perzentil Latenz
self.max_error_rate = 0.15 # 15% Fehlerrate
self.cooldown_seconds = 60 # Wartezeit nach Deaktivierung
self.disabled_models: Dict[str, datetime] = {}
def record_latency(self, model: str, latency_ms: float):
"""Zeichnet Latenz-Messung auf"""
self.latency_history[model].append(latency_ms)
def record_success(self, model: str):
"""Zeichnet erfolgreiche Anfrage auf"""
self.error_history[model].append(0)
def record_error(self, model: str):
"""Zeichnet fehlgeschlagene Anfrage auf"""
self.error_history[model].append(1)
def get_model_health(self, model: str) -> Dict:
"""Berechnet Gesundheitsmetriken für ein Modell"""
latencies = list(self.latency_history[model])
errors = list(self.error_history[model])
if not latencies:
return {"status": "unknown", "score": 0.5}
avg_latency = statistics.mean(latencies)
p95_latency = statistics.quantiles(latencies, n=20)[18] if len(latencies) >= 20 else max(latencies)
error_rate = sum(errors) / len(errors)
# Gesundheits-Score: 0-1 (1 = perfekt)
latency_score = max(0, 1 - (p95_latency / self.max_latency_p95_ms))
error_score = max(0, 1 - (error_rate / self.max_error_rate))
health_score = (latency_score * 0.4) + (error_score * 0.6)
status = "healthy"
if health_score < 0.3:
status = "critical"
elif health_score < 0.6:
status = "degraded"
return {
"status": status,
"score": round(health_score, 3),
"avg_latency_ms": round(avg_latency, 1),
"p95_latency_ms": round(p95_latency, 1),
"error_rate": round(error_rate * 100, 2),
"is_enabled": self.is_model_enabled(model)
}
def is_model_enabled(self, model: str) -> bool:
"""Prüft ob Modell aktiviert ist (Cooldown berücksichtigt)"""
if model not in self.disabled_models:
return True
disabled_until = self.disabled_models[model]
if datetime.now() > disabled_until:
del self.disabled_models[model]
return True
return False
def disable_model(self, model: str):
"""Deaktiviert ein Modell temporär"""
self.disabled_models[model] = datetime.now() + timedelta(
seconds=self.cooldown_seconds
)
print(f"🚫 Modell {model} für {self.cooldown_seconds}s deaktiviert")
def get_best_model(self, priority: str = "cost") -> Optional[str]:
"""
Wählt das beste verfügbare Modell basierend auf Strategie.
Args:
priority: "cost" (günstigstes) oder "performance" (schnellstes)
Returns:
Modellname oder None wenn keines verfügbar
"""
candidates = []
for model in self.latency_history.keys():
health = self.get_model_health(model)
if not health["is_enabled"] or health["status"] == "critical":
continue
candidates.append({
"model": model,
"health": health,
"cost": self.cost_per_mtok[model],
"latency": health["avg_latency_ms"]
})
if not candidates:
return None
if priority == "cost":
# Sortiere nach Kosten aufsteigend
candidates.sort(key=lambda x: x["cost"])
else:
# Sortiere nach Latenz aufsteigend
candidates.sort(key=lambda x: x["latency"])
return candidates[0]["model"]
class SmartRelayClient:
"""Intelligenter Relay-Client mit automatischer Modell-Selektion"""
BASE_URL = "https://api.holysheep.ai/v1"
def __init__(self, api_key: str):
self.api_key = api_key
self.health_monitor = HealthMonitor()
self.session: Optional[aiohttp.ClientSession] = None
async def _ensure_session(self):
"""Stellt sicher dass eine aiohttp-Session existiert"""
if self.session is None or self.session.closed:
self.session = aiohttp.ClientSession(
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
)
async def chat_completion_smart(
self,
messages: List[Dict],
strategy: str = "cost",
max_cost_per_1k_tokens: float = 1.00
) -> Optional[Dict]:
"""
Intelligente Chat-Completion mit automatischer Modell-Auswahl.
Args:
messages: Chat-Nachrichten
strategy: "cost" (Standard) oder "performance"
max_cost_per_1k_tokens: Budget-Limit
"""
await self._ensure_session()
# Wähle bestes Modell basierend auf Strategie
selected_model = self.health_monitor.get_best_model(priority=strategy)
if not selected_model:
print("❌ Kein verfügbares Modell gefunden")
return None
# Kostenprüfung
model_cost = self.health_monitor.cost_per_mtok[selected_model]
if model_cost > max_cost_per_1k_tokens * 1000:
print(f"⚠️ Modell {selected_model} über Budget "
f"(${model_cost}/MTok vs ${max_cost_per_1k_tokens * 1000}/MTok)")
# Wähle nächstgünstigeres Modell
candidates = [m for m in self.health_monitor.cost_per_mtok.keys()
if self.health_monitor.cost_per_mtok[m] <= max_cost_per_1k_tokens * 1000
and self.health_monitor.is_model_enabled(m)]
if candidates:
selected_model = min(candidates,
key=lambda m: self.health_monitor.cost_per_mtok[m])
try:
start_time = time.time()
async with self.session.post(
f"{self.BASE_URL}/chat/completions",
json={
"model": selected_model,
"messages": messages,
"max_tokens": 4096
},
timeout=aiohttp.ClientTimeout(total=30)
) as response:
latency_ms = (time.time() - start_time) * 1000
self.health_monitor.record_latency(selected_model, latency_ms)
if response.status == 200:
self.health_monitor.record_success(selected_model)
data = await response.json()
tokens = data.get("usage", {}).get("total_tokens", 0)
cost = (tokens / 1_000_000) * model_cost
print(f"✅ {selected_model} | {latency_ms:.0f}ms | "
f"${cost:.4f} | Score: "
f"{self.health_monitor.get_model_health(selected_model)['score']}")
return data
elif response.status == 429:
self.health_monitor.record_error(selected_model)
self.health_monitor.disable_model(selected_model)
else:
self.health_monitor.record_error(selected_model)
response.raise_for_status()
except Exception as e:
print(f"❌ Fehler mit {selected_model}: {e}")
self.health_monitor.record_error(selected_model)
if "timeout" in str(e).lower() or response.status >= 500:
self.health_monitor.disable_model(selected_model)
return None
async def get_health_dashboard(self) -> Dict:
"""Gibt vollständigen Gesundheitsbericht aller Modelle zurück"""
return {
"timestamp": datetime.now().isoformat(),
"models": {
model: self.health_monitor.get_model_health(model)
for model in self.health_monitor.latency_history.keys()
},
"recommendation": self.health_monitor.get_best_model(priority="cost")
}
async def close(self):
"""Schließt die HTTP-Session"""
if self.session and not self.session.closed:
await self.session.close()
======== ASYNC VERWENDUNGSBEISPIEL ========
async def main():
client = SmartRelayClient(api_key="YOUR_HOLYSHEEP_API_KEY")
messages = [
{"role": "system", "content": "Du bist ein hilfreicher KI-Assistent."},
{"role": "user", "content": "Was sind die Vorteile von Multi-Model-Failover?"}
]
print("=" * 60)
print("Smart Relay mit Health-Monitoring")
print("=" * 60)
# Intelligente Anfrage (kostenoptimiert)
result = await client.chat_completion_smart(
messages=messages,
strategy="cost"
)
if result:
print(f"\nAntwort: {result['choices'][0]['message']['content'][:200]}...")
# Gesundheits-Dashboard anzeigen
print("\n" + "=" * 60)
print("GESUNDHEITS-DASHBOARD")
print("=" * 60)
dashboard = await client.get_health_dashboard()
for model, health in dashboard["models"].items():
status_icon = "🟢" if health["status"] == "healthy" else \
"🟡" if health["status"] == "degraded" else "🔴"
print(f"{status_icon} {model}:")
print(f" Score: {health['score']} | "
f"P95 Latenz: {health['p95_latency_ms']}ms | "
f"Fehlerrate: {health['error_rate']}%")
print(f"\n💡 Empfohlenes Modell: {dashboard['recommendation']}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Häufige Fehler und Lösungen
Fehler 1: Rate-Limit-Schleife ohne Backoff
Problem: Bei 429-Fehlern geraten Anwendungen in eine Endlosschleife.
# ❌ FALSCH: Sofortige Wiederholung ohne Backoff
def broken_request():
while True:
response = requests.post(url, json=data)
if response.status_code == 429:
continue # Endlosschleife!
✅ RICHTIG: Exponentielles Backoff mit Jitter
def resilient_request_with_backoff(
client: HolySheepRelayClient,
messages: List[Dict],
max_total_wait: int = 60
):
"""
Anfrage mit exponentiellem Backoff bei Rate-Limits.
Maximale Wartezeit: base * (2^attempt) + random_jitter
Beispiel: 1s, 2s, 4s, 8s, 16s (bei base=1, max_attempts=5)
"""
base_delay = 1 # Sekunden
max_attempts = 5
total_waited = 0
for attempt in range(max_attempts):
response = requests.post(
f"{client.BASE_URL}/chat/completions",
headers=client.headers,
json={"model": "deepseek-v3.2", "messages": messages}
)
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
# Berechne Wartezeit mit Jitter
delay = min(base_delay * (2 ** attempt), max_total_wait - total_waited)
import random
jitter = random.uniform(0, 0.5 * delay)
actual_delay = delay + jitter
if total_waited + actual_delay > max_total_wait:
print(f"⏹️ Maximale Wartezeit erreicht ({max_total_wait}s)")
break
print(f"⏳ Rate-Limited, warte {actual_delay:.1f}s...")
time.sleep(actual_delay)
total_waited += actual_delay
elif response.status_code >= 500:
# Server-Fehler: Kurze Pause, dann Fallback
print(f"🔴 Server-Fehler {response.status_code}, Fallback wird aktiviert")
break
else:
response.raise_for_status()
return None # Alle Versuche fehlgeschlagen
Fehler 2: Fehlende Timeout-Konfiguration
Problem: Unendliches Warten auf Antworten bei Modell-Ausfällen.
# ❌ FALSCH: Kein Timeout definiert
response = requests.post(url, json=data) # Hängt endlos!
✅ RICHTIG: Timeout basierend auf Modell-Latenz-Anforderungen
def create_timed_client(api_key: str) -> HolySheepRelayClient:
"""Erstellt Client mit modellspezifischen Timeouts"""
client = HolySheepRelayClient(api_key)
# Modell-spezifische Timeouts in Sekunden
model_timeouts = {
"deepseek-v3.2": 3, # Schnelles Modell
"gemini-2.5-flash": 5, # Mittlere Latenz
"gpt-4.1": 10, # Höhere Latenz akzeptabel
"claude-sonnet-4.5": 15 # Claude kann langsam sein
}
def timed_chat_completion(messages, model="deepseek-v3.2"):
timeout = model_timeouts.get(model, 10)
try:
response = requests.post(
f"{client.BASE_URL}/chat/completions",
headers=client.headers,
json={"model": model, "messages": messages},
timeout=timeout # HTTP-Timeout in Sekunden
)
response.raise_for_status()
return response.json()
except requests.Timeout:
print(f"⏱️ Timeout ({timeout}s) für {model}")
raise
except requests.ConnectionError as e:
print(f"🔌 Verbindungsfehler: {e}")
raise
client.timed_completion = timed_chat_completion
return client
Verwendung:
client = create_timed_client("YOUR_HOLYSHEEP_API_KEY")
try:
result = client.timed_completion(messages, model="deepseek-v3.2")
except (requests.Timeout, requests.ConnectionError) as e:
print(f"⚠️ Fallback wird aktiviert wegen: {e}")
# Hier Fallback-Logik implementieren
Fehler 3: Kosten-Tracking vernachlässigt
Problem: Unerwartet hohe Rechnungen, weil Token-Nutzung nicht überwacht wird.
# ❌ FALSCH: Keine Kostenkontrolle
response = requests.post(url, json={"model": "claude-sonnet-4.5", ...})
💸 Wer bezahlt das?
✅ RICHTIG: Kosten-Tracker mit Budget-Limits
class CostControlledRelayClient:
"""Relay-Client mit strikter Budget-Kontrolle"""
def __init__(self, api_key: str, monthly_budget_usd: float = 100.0):
self.client = HolySheepRelayClient(api_key)
self.monthly_budget = monthly_budget_usd
self.spent_this_month = 0.0
self.month_start = datetime.now()
# Kosten pro Modell (2026)
self.cost_per_mtok = {
"deepseek-v3.2": 0.42,
"gemini-2.5-flash": 2.50,
"gpt-4.1": 8.00,
"claude-sonnet-4.5": 15.00
}
def _check_budget(self, model: str, estimated_tokens: int) -> bool:
"""Prüft ob Budget für Anfrage ausreicht"""
# Reset am Monatsanfang
if datetime.now().month != self.month_start.month:
self.spent_this_month = 0.0
self.month_start = datetime.now()
estimated_cost = (estimated_tokens / 1_000_000) * \
self.cost_per_mtok[model]
if self.spent_this_month + estimated_cost > self.monthly_budget:
print(f"🚫 Budget überschritten! "
f"Spent: ${self.spent_this_month:.2f} / "
f"Budget: ${self.monthly_budget:.2f}")
return False
return True
def _update_spending(self, model: str, tokens: int):
"""Aktualisiert Ausgaben-Tracker"""
cost = (tokens / 1_000_000) * self.cost_per_mtok[model]
self.spent_this_month += cost
# Warnung bei 80% Budget-Ausschöpfung
budget_pct = (self.spent_this_month / self.monthly_budget) * 100
if budget_pct >= 80:
print(f"⚠️ Warnung: {budget_pct:.0f}% des Budgets verbraucht "
f"(${self.spent_this_month:.2f})")
def safe_completion(self, messages, model, max_tokens=1000):
"""Sichere Completion mit Budget-Prüfung"""
if not self._check_budget(model, max_tokens):
# Automatisch auf günstigeres Modell ausweichen
affordable_models = [
m for m, cost in self.cost_per_mtok.items()
if (self.spent_this_month + (max_tokens / 1_000_000) * cost)
<= self.monthly_budget
]
if not affordable_models:
raise ValueError("Kein Modell mehr im Budget!")
model = min(affordable_models, key=lambda m: self.cost_per_mtok[m])
print(f"🔄 Ausgewichen auf {model} (Budget-Grund)")
result = self.client.chat_completion(messages, primary_model=model)
if result:
tokens = result.get("usage", {}).get("total_tokens", 0)
self._update_spending(model, tokens)
return result
def