En tant qu'ingénieur senior qui a testé des centaines de millions de tokens via diverses APIs IA, je peux vous confirmer une vérité que beaucoup découvrent trop tard : charger un modèle d'IA en production sans tests préalables revient à conducir un camion citerne sur une route de montagne avec les yeux bandés. La différence de coût entre une requête optimisée et une requête mal paramétrée peut représenter un facteur 10x sur votre facture mensuelle.

Dans ce guide complet, je vais partager ma méthodologie de stress testing pour APIs IA utilisée en production, combinant deux outils complémentaires : Locust pour les tests distribués avec Python et k6 pour les tests de performance orientés développeurs. Vous apprendrez à simuler des charges réalistes, identifier les goulots d'étranglement et optimiser vos coûts d'inférence.

Architecture de test recommandée

Avant de commencer à coder, définissons l'architecture optimale pour des tests de charge sur APIs IA. Une configuration réaliste comprend plusieurs composants essentiels :

Pourquoi Locust et k6 ensemble ?

Après des années de tests de charge sur des APIs IA chez plusieurs startups, j'ai adopté une approche hybride. Locust brille par sa flexibilité Python et son intégration naturelle avec les modèles d'IA, tandis que k6 excelle dans les tests de performance continus et l'intégration CI/CD.

Comparatif Locust vs k6 pour APIs IA

CritèreLocustk6
LangagePythonJavaScript/Go
Tests distribués✅ Native✅ k6 Operator
Intégration CI/CDMoyenne✅ Excellente
C courbe d'apprentissageFaibleTrès faible
Coût licenceGratuit (open source)Gratuit (open source)
Meilleur pourLogique métier complexeTests de performance simples
Support streaming✅ Via libraries tierces✅ Natif

Configuration Locust pour HolySheep AI API

Commençons par la configuration Locust. L'API HolySheep offre une latence moyenne de <50ms et supporte les derniers modèles including GPT-4.1, Claude Sonnet 4.5, et DeepSeek V3.2. Le taux de change favorable (¥1 = $1) permet des économies de 85%+ comparé aux tarifs US.

# locustfile.py
from locust import HttpUser, task, between, events
import json
import random
import time
from typing import Optional

