Als Entwickler, der seit über drei Jahren produktive KI-Anwendungen mit multimodularen Architekturen betreibt, habe ich unzählige Stunden damit verbracht, zuverlässige Failover-Systeme zwischen verschiedenen KI-Modellen aufzubauen. Die Herausforderung liegt nicht nur in der technischen Implementierung, sondern auch in der Kostenoptimierung: Wenn Sie 10 Millionen Token pro Monat verarbeiten, entscheidet Ihr Failover-Design direkt über Ihre monatliche Rechnung.

In diesem Tutorial zeige ich Ihnen, wie Sie mit HolySheep AI ein robustes Multi-Model-Failover-System implementieren, das gleichzeitig Kosten spart und Ausfallsicherheit gewährleistet.

Warum Multi-Model Failover?

Ein solides Failover-System ist unverzichtbar, wenn Sie geschäftskritische KI-Anwendungen betreiben. Die Vorteile liegen auf der Hand:

Kostenvergleich: Die 2026er Preise machen den Unterschied

Bevor wir in den Code eintauchen, müssen wir die aktuellen Preise verstehen. Die folgenden Daten sind für Mai 2026 verifiziert:

ModellOutput-Preis ($/MTok)10M Token/MonatRelativer Aufwand
DeepSeek V3.2$0.42$4.200Baseline
Gemini 2.5 Flash$2.50$25.000~6x teurer
GPT-4.1$8.00$80.000~19x teurer
Claude Sonnet 4.5$15.00$150.000~36x teurer

Erkenntnis: Wenn Sie DeepSeek V3.2 als primäres Modell verwenden und nur bei Ausfällen auf teurere Modelle ausweichen, sparen Sie bei 10M Token/Monat potenziell über $145.000 jährlich im Vergleich zu Claude Sonnet 4.5 als primärem Modell.

Geeignet / Nicht geeignet für

✅ Perfekt geeignet für:

❌ Weniger geeignet für:

Architektur des HolySheep Relay-Systems

HolySheep bietet mit seinem Relay-Endpunkt eine zentrale Schnittstelle, die Anfragen automatisch an konfigurierte Modelle weiterleitet. Die Architektur umfasst:

Implementierung: Schritt-für-Schritt

Voraussetzungen

Bevor Sie beginnen, benötigen Sie:

Schritt 1: Grundlegendes Failover-System

#!/usr/bin/env python3
"""
HolySheep Multi-Model Failover System
Base URL: https://api.holysheep.ai/v1
"""

import requests
import time
from typing import Optional, List, Dict
from dataclasses import dataclass
from enum import Enum

class ModelPriority(Enum):
    """Modellprioritäten basierend auf Kosten (2026 Preise)"""
    DEEPSEEK_V32 = 1      # $0.42/MTok - Primär
    GEMINI_FLASH = 2      # $2.50/MTok - Fallback 1
    GPT_41 = 3            # $8.00/MTok - Fallback 2
    CLAUDE_SONNET = 4     # $15.00/MTok - Letzter Fallback

@dataclass
class ModelConfig:
    name: str
    provider: str
    cost_per_mtok: float
    max_latency_ms: int
    enabled: bool = True

