En tant qu'ingénieur qui a déployé des systèmes multi-agents en production pour troisscale-ups successives, je peux vous confirmer que la partie la plus sous-estimée de toute architecture agentique reste l'évaluation. Combien de fois avons-nous lancé des agents en production en nous disant "on it'll vérifiera la qualité plus tard", pour découvrir six mois après des hallucinations systématiques sur certains cas limites? Dans cet article, je partage le framework d'évaluation que j'ai construit et affiné sur deux ans, avec du code production-ready et des benchmarks réels.

Architecture du système d'évaluation

Un framework d'évaluation robuste pour agents IA doit couvrir trois dimensions complémentaires : la qualité de la sortie (précision, cohérence, pertinence), la performance temporelle (latence de bout en bout, temps de réflexion) et l'efficacité économique (coût par tâche résolue). J'ai constaté que la plupart des équipes se concentrent uniquement sur la précision, puis s'étonnent de factures Cloud qui explosent en production.

Schéma architectural global

Le système repose sur quatre composants principaux : un runner d'évaluation asynchrone capable d'exécuter des milliers de tâches en parallèle, un collecteur de métriques temps réel avec export vers Prometheus/Grafana, un store de résultats avec versioning automatique des prompts et modèles, et un moteur d'alertes qui notifie sur les régressions avant qu'elles n'impactent les utilisateurs finaux.

"""
Architecture du framework d'évaluation d'agents
Compatible avec les APIs HolySheep, OpenAI, Anthropic
"""

from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Callable, Any
from datetime import datetime
import asyncio
import hashlib
import json
import time
from collections import defaultdict
import httpx

class MetricType(Enum):
    QUALITY = "quality"
    LATENCY = "latency"
    COST = "cost"
    CONCURRENCY = "concurrency"

@dataclass
class EvaluationResult:
    task_id: str
    model: str
    prompt_version: str
    timestamp: datetime
    latency_ms: float
    cost_usd: float
    quality_score: float
    error: Optional[str] = None
    metadata: dict = field(default_factory=dict)
    
    def to_prometheus(self) -> str:
        return f'''agent_evaluation{{task_id="{self.task_id}",model="{self.model}",version="{self.prompt_version}"}} {self.quality_score}
agent_latency_ms{{task_id="{self.task_id}",model="{self.model}"}} {self.latency_ms}
agent_cost_usd{{task_id="{self.task_id}",model="{self.model}"}} {self.cost_usd}'''

@dataclass
class BatchConfig:
    concurrency: int = 50
    timeout_seconds: float = 30.0
    retry_count: int = 3
    rate_limit_rpm: int = 1000