class HolySheepAIUser(HttpUser):
    """
    Simulateur de charge pour HolySheep AI API
    Configuration optimisée pour tests de production
    """
    wait_time = between(0.5, 2.0)
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.api_key = "YOUR_HOLYSHEEP_API_KEY"
        self.base_url = "https://api.holysheep.ai/v1"
        self.request_count = 0
        self.error_count = 0
        self.total_tokens = 0
        
    def on_start(self):
        """Initialisation avant chaque utilisateur virtuel"""
        self.headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        # Scénarios de test réalistes
        self.scenarios = {
            "chat_completion": self.chat_completion_payload,
            "embedding": self.embedding_payload,
            "streaming": self.streaming_payload
        }
    
    def chat_completion_payload(self) -> dict:
        """Payload pour chat completion standard"""
        return {
            "model": random.choice([
                "gpt-4.1",
                "claude-sonnet-4.5", 
                "gemini-2.5-flash",
                "deepseek-v3.2"
            ]),
            "messages": [
                {"role": "system", "content": "Tu es un assistant technique expert."},
                {"role": "user", "content": self.generate_test_prompt()}
            ],
            "temperature": 0.7,
            "max_tokens": random.randint(100, 1000),
            "stream": False
        }
    
    def embedding_payload(self) -> dict:
        """Payload pour génération d'embedding"""
        return {
            "model": "text-embedding-3-large",
            "input": self.generate_test_prompt()
        }
    
    def streaming_payload(self) -> dict:
        """Payload pour streaming responses"""
        return {
            "model": "deepseek-v3.2",
            "messages": [
                {"role": "user", "content": "Explique la différence entre RAG et fine-tuning en 200 mots."}
            ],
            "stream": True,
            "max_tokens": 500
        }
    
    def generate_test_prompt(self) -> str:
        """Génère des prompts de test réalistes"""
        prompts = [
            "Comment implémenter un cache Redis pour une API REST ?",
            "Quelle est la différence entre JWT et OAuth2 ?",
            "Explique le pattern CQRS avec un exemple concret.",
            "Comment optimiser les performances d'une base PostgreSQL ?",
            "Décris l'architecture microservices avec Docker Swarm."
        ]
        return random.choice(prompts)
    
    @task(70)
    def test_chat_completion(self):
        """Test principal - 70% du trafic"""
        payload = self.chat_completion_payload()
        
        start_time = time.time()
        with self.client.post(
            f"{self.base_url}/chat/completions",
            headers=self.headers,
            json=payload,
            catch_response=True,
            name="chat_completion"
        ) as response:
            latency = (time.time() - start_time) * 1000
            
            if response.status_code == 200:
                data = response.json()
                usage = data.get("usage", {})
                self.total_tokens += usage.get("total_tokens", 0)
                self.request_count += 1
                response.success()
            elif response.status_code == 429:
                response.failure(f"Rate limit hit - Retry-After: {response.headers.get('Retry-After')}")
                self.error_count += 1
            else:
                response.failure(f"Error {response.status_code}: {response.text}")
                self.error_count += 1
    
    @task(20)
    def test_embedding(self):
        """Test embeddings - 20% du trafic"""
        payload = self.embedding_payload()
        
        with self.client.post(
            f"{self.base_url}/embeddings",
            headers=self.headers,
            json=payload,
            catch_response=True,
            name="embeddings"
        ) as response:
            if response.status_code == 200:
                response.success()
            else:
                response.failure(f"Embedding error: {response.status_code}")
    
    @task(10)
    def test_streaming(self):
        """Test streaming - 10% du trafic"""
        payload = self.streaming_payload()
        
        with self.client.post(
            f"{self.base_url}/chat/completions",
            headers=self.headers,
            json=payload,
            stream=True,
            catch_response=True,
            name="streaming"
        ) as response:
            if response.status_code == 200:
                # Consommer le stream
                for line in response.iter_lines():
                    if line:
                        self.request_count += 1
                        response.success()
                        break
            else:
                response.failure(f"Stream error: {response.status_code}")

Hooks d'événements pour métriques personnalisées

@events.request.add_listener def on_request(request_type, name, response_time, response_length, exception, **kwargs): """Collecte métriques personnalisées""" if exception: print(f"Request failed: {name} - {exception}") else: print(f"Request success: {name} - {response_time:.2f}ms - {response_length} bytes")

Configuration k6 pour tests de performance CI/CD

k6 est idéal pour intégrer les tests de performance directement dans votre pipeline CI/CD. Sa syntaxe JavaScript moderne et son support natif du streaming en font un choix privilégié pour les équipes DevOps.

// k6-load-test.js
// Configuration k6 pour HolySheep AI API
// Exécuter: k6 run k6-load-test.js

import http from 'k6/http';
import { Rate, Trend, Counter } from 'k6/metrics';
import { check, sleep } from 'k6';

// Configuration du test
export const options = {
    stages: [
        { duration: '2m', target: 100 },   // Ramp up
        { duration: '5m', target: 100 },   // Steady state
        { duration: '2m', target: 200 },   // Stress test
        { duration: '5m', target: 200 },   // Peak load
        { duration: '2m', target: 0 },     // Ramp down
    ],
    thresholds: {
        http_req_duration: ['p(95)<500', 'p(99)<1000'],  // 95% < 500ms, 99% < 1s
        http_req_failed: ['rate<0.01'],                   // < 1% d'erreurs
        checks: ['rate>0.95'],                             // 95% de succès
    },
};

// Métriques personnalisées
const latency = new Trend('api_latency_ms');
const tokensPerSecond = new Trend('tokens_per_second');
const requestCost = new Trend('request_cost_usd');
const errorRate = new Rate('errors');