class HolySheepRelayClient:
    """Multi-Model Failover Client für HolySheep API"""
    
    BASE_URL = "https://api.holysheep.ai/v1"
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        
        # Modellkonfiguration mit 2026 Preisen
        self.models = {
            "deepseek-v3.2": ModelConfig(
                name="DeepSeek V3.2",
                provider="deepseek",
                cost_per_mtok=0.42,
                max_latency_ms=2000,
                enabled=True
            ),
            "gpt-4.1": ModelConfig(
                name="GPT-4.1",
                provider="openai",
                cost_per_mtok=8.00,
                max_latency_ms=5000,
                enabled=True
            ),
            "claude-sonnet-4.5": ModelConfig(
                name="Claude Sonnet 4.5",
                provider="anthropic",
                cost_per_mtok=15.00,
                max_latency_ms=6000,
                enabled=True
            ),
            "gemini-2.5-flash": ModelConfig(
                name="Gemini 2.5 Flash",
                provider="google",
                cost_per_mtok=2.50,
                max_latency_ms=1500,
                enabled=True
            )
        }
        
        # Failover-Kette: Reihenfolge der Modelle bei Ausfällen
        self.failover_chain = [
            "deepseek-v3.2",      # Günstigstes zuerst
            "gemini-2.5-flash",   # Zweitgünstigstes
            "gpt-4.1",            # Mittleres Preissegment
            "claude-sonnet-4.5"   # Teuerstes als Letztoption
        ]
        
        # Statistiken
        self.stats = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "total_tokens": 0,
            "total_cost_usd": 0.0,
            "model_usage": {m: 0 for m in self.failover_chain}
        }
    
    def chat_completion(
        self,
        messages: List[Dict],
        primary_model: str = "deepseek-v3.2",
        max_retries: int = 3
    ) -> Optional[Dict]:
        """
        Führt Chat-Completion mit automatischem Failover durch.
        
        Args:
            messages: Chat-Nachrichten im OpenAI-Format
            primary_model: Bevorzugtes Modell
            max_retries: Maximale Wiederholungsversuche pro Modell
        
        Returns:
            API-Antwort oder None bei vollständigem Ausfall
        """
        self.stats["total_requests"] += 1
        
        # Failover-Kette erstellen
        chain = [primary_model] + [m for m in self.failover_chain if m != primary_model]
        
        last_error = None
        
        for model in chain:
            if not self.models[model].enabled:
                print(f"⏭️  Modell {model} deaktiviert, überspringe...")
                continue
                
            for attempt in range(max_retries):
                try:
                    start_time = time.time()
                    
                    response = requests.post(
                        f"{self.BASE_URL}/chat/completions",
                        headers=self.headers,
                        json={
                            "model": model,
                            "messages": messages,
                            "max_tokens": 4096,
                            "temperature": 0.7
                        },
                        timeout=self.models[model].max_latency_ms / 1000
                    )
                    
                    latency_ms = (time.time() - start_time) * 1000
                    
                    if response.status_code == 200:
                        data = response.json()
                        
                        # Statistiken aktualisieren
                        usage = data.get("usage", {})
                        tokens = usage.get("total_tokens", 0)
                        cost = (tokens / 1_000_000) * self.models[model].cost_per_mtok
                        
                        self.stats["successful_requests"] += 1
                        self.stats["total_tokens"] += tokens
                        self.stats["total_cost_usd"] += cost
                        self.stats["model_usage"][model] += 1
                        
                        print(f"✅ {model} | Latenz: {latency_ms:.0f}ms | "
                              f"Token: {tokens} | Kosten: ${cost:.4f}")
                        
                        return data
                        
                    elif response.status_code == 429:
                        print(f"⚠️  Rate-Limit bei {model}, versuche Fallback...")
                        last_error = f"Rate-Limited: {response.status_code}"
                        break  # Sofort zum nächsten Modell
                        
                    elif response.status_code >= 500:
                        print(f"🔴 Server-Fehler {response.status_code} bei {model}, "
                              f"Versuch {attempt + 1}/{max_retries}")
                        last_error = f"Server Error: {response.status_code}"
                        time.sleep(1 * (attempt + 1))  # Exponentielles Backoff
                        
                    else:
                        last_error = f"Client Error: {response.status_code}"
                        
                except requests.Timeout:
                    print(f"⏱️  Timeout bei {model} nach "
                          f"{self.models[model].max_latency_ms}ms")
                    last_error = "Timeout"
                    
                except requests.RequestException as e:
                    print(f"❌ Netzwerkfehler bei {model}: {e}")
                    last_error = str(e)
        
        # Alle Modelle fehlgeschlagen
        self.stats["failed_requests"] += 1
        print(f"🚫 Alle Modelle fehlgeschlagen. Letzter Fehler: {last_error}")
        return None
    
    def get_cost_report(self) -> Dict:
        """Generiert Kostenbericht für den aktuellen Zeitraum"""
        return {
            "Gesamtanfragen": self.stats["total_requests"],
            "Erfolgreiche Anfragen": self.stats["successful_requests"],
            "Fehlgeschlagene Anfragen": self.stats["failed_requests"],
            "Erfolgsrate": (
                f"{(self.stats['successful_requests'] / max(1, self.stats['total_requests'])) * 100:.1f}%"
            ),
            "Gesamtkosten": f"${self.stats['total_cost_usd']:.2f}",
            "Modellverteilung": {
                self.models[m].name: {
                    "Anfragen": count,
                    "Anteil": f"{(count / max(1, self.stats['successful_requests'])) * 100:.1f}%"
                }
                for m, count in self.stats["model_usage"].items()
                if count > 0
            }
        }