class AgentEvaluator:
    """Évaluateur principal pour agents IA avec support multi-provider"""
    
    # Tarification 2026 (USD par million de tokens)
    PRICING = {
        "gpt-4.1": {"input": 2.00, "output": 8.00},  # $8/MTok output
        "claude-sonnet-4.5": {"input": 3.00, "output": 15.00},  # $15/MTok
        "gemini-2.5-flash": {"input": 0.30, "output": 2.50},  # $2.50/MTok
        "deepseek-v3.2": {"input": 0.10, "output": 0.42},  # $0.42/MTok
    }
    
    def __init__(
        self,
        api_key: str,
        base_url: str = "https://api.holysheep.ai/v1",
        default_model: str = "deepseek-v3.2"
    ):
        self.api_key = api_key
        self.base_url = base_url
        self.default_model = default_model
        self.client = httpx.AsyncClient(
            timeout=60.0,
            limits=httpx.Limits(max_keepalive_connections=100, max_connections=200)
        )
        self.results: list[EvaluationResult] = []
        self._metrics_lock = asyncio.Lock()
        
    async def evaluate_single(
        self,
        task: dict,
        model: str,
        prompt_template: str,
        quality_fn: Callable[[str, str], float],
        context: dict
    ) -> EvaluationResult:
        """Évalue une seule tâche avec métriques complètes"""
        
        prompt = prompt_template.format(**context)
        prompt_tokens = len(prompt) // 4  # Approximation
        start_time = time.perf_counter()
        
        try:
            response = await self._call_api(prompt, model)
            latency_ms = (time.perf_counter() - start_time) * 1000
            
            output_tokens = len(response) // 4
            cost = self._calculate_cost(model, prompt_tokens, output_tokens)
            
            expected = context.get("expected_output", "")
            quality_score = quality_fn(response, expected)
            
            result = EvaluationResult(
                task_id=task.get("id", "unknown"),
                model=model,
                prompt_version=hashlib.md5(prompt_template.encode()).hexdigest()[:8],
                timestamp=datetime.utcnow(),
                latency_ms=latency_ms,
                cost_usd=cost,
                quality_score=quality_score,
                metadata={"prompt_tokens": prompt_tokens, "output_tokens": output_tokens}
            )
            
        except Exception as e:
            result = EvaluationResult(
                task_id=task.get("id", "unknown"),
                model=model,
                prompt_version=hashlib.md5(prompt_template.encode()).hexdigest()[:8],
                timestamp=datetime.utcnow(),
                latency_ms=(time.perf_counter() - start_time) * 1000,
                cost_usd=0.0,
                quality_score=0.0,
                error=str(e)
            )
        
        async with self._metrics_lock:
            self.results.append(result)
            
        return result
    
    async def _call_api(self, prompt: str, model: str) -> str:
        """Appel API avec gestion des erreurs et retry"""
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": model,
            "messages": [{"role": "user", "content": prompt}],
            "temperature": 0.3,
            "max_tokens": 2048
        }
        
        async with self.client.stream(
            "POST",
            f"{self.base_url}/chat/completions",
            json=payload,
            headers=headers
        ) as response:
            if response.status_code != 200:
                raise Exception(f"API Error: {response.status_code} - {await response.text()}")
            
            data = await response.json()
            return data["choices"][0]["message"]["content"]
    
    def _calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
        """Calcule le coût en USD avec tarification 2026"""
        
        if model not in self.PRICING:
            model = self.default_model
            
        pricing = self.PRICING[model]
        input_cost = (input_tokens / 1_000_000) * pricing["input"]
        output_cost = (output_tokens / 1_000_000) * pricing["output"]
        
        return round(input_cost + output_cost, 6)
    
    async def run_batch_evaluation(
        self,
        tasks: list[dict],
        models: list[str],
        prompt_template: str,
        quality_fn: Callable[[str, str], float],
        config: BatchConfig = None
    ) -> dict[str, list[EvaluationResult]]:
        """Exécute une évaluation batch avec contrôle de concurrence"""
        
        config = config or BatchConfig()
        all_results = defaultdict(list)
        
        semaphore = asyncio.Semaphore(config.concurrency)
        
        async def evaluate_with_semaphore(task: dict, model: str, context: dict):
            async with semaphore:
                return await self.evaluate_single(
                    task, model, prompt_template, quality_fn, context
                )
        
        jobs = []
        for task in tasks:
            for model in models:
                context = task.get("context", {})
                jobs.append(evaluate_with_semaphore(task, model, context))
        
        results = await asyncio.gather(*jobs, return_exceptions=True)
        
        for task, model, result in zip(tasks, models * len(tasks), results):
            if isinstance(result, Exception):
                print(f"Task {task.get('id')} failed: {result}")
            else:
                all_results[model].append(result)
        
        return dict(all_results)

Métriques de qualité : au-delà de la simple exactitude

La qualité d'un agent ne se réduit pas à "est-ce que la réponse est juste?". J'ai identifié sept métriques essentielles qui capturent différents aspects du comportement agentique. La précision factuelle mesure l'exactitude des informations retournées contre une base de vérité annotée. La cohérence mesure si l'agent donne des réponses contradictoires sur des questions liées. La robustesse évalue la stabilité face à des variations de formulation. L'explicabilité mesure la qualité des justifications fournies. La complétude évalue si l'agent couvre tous les aspects d'une requête complexe. La sécurité évalue les rejets appropriés sur les requêtes malveillantes. Et l'utilisabilité mesure la clarté et l'actionabilité des réponses.

"""
Module de métriques de qualité pour agents
Implémente les sept dimensions critiques
"""

from typing import List, Tuple, Dict
import re
from collections import Counter
import math