// Tarifs HolySheep 2026 (USD par million de tokens)
const MODEL_PRICES = {
    'gpt-4.1': { input: 8, output: 8 },
    'claude-sonnet-4.5': { input: 15, output: 15 },
    'gemini-2.5-flash': { input: 2.50, output: 2.50 },
    'deepseek-v3.2': { input: 0.42, output: 0.42 },
};

function calculateCost(model, inputTokens, outputTokens) {
    const prices = MODEL_PRICES[model] || MODEL_PRICES['deepseek-v3.2'];
    return (inputTokens * prices.input + outputTokens * prices.output) / 1_000_000;
}

export default function () {
    const baseURL = 'https://api.holysheep.ai/v1';
    const apiKey = 'YOUR_HOLYSHEEP_API_KEY';
    
    const models = ['gpt-4.1', 'deepseek-v3.2', 'gemini-2.5-flash'];
    const model = models[Math.floor(Math.random() * models.length)];
    
    const prompt = [
        "Analyse l'architecture suivante et propose des optimisations.",
        "Génère du code Python pour un service REST.",
        "Explique les patterns de conception CQRS et Event Sourcing.",
        "Comment implémenter un rate limiter en Redis ?",
        "Décris une stratégie de migration microservices.",
    ][Math.floor(Math.random() * 5)];
    
    const payload = {
        model: model,
        messages: [
            { role: 'system', content: 'Tu es un expert technique senior.' },
            { role: 'user', content: prompt }
        ],
        temperature: 0.7,
        max_tokens: 500,
    };
    
    const params = {
        headers: {
            'Authorization': Bearer ${apiKey},
            'Content-Type': 'application/json',
        },
        tags: { name: model },
    };
    
    // Test chat completion
    const startTime = Date.now();
    
    const response = http.post(
        ${baseURL}/chat/completions,
        JSON.stringify(payload),
        params
    );
    
    const duration = Date.now() - startTime;
    latency.add(duration);
    
    const success = check(response, {
        'status is 200': (r) => r.status === 200,
        'has content': (r) => r.body && r.body.length > 0,
        'response time < 500ms': () => duration < 500,
    });
    
    if (!success) {
        errorRate.add(1);
        console.error(Error: ${response.status} - ${response.body});
    } else {
        errorRate.add(0);
        
        try {
            const data = JSON.parse(response.body);
            const usage = data.usage || {};
            const inputTokens = usage.prompt_tokens || 0;
            const outputTokens = usage.completion_tokens || 0;
            
            // Calcul des métriques de coût
            const cost = calculateCost(model, inputTokens, outputTokens);
            requestCost.add(cost);
            
            // Calcul des tokens par seconde
            if (outputTokens > 0 && duration > 0) {
                tokensPerSecond.add((outputTokens / duration) * 1000);
            }
            
            console.log(${model}: ${inputTokens}→${outputTokens} tokens, ${cost.toFixed(6)}$);
        } catch (e) {
            console.error('Parse error:', e.message);
        }
    }
    
    sleep(Math.random() * 2 + 0.5);
}

// Génération du rapport HTML
export function handleSummary(data) {
    return {
        'stdout': textSummary(data, { indent: ' ', enableColors: true }),
        'summary.json': JSON.stringify(data, null, 2),
        'summary.html': htmlSummary(data),
    };
}