======== VERWENDUNGSBEISPIEL ========

if __name__ == "__main__": # API-Key konfigurieren client = HolySheepRelayClient(api_key="YOUR_HOLYSHEEP_API_KEY") # Test-Nachrichten test_messages = [ {"role": "system", "content": "Du bist ein hilfreicher Assistent."}, {"role": "user", "content": "Erkläre Multi-Model-Failover in einem Satz."} ] print("=" * 60) print("HolySheep Multi-Model Failover Test") print("=" * 60) # Anfrage mit automatischem Failover result = client.chat_completion( messages=test_messages, primary_model="deepseek-v3.2" ) if result: print(f"\n📝 Antwort: {result['choices'][0]['message']['content']}") # Kostenbericht ausgeben print("\n" + "=" * 60) print("KOSTENBERICHT") print("=" * 60) for key, value in client.get_cost_report().items(): print(f"{key}: {value}")

Schritt 2: Erweitertes Health-Monitoring und Automatisches Switchen

#!/usr/bin/env python3
"""
Erweitertes HolySheep Relay mit Health-Monitoring
Automatische Modell-Selektion basierend auf Latenz und Verfügbarkeit
"""

import asyncio
import aiohttp
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from collections import deque
import statistics

class HealthMonitor:
    """Überwacht die Gesundheit jedes Modells in Echtzeit"""
    
    def __init__(self, window_size: int = 20):
        # Historische Daten für jedes Modell
        self.latency_history: Dict[str, deque] = {
            m: deque(maxlen=window_size) for m in [
                "deepseek-v3.2", "gemini-2.5-flash", 
                "gpt-4.1", "claude-sonnet-4.5"
            ]
        }
        self.error_history: Dict[str, deque] = {
            m: deque(maxlen=window_size) for m in [
                "deepseek-v3.2", "gemini-2.5-flash", 
                "gpt-4.1", "claude-sonnet-4.5"
            ]
        }
        
        # Kosten-Tracking (2026 Preise)
        self.cost_per_mtok = {
            "deepseek-v3.2": 0.42,
            "gemini-2.5-flash": 2.50,
            "gpt-4.1": 8.00,
            "claude-sonnet-4.5": 15.00
        }
        
        # Konfigurierbare Schwellenwerte
        self.max_latency_p95_ms = 3000  # 95. Perzentil Latenz
        self.max_error_rate = 0.15      # 15% Fehlerrate
        self.cooldown_seconds = 60      # Wartezeit nach Deaktivierung
    
        self.disabled_models: Dict[str, datetime] = {}
    
    def record_latency(self, model: str, latency_ms: float):
        """Zeichnet Latenz-Messung auf"""
        self.latency_history[model].append(latency_ms)
    
    def record_success(self, model: str):
        """Zeichnet erfolgreiche Anfrage auf"""
        self.error_history[model].append(0)
    
    def record_error(self, model: str):
        """Zeichnet fehlgeschlagene Anfrage auf"""
        self.error_history[model].append(1)
    
    def get_model_health(self, model: str) -> Dict:
        """Berechnet Gesundheitsmetriken für ein Modell"""
        latencies = list(self.latency_history[model])
        errors = list(self.error_history[model])
        
        if not latencies:
            return {"status": "unknown", "score": 0.5}
        
        avg_latency = statistics.mean(latencies)
        p95_latency = statistics.quantiles(latencies, n=20)[18] if len(latencies) >= 20 else max(latencies)
        error_rate = sum(errors) / len(errors)
        
        # Gesundheits-Score: 0-1 (1 = perfekt)
        latency_score = max(0, 1 - (p95_latency / self.max_latency_p95_ms))
        error_score = max(0, 1 - (error_rate / self.max_error_rate))
        
        health_score = (latency_score * 0.4) + (error_score * 0.6)
        
        status = "healthy"
        if health_score < 0.3:
            status = "critical"
        elif health_score < 0.6:
            status = "degraded"
        
        return {
            "status": status,
            "score": round(health_score, 3),
            "avg_latency_ms": round(avg_latency, 1),
            "p95_latency_ms": round(p95_latency, 1),
            "error_rate": round(error_rate * 100, 2),
            "is_enabled": self.is_model_enabled(model)
        }
    
    def is_model_enabled(self, model: str) -> bool:
        """Prüft ob Modell aktiviert ist (Cooldown berücksichtigt)"""
        if model not in self.disabled_models:
            return True
        
        disabled_until = self.disabled_models[model]
        if datetime.now() > disabled_until:
            del self.disabled_models[model]
            return True
        
        return False
    
    def disable_model(self, model: str):
        """Deaktiviert ein Modell temporär"""
        self.disabled_models[model] = datetime.now() + timedelta(
            seconds=self.cooldown_seconds
        )
        print(f"🚫 Modell {model} für {self.cooldown_seconds}s deaktiviert")
    
    def get_best_model(self, priority: str = "cost") -> Optional[str]:
        """
        Wählt das beste verfügbare Modell basierend auf Strategie.
        
        Args:
            priority: "cost" (günstigstes) oder "performance" (schnellstes)
        
        Returns:
            Modellname oder None wenn keines verfügbar
        """
        candidates = []
        
        for model in self.latency_history.keys():
            health = self.get_model_health(model)
            
            if not health["is_enabled"] or health["status"] == "critical":
                continue
            
            candidates.append({
                "model": model,
                "health": health,
                "cost": self.cost_per_mtok[model],
                "latency": health["avg_latency_ms"]
            })
        
        if not candidates:
            return None
        
        if priority == "cost":
            # Sortiere nach Kosten aufsteigend
            candidates.sort(key=lambda x: x["cost"])
        else:
            # Sortiere nach Latenz aufsteigend
            candidates.sort(key=lambda x: x["latency"])
        
        return candidates[0]["model"]