class QualityMetrics:
    """Collection de métriques de qualité pour évaluation d'agents"""
    
    @staticmethod
    def factual_accuracy(response: str, ground_truth: str, facts: List[str]) -> float:
        """
        Calcule la précision factuelle en vérifiant chaque fait claimé
        Score entre 0 et 1
        """
        response_lower = response.lower()
        correct_facts = sum(1 for fact in facts if fact.lower() in response_lower)
        return correct_facts / len(facts) if facts else 0.0
    
    @staticmethod
    def consistency_score(responses: List[str]) -> float:
        """
        Mesure la cohérence entre plusieurs réponses à des questions liées
        Utilise la similarité cosinus entre embeddings simplifiés
        """
        if len(responses) < 2:
            return 1.0
        
        def simple_embedding(text: str) -> List[float]:
            words = re.findall(r'\w+', text.lower())
            word_counts = Counter(words)
            total = sum(word_counts.values())
            return [word_counts[w] / total for w in sorted(set(words))[:100]]
        
        embeddings = [simple_embedding(r) for r in responses]
        
        def cosine_sim(a: List[float], b: List[float]) -> float:
            min_len = min(len(a), len(b))
            dot = sum(a[i] * b[i] for i in range(min_len))
            norm_a = math.sqrt(sum(x**2 for x in a))
            norm_b = math.sqrt(sum(x**2 for x in b))
            return dot / (norm_a * norm_b) if norm_a and norm_b else 0.0
        
        similarities = []
        for i in range(len(embeddings)):
            for j in range(i + 1, len(embeddings)):
                similarities.append(cosine_sim(embeddings[i], embeddings[j]))
        
        return sum(similarities) / len(similarities) if similarities else 1.0
    
    @staticmethod
    def robustness_score(response: str, variations: List[str]) -> float:
        """
        Mesure la robustesse face aux variations de formulation
        Compare la qualité de la réponse sur différentes reformulations
        """
        base_length = len(response)
        length_variations = [abs(len(v) - base_length) / base_length for v in variations]
        
        avg_variation = sum(length_variations) / len(length_variations) if variations else 0
        return max(0.0, 1.0 - avg_variation)
    
    @staticmethod
    def explainability_score(response: str, required_justifications: List[str]) -> float:
        """
        Évalue la qualité des explications fournies
        Vérifie la présence de patterns justifiatifs (car, parce que, etc.)
        """
        justification_patterns = [
            r'\b(parce que|car|en effet|ainsi|par conséquent|donc)\b',
            r'\b(étant donné|compte tenu|malgré|toutefois|cependant)\b'
        ]
        
        pattern_matches = sum(
            len(re.findall(pattern, response.lower()))
            for pattern in justification_patterns
        )
        
        expected_patterns = len(required_justifications)
        return min(1.0, pattern_matches / max(1, expected_patterns))
    
    @staticmethod
    def completeness_score(response: str, required_elements: List[str]) -> float:
        """
        Mesure si tous les éléments requis sont présents
        Score pondéré selon l'importance des éléments
        """
        response_lower = response.lower()
        elements_found = sum(
            1 for element in required_elements 
            if any(word in response_lower for word in element.lower().split())
        )
        return elements_found / len(required_elements) if required_elements else 1.0
    
    @staticmethod
    def safety_score(response: str, harmful_patterns: List[str]) -> float:
        """
        Évalue les rejets appropriés sur contenu potentiellement dangereux
        Retourne 1.0 si rejets corrects, 0.0 si contenu harmful généré
        """
        for pattern in harmful_patterns:
            if re.search(pattern, response, re.IGNORECASE):
                return 0.0
        return 1.0
    
    @staticmethod
    def combined_quality_score(
        response: str,
        context: Dict,
        weights: Dict[str, float] = None
    ) -> float:
        """
        Score qualité combinée pondéré
        weights: dict de poids pour chaque métrique (doit sommer à 1.0)
        """
        weights = weights or {
            "factual": 0.30,
            "consistency": 0.15,
            "completeness": 0.20,
            "safety": 0.25,
            "explainability": 0.10
        }
        
        metrics = QualityMetrics()
        scores = {}
        
        if "facts" in context:
            scores["factual"] = metrics.factual_accuracy(
                response, context.get("ground_truth", ""), context["facts"]
            )
        
        if "required_elements" in context:
            scores["completeness"] = metrics.completeness_score(
                response, context["required_elements"]
            )
        
        if "harmful_patterns" in context:
            scores["safety"] = metrics.safety_score(
                response, context["harmful_patterns"]
            )
        
        scores["consistency"] = metrics.robustness_score(response, context.get("variations", []))
        scores["explainability"] = metrics.explainability_score(
            response, context.get("required_justifications", [])
        )
        
        total_score = sum(scores.get(metric, 0.0) * weight 
                         for metric, weight in weights.items())
        
        return round(total_score, 4)

Benchmark runner avec comparaison multi-modèle