function htmlSummary(data) {
    const metrics = data.metrics;
    
    return `
    <!DOCTYPE html>
    <html>
    <head>
        <title>Load Test Report - HolySheep AI</title>
        <style>
            body { font-family: Arial, sans-serif; margin: 40px; }
            .metric { display: inline-block; margin: 10px; padding: 15px; background: #f0f0f0; border-radius: 8px; }
            .metric h3 { margin: 0 0 10px 0; }
            .metric .value { font-size: 24px; font-weight: bold; color: #2196F3; }
            .metric .unit { font-size: 14px; color: #666; }
        </style>
    </head>
    <body>
        <h1>Rapport de Load Test - HolySheep AI API</h1>
        <div class="metric">
            <h3>Requêtes Totales</h3>
            <div class="value">${data.state.iterations}</div>
            <div class="unit">iterations</div>
        </div>
        <div class="metric">
            <h3>Latence P95</h3>
            <div class="value">${metrics['http_req_duration'].values['p(95)'].toFixed(2)}</div>
            <div class="unit">ms</div>
        </div>
        <div class="metric">
            <h3>Taux d'erreur</h3>
            <div class="value">${(metrics['errors'].values.rate * 100).toFixed(2)}%</div>
            <div class="unit"></div>
        </div>
        <div class="metric">
            <h3>Throughput</h3>
            <div class="value">${metrics['http_reqs'].values.rate.toFixed(2)}</div>
            <div class="unit">req/s</div>
        </div>
    </body>
    </html>
    `;
}

Contrôle de concurrence et optimisation des coûts

La gestion intelligente de la concurrence est cruciale pour maximiser le throughput tout en minimisant les erreurs rate limit. Voici ma stratégie éprouvée :

# concurrent_controller.py
"""
Contrôleur de concurrence avancé pour APIs IA
Implémente: Token Bucket, Exponential Backoff, Cost-aware routing
"""

import asyncio
import time
import hashlib
from dataclasses import dataclass, field
from typing import Dict, Optional, List
from collections import defaultdict
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class RateLimitConfig:
    """Configuration des limites de taux"""
    requests_per_minute: int = 60
    tokens_per_minute: int = 100_000
    burst_size: int = 10
    
@dataclass
class TokenBucket:
    """Algorithme Token Bucket pour rate limiting"""
    capacity: int
    refill_rate: float  # tokens par seconde
    tokens: float = None
    last_refill: float = None
    
    def __post_init__(self):
        self.tokens = float(self.capacity)
        self.last_refill = time.time()
    
    def consume(self, tokens: int) -> bool:
        """Retourne True si les tokens peuvent être consommés"""
        self._refill()
        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        return False
    
    def _refill(self):
        """Réapprovisionnement automatique"""
        now = time.time()
        elapsed = now - self.last_refill
        new_tokens = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_refill = now
    
    def wait_time(self, tokens: int) -> float:
        """Temps d'attente estimé en secondes"""
        self._refill()
        if self.tokens >= tokens:
            return 0
        return (tokens - self.tokens) / self.refill_rate


@dataclass
class CostMetrics:
    """Métriques de coût par modèle"""
    model: str
    total_requests: int = 0
    total_input_tokens: int = 0
    total_output_tokens: int = 0
    total_cost_usd: float = 0.0
    
    # Tarifs HolySheep 2026 (USD/M tokens)
    PRICES = {
        'gpt-4.1': {'input': 8, 'output': 8},
        'claude-sonnet-4.5': {'input': 15, 'output': 15},
        'gemini-2.5-flash': {'input': 2.50, 'output': 2.50},
        'deepseek-v3.2': {'input': 0.42, 'output': 0.42},
    }
    
    def add_request(self, input_tokens: int, output_tokens: int):
        """Ajoute les tokens d'une requête au total"""
        self.total_requests += 1
        self.total_input_tokens += input_tokens
        self.total_output_tokens += output_tokens
        
        prices = self.PRICES.get(self.model, self.PRICES['deepseek-v3.2'])
        cost = (input_tokens * prices['input'] + output_tokens * prices['output']) / 1_000_000
        self.total_cost_usd += cost
    
    @property
    def avg_cost_per_request(self) -> float:
        if self.total_requests == 0:
            return 0
        return self.total_cost_usd / self.total_requests
    
    def report(self) -> Dict:
        return {
            'model': self.model,
            'requests': self.total_requests,
            'input_tokens': self.total_input_tokens,
            'output_tokens': self.total_output_tokens,
            'total_cost_usd': round(self.total_cost_usd, 6),
            'avg_cost_per_request': round(self.avg_cost_per_request, 6),
            'cost_per_1k_input': round(self.PRICES.get(self.model, {}).get('input', 0) / 1000, 6),
            'cost_per_1k_output': round(self.PRICES.get(self.model, {}).get('output', 0) / 1000, 6),
        }