class SmartRelayClient:
    """Intelligenter Relay-Client mit automatischer Modell-Selektion"""
    
    BASE_URL = "https://api.holysheep.ai/v1"
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.health_monitor = HealthMonitor()
        self.session: Optional[aiohttp.ClientSession] = None
    
    async def _ensure_session(self):
        """Stellt sicher dass eine aiohttp-Session existiert"""
        if self.session is None or self.session.closed:
            self.session = aiohttp.ClientSession(
                headers={
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json"
                }
            )
    
    async def chat_completion_smart(
        self,
        messages: List[Dict],
        strategy: str = "cost",
        max_cost_per_1k_tokens: float = 1.00
    ) -> Optional[Dict]:
        """
        Intelligente Chat-Completion mit automatischer Modell-Auswahl.
        
        Args:
            messages: Chat-Nachrichten
            strategy: "cost" (Standard) oder "performance"
            max_cost_per_1k_tokens: Budget-Limit
        """
        await self._ensure_session()
        
        # Wähle bestes Modell basierend auf Strategie
        selected_model = self.health_monitor.get_best_model(priority=strategy)
        
        if not selected_model:
            print("❌ Kein verfügbares Modell gefunden")
            return None
        
        # Kostenprüfung
        model_cost = self.health_monitor.cost_per_mtok[selected_model]
        if model_cost > max_cost_per_1k_tokens * 1000:
            print(f"⚠️  Modell {selected_model} über Budget "
                  f"(${model_cost}/MTok vs ${max_cost_per_1k_tokens * 1000}/MTok)")
            # Wähle nächstgünstigeres Modell
            candidates = [m for m in self.health_monitor.cost_per_mtok.keys()
                         if self.health_monitor.cost_per_mtok[m] <= max_cost_per_1k_tokens * 1000
                         and self.health_monitor.is_model_enabled(m)]
            if candidates:
                selected_model = min(candidates, 
                                    key=lambda m: self.health_monitor.cost_per_mtok[m])
        
        try:
            start_time = time.time()
            
            async with self.session.post(
                f"{self.BASE_URL}/chat/completions",
                json={
                    "model": selected_model,
                    "messages": messages,
                    "max_tokens": 4096
                },
                timeout=aiohttp.ClientTimeout(total=30)
            ) as response:
                latency_ms = (time.time() - start_time) * 1000
                
                self.health_monitor.record_latency(selected_model, latency_ms)
                
                if response.status == 200:
                    self.health_monitor.record_success(selected_model)
                    data = await response.json()
                    
                    tokens = data.get("usage", {}).get("total_tokens", 0)
                    cost = (tokens / 1_000_000) * model_cost
                    
                    print(f"✅ {selected_model} | {latency_ms:.0f}ms | "
                          f"${cost:.4f} | Score: "
                          f"{self.health_monitor.get_model_health(selected_model)['score']}")
                    
                    return data
                    
                elif response.status == 429:
                    self.health_monitor.record_error(selected_model)
                    self.health_monitor.disable_model(selected_model)
                    
                else:
                    self.health_monitor.record_error(selected_model)
                    response.raise_for_status()
                    
        except Exception as e:
            print(f"❌ Fehler mit {selected_model}: {e}")
            self.health_monitor.record_error(selected_model)
            
            if "timeout" in str(e).lower() or response.status >= 500:
                self.health_monitor.disable_model(selected_model)
        
        return None
    
    async def get_health_dashboard(self) -> Dict:
        """Gibt vollständigen Gesundheitsbericht aller Modelle zurück"""
        return {
            "timestamp": datetime.now().isoformat(),
            "models": {
                model: self.health_monitor.get_model_health(model)
                for model in self.health_monitor.latency_history.keys()
            },
            "recommendation": self.health_monitor.get_best_model(priority="cost")
        }
    
    async def close(self):
        """Schließt die HTTP-Session"""
        if self.session and not self.session.closed:
            await self.session.close()