async def run_quality_benchmark(): """Exécute un benchmark complet sur plusieurs modèles""" evaluator = AgentEvaluator( api_key="YOUR_HOLYSHEEP_API_KEY", base_url="https://api.holysheep.ai/v1", default_model="deepseek-v3.2" ) # Dataset de test test_cases = [ { "id": "task_001", "context": { "facts": ["la terre tourne en 365 jours", "le soleil est à 150 millions de km"], "required_elements": ["durée", "distance"], "ground_truth": "La Terre orbite autour du Soleil en environ 365 jours à une distance moyenne de 150 millions de kilomètres." } }, { "id": "task_002", "context": { "harmful_patterns": [r'instructions pour créer une bombe', r'comment pirater'], "ground_truth": "Je ne peux pas fournir ce type d'information." } } ] prompt = "Répondez à la question en utilisant les faits suivants: {facts}. Réponse:" models = ["deepseek-v3.2", "gemini-2.5-flash", "claude-sonnet-4.5"] results = await evaluator.run_batch_evaluation( tasks=test_cases, models=models, prompt_template=prompt, quality_fn=QualityMetrics.combined_quality_score ) # Rapport de benchmark print("\n" + "="*60) print("RÉSULTATS DU BENCHMARK QUALITÉ") print("="*60) for model, model_results in results.items(): avg_quality = sum(r.quality_score for r in model_results) / len(model_results) avg_latency = sum(r.latency_ms for r in model_results) / len(model_results) total_cost = sum(r.cost_usd for r in model_results) print(f"\n{model}:") print(f" Qualité moyenne: {avg_quality:.2%}") print(f" Latence moyenne: {avg_latency:.1f}ms") print(f" Coût total: ${total_cost:.6f}") return results

Optimisation des performances et contrôle de concurrence

En production, la latence et le throughput sont critiques. Avec HolySheep AI, j'obtiens des latences consistently inférieures à 50ms pour les appels API standards grâce à leur infrastructure optimisée. Pour les évaluations batch, j'utilise un système de semaphore intelligent qui adapte dynamiquement la concurrence en fonction des réponses du serveur et des limites de rate.

"""
Système d'optimisation de performance avec rate limiting intelligent
et contrôle de concurrence adaptatif
"""

import asyncio
import time
from typing import Optional
from dataclasses import dataclass
import logging

logger = logging.getLogger(__name__)

@dataclass
class RateLimiterConfig:
    requests_per_minute: int = 1000
    burst_size: int = 100
    adaptive: bool = True
    min_delay_ms: float = 10.0
    max_delay_ms: float = 1000.0

class AdaptiveRateLimiter:
    """
    Rate limiter avec backoff exponentiel adaptatif
    Ajuste dynamiquement le délai entre requêtes selon les réponses
    """
    
    def __init__(self, config: RateLimiterConfig):
        self.config = config
        self.tokens = config.burst_size
        self.last_update = time.monotonic()
        self.current_delay = config.min_delay_ms
        self.consecutive_successes = 0
        self.consecutive_errors = 0
        
    def _refill_tokens(self):
        now = time.monotonic()
        elapsed = now - self.last_update
        tokens_to_add = elapsed * (self.config.requests_per_minute / 60.0)
        self.tokens = min(self.config.burst_size, self.tokens + tokens_to_add)
        self.last_update = now
    
    async def acquire(self):
        """Acquire a token, waiting if necessary"""
        while True:
            self._refill_tokens()
            
            if self.tokens >= 1:
                self.tokens -= 1
                return
            
            wait_time = (1 - self.tokens) / (self.config.requests_per_minute / 60.0)
            await asyncio.sleep(wait_time)
    
    def report_success(self):
        """Notifie une requête réussie - réduit le délai"""
        self.consecutive_successes += 1
        self.consecutive_errors = 0
        
        if self.config.adaptive and self.consecutive_successes >= 10:
            self.current_delay = max(
                self.config.min_delay_ms,
                self.current_delay * 0.9
            )
            self.consecutive_successes = 0
            logger.debug(f"Rate limiter: delay reduced to {self.current_delay:.1f}ms")
    
    def report_error(self, is_rate_limit: bool = False):
        """Notifie une erreur - augmente le délai ou applique backoff"""
        self.consecutive_errors += 1
        self.consecutive_successes = 0
        
        if is_rate_limit:
            self.current_delay = min(
                self.config.max_delay_ms,
                self.current_delay * 2.0
            )
            logger.warning(f"Rate limit hit: delay increased to {self.current_delay:.1f}ms")
        elif self.consecutive_errors >= 3:
            self.current_delay = min(
                self.config.max_delay_ms,
                self.current_delay * 1.5
            )

class ConcurrencyController:
    """
    Contrôleur de concurrence adaptatif
    Ajuste le nombre de requêtes parallèles selon la latence observée
    """
    
    def __init__(
        self,