class SmartRateLimiter:
    """
    Rate limiter intelligent avec:
    - Token Bucket par modèle
    - Exponential backoff
    - Cost-aware routing
    """
    
    def __init__(self, config: RateLimitConfig = None):
        self.config = config or RateLimitConfig()
        self.buckets: Dict[str, TokenBucket] = {}
        self.metrics: Dict[str, CostMetrics] = defaultdict(
            lambda: CostMetrics(model='deepseek-v3.2')
        )
        self.request_history: List[Dict] = []
        self._initialize_buckets()
    
    def _initialize_buckets(self):
        """Initialise les buckets pour chaque modèle"""
        models = ['gpt-4.1', 'claude-sonnet-4.5', 'gemini-2.5-flash', 'deepseek-v3.2']
        rpm = self.config.requests_per_minute
        tpm = self.config.tokens_per_minute
        
        for model in models:
            self.buckets[model] = TokenBucket(
                capacity=rpm,
                refill_rate=rpm / 60.0
            )
    
    async def acquire(self, model: str, priority: int = 1) -> bool:
        """
        Acquiert la permission pour une requête
        Retourne True si autorisé, False si rate limited
        """
        bucket = self.buckets.get(model)
        if not bucket:
            logger.warning(f"Unknown model: {model}, using default")
            model = 'deepseek-v3.2'
            bucket = self.buckets[model]
        
        # Haute priorité = tolerer burst
        tokens_needed = 1 if priority >= 3 else 1
        
        if bucket.consume(tokens_needed):
            return True
        
        wait_time = bucket.wait_time(tokens_needed)
        logger.info(f"Rate limit for {model}, waiting {wait_time:.2f}s")
        await asyncio.sleep(wait_time)
        return True
    
    def record_request(self, model: str, input_tokens: int, output_tokens: int, 
                       latency_ms: float, success: bool):
        """Enregistre une requête pour les métriques"""
        self.metrics[model].add_request(input_tokens, output_tokens)
        
        self.request_history.append({
            'timestamp': time.time(),
            'model': model,
            'input_tokens': input_tokens,
            'output_tokens': output_tokens,
            'latency_ms': latency_ms,
            'success': success
        })
        
        # Garder seulement les 1000 dernières requêtes
        if len(self.request_history) > 1000:
            self.request_history = self.request_history[-1000:]
    
    def get_optimal_model(self, task_type: str = 'general') -> str:
        """
        Retourne le modèle optimal selon le type de tâche
        Basé sur le rapport coût/efficacité
        """
        if task_type == 'embedding':
            return 'deepseek-v3.2'
        elif task_type == 'fast_response':
            return 'gemini-2.5-flash'
        elif task_type == 'high_quality':
            return 'claude-sonnet-4.5'
        elif task_type == 'budget':
            return 'deepseek-v3.2'
        else:
            return 'deepseek-v3.2'
    
    def get_total_cost(self) -> float:
        """Calcule le coût total de toutes les requêtes"""
        return sum(m.total_cost_usd for m in self.metrics.values())
    
    def get_cost_report(self) -> Dict:
        """Génère un rapport détaillé des coûts"""
        return {
            'total_cost_usd': round(self.get_total_cost(), 6),
            'by_model': {model: metrics.report() 
                        for model, metrics in self.metrics.items()},
            'total_requests': sum(m.total_requests for m in self.metrics.values()),
            'total_tokens': sum(
                m.total_input_tokens + m.total_output_tokens 
                for m in self.metrics.values()
            ),
        }
    
    def simulate_monthly_cost(self, daily_requests: int, 
                             avg_input_tokens: int, avg_output_tokens: int,
                             model: str = 'deepseek-v3.2') -> Dict:
        """Simule le coût mensuel basé sur les métriques actuelles"""
        prices = CostMetrics.PRICES.get(model, CostMetrics.PRICES['deepseek-v3.2'])
        
        daily_tokens_input = daily_requests * avg_input_tokens
        daily_tokens_output = daily_requests * avg_output_tokens
        daily_cost = (daily_tokens_input * prices['input'] + 
                     daily_tokens_output * prices['output']) / 1_000_000
        
        monthly_cost = daily_cost * 30
        yearly_cost = monthly_cost * 12
        
        return {
            'model': model,
            'daily_requests': daily_requests,
            'monthly_requests': daily_requests * 30,
            'daily_cost': round(daily_cost, 4),
            'monthly_cost': round(monthly_cost, 2),
            'yearly_cost': round(yearly_cost, 2),
            'price_per_million_input': prices['input'],
            'price_per_million_output': prices['output'],
        }