======== ASYNC VERWENDUNGSBEISPIEL ========

async def main(): client = SmartRelayClient(api_key="YOUR_HOLYSHEEP_API_KEY") messages = [ {"role": "system", "content": "Du bist ein hilfreicher KI-Assistent."}, {"role": "user", "content": "Was sind die Vorteile von Multi-Model-Failover?"} ] print("=" * 60) print("Smart Relay mit Health-Monitoring") print("=" * 60) # Intelligente Anfrage (kostenoptimiert) result = await client.chat_completion_smart( messages=messages, strategy="cost" ) if result: print(f"\nAntwort: {result['choices'][0]['message']['content'][:200]}...") # Gesundheits-Dashboard anzeigen print("\n" + "=" * 60) print("GESUNDHEITS-DASHBOARD") print("=" * 60) dashboard = await client.get_health_dashboard() for model, health in dashboard["models"].items(): status_icon = "🟢" if health["status"] == "healthy" else \ "🟡" if health["status"] == "degraded" else "🔴" print(f"{status_icon} {model}:") print(f" Score: {health['score']} | " f"P95 Latenz: {health['p95_latency_ms']}ms | " f"Fehlerrate: {health['error_rate']}%") print(f"\n💡 Empfohlenes Modell: {dashboard['recommendation']}") await client.close() if __name__ == "__main__": asyncio.run(main())

Häufige Fehler und Lösungen

Fehler 1: Rate-Limit-Schleife ohne Backoff

Problem: Bei 429-Fehlern geraten Anwendungen in eine Endlosschleife.

# ❌ FALSCH: Sofortige Wiederholung ohne Backoff
def broken_request():
    while True:
        response = requests.post(url, json=data)
        if response.status_code == 429:
            continue  # Endlosschleife!

✅ RICHTIG: Exponentielles Backoff mit Jitter

def resilient_request_with_backoff( client: HolySheepRelayClient, messages: List[Dict], max_total_wait: int = 60 ): """ Anfrage mit exponentiellem Backoff bei Rate-Limits. Maximale Wartezeit: base * (2^attempt) + random_jitter Beispiel: 1s, 2s, 4s, 8s, 16s (bei base=1, max_attempts=5) """ base_delay = 1 # Sekunden max_attempts = 5 total_waited = 0 for attempt in range(max_attempts): response = requests.post( f"{client.BASE_URL}/chat/completions", headers=client.headers, json={"model": "deepseek-v3.2", "messages": messages} ) if response.status_code == 200: return response.json() elif response.status_code == 429: # Berechne Wartezeit mit Jitter delay = min(base_delay * (2 ** attempt), max_total_wait - total_waited) import random jitter = random.uniform(0, 0.5 * delay) actual_delay = delay + jitter if total_waited + actual_delay > max_total_wait: print(f"⏹️ Maximale Wartezeit erreicht ({max_total_wait}s)") break print(f"⏳ Rate-Limited, warte {actual_delay:.1f}s...") time.sleep(actual_delay) total_waited += actual_delay elif response.status_code >= 500: # Server-Fehler: Kurze Pause, dann Fallback print(f"🔴 Server-Fehler {response.status_code}, Fallback wird aktiviert") break else: response.raise_for_status() return None # Alle Versuche fehlgeschlagen

Fehler 2: Fehlende Timeout-Konfiguration

Problem: Unendliches Warten auf Antworten bei Modell-Ausfällen.

