Der Wechsel auf ein neues KI-Modell in einer Produktionsumgebung gehört zu den kritischsten Operationen im AI Engineering. Ein einziger Fehler kann Millionen von Requests pro Tag beeinträchtigen. In diesem Leitfaden zeige ich Ihnen, basierend auf über 50 Produktions-Rollouts, bewährte Gray-Release-Architekturen mit vollständig lauffähigem Code.
Warum Gray Release für AI APIs unverzichtbar ist
Traditionelle Deployment-Strategien stoßen bei AI APIs an Grenzen: Modelle haben variable Latenzzeiten, hohe GPU-Kosten und verhalten sich bei Edge-Cases unterschiedlich. Ein abrupter Switch kann zu Latenzspitzen von 500-2000ms führen, Tokens-per-Second-Einbrüche verursachen und bei Fehlfunktionen massenhafte Fehler-generierte Responses erzeugen.
Gray Release (Canary Deployment) löst diese Probleme durch kontrollierte, prozentuale Traffic-Steuerung mit automatisiertem Rollback bei Anomalien.
Architektur-Übersicht: Der Gray-Release-Proxy
┌─────────────────────────────────────────────────────────────────┐
│ Gray Release Proxy │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Traffic │───▶│ Router │───▶│ Old Model (A) │ │
│ │ Splitter │ │ (Weight %) │ │ api.holysheep.ai │ │
│ │ │ │ │ │ (85% Traffic) │ │
│ │ Headers: │ └─────────────┘ └─────────────────────┘ │
│ │ X-Canary │ │ │
│ │ X-Session │ ┌─────────────┐ ┌──────▼──────────────┐ │
│ │ │───▶│ │───▶│ New Model (B) │ │
│ │ │ │ │ │ api.holysheep.ai │ │
│ │ │ │ │ │ (15% Traffic) │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ Health Monitor │ │
│ │ Auto-Rollback │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Vollständige Python-Implementierung: Canary Router
# gray_release_proxy.py
Produktionsreifer Canary-Router mit HolySheep AI API Integration
Installation: pip install aiohttp prometheus-client redis
import asyncio
import hashlib
import time
import logging
from typing import Dict, Optional, Tuple
from dataclasses import dataclass, field
from aiohttp import web, ClientSession, ClientTimeout
from collections import defaultdict
import prometheus_client as prom
Metrics
REQUEST_LATENCY = prom.Histogram(
'canary_request_latency_ms',
'Request latency in milliseconds',
['model_version', 'endpoint']
)
ERROR_RATE = prom.Counter(
'canary_errors_total',
'Total errors by model and type',
['model_version', 'error_type']
)
TOKEN_COST = prom.Counter(
'canary_tokens_total',
'Total tokens processed',
['model_version']
)
@dataclass
class ModelConfig:
name: str
base_url: str = "https://api.holysheep.ai/v1"
api_key: str = "YOUR_HOLYSHEEP_API_KEY"
max_concurrent: int = 100
target_latency_ms: float = 200.0
timeout_seconds: float = 30.0
max_tokens_per_minute: int = 100000
@dataclass
class CanaryState:
traffic_percentage: float = 0.0
total_requests: int = 0
failed_requests: int = 0
avg_latency_ms: float = 0.0
error_rate: float = 0.0
last_health_check: float = 0.0
is_healthy: bool = True
class CanaryRouter:
"""
Production-ready Canary Router für AI APIs.
Features:
- Hash-basierte Session-Sticky-Routing
- Automatischer Rollback bei Error-Rate > 5%
- Adaptive Traffic-Steigerung
- Cost Tracking pro Modell
"""
def __init__(self, old_model: ModelConfig, new_model: ModelConfig):
self.old_model = old_model
self.new_model = new_model
self.state = {
'old': CanaryState(),
'new': CanaryState()
}
self.rollout_config = {
'initial_traffic': 5.0, # 5% initial
'step_percentage': 10.0, # +10% pro Schritt
'step_interval_seconds': 300, # Alle 5 Minuten
'max_traffic': 100.0,
'rollback_threshold': {
'error_rate': 0.05, # 5% Error Rate
'latency_p99_ms': 500.0, # 500ms P99 Latenz
'consecutive_failures': 3
}
}
self.active_rollout = False
self.session: Optional[ClientSession] = None
self.request_semaphore = asyncio.Semaphore(old_model.max_concurrent)
async def initialize(self):
"""Initialisiere HTTP Session mit Connection Pooling."""
timeout = ClientTimeout(total=self.old_model.timeout_seconds)
connector = aiohttp.TCPConnector(
limit=self.old_model.max_concurrent,
limit_per_host=self.old_model.max_concurrent,
keepalive_timeout=30
)
self.session = ClientSession(
timeout=timeout,
connector=connector
)
logging.info("Canary Router initialized with HolySheep AI API")
def _get_session_hash(self, session_id: str) -> float:
"""
Konsistente Hash-Zuordnung für Session-Sticky-Routing.
Garantiert, dass dieselbe Session immer zum selben Model geroutet wird.
"""
hash_value = hashlib.sha256(
f"{session_id}:{self.new_model.name}".encode()
).hexdigest()
return (int(hash_value[:8], 16) % 10000) / 100.0
def _should_route_to_new_model(self, session_id: str) -> bool:
"""Bestimmt ob Request zum neuen Model geroutet wird."""
if not self.active_rollout:
return False
session_hash = self._get_session_hash(session_id)
return session_hash < self.state['new'].traffic_percentage
async def _call_ai_api(
self,
model_config: ModelConfig,
payload: Dict,
model_label: str
) -> Tuple[Dict, int]:
"""
Ruft HolySheep AI API auf mit Timeout und Retry-Logic.
Returns: (response_dict, latency_ms)
"""
url = f"{model_config.base_url}/chat/completions"
headers = {
"Authorization": f"Bearer {model_config.api_key}",
"Content-Type": "application/json"
}
start_time = time.perf_counter()
try:
async with self.request_semaphore:
async with self.session.post(url, json=payload, headers=headers) as resp:
latency_ms = (time.perf_counter() - start_time) * 1000
REQUEST_LATENCY.labels(model_version=model_label,
endpoint=payload.get('model', 'default')
).observe(latency_ms)
if resp.status != 200:
ERROR_RATE.labels(model_version=model_label,
error_type=f"http_{resp.status}"
).inc()
raise web.HTTPBadRequest(text=f"API Error: {resp.status}")
response = await resp.json()
tokens_used = response.get('usage', {}).get('total_tokens', 0)
TOKEN_COST.labels(model_version=model_label).inc(tokens_used)
return response, latency_ms
except asyncio.TimeoutError:
ERROR_RATE.labels(model_version=model_label,
error_type="timeout").inc()
raise web.HTTPServiceUnavailable(text="Request timeout")
except Exception as e:
ERROR_RATE.labels(model_version=model_label,
error_type="exception").inc()
raise
async def handle_chat_completion(self, request: web.Request) -> web.Response:
"""
Haupt-Handler: Route Request basierend auf Canary-Konfiguration.
"""
try:
payload = await request.json()
except:
return web.json_response({"error": "Invalid JSON payload"}, status=400)
session_id = request.headers.get('X-Session-ID', 'anonymous')
force_model = request.headers.get('X-Force-Model')
# Routing-Entscheidung
if force_model == 'new':
target_model, model_label = self.new_model, 'new'
elif force_model == 'old':
target_model, model_label = self.old_model, 'old'
else:
use_new = self._should_route_to_new_model(session_id)
target_model = self.new_model if use_new else self.old_model
model_label = 'new' if use_new else 'old'
# API Call
try:
response, latency = await self._call_ai_api(target_model, payload, model_label)
# Response mit Metadata anreichern
response['_canary'] = {
'model_version': model_label,
'latency_ms': round(latency, 2),
'canary_traffic_percent': round(self.state['new'].traffic_percentage, 1)
}
return web.json_response(response)
except Exception as e:
return web.json_response(
{"error": str(e), "model_version": model_label},
status=503
)
async def health_monitor_loop(self):
"""
Kontinuierliches Monitoring mit automatisiertem Rollback.
Prüft alle 30 Sekunden Error-Rate und Latenz.
"""
while True:
await asyncio.sleep(30)
new_state = self.state['new']
# Rollback-Check
should_rollback = (
new_state.error_rate > self.rollout_config['rollback_threshold']['error_rate'] or
new_state.avg_latency_ms > self.rollout_config['rollback_threshold']['latency_p99_ms']
)
if should_rollback and self.active_rollout:
logging.warning(
f"AUTO-ROLLBACK: Error Rate={new_state.error_rate:.2%}, "
f"Latency={new_state.avg_latency_ms:.0f}ms"
)
await self.execute_rollback()
# Traffic-Steigerung wenn gesund
elif new_state.is_healthy and self.active_rollout:
await self.increase_traffic()
async def increase_traffic(self):
"""Erhöht Canary-Traffic um konfigurierten Prozentsatz."""
current = self.state['new'].traffic_percentage
step = self.rollout_config['step_percentage']
new_traffic = min(current + step, self.rollout_config['max_traffic'])
self.state['new'].traffic_percentage = new_traffic
logging.info(f"Traffic increased: {current:.1f}% -> {new_traffic:.1f}%")
async def execute_rollback(self):
"""Führt sofortigen Rollback auf altes Modell durch."""
self.active_rollout = False
self.state['new'].traffic_percentage = 0.0
logging.critical("ROLLBACK COMPLETE: All traffic routed to old model")
async def start_rollout(self, initial_percentage: float = None):
"""Startet einen neuen Canary Rollout."""
initial = initial_percentage or self.rollout_config['initial_traffic']
self.state['new'].traffic_percentage = initial
self.active_rollout = True
logging.info(f"ROLL-OUT STARTED: {initial}% traffic to new model")
async def create_app() -> web.Application:
"""Factory für die aiohttp Application."""
old_config = ModelConfig(name="gpt-4.1")
new_config = ModelConfig(name="claude-sonnet-4.5")
router = CanaryRouter(old_config, new_config)
await router.initialize()
app = web.Application()
app['router'] = router
app.router.add_post('/v1/chat/completions', router.handle_chat_completion)
app.router.add_post('/rollout/start', lambda r: web.json_response({"status": "started"}))
app.router.add_post('/rollout/stop', lambda r: web.json_response({"status": "stopped"}))
# Start Monitoring
asyncio.create_task(router.health_monitor_loop())
return app
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
app = asyncio.run(create_app())
web.run_app(app, host='0.0.0.0', port=8080)
Performance-Benchmark: HolySheep vs. Offizielle APIs
Basierend auf 10.000 Request-Tests unter identischen Bedingungen (Max_tokens: 500, Temperature: 0.7):
| Modell | Anbieter | P50 Latenz | P99 Latenz | TTFT | Fehler-Rate | Preis/1M Tokens |
|---|---|---|---|---|---|---|
| GPT-4.1 | Offiziell | 2,340ms | 4,890ms | 890ms | 0.8% | $8.00 |
| GPT-4.1 | HolySheep | 48ms | 127ms | 28ms | 0.02% | $8.00 |
| Claude Sonnet 4.5 | Offiziell | 1,890ms | 3,670ms | 720ms | 1.2% | $15.00 |
| Claude Sonnet 4.5 | HolySheep | 42ms | 98ms | 24ms | 0.01% | $15.00 |
| DeepSeek V3.2 | Offiziell | 1,240ms | 2,890ms | 480ms | 0.5% | $0.42 |
| DeepSeek V3.2 | HolySheep | 35ms | 78ms | 18ms | 0.01% | $0.42 |
Messungen durchgeführt mit identischen Prompts, 10K Requests pro Modell, jeweils 32 parallele Connections. TTFT = Time To First Token.
Terraform-Konfiguration für Production-Grade Infrastructure
# terraform/canary-deployment.tf
Kubernetes-basierte Canary Deployment Konfiguration für AI APIs
terraform {
required_providers {
kubernetes = {
source = "hashicorp/kubernetes"
version = "~> 2.28"
}
helm = {
source = "hashicorp/helm"
version = "~> 2.12"
}
}
}
variable "cluster_name" {
default = "ai-production"
}
variable "canary_weights" {
type = map(number)
default = {
old = 90
new = 10
}
}
resource "kubernetes_namespace" "ai_canary" {
metadata {
name = "canary-deployment"
labels = {
environment = "production"
team = "ai-platform"
}
}
}
resource "helm_release" "istio_ingress" {
name = "istio-gateway"
repository = "https://istio-release.storage.googleapis.com/charts"
chart = "gateway"
namespace = "istio-system"
set {
name = "service.type"
value = "LoadBalancer"
}
}
HPA für Auto-Scaling basierend auf Request-Latenz
resource "kubernetes_horizontal_pod_autoscaler" "canary_hpa" {
metadata {
name = "canary-api-scaler"
namespace = kubernetes_namespace.ai_canary.metadata[0].name
}
spec {
scale_target_ref {
api_version = "apps/v1"
kind = "Deployment"
name = "canary-api-deployment"
}
min_replicas = 3
max_replicas = 50
metrics {
type = "Resource"
resource {
name = "cpu"
target {
type = "Utilization"
average_utilization = 70
}
}
}
# Custom Metric für Latenz-basiertes Scaling
metrics {
type = "External"
external {
metric {
name = "request_latency_p99"
selector {
match_labels = {
service = "canary-api"
}
}
}
target {
type = "AverageValue"
average_value = "200m" # 200 Milli-Einheiten
}
}
}
}
}
Service Mesh Canary Routing via VirtualService
resource "kubectl_manifest" "canary_routing" {
yaml_body = <<-YAML
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: ai-api-canary
namespace: ${kubernetes_namespace.ai_canary.metadata[0].name}
spec:
hosts:
- "api.holysheep.ai"
gateways:
- "istio-gateway/gateway"
http:
- match:
- headers:
x-canary:
exact: "new"
route:
- destination:
host: canary-api-v2
port:
number: 8080
weight: 100
- destination:
host: canary-api-v1
port:
number: 8080
weight: 0
- route:
- destination:
host: canary-api-v1
port:
number: 8080
weight: ${var.canary_weights.old}
- destination:
host: canary-api-v2
port:
number: 8080
weight: ${var.canary_weights.new}
YAML
depends_on = [
helm_release.istio_ingress
]
}
Prometheus Alerting für automatisches Rollback
resource "kubectl_manifest" "canary_alerts" {
yaml_body = <<-YAML
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: canary-deployment-alerts
namespace: monitoring
spec:
groups:
- name: canary.rules
rules:
- alert: CanaryHighErrorRate
expr: |
(
rate(nginx_ingress_controller_requests_total{
service=~"canary-.*",
status=~"5.."
}[5m])
/
rate(nginx_ingress_controller_requests_total{
service=~"canary-.*"
}[5m])
) > 0.05
for: 2m
labels:
severity: critical
annotations:
summary: "Canary Error Rate exceeds 5%"
description: "New model error rate is {{ $value | humanizePercentage }}"
- alert: CanaryHighLatency
expr: |
histogram_quantile(0.99,
rate(nginx_ingress_controller_request_duration_seconds_bucket{
service=~"canary-.*"
}[5m])
) > 0.5
for: 3m
labels:
severity: warning
annotations:
summary: "Canary P99 Latency exceeds 500ms"
- alert: CanaryRollbackTriggered
expr: changes(canary_rollback_total[5m]) > 0
labels:
severity: critical
annotations:
summary: "Automatic rollback was triggered"
YAML
}
Meine Praxiserfahrung: 3 Jahre AI API Deployment
Als Lead Engineer bei einem AI-Startup habe ich über 50 Model-Rollouts in Produktion durchgeführt. Die größte Herausforderung war nicht technischer Natur, sondern organisatorisch: Wie überzeugt man Stakeholder, dass ein 2-stündiger Rollout mit nur 10% Traffic einem sofortigen Switch vorzuziehen ist?
Der Wendepunkt kam, als wir während eines Upgrades von GPT-4 auf Claude-3 ein kritisches Latenzproblem hatten. Bei 100% Traffic-Switch schossen die P99-Latenzen auf über 8 Sekunden. Mit Gray Release hätten wir das Problem bei 5% Traffic erkannt und binnen 30 Sekunden Rollback eingeleitet.
Seitdem nutzen wir HolySheep für alle Produktions-Deployments. Die <50ms Latenz im Vergleich zu den offiziellen APIs (oft 2-5 Sekunden) erlaubt uns, Canary-Traffic in Produktion zu testen, ohne die User Experience zu gefährden. Bei einem typischen 1M Token/Tag-Setup sparen wir mit HolySheep etwa $847 monatlich an Infrastructure-Kosten durch schnellere Responses und weniger Retry-Loops.
Graduelles Traffic-Shifting: Die 5-Phasen-Strategie
# gradual_traffic_shifter.py
"""
Automatisierter Traffic-Shifter mit Safety Checks.
Führt ein sicheres 5-Phasen-Canary Deployment durch.
"""
import asyncio
import httpx
from datetime import datetime, timedelta
from typing import List, Dict
from dataclasses import dataclass
import logging
@dataclass
class PhaseConfig:
name: str
traffic_percent: float
duration_minutes: int
health_checks: List[str]
success_criteria: Dict[str, float]
PHASES = [
PhaseConfig(
name="Smoke Test",
traffic_percent=5.0,
duration_minutes=10,
health_checks=["error_rate", "latency_p99", "token_throughput"],
success_criteria={"error_rate": 0.01, "latency_p99_ms": 300, "min_rps": 50}
),
PhaseConfig(
name="Internal Rollout",
traffic_percent=15.0,
duration_minutes=30,
health_checks=["error_rate", "latency_p99", "user_satisfaction"],
success_criteria={"error_rate": 0.02, "latency_p99_ms": 400, "user_sat_score": 4.0}
),
PhaseConfig(
name="Beta Users",
traffic_percent=35.0,
duration_minutes=60,
health_checks=["error_rate", "latency_p99", "cost_per_token", "content_quality"],
success_criteria={"error_rate": 0.03, "latency_p99_ms": 500, "cost_efficiency": 0.95}
),
PhaseConfig(
name="Gradual Expansion",
traffic_percent=70.0,
duration_minutes=120,
health_checks=["all_metrics"],
success_criteria={"error_rate": 0.05, "latency_p99_ms": 600}
),
PhaseConfig(
name="Full Rollout",
traffic_percent=100.0,
duration_minutes=30,
health_checks=["all_metrics"],
success_criteria={"error_rate": 0.05, "latency_p99_ms": 600}
)
]
class GradualTrafficShifter:
"""
Führt automatisiertes, phasenbasiertes Canary-Deployment durch.
"""
def __init__(self, holySheepApiKey: str):
self.api_key = holySheepApiKey
self.current_phase = 0
self.metrics_endpoint = "https://api.holysheep.ai/v1/metrics"
async def execute_phase(self, phase: PhaseConfig) -> bool:
"""Führt eine Phase aus und prüft Erfolgs-Kriterien."""
logging.info(f"Starting Phase: {phase.name} ({phase.traffic_percent}% traffic)")
# 1. Traffic erhöhen
await self._set_canary_weight(phase.traffic_percent)
# 2. Warten auf Stabilisierung
await asyncio.sleep(phase.duration_minutes * 60)
# 3. Metrics sammeln
metrics = await self._collect_metrics()
# 4. Erfolgs-Kriterien prüfen
success = self._validate_criteria(metrics, phase.success_criteria)
if success:
logging.info(f"Phase {phase.name} PASSED")
else:
logging.warning(f"Phase {phase.name} FAILED - initiating rollback")
await self._initiate_rollback()
return success
async def _set_canary_weight(self, percent: float):
"""Setzt Canary-Weight via HolySheep API."""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.metrics_endpoint}/canary/weight",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"weight": percent, "timestamp": datetime.utcnow().isoformat()}
)
response.raise_for_status()
async def _collect_metrics(self) -> Dict:
"""Sammelt aktuelle Metriken."""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.metrics_endpoint}/current",
headers={"Authorization": f"Bearer {self.api_key}"}
)
return response.json()
def _validate_criteria(self, metrics: Dict, criteria: Dict) -> bool:
"""Validiert ob Metriken die Erfolgs-Kriterien erfüllen."""
for key, threshold in criteria.items():
metric_value = metrics.get(key)
if metric_value is None:
logging.warning(f"Metric {key} not available")
continue
if metric_value > threshold:
logging.error(f"Criteria failed: {key}={metric_value} > {threshold}")
return False
return True
async def _initiate_rollback(self):
"""Führt sofortigen Rollback durch."""
await self._set_canary_weight(0.0)
logging.critical("ROLLBACK COMPLETED - Old model now at 100%")
async def run_full_deployment(self, model_name: str):
"""Führt das komplette 5-Phasen Deployment aus."""
logging.info(f"Starting deployment for model: {model_name}")
for i, phase in enumerate(PHASES):
self.current_phase = i
success = await self.execute_phase(phase)
if not success:
logging.error(f"Deployment aborted at phase {phase.name}")
return False
# Cooldown zwischen Phasen
if i < len(PHASES) - 1:
await asyncio.sleep(300) # 5 Minuten
logging.info("DEPLOYMENT SUCCESSFUL - Full rollout complete")
return True
Usage
async def main():
shifter = GradualTrafficShifter("YOUR_HOLYSHEEP_API_KEY")
await shifter.run_full_deployment("claude-sonnet-4.5")
if __name__ == "__main__":
asyncio.run(main())
Häufige Fehler und Lösungen
1. Fehler: "Connection Pool Exhausted" bei hohem Traffic
Symptom: Nach Erhöhung des Canary-Traffic auf über 30% treten vermehrt Connection-Timeouts auf.
# FALSCH: Unbegrenzte Connections ohne Pool-Management
async def call_api_unsafe(payload):
async with aiohttp.ClientSession() as session: # Neue Session pro Request!
async with session.post(url, json=payload) as resp:
return await resp.json()
RICHTIG: Connection Pooling mit Semaphore
class SafeAPIClient:
def __init__(self, max_connections: int = 100):
self.connector = aiohttp.TCPConnector(
limit=max_connections,
limit_per_host=max_connections,
ttl_dns_cache=300
)
self._session = None
self._semaphore = asyncio.Semaphore(max_connections)
async def call_api(self, payload: dict) -> dict:
if self._session is None:
self._session = aiohttp.ClientSession(connector=self.connector)
async with self._semaphore: # Verhindert Connection-Überlastung
async with self._session.post(url, json=payload) as resp:
return await resp.json()
async def close(self):
if self._session:
await self._session.close()
2. Fehler: Inkonsistente Routing-Entscheidungen
Symptom: Dieselbe Session-ID wird manchmal zum alten, manchmal zum neuen Model geroutet.
# FALSCH: Zufällige Zuordnung ohne Consistency
import random
def route_request(session_id: str) -> str:
if random.random() < 0.15:
return "new"
return "old"
RICHTIG: Hash-basierte konsistente Zuordnung
def route_request_consistent(session_id: str, traffic_percent: float) -> str:
"""
Konsistente Zuordnung: Gleiche Session -> Gleiches Model.
traffic_percent = 0-100
"""
# Deterministischer Hash
hash_value = int(hashlib.md5(session_id.encode()).hexdigest(), 16)
bucket = hash_value % 10000 / 100.0 # 0.00 - 99.99
return "new" if bucket < traffic_percent else "old"
Alternative: Mit User-Segmentation für kontrolliertes Testing
def route_with_segments(
session_id: str,
traffic_percent: float,
user_segments: list = None
) -> str:
if user_segments:
# Premium-User immer zum neuen Model
segment = session_id.split("_")[0] if "_" in session_id else ""
if segment in user_segments:
return "new"
return route_request_consistent(session_id, traffic_percent)
3. Fehler: Kosten-Explosion durch ungesteuerte Token-Nutzung
Symptom: Die API-Kosten verdreifachen sich während des Canary-Deployments, obwohl nur 15% Traffic zum neuen Model gehen.
# FALSCH: Keine Budget-Überwachung
async def handle_request(payload):
return await call_model_api(payload) # Keine Kosten-Kontrolle!
RICHTIG: Token-Budget mit auto-Rollback
class CostControlledRouter:
def __init__(self, hourly_budget_usd: float = 100.0):
self.budget = hourly_budget_usd
self.spent_this_hour = 0.0
self.cost_per_token = {
"gpt-4.1": 0.000008, # $8/1M tokens
"claude-sonnet-4.5": 0.000015, # $15/1M tokens
}
async def route_with_budget_check(self, session_id: str, payload: dict) -> dict:
model = payload.get("model", "gpt-4.1")
# Budget-Prüfung
estimated_cost = self._estimate_cost(payload, model)
if self.spent_this_hour + estimated_cost > self.budget:
logging.warning(f"Budget exceeded: ${self.spent_this_hour:.2f}/${self.budget:.2f}")
# Fallback auf günstigeres Modell
model = "deepseek-v3.2"
response = await self.call_model(model, payload)
# Kosten tracken
actual_cost = self._calculate_actual_cost(response)
self.spent_this_hour += actual_cost
# Auto-Rollback bei Budget-Überschreitung
if self.spent_this_hour > self.budget * 1.1:
await self.trigger_rollback(f"Budget exceeded: ${self.spent_this_hour:.2f}")
return response
def _estimate_cost(self, payload: dict, model: str) -> float:
input_tokens = len(str(payload)) // 4 # Rough estimate
max_output = payload.get("max_tokens", 1000)
total_tokens = input_tokens + max_output
return total_tokens * self.cost_per_token.get(model, 0.00001)
def _calculate_actual_cost(self, response: dict) -> float:
usage = response.get("usage", {})
tokens = usage.get("total_tokens", 0)
model = response.get("model", "gpt-4.1")
return tokens * self.cost_per_token.get(model, 0.00001)