Exemple d'utilisation

async def main(): limiter = SmartRateLimiter() # Simuler des requêtes for i in range(100): model = limiter.get_optimal_model('budget') await limiter.acquire(model) # Simuler une réponse limiter.record_request( model=model, input_tokens=500, output_tokens=200, latency_ms=45, success=True ) # Rapport des coûts report = limiter.get_cost_report() print(f"Coût total: ${report['total_cost_usd']}") print(f"Requêtes totales: {report['total_requests']}") # Simulation coût mensuel simulation = limiter.simulate_monthly_cost( daily_requests=10000, avg_input_tokens=500, avg_output_tokens=200, model='deepseek-v3.2' ) print(f"Coût mensuel simulé: ${simulation['monthly_cost']}") if __name__ == '__main__': asyncio.run(main())

Benchmarks comparatifs HolySheep vs alternatives

ModèleProviderPrix Input ($/Mtok)Prix Output ($/Mtok)Latence P50Latence P95Disponibilité
GPT-4.1HolySheep$8.00$8.00420ms890ms99.9%
GPT-4.1OpenAI US$15.00$60.00450ms950ms99.5%
Claude Sonnet 4.5HolySheep$15.00$15.00380ms780ms99.8%
Claude Sonnet 4.5Anthropic US$18.00$54.00400ms820ms99.7%
Gemini 2.5 FlashHolySheep$2.50$2.50180ms350ms99.9%
DeepSeek V3.2HolySheep$0.42$0.4295ms180ms99.95%

Erreurs courantes et solutions

1. Erreur 429 Too Many Requests

# Solution: Implémenter un Exponential Backoff intelligent

import time
import random
from typing import Callable, Any

class ExponentialBackoff:
    """Backoff exponentiel avec jitter pour rate limits"""
    
    def __init__(self, base_delay: float = 1.0, max_delay: float = 60.0, 
                 max_retries: int = 5, jitter: bool = True):
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.max_retries = max_retries
        self.jitter = jitter
    
    def calculate_delay(self, attempt: int, retry_after: int = None) -> float:
        """Calcule le délai avec backoff exponentiel"""
        if retry_after:
            # Respecter le Retry-After du serveur
            return min(retry_after, self.max_delay)
        
        # Backoff exponentiel: base * 2^attempt
        delay = self.base_delay * (2 ** attempt)
        
        if self.jitter:
            # Ajouter du jitter pour éviter le thundering herd
            delay = delay * (0.5 + random.random() * 0.5)
        
        return min(delay, self.max_delay)