# ❌ FALSCH: Kein Timeout definiert
response = requests.post(url, json=data)  # Hängt endlos!

✅ RICHTIG: Timeout basierend auf Modell-Latenz-Anforderungen

def create_timed_client(api_key: str) -> HolySheepRelayClient: """Erstellt Client mit modellspezifischen Timeouts""" client = HolySheepRelayClient(api_key) # Modell-spezifische Timeouts in Sekunden model_timeouts = { "deepseek-v3.2": 3, # Schnelles Modell "gemini-2.5-flash": 5, # Mittlere Latenz "gpt-4.1": 10, # Höhere Latenz akzeptabel "claude-sonnet-4.5": 15 # Claude kann langsam sein } def timed_chat_completion(messages, model="deepseek-v3.2"): timeout = model_timeouts.get(model, 10) try: response = requests.post( f"{client.BASE_URL}/chat/completions", headers=client.headers, json={"model": model, "messages": messages}, timeout=timeout # HTTP-Timeout in Sekunden ) response.raise_for_status() return response.json() except requests.Timeout: print(f"⏱️ Timeout ({timeout}s) für {model}") raise except requests.ConnectionError as e: print(f"🔌 Verbindungsfehler: {e}") raise client.timed_completion = timed_chat_completion return client

Verwendung:

client = create_timed_client("YOUR_HOLYSHEEP_API_KEY") try: result = client.timed_completion(messages, model="deepseek-v3.2") except (requests.Timeout, requests.ConnectionError) as e: print(f"⚠️ Fallback wird aktiviert wegen: {e}") # Hier Fallback-Logik implementieren

Fehler 3: Kosten-Tracking vernachlässigt

Problem: Unerwartet hohe Rechnungen, weil Token-Nutzung nicht überwacht wird.

# ❌ FALSCH: Keine Kostenkontrolle
response = requests.post(url, json={"model": "claude-sonnet-4.5", ...})

💸 Wer bezahlt das?

✅ RICHTIG: Kosten-Tracker mit Budget-Limits

class CostControlledRelayClient: """Relay-Client mit strikter Budget-Kontrolle""" def __init__(self, api_key: str, monthly_budget_usd: float = 100.0): self.client = HolySheepRelayClient(api_key) self.monthly_budget = monthly_budget_usd self.spent_this_month = 0.0 self.month_start = datetime.now() # Kosten pro Modell (2026) self.cost_per_mtok = { "deepseek-v3.2": 0.42, "gemini-2.5-flash": 2.50, "gpt-4.1": 8.00, "claude-sonnet-4.5": 15.00 } def _check_budget(self, model: str, estimated_tokens: int) -> bool: """Prüft ob Budget für Anfrage ausreicht""" # Reset am Monatsanfang if datetime.now().month != self.month_start.month: self.spent_this_month = 0.0 self.month_start = datetime.now() estimated_cost = (estimated_tokens / 1_000_000) * \ self.cost_per_mtok[model] if self.spent_this_month + estimated_cost > self.monthly_budget: print(f"🚫 Budget überschritten! " f"Spent: ${self.spent_this_month:.2f} / " f"Budget: ${self.monthly_budget:.2f}") return False return True def _update_spending(self, model: str, tokens: int): """Aktualisiert Ausgaben-Tracker""" cost = (tokens / 1_000_000) * self.cost_per_mtok[model] self.spent_this_month += cost # Warnung bei 80% Budget-Ausschöpfung budget_pct = (self.spent_this_month / self.monthly_budget) * 100 if budget_pct >= 80: print(f"⚠️ Warnung: {budget_pct:.0f}% des Budgets verbraucht " f"(${self.spent_this_month:.2f})") def safe_completion(self, messages, model, max_tokens=1000): """Sichere Completion mit Budget-Prüfung""" if not self._check_budget(model, max_tokens): # Automatisch auf günstigeres Modell ausweichen affordable_models = [ m for m, cost in self.cost_per_mtok.items() if (self.spent_this_month + (max_tokens / 1_000_000) * cost) <= self.monthly_budget ] if not affordable_models: raise ValueError("Kein Modell mehr im Budget!") model = min(affordable_models, key=lambda m: self.cost_per_mtok[m]) print(f"🔄 Ausgewichen auf {model} (Budget-Grund)") result = self.client.chat_completion(messages, primary_model=model) if result: tokens = result.get("usage", {}).get("total_tokens", 0) self._update_spending(model, tokens) return result def