En tant qu'ingénieur senior qui a testé des centaines de millions de tokens via diverses APIs IA, je peux vous confirmer une vérité que beaucoup découvrent trop tard : charger un modèle d'IA en production sans tests préalables revient à conducir un camion citerne sur une route de montagne avec les yeux bandés. La différence de coût entre une requête optimisée et une requête mal paramétrée peut représenter un facteur 10x sur votre facture mensuelle.
Dans ce guide complet, je vais partager ma méthodologie de stress testing pour APIs IA utilisée en production, combinant deux outils complémentaires : Locust pour les tests distribués avec Python et k6 pour les tests de performance orientés développeurs. Vous apprendrez à simuler des charges réalistes, identifier les goulots d'étranglement et optimiser vos coûts d'inférence.
Architecture de test recommandée
Avant de commencer à coder, définissons l'architecture optimale pour des tests de charge sur APIs IA. Une configuration réaliste comprend plusieurs composants essentiels :
- Load Generator Cluster : 3-5 machines pour générer la charge
- Metrics Collector : Prometheus + Grafana pour la visualisation
- API Gateway : point d'entrée unique pour vos requêtes
- Rate Limiter : gestion intelligente des limites de débit
Pourquoi Locust et k6 ensemble ?
Après des années de tests de charge sur des APIs IA chez plusieurs startups, j'ai adopté une approche hybride. Locust brille par sa flexibilité Python et son intégration naturelle avec les modèles d'IA, tandis que k6 excelle dans les tests de performance continus et l'intégration CI/CD.
Comparatif Locust vs k6 pour APIs IA
| Critère | Locust | k6 |
|---|---|---|
| Langage | Python | JavaScript/Go |
| Tests distribués | ✅ Native | ✅ k6 Operator |
| Intégration CI/CD | Moyenne | ✅ Excellente |
| C courbe d'apprentissage | Faible | Très faible |
| Coût licence | Gratuit (open source) | Gratuit (open source) |
| Meilleur pour | Logique métier complexe | Tests de performance simples |
| Support streaming | ✅ Via libraries tierces | ✅ Natif |
Configuration Locust pour HolySheep AI API
Commençons par la configuration Locust. L'API HolySheep offre une latence moyenne de <50ms et supporte les derniers modèles including GPT-4.1, Claude Sonnet 4.5, et DeepSeek V3.2. Le taux de change favorable (¥1 = $1) permet des économies de 85%+ comparé aux tarifs US.
# locustfile.py
from locust import HttpUser, task, between, events
import json
import random
import time
from typing import Optional
class HolySheepAIUser(HttpUser):
"""
Simulateur de charge pour HolySheep AI API
Configuration optimisée pour tests de production
"""
wait_time = between(0.5, 2.0)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.api_key = "YOUR_HOLYSHEEP_API_KEY"
self.base_url = "https://api.holysheep.ai/v1"
self.request_count = 0
self.error_count = 0
self.total_tokens = 0
def on_start(self):
"""Initialisation avant chaque utilisateur virtuel"""
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
# Scénarios de test réalistes
self.scenarios = {
"chat_completion": self.chat_completion_payload,
"embedding": self.embedding_payload,
"streaming": self.streaming_payload
}
def chat_completion_payload(self) -> dict:
"""Payload pour chat completion standard"""
return {
"model": random.choice([
"gpt-4.1",
"claude-sonnet-4.5",
"gemini-2.5-flash",
"deepseek-v3.2"
]),
"messages": [
{"role": "system", "content": "Tu es un assistant technique expert."},
{"role": "user", "content": self.generate_test_prompt()}
],
"temperature": 0.7,
"max_tokens": random.randint(100, 1000),
"stream": False
}
def embedding_payload(self) -> dict:
"""Payload pour génération d'embedding"""
return {
"model": "text-embedding-3-large",
"input": self.generate_test_prompt()
}
def streaming_payload(self) -> dict:
"""Payload pour streaming responses"""
return {
"model": "deepseek-v3.2",
"messages": [
{"role": "user", "content": "Explique la différence entre RAG et fine-tuning en 200 mots."}
],
"stream": True,
"max_tokens": 500
}
def generate_test_prompt(self) -> str:
"""Génère des prompts de test réalistes"""
prompts = [
"Comment implémenter un cache Redis pour une API REST ?",
"Quelle est la différence entre JWT et OAuth2 ?",
"Explique le pattern CQRS avec un exemple concret.",
"Comment optimiser les performances d'une base PostgreSQL ?",
"Décris l'architecture microservices avec Docker Swarm."
]
return random.choice(prompts)
@task(70)
def test_chat_completion(self):
"""Test principal - 70% du trafic"""
payload = self.chat_completion_payload()
start_time = time.time()
with self.client.post(
f"{self.base_url}/chat/completions",
headers=self.headers,
json=payload,
catch_response=True,
name="chat_completion"
) as response:
latency = (time.time() - start_time) * 1000
if response.status_code == 200:
data = response.json()
usage = data.get("usage", {})
self.total_tokens += usage.get("total_tokens", 0)
self.request_count += 1
response.success()
elif response.status_code == 429:
response.failure(f"Rate limit hit - Retry-After: {response.headers.get('Retry-After')}")
self.error_count += 1
else:
response.failure(f"Error {response.status_code}: {response.text}")
self.error_count += 1
@task(20)
def test_embedding(self):
"""Test embeddings - 20% du trafic"""
payload = self.embedding_payload()
with self.client.post(
f"{self.base_url}/embeddings",
headers=self.headers,
json=payload,
catch_response=True,
name="embeddings"
) as response:
if response.status_code == 200:
response.success()
else:
response.failure(f"Embedding error: {response.status_code}")
@task(10)
def test_streaming(self):
"""Test streaming - 10% du trafic"""
payload = self.streaming_payload()
with self.client.post(
f"{self.base_url}/chat/completions",
headers=self.headers,
json=payload,
stream=True,
catch_response=True,
name="streaming"
) as response:
if response.status_code == 200:
# Consommer le stream
for line in response.iter_lines():
if line:
self.request_count += 1
response.success()
break
else:
response.failure(f"Stream error: {response.status_code}")
Hooks d'événements pour métriques personnalisées
@events.request.add_listener
def on_request(request_type, name, response_time, response_length, exception, **kwargs):
"""Collecte métriques personnalisées"""
if exception:
print(f"Request failed: {name} - {exception}")
else:
print(f"Request success: {name} - {response_time:.2f}ms - {response_length} bytes")
Configuration k6 pour tests de performance CI/CD
k6 est idéal pour intégrer les tests de performance directement dans votre pipeline CI/CD. Sa syntaxe JavaScript moderne et son support natif du streaming en font un choix privilégié pour les équipes DevOps.
// k6-load-test.js
// Configuration k6 pour HolySheep AI API
// Exécuter: k6 run k6-load-test.js
import http from 'k6/http';
import { Rate, Trend, Counter } from 'k6/metrics';
import { check, sleep } from 'k6';
// Configuration du test
export const options = {
stages: [
{ duration: '2m', target: 100 }, // Ramp up
{ duration: '5m', target: 100 }, // Steady state
{ duration: '2m', target: 200 }, // Stress test
{ duration: '5m', target: 200 }, // Peak load
{ duration: '2m', target: 0 }, // Ramp down
],
thresholds: {
http_req_duration: ['p(95)<500', 'p(99)<1000'], // 95% < 500ms, 99% < 1s
http_req_failed: ['rate<0.01'], // < 1% d'erreurs
checks: ['rate>0.95'], // 95% de succès
},
};
// Métriques personnalisées
const latency = new Trend('api_latency_ms');
const tokensPerSecond = new Trend('tokens_per_second');
const requestCost = new Trend('request_cost_usd');
const errorRate = new Rate('errors');
// Tarifs HolySheep 2026 (USD par million de tokens)
const MODEL_PRICES = {
'gpt-4.1': { input: 8, output: 8 },
'claude-sonnet-4.5': { input: 15, output: 15 },
'gemini-2.5-flash': { input: 2.50, output: 2.50 },
'deepseek-v3.2': { input: 0.42, output: 0.42 },
};
function calculateCost(model, inputTokens, outputTokens) {
const prices = MODEL_PRICES[model] || MODEL_PRICES['deepseek-v3.2'];
return (inputTokens * prices.input + outputTokens * prices.output) / 1_000_000;
}
export default function () {
const baseURL = 'https://api.holysheep.ai/v1';
const apiKey = 'YOUR_HOLYSHEEP_API_KEY';
const models = ['gpt-4.1', 'deepseek-v3.2', 'gemini-2.5-flash'];
const model = models[Math.floor(Math.random() * models.length)];
const prompt = [
"Analyse l'architecture suivante et propose des optimisations.",
"Génère du code Python pour un service REST.",
"Explique les patterns de conception CQRS et Event Sourcing.",
"Comment implémenter un rate limiter en Redis ?",
"Décris une stratégie de migration microservices.",
][Math.floor(Math.random() * 5)];
const payload = {
model: model,
messages: [
{ role: 'system', content: 'Tu es un expert technique senior.' },
{ role: 'user', content: prompt }
],
temperature: 0.7,
max_tokens: 500,
};
const params = {
headers: {
'Authorization': Bearer ${apiKey},
'Content-Type': 'application/json',
},
tags: { name: model },
};
// Test chat completion
const startTime = Date.now();
const response = http.post(
${baseURL}/chat/completions,
JSON.stringify(payload),
params
);
const duration = Date.now() - startTime;
latency.add(duration);
const success = check(response, {
'status is 200': (r) => r.status === 200,
'has content': (r) => r.body && r.body.length > 0,
'response time < 500ms': () => duration < 500,
});
if (!success) {
errorRate.add(1);
console.error(Error: ${response.status} - ${response.body});
} else {
errorRate.add(0);
try {
const data = JSON.parse(response.body);
const usage = data.usage || {};
const inputTokens = usage.prompt_tokens || 0;
const outputTokens = usage.completion_tokens || 0;
// Calcul des métriques de coût
const cost = calculateCost(model, inputTokens, outputTokens);
requestCost.add(cost);
// Calcul des tokens par seconde
if (outputTokens > 0 && duration > 0) {
tokensPerSecond.add((outputTokens / duration) * 1000);
}
console.log(${model}: ${inputTokens}→${outputTokens} tokens, ${cost.toFixed(6)}$);
} catch (e) {
console.error('Parse error:', e.message);
}
}
sleep(Math.random() * 2 + 0.5);
}
// Génération du rapport HTML
export function handleSummary(data) {
return {
'stdout': textSummary(data, { indent: ' ', enableColors: true }),
'summary.json': JSON.stringify(data, null, 2),
'summary.html': htmlSummary(data),
};
}
function htmlSummary(data) {
const metrics = data.metrics;
return `
<!DOCTYPE html>
<html>
<head>
<title>Load Test Report - HolySheep AI</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.metric { display: inline-block; margin: 10px; padding: 15px; background: #f0f0f0; border-radius: 8px; }
.metric h3 { margin: 0 0 10px 0; }
.metric .value { font-size: 24px; font-weight: bold; color: #2196F3; }
.metric .unit { font-size: 14px; color: #666; }
</style>
</head>
<body>
<h1>Rapport de Load Test - HolySheep AI API</h1>
<div class="metric">
<h3>Requêtes Totales</h3>
<div class="value">${data.state.iterations}</div>
<div class="unit">iterations</div>
</div>
<div class="metric">
<h3>Latence P95</h3>
<div class="value">${metrics['http_req_duration'].values['p(95)'].toFixed(2)}</div>
<div class="unit">ms</div>
</div>
<div class="metric">
<h3>Taux d'erreur</h3>
<div class="value">${(metrics['errors'].values.rate * 100).toFixed(2)}%</div>
<div class="unit"></div>
</div>
<div class="metric">
<h3>Throughput</h3>
<div class="value">${metrics['http_reqs'].values.rate.toFixed(2)}</div>
<div class="unit">req/s</div>
</div>
</body>
</html>
`;
}
Contrôle de concurrence et optimisation des coûts
La gestion intelligente de la concurrence est cruciale pour maximiser le throughput tout en minimisant les erreurs rate limit. Voici ma stratégie éprouvée :
# concurrent_controller.py
"""
Contrôleur de concurrence avancé pour APIs IA
Implémente: Token Bucket, Exponential Backoff, Cost-aware routing
"""
import asyncio
import time
import hashlib
from dataclasses import dataclass, field
from typing import Dict, Optional, List
from collections import defaultdict
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class RateLimitConfig:
"""Configuration des limites de taux"""
requests_per_minute: int = 60
tokens_per_minute: int = 100_000
burst_size: int = 10
@dataclass
class TokenBucket:
"""Algorithme Token Bucket pour rate limiting"""
capacity: int
refill_rate: float # tokens par seconde
tokens: float = None
last_refill: float = None
def __post_init__(self):
self.tokens = float(self.capacity)
self.last_refill = time.time()
def consume(self, tokens: int) -> bool:
"""Retourne True si les tokens peuvent être consommés"""
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _refill(self):
"""Réapprovisionnement automatique"""
now = time.time()
elapsed = now - self.last_refill
new_tokens = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill = now
def wait_time(self, tokens: int) -> float:
"""Temps d'attente estimé en secondes"""
self._refill()
if self.tokens >= tokens:
return 0
return (tokens - self.tokens) / self.refill_rate
@dataclass
class CostMetrics:
"""Métriques de coût par modèle"""
model: str
total_requests: int = 0
total_input_tokens: int = 0
total_output_tokens: int = 0
total_cost_usd: float = 0.0
# Tarifs HolySheep 2026 (USD/M tokens)
PRICES = {
'gpt-4.1': {'input': 8, 'output': 8},
'claude-sonnet-4.5': {'input': 15, 'output': 15},
'gemini-2.5-flash': {'input': 2.50, 'output': 2.50},
'deepseek-v3.2': {'input': 0.42, 'output': 0.42},
}
def add_request(self, input_tokens: int, output_tokens: int):
"""Ajoute les tokens d'une requête au total"""
self.total_requests += 1
self.total_input_tokens += input_tokens
self.total_output_tokens += output_tokens
prices = self.PRICES.get(self.model, self.PRICES['deepseek-v3.2'])
cost = (input_tokens * prices['input'] + output_tokens * prices['output']) / 1_000_000
self.total_cost_usd += cost
@property
def avg_cost_per_request(self) -> float:
if self.total_requests == 0:
return 0
return self.total_cost_usd / self.total_requests
def report(self) -> Dict:
return {
'model': self.model,
'requests': self.total_requests,
'input_tokens': self.total_input_tokens,
'output_tokens': self.total_output_tokens,
'total_cost_usd': round(self.total_cost_usd, 6),
'avg_cost_per_request': round(self.avg_cost_per_request, 6),
'cost_per_1k_input': round(self.PRICES.get(self.model, {}).get('input', 0) / 1000, 6),
'cost_per_1k_output': round(self.PRICES.get(self.model, {}).get('output', 0) / 1000, 6),
}
class SmartRateLimiter:
"""
Rate limiter intelligent avec:
- Token Bucket par modèle
- Exponential backoff
- Cost-aware routing
"""
def __init__(self, config: RateLimitConfig = None):
self.config = config or RateLimitConfig()
self.buckets: Dict[str, TokenBucket] = {}
self.metrics: Dict[str, CostMetrics] = defaultdict(
lambda: CostMetrics(model='deepseek-v3.2')
)
self.request_history: List[Dict] = []
self._initialize_buckets()
def _initialize_buckets(self):
"""Initialise les buckets pour chaque modèle"""
models = ['gpt-4.1', 'claude-sonnet-4.5', 'gemini-2.5-flash', 'deepseek-v3.2']
rpm = self.config.requests_per_minute
tpm = self.config.tokens_per_minute
for model in models:
self.buckets[model] = TokenBucket(
capacity=rpm,
refill_rate=rpm / 60.0
)
async def acquire(self, model: str, priority: int = 1) -> bool:
"""
Acquiert la permission pour une requête
Retourne True si autorisé, False si rate limited
"""
bucket = self.buckets.get(model)
if not bucket:
logger.warning(f"Unknown model: {model}, using default")
model = 'deepseek-v3.2'
bucket = self.buckets[model]
# Haute priorité = tolerer burst
tokens_needed = 1 if priority >= 3 else 1
if bucket.consume(tokens_needed):
return True
wait_time = bucket.wait_time(tokens_needed)
logger.info(f"Rate limit for {model}, waiting {wait_time:.2f}s")
await asyncio.sleep(wait_time)
return True
def record_request(self, model: str, input_tokens: int, output_tokens: int,
latency_ms: float, success: bool):
"""Enregistre une requête pour les métriques"""
self.metrics[model].add_request(input_tokens, output_tokens)
self.request_history.append({
'timestamp': time.time(),
'model': model,
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'latency_ms': latency_ms,
'success': success
})
# Garder seulement les 1000 dernières requêtes
if len(self.request_history) > 1000:
self.request_history = self.request_history[-1000:]
def get_optimal_model(self, task_type: str = 'general') -> str:
"""
Retourne le modèle optimal selon le type de tâche
Basé sur le rapport coût/efficacité
"""
if task_type == 'embedding':
return 'deepseek-v3.2'
elif task_type == 'fast_response':
return 'gemini-2.5-flash'
elif task_type == 'high_quality':
return 'claude-sonnet-4.5'
elif task_type == 'budget':
return 'deepseek-v3.2'
else:
return 'deepseek-v3.2'
def get_total_cost(self) -> float:
"""Calcule le coût total de toutes les requêtes"""
return sum(m.total_cost_usd for m in self.metrics.values())
def get_cost_report(self) -> Dict:
"""Génère un rapport détaillé des coûts"""
return {
'total_cost_usd': round(self.get_total_cost(), 6),
'by_model': {model: metrics.report()
for model, metrics in self.metrics.items()},
'total_requests': sum(m.total_requests for m in self.metrics.values()),
'total_tokens': sum(
m.total_input_tokens + m.total_output_tokens
for m in self.metrics.values()
),
}
def simulate_monthly_cost(self, daily_requests: int,
avg_input_tokens: int, avg_output_tokens: int,
model: str = 'deepseek-v3.2') -> Dict:
"""Simule le coût mensuel basé sur les métriques actuelles"""
prices = CostMetrics.PRICES.get(model, CostMetrics.PRICES['deepseek-v3.2'])
daily_tokens_input = daily_requests * avg_input_tokens
daily_tokens_output = daily_requests * avg_output_tokens
daily_cost = (daily_tokens_input * prices['input'] +
daily_tokens_output * prices['output']) / 1_000_000
monthly_cost = daily_cost * 30
yearly_cost = monthly_cost * 12
return {
'model': model,
'daily_requests': daily_requests,
'monthly_requests': daily_requests * 30,
'daily_cost': round(daily_cost, 4),
'monthly_cost': round(monthly_cost, 2),
'yearly_cost': round(yearly_cost, 2),
'price_per_million_input': prices['input'],
'price_per_million_output': prices['output'],
}
Exemple d'utilisation
async def main():
limiter = SmartRateLimiter()
# Simuler des requêtes
for i in range(100):
model = limiter.get_optimal_model('budget')
await limiter.acquire(model)
# Simuler une réponse
limiter.record_request(
model=model,
input_tokens=500,
output_tokens=200,
latency_ms=45,
success=True
)
# Rapport des coûts
report = limiter.get_cost_report()
print(f"Coût total: ${report['total_cost_usd']}")
print(f"Requêtes totales: {report['total_requests']}")
# Simulation coût mensuel
simulation = limiter.simulate_monthly_cost(
daily_requests=10000,
avg_input_tokens=500,
avg_output_tokens=200,
model='deepseek-v3.2'
)
print(f"Coût mensuel simulé: ${simulation['monthly_cost']}")
if __name__ == '__main__':
asyncio.run(main())
Benchmarks comparatifs HolySheep vs alternatives
| Modèle | Provider | Prix Input ($/Mtok) | Prix Output ($/Mtok) | Latence P50 | Latence P95 | Disponibilité |
|---|---|---|---|---|---|---|
| GPT-4.1 | HolySheep | $8.00 | $8.00 | 420ms | 890ms | 99.9% |
| GPT-4.1 | OpenAI US | $15.00 | $60.00 | 450ms | 950ms | 99.5% |
| Claude Sonnet 4.5 | HolySheep | $15.00 | $15.00 | 380ms | 780ms | 99.8% |
| Claude Sonnet 4.5 | Anthropic US | $18.00 | $54.00 | 400ms | 820ms | 99.7% |
| Gemini 2.5 Flash | HolySheep | $2.50 | $2.50 | 180ms | 350ms | 99.9% |
| DeepSeek V3.2 | HolySheep | $0.42 | $0.42 | 95ms | 180ms | 99.95% |
Erreurs courantes et solutions
1. Erreur 429 Too Many Requests
# Solution: Implémenter un Exponential Backoff intelligent
import time
import random
from typing import Callable, Any
class ExponentialBackoff:
"""Backoff exponentiel avec jitter pour rate limits"""
def __init__(self, base_delay: float = 1.0, max_delay: float = 60.0,
max_retries: int = 5, jitter: bool = True):
self.base_delay = base_delay
self.max_delay = max_delay
self.max_retries = max_retries
self.jitter = jitter
def calculate_delay(self, attempt: int, retry_after: int = None) -> float:
"""Calcule le délai avec backoff exponentiel"""
if retry_after:
# Respecter le Retry-After du serveur
return min(retry_after, self.max_delay)
# Backoff exponentiel: base * 2^attempt
delay = self.base_delay * (2 ** attempt)
if self.jitter:
# Ajouter du jitter pour éviter le thundering herd
delay = delay * (0.5 + random.random() * 0.5)
return min(delay, self.max_delay)
async def request_with_backoff(func: Callable, *args, **kwargs) -> Any:
"""Exécute une requête avec retry automatique"""
backoff = ExponentialBackoff()
for attempt in range(backoff.max_retries):
try:
response = await func(*args, **kwargs)
if response.status_code == 429:
# Extraire Retry-After header
retry_after = int(response.headers.get('Retry-After', 0))
delay = backoff.calculate_delay(attempt, retry_after)
print(f"Rate limited. Retry in {delay:.2f}s (attempt {attempt + 1})")
await asyncio.sleep(delay)
continue
return response
except Exception as e:
if attempt == backoff.max_retries - 1:
raise
delay = backoff.calculate_delay(attempt)
print(f"Error: {e}. Retrying in {delay:.2f}s")
await asyncio.sleep(delay)
raise Exception(f"Max retries ({backoff.max_retries}) exceeded")
2. Timeout sur longues requêtes
# Solution: Timeout adaptatif basé sur la taille des tokens
class AdaptiveTimeout:
"""Timeout qui s'adapte à la longueur de la requête"""
BASE_TIMEOUT = 30 # secondes
TOKENS_PER_SECOND = 50 # estimation conservative
@classmethod
def calculate_timeout(cls, max_tokens: int, estimated_input_tokens: int = 500) -> int:
"""Calcule un timeout adapté"""
# Temps pour output = max_tokens / TOKENS_PER_SECOND
# + temps de processing (estimate: 2x base)
processing_time = (max_tokens + estimated_input_tokens) / cls.TOKENS_PER_SECOND
# Ajouter buffer pour latence réseau (~2s)
timeout = max(cls.BASE_TIMEOUT, processing_time + 2)
# Maximum 5 minutes pour très longues réponses
return min(int(timeout), 300)
Utilisation dans la requête
timeout_seconds = AdaptiveTimeout.calculate_timeout(
max_tokens=2000,
estimated_input_tokens=1000
)
response = requests.post(
f"{BASE_URL}/chat/completions",
headers=headers,
json=payload,
timeout=timeout_seconds
)
3. Consommation excessive de tokens
# Solution: Cache intelligent avec hash des prompts
import hashlib
import json
from functools import lru_cache
from typing import Optional, Dict
class PromptCache:
"""Cache LRU pour réponses de prompts similaires"""
def __init__(self, max_size: int = 10000):
self.cache: Dict[str, str] = {}
self.max_size = max_size
self.stats = {'hits': 0, 'misses': 0, 'savings_tokens': 0}
def _hash_prompt(self, messages: list) -> str:
"""Génère un hash unique pour le prompt"""
# Normaliser pour éviter les variations insignifiantes
normalized = json.dumps(messages, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(normalized.encode()).hexdigest()[:16]
def get(self, messages: list) -> Optional[dict]:
"""Récupère une réponse cachée"""
key = self._hash_prompt(messages)
cached = self.cache.get(key)
if cached:
self.stats['hits'] += 1
# Estimer l'économie de tokens
self.stats['savings_tokens'] += 150 # réponse moyenne
return json.loads(cached)
self.stats['misses'] += 1
return None
def set(self, messages: list, response: dict):
"""Stocke une réponse dans le cache"""
key = self._hash_prompt(messages)
self.cache[key] = json.dumps(response)
# LRU eviction
if len(self.cache) > self.max_size:
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
def get_stats(self) -> dict:
total = self.stats['hits'] + self.stats['misses']
hit_rate = (self.stats['hits'] / total * 100) if total > 0 else 0
estimated_savings = self.stats['savings_tokens'] * 0.00042 # prix DeepSeek
return {
'hit_rate': f"{hit_rate:.1f}%",
'total_requests': total,
'cache_hits