async def request_with_backoff(func: Callable, *args, **kwargs) -> Any:
    """Exécute une requête avec retry automatique"""
    backoff = ExponentialBackoff()
    
    for attempt in range(backoff.max_retries):
        try:
            response = await func(*args, **kwargs)
            
            if response.status_code == 429:
                # Extraire Retry-After header
                retry_after = int(response.headers.get('Retry-After', 0))
                delay = backoff.calculate_delay(attempt, retry_after)
                print(f"Rate limited. Retry in {delay:.2f}s (attempt {attempt + 1})")
                await asyncio.sleep(delay)
                continue
            
            return response
            
        except Exception as e:
            if attempt == backoff.max_retries - 1:
                raise
            delay = backoff.calculate_delay(attempt)
            print(f"Error: {e}. Retrying in {delay:.2f}s")
            await asyncio.sleep(delay)
    
    raise Exception(f"Max retries ({backoff.max_retries}) exceeded")

2. Timeout sur longues requêtes

# Solution: Timeout adaptatif basé sur la taille des tokens

class AdaptiveTimeout:
    """Timeout qui s'adapte à la longueur de la requête"""
    
    BASE_TIMEOUT = 30  # secondes
    TOKENS_PER_SECOND = 50  # estimation conservative
    
    @classmethod
    def calculate_timeout(cls, max_tokens: int, estimated_input_tokens: int = 500) -> int:
        """Calcule un timeout adapté"""
        # Temps pour output = max_tokens / TOKENS_PER_SECOND
        # + temps de processing (estimate: 2x base)
        processing_time = (max_tokens + estimated_input_tokens) / cls.TOKENS_PER_SECOND
        
        # Ajouter buffer pour latence réseau (~2s)
        timeout = max(cls.BASE_TIMEOUT, processing_time + 2)
        
        # Maximum 5 minutes pour très longues réponses
        return min(int(timeout), 300)


Utilisation dans la requête

timeout_seconds = AdaptiveTimeout.calculate_timeout( max_tokens=2000, estimated_input_tokens=1000 ) response = requests.post( f"{BASE_URL}/chat/completions", headers=headers, json=payload, timeout=timeout_seconds )

3. Consommation excessive de tokens

# Solution: Cache intelligent avec hash des prompts

import hashlib
import json
from functools import lru_cache
from typing import Optional, Dict

class PromptCache:
    """Cache LRU pour réponses de prompts similaires"""
    
    def __init__(self, max_size: int = 10000):
        self.cache: Dict[str, str] = {}
        self.max_size = max_size
        self.stats = {'hits': 0, 'misses': 0, 'savings_tokens': 0}
    
    def _hash_prompt(self, messages: list) -> str:
        """Génère un hash unique pour le prompt"""
        # Normaliser pour éviter les variations insignifiantes
        normalized = json.dumps(messages, sort_keys=True, ensure_ascii=False)
        return hashlib.sha256(normalized.encode()).hexdigest()[:16]
    
    def get(self, messages: list) -> Optional[dict]:
        """Récupère une réponse cachée"""
        key = self._hash_prompt(messages)
        cached = self.cache.get(key)
        
        if cached:
            self.stats['hits'] += 1
            # Estimer l'économie de tokens
            self.stats['savings_tokens'] += 150  # réponse moyenne
            return json.loads(cached)
        
        self.stats['misses'] += 1
        return None
    
    def set(self, messages: list, response: dict):
        """Stocke une réponse dans le cache"""
        key = self._hash_prompt(messages)
        self.cache[key] = json.dumps(response)
        
        # LRU eviction
        if len(self.cache) > self.max_size:
            oldest_key = next(iter(self.cache))
            del self.cache[oldest_key]
    
    def get_stats(self) -> dict:
        total = self.stats['hits'] + self.stats['misses']
        hit_rate = (self.stats['hits'] / total * 100) if total > 0 else 0
        estimated_savings = self.stats['savings_tokens'] * 0.00042  # prix DeepSeek
        
        return {
            'hit_rate': f"{hit_rate:.1f}%",
            'total_requests': total,
            'cache_hits