En tant que développeur qui a intégré des dizaines de modèles LLM dans des applications de production, j'ai passé des heures interminables à déboguer des appels API silencieux, des latences inexpliquées et des coûts qui flambaient sans avertissement. Le Callback机制 de LangChain a transformé ma façon de superviser les intégrations IA. Aujourd'hui, je vous partage tout ce que j'ai appris après des mois de pratique intensive avec l'API HolySheep, qui offre des performances exceptionnelles à des tarifs défiant toute concurrence.

Tableau Comparatif : HolySheep vs API Officielle vs Services Relais

Critère HolySheep AI API OpenAI/Anthropic Officielle Services Relais Classiques
Latence moyenne moins de 50ms 150-300ms 100-250ms
Coût GPT-4.1 $8/1M tokens $8/1M tokens $10-12/1M tokens
Coût Claude Sonnet 4.5 $15/1M tokens $15/1M tokens $18-22/1M tokens
Coût DeepSeek V3.2 $0.42/1M tokens N/A $0.50-0.60/1M tokens
Économie via ¥1=$1 85%+ vs facturation USD Prix catalogue USD Frais supplémentaires
Paiement WeChat Pay, Alipay, USDT Carte internationale uniquement Limité
Crédits gratuits Oui, à l'inscription $5 pour nouveaux comptes Rarement
API compatible LangChain 100% compatible Natif Compatible mais lent

Pourquoi le Callback机制 Est Essentiel

Le système de Callbacks de LangChain fonctionne comme un système d'événements centralisé. Chaque opération — qu'il s'agisse d'un appel invoke(), batch() ou stream() — traverse une série de points d'ancrage où vous pouvez intercepter, mesurer et logger des informations cruciales.

Dans mon expérience, voici les problèmes que j'ai résolus grâce aux Callbacks :

Architecture du Système de Callbacks

Le CallbackHandler de LangChain définit plusieurs méthodes qui correspondent aux différentes phases du cycle de vie d'une requête :


Les méthodes principales du CallbackHandler

class BaseCallbackHandler: """Méthodes hookées par LangChain à chaque étape""" # === Gestion du run (cycle complet) === def on_llm_start(self, serialized, prompts, **kwargs): """Déclenché AVANT l'envoi de la requête au modèle""" pass def on_llm_new_token(self, token, **kwargs): """Déclenché pour chaque token en streaming""" pass def on_llm_end(self, response, **kwargs): """Déclenché APRÈS réception complète de la réponse""" pass def on_llm_error(self, error, **kwargs): """Déclenché en cas d'erreur pendant l'appel""" pass # === Pour les Chains === def on_chain_start(self, serialized, inputs, **kwargs): """Début d'exécution d'une chaîne""" pass def on_chain_end(self, outputs, **kwargs): """Fin réussie d'une chaîne""" pass def on_chain_error(self, error, **kwargs): """Erreur dans une chaîne""" pass # === Pour les Tools === def on_tool_start(self, serialized, input_str, **kwargs): """Appel d'un tool""" pass def on_tool_end(self, output, **kwargs): """Fin d'un tool""" pass def on_tool_error(self, error, **kwargs): """Erreur d'un tool""" pass

Implémentation Pratique avec HolySheep API

Passons aux choses sérieuses avec une implémentation complète. Le code suivant intègre l'API HolySheep avec un système de Callbacks complet pour la surveillance temps réel.


Installation requise: pip install langchain langchain-community langchain-openai

import os import time import json from datetime import datetime from typing import Optional, Any, Dict, List from dataclasses import dataclass, field from collections import defaultdict

Configuration HolySheep

os.environ["OPENAI_API_KEY"] = "YOUR_HOLYSHEEP_API_KEY" os.environ["OPENAI_API_BASE"] = "https://api.holysheep.ai/v1" from langchain_openai import ChatOpenAI from langchain.schema import HumanMessage, SystemMessage, LLMResult from langchain.callbacks.base import BaseCallbackHandler

============================================

MONITORING CALLBACK - Le cœur du système

============================================

@dataclass class RequestMetrics: """Métriques agrégées pour une requête""" start_time: float = field(default_factory=time.time) end_time: Optional[float] = None tokens_generated: int = 0 tokens_prompt: int = 0 cost_usd: float = 0.0 model: str = "" error: Optional[str] = None @property def latency_ms(self) -> float: if self.end_time: return (self.end_time - self.start_time) * 1000 return 0.0 @property def total_tokens(self) -> int: return self.tokens_prompt + self.tokens_generated class HolySheepMonitorCallback(BaseCallbackHandler): """Callback complet pour monitoring HolySheep API""" # Tarification HolySheep 2026 (en USD par million de tokens) PRICING = { "gpt-4.1": {"input": 2.00, "output": 8.00}, "gpt-4o-mini": {"input": 0.15, "output": 0.60}, "claude-sonnet-4.5": {"input": 3.00, "output": 15.00}, "claude-3-5-sonnet": {"input": 3.00, "output": 15.00}, "gemini-2.5-flash": {"input": 0.125, "output": 2.50}, "deepseek-v3.2": {"input": 0.14, "output": 0.42}, } def __init__(self): self.metrics: List[RequestMetrics] = [] self.current_metrics: Optional[RequestMetrics] = None self.total_cost = 0.0 self.request_count = 0 def on_llm_start( self, serialized: Dict[str, Any], prompts: List[str], **kwargs ) -> None: """Capture le début d'une requête LLM""" self.current_metrics = RequestMetrics() model = serialized.get("name", "unknown") self.current_metrics.model = model self.current_metrics.start_time = time.time() # Estimation des tokens du prompt (approximatif: 1 token ≈ 4 caractères) self.current_metrics.tokens_prompt = sum( len(p) // 4 for p in prompts ) print(f"🔵 [LLM START] Modèle: {model}") print(f" Prompt tokens estimés: {self.current_metrics.tokens_prompt}") def on_llm_new_token(self, token: str, **kwargs) -> None: """Capture chaque nouveau token en streaming""" if self.current_metrics: self.current_metrics.tokens_generated += 1 def on_llm_end(self, response: LLMResult, **kwargs) -> None: """Calcule les métriques finales après succès""" if not self.current_metrics: return self.current_metrics.end_time = time.time() # Extraction des métriques réelles depuis la réponse generation_info = response.generations[0][0].generation_info if generation_info: self.current_metrics.tokens_generated = generation_info.get( "token_usage", {} ).get("completion_tokens", self.current_metrics.tokens_generated) self.current_metrics.tokens_prompt = generation_info.get( "token_usage", {} ).get("prompt_tokens", self.current_metrics.tokens_prompt) # Calcul du coût basé sur le modèle model = self.current_metrics.model pricing = self.PRICING.get(model, {"input": 1.0, "output": 2.0}) cost = ( self.current_metrics.tokens_prompt * pricing["input"] / 1_000_000 + self.current_metrics.tokens_generated * pricing["output"] / 1_000_000 ) self.current_metrics.cost_usd = cost self.total_cost += cost self.request_count += 1 self.metrics.append(self.current_metrics) print(f"🟢 [LLM END] Latence: {self.current_metrics.latency_ms:.2f}ms") print(f" Tokens: {self.current_metrics.tokens_prompt} (prompt) + " f"{self.current_metrics.tokens_generated} (completion)") print(f" Coût: ${cost:.6f}") self.current_metrics = None def on_llm_error(self, error: Exception, **kwargs) -> None: """Log les erreurs pour debugging""" if self.current_metrics: self.current_metrics.error = str(error) self.current_metrics.end_time = time.time() self.metrics.append(self.current_metrics) print(f"🔴 [LLM ERROR] {error.__class__.__name__}: {error}") def get_summary(self) -> Dict[str, Any]: """Génère un résumé des métriques""" if not self.metrics: return {} latencies = [m.latency_ms for m in self.metrics] return { "total_requests": self.request_count, "total_cost_usd": self.total_cost, "total_tokens": sum(m.total_tokens for m in self.metrics), "avg_latency_ms": sum(latencies) / len(latencies), "min_latency_ms": min(latencies), "max_latency_ms": max(latencies), "error_count": sum(1 for m in self.metrics if m.error), }

============================================

UTILISATION PRATIQUE

============================================

def demo_holy_sheep_callback(): """Démonstration complète du système de callbacks""" # Initialisation du monitor monitor = HolySheepMonitorCallback() # Configuration du modèle avec HolySheep llm = ChatOpenAI( model="deepseek-v3.2", # Modèle économique: $0.42/1M tokens output temperature=0.7, callbacks=[monitor] # ← Intégration du callback ) print("=" * 60) print("DÉMO: Monitoring HolySheep API avec LangChain Callbacks") print("=" * 60) # Test avec un prompt simple response = llm.invoke( "Explique en 3 phrases ce qu'est un callback en programmation." ) print(f"\n📝 Réponse: {response.content}\n") # Test avec un prompt plus complexe messages = [ SystemMessage(content="Tu es un assistant technique concis."), HumanMessage(content="Qu'est-ce que le pattern Observer?") ] response2 = llm.invoke(messages) print(f"📝 Réponse 2: {response2.content}\n") # Affichage du résumé summary = monitor.get_summary() print("=" * 60) print("📊 RÉSUMÉ DES MÉTRIQUES") print("=" * 60) print(f" Requêtes totales: {summary['total_requests']}") print(f" Coût total: ${summary['total_cost_usd']:.6f}") print(f" Latence moyenne: {summary['avg_latency_ms']:.2f}ms") print(f" Latence min/max: {summary['min_latency_ms']:.2f}ms / " f"{summary['max_latency_ms']:.2f}ms") print(f" Erreurs: {summary['error_count']}") if __name__ == "__main__": demo_holy_sheep_callback()

Logs Structurés et Export vers Fichiers

Dans un contexte de production, vous voudrez persister vos logs. Voici une version avancée qui écrit dans des fichiers JSON structurés, idéal pour ingestion dans des systèmes comme ELK Stack ou Datadog.


import json
import logging
from pathlib import Path
from datetime import datetime
from threading import Lock

class ProductionCallbackHandler(BaseCallbackHandler):
    """Callback production-ready avec logs fichiers et console"""
    
    def __init__(
        self,
        log_dir: str = "./logs",
        log_level: int = logging.INFO,
        max_file_size_mb: int = 10
    ):
        super().__init__()
        
        self.log_dir = Path(log_dir)
        self.log_dir.mkdir(parents=True, exist_ok=True)
        
        # Logger principal avec rotation
        self.logger = logging.getLogger("holy_sheep_monitor")
        self.logger.setLevel(log_level)
        
        # Handler fichier avec JSON Lines
        log_file = self.log_dir / f"llm_calls_{datetime.now():%Y%m%d}.jsonl"
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(logging.Formatter('%(message)s'))
        self.logger.addHandler(file_handler)
        
        # Handler console
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(
            logging.Formatter('[%(levelname)s] %(message)s')
        )
        self.logger.addHandler(console_handler)
        
        self._lock = Lock()
        self._current_run_id = None
        self._run_data = {}
        
    def on_llm_start(
        self,
        serialized: Dict[str, Any],
        prompts: List[str],
        **kwargs
    ) -> None:
        """Enregistre le début avec timestamp précis"""
        run_id = kwargs.get("run_id", str(uuid.uuid4()))
        
        with self._lock:
            self._current_run_id = run_id
            self._run_data = {
                "event": "llm_start",
                "run_id": run_id,
                "timestamp": datetime.now().isoformat(),
                "model": serialized.get("name", "unknown"),
                "prompt_preview": prompts[0][:200] if prompts else "",
                "prompt_length": sum(len(p) for p in prompts),
                "env": "production"
            }
            
        self.logger.info(json.dumps(self._run_data))
        
    def on_llm_end(self, response: LLMResult, **kwargs) -> None:
        """Log la complétion avec métriques finales"""
        with self._lock:
            if self._current_run_id:
                end_data = {
                    "event": "llm_end",
                    "run_id": self._current_run_id,
                    "timestamp": datetime.now().isoformat(),
                    "latency_ms": (
                        datetime.now() - datetime.fromisoformat(
                            self._run_data["timestamp"]
                        )
                    ).total_seconds() * 1000,
                    "response_preview": (
                        response.generations[0][0].text[:200] 
                        if response.generations else ""
                    ),
                }
                
                # Ajout des token counts si disponibles
                if hasattr(response, 'llm_output') and response.llm_output:
                    usage = response.llm_output.get('token_usage', {})
                    end_data.update({
                        "prompt_tokens": usage.get('prompt_tokens', 0),
                        "completion_tokens": usage.get('completion_tokens', 0),
                        "total_tokens": usage.get('total_tokens', 0),
                    })
                
                self.logger.info(json.dumps(end_data))
                self._current_run_id = None
                self._run_data = {}
                
    def on_llm_error(self, error: Exception, **kwargs) -> None:
        """Log les erreurs pour alerting"""
        error_data = {
            "event": "llm_error",
            "run_id": self._current_run_id,
            "timestamp": datetime.now().isoformat(),
            "error_type": error.__class__.__name__,
            "error_message": str(error),
        }
        
        self.logger.error(json.dumps(error_data))
        
        # Réinitialisation de l'état
        with self._lock:
            self._current_run_id = None
            self._run_data = {}

Export du résumé CSV pour analyse

def export_metrics_csv(metrics: List[RequestMetrics], output_path: str): """Exporte les métriques en CSV pour analyse Excel/Python""" import csv with open(output_path, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=[ 'timestamp', 'model', 'latency_ms', 'prompt_tokens', 'completion_tokens', 'total_tokens', 'cost_usd', 'error' ]) writer.writeheader() for m in metrics: writer.writerow({ 'timestamp': datetime.fromtimestamp(m.start_time).isoformat(), 'model': m.model, 'latency_ms': round(m.latency_ms, 2), 'prompt_tokens': m.tokens_prompt, 'completion_tokens': m.tokens_generated, 'total_tokens': m.total_tokens, 'cost_usd': round(m.cost_usd, 6), 'error': m.error or '' }) print(f"✅ Métriques exportées vers {output_path}")

Callback pour Chaines et Tools

Les Callbacks ne se limitent pas aux appels LLM simples. Ils fonctionnent également pour les Chains LangChain et les Tools. Voici comment je surveille une chaîne de traitement complexe.


from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.tools import Tool
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.schema import AgentFinish, AgentAction

class ChainMonitorCallback(BaseCallbackHandler):
    """Surveillance complète d'une chaîne d'agents"""
    
    def __init__(self):
        self.chain_timeline = []
        self.tool_calls = defaultdict(int)
        
    def on_chain_start(
        self,
        serialized: Dict[str, Any],
        inputs: Dict[str, Any],
        **kwargs
    ) -> None:
        """Capture le début d'une chaîne"""
        chain_name = serialized.get("name", serialized.get("id", ["unknown"])[-1])
        self.chain_timeline.append({
            "type": "chain_start",
            "name": chain_name,
            "time": time.time()
        })
        print(f"🔗 [CHAIN START] {chain_name}")
        
    def on_chain_end(self, outputs: Dict[str, Any], **kwargs) -> None:
        """Capture la fin d'une chaîne"""
        chain_name = self.chain_timeline[-1]["name"] if self.chain_timeline else "unknown"
        duration = time.time() - self.chain_timeline[-1]["time"]
        
        self.chain_timeline.append({
            "type": "chain_end",
            "name": chain_name,
            "time": time.time(),
            "duration_ms": duration * 1000
        })
        print(f"🔗 [CHAIN END] {chain_name} ({duration*1000:.2f}ms)")
        
    def on_tool_start(
        self,
        serialized: Dict[str, Any],
        input_str: str,
        **kwargs
    ) -> None:
        """Surveille l'appel des tools"""
        tool_name = serialized.get("name", "unknown")
        self.tool_calls[tool_name] += 1
        
        self.chain_timeline.append({
            "type": "tool_start",
            "name": tool_name,
            "time": time.time(),
            "input_preview": input_str[:100]
        })
        print(f"🔧 [TOOL START] {tool_name}")
        
    def on_tool_end(self, output: str, **kwargs) -> None:
        """Surveille la fin des tools"""
        tool_name = self.chain_timeline[-1]["name"]
        duration = time.time() - self.chain_timeline[-1]["time"]
        
        self.chain_timeline.append({
            "type": "tool_end",
            "name": tool_name,
            "time": time.time(),
            "duration_ms": duration * 1000,
            "output_preview": output[:100]
        })
        print(f"🔧 [TOOL END] {tool_name} ({duration*1000:.2f}ms)")
        
    def on_tool_error(self, error: Exception, **kwargs) -> None:
        """Log les erreurs de tools"""
        print(f"🔴 [TOOL ERROR] {error}")

============================================

EXEMPLE COMPLET: AGENT AVEC MONITORING

============================================

def create_monitored_agent(): """Crée un agent avec monitoring complet""" # Définition des tools def calculate(expression: str) -> str: """Évalue une expression mathématique simple""" try: result = eval(expression) return f"Résultat: {result}" except Exception as e: return f"Erreur: {e}" def search_docs(query: str) -> str: """Simule une recherche dans la documentation""" docs = { "callback": "Un callback est une fonction passée comme argument...", "langchain": "LangChain est un framework pour construire des apps IA...", } return docs.get(query.lower(), "Documentation non trouvée.") tools = [ Tool(name="Calculator", func=calculate, description="Pour calculs mathématiques"), Tool(name="SearchDocs", func=search_docs, description="Recherche dans la docs"), ] # Prompt de l'agent prompt = ChatPromptTemplate.from_messages([ ("system", "Tu es un assistant technique qui utilise des outils."), ("human", "{input}"), MessagesPlaceholder(variable_name="agent_scratchpad"), ]) # Initialisation du monitor monitor = ChainMonitorCallback() # Création de l'agent avec HolySheep llm = ChatOpenAI( model="gpt-4o-mini", # Modèle économique: $0.60/1M output callbacks=[monitor] ) agent = create_openai_functions_agent(llm, tools, prompt) executor = AgentExecutor( agent=agent, tools=tools, callbacks=[monitor], # Surveillance des tools aussi verbose=True ) return executor, monitor

Exécution de démonstration

if __name__ == "__main__": agent, monitor = create_monitored_agent() result = agent.invoke({ "input": "Calcule 15 * 23 + 45, puis cherche des infos sur 'callback'" }) print("\n" + "=" * 50) print("TIMELINE DES OPÉRATIONS") print("=" * 50) for event in monitor.chain_timeline: print(f" {event}") print(f"\n📊 Stats: {dict(monitor.tool_calls)}")

Intégration avec AsyncIO pour Haute Performance

Pour les applications nécessitant des performances maximales, l'intégration avec AsyncIO permet de gérer des centaines de requêtes simultanées sans bloquer le thread principal.


import asyncio
from typing import Optional
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
from datetime import datetime

class AsyncStreamingCallback(BaseCallbackHandler):
    """Callback async avec streaming et buffer de tokens"""
    
    def __init__(self):
        self.tokens_buffer = []
        self.start_time: Optional[float] = None
        self.first_token_time: Optional[float] = None
        
    async def on_llm_start(
        self,
        serialized: Dict[str, Any],
        prompts: List[str],
        **kwargs
    ) -> None:
        self.start_time = time.time()
        self.tokens_buffer = []
        print(f"⏱️ Requête démarrée à {datetime.now():%H:%M:%S.%f}")
        
    async def on_llm_new_token(self, token: str, **kwargs) -> None:
        if self.first_token_time is None:
            self.first_token_time = time.time()
            ttft = (self.first_token_time - self.start_time) * 1000
            print(f"⚡ Time To First Token (TTFT): {ttft:.2f}ms")
            
        self.tokens_buffer.append(token)
        
    async def on_llm_end(self, response: LLMResult, **kwargs) -> None:
        end_time = time.time()
        
        total_time = (end_time - self.start_time) * 1000
        tokens_count = len(self.tokens_buffer)
        
        if tokens_count > 0:
            throughput = tokens_count / ((end_time - self.first_token_time) * 1000)
            print(f"✅ Requête terminée en {total_time:.2f}ms")
            print(f"   Tokens générés: {tokens_count}")
            print(f"   Throughput: {throughput:.2f} tokens/ms")

async def parallel_requests_demo():
    """Lance plusieurs requêtes en parallèle avec monitoring"""
    
    monitor = AsyncStreamingCallback()
    
    llm = ChatOpenAI(
        model="deepseek-v3.2",
        streaming=True,
        callbacks=[monitor]
    )
    
    prompts = [
        "Explique la programmation asynchrone en Python.",
        "Qu'est-ce que FastAPI et ses avantages?",
        "Comment implémenter un callback en JavaScript?",
        "Décris le pattern Observer en programmation.",
        "Explique la différence entre async/await et promises.",
    ]
    
    print("🚀 Lancement de 5 requêtes en parallèle...")
    start = time.time()
    
    # Exécution parallèle
    tasks = [llm.apredict(prompt) for prompt in prompts]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    total_time = time.time() - start
    
    print(f"\n📊 Temps total (parallèle): {total_time*1000:.2f}ms")
    print(f"   Temps moyen par requête: {total_time*1000/5:.2f}ms")
    
    # Affichage des résultats
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"   Requête {i+1}: ❌ Erreur - {result}")
        else:
            print(f"   Requête {i+1}: ✅ {len(result)} caractères")

Exécution

if __name__ == "__main__": asyncio.run(parallel_requests_demo())

Erreurs Courantes et Solutions

Après des mois d'utilisation intensive, j'ai rencontré et résolu de nombreux problèmes. Voici les 5 erreurs les plus fréquentes que vous rencontrerez probablement.

Erreur 1 : "AuthenticationError - Invalid API Key"

Symptôme : L'erreur AuthenticationError apparaît immédiatement après l'appel à l'API, même avec une clé aparentemente valide.


❌ CODE QUI PROVOQUE L'ERREUR

import os from langchain_openai import ChatOpenAI

Erreur fréquente: espace ou caractère invisible dans la clé

os.environ["OPENAI_API_KEY"] = "sk-holysheep-xxxxx " # Espace final! os.environ["OPENAI_API_BASE"] = "https://api.holysheep.ai/v1" llm = ChatOpenAI(model="gpt-4o-mini") response = llm.invoke("Test") # AuthenticationError!

✅ SOLUTION CORRIGÉE

import os import re from langchain_openai import ChatOpenAI def sanitize_api_key(key: str) -> str: """Nettoie la clé API de tout caractère superflu""" if not key: raise ValueError("La clé API ne peut pas être vide") # Supprime les espaces, sauts de ligne, tabulations cleaned = key.strip() # Vérifie le format HolySheep if not cleaned.startswith("sk-"): raise ValueError(f"Format de clé invalide: {cleaned[:10]}...") return cleaned

Configuration propre

API_KEY = os.environ.get("HOLYSHEEP_API_KEY", "") os.environ["OPENAI_API_KEY"] = sanitize_api_key(API_KEY) os.environ["OPENAI_API_BASE"] = "https://api.holysheep.ai/v1"

Vérification de la connexion

try: llm = ChatOpenAI(model="gpt-4o-mini") response = llm.invoke("Ping") # Test rapide print("✅ Connexion HolySheep réussie!") except Exception as e: print(f"🔴 Erreur: {e}") # Actions de dépannage...

Erreur 2 : "RateLimitError - Too Many Requests"

Symptôme : L'erreur RateLimitError survient même avec un petit nombre de requêtes, particulièrement lors de tests intensifs.


❌ CODE QUI PROVOQUE L'ERREUR - Pas de gestion de rate limit

from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4o-mini")

Boucle qui sature le rate limit

for i in range(100): response = llm.invoke(f"Requête {i}") # RateLimitError inévitable!

✅ SOLUTION CORRIGÉE AVEC RETRY ET BACKOFF

import time import asyncio from functools import wraps from langchain_openai import ChatOpenAI from langchain.callbacks.base import BaseCallbackHandler class RateLimitHandler(BaseCallbackHandler): """Gestion intelligente du rate limit avec retry automatique""" def __init__(self, max_retries: int = 3, base_delay: float = 1.0): self.max_retries = max_retries self.base_delay = base_delay self.request_count = 0 self.last_request_time = 0 def on_llm_start(self, serialized, prompts, **kwargs): current_time = time.time() elapsed = current_time - self.last_request_time # HolySheep: limite ~100 req/min pour les plans gratuits # On garde 80 req/min pour marge de sécurité min_interval = 60 / 80 if elapsed < min_interval: sleep_time = min_interval - elapsed print(f"⏳ Rate limiting: pause de {sleep_time:.2f}s") time.sleep(sleep_time) self.last_request_time = time.time() self.request_count += 1 def on_llm_error(self, error, **kwargs): error_str = str(error).lower() if "rate limit" in error_str or "429" in error_str: print("⚠️ Rate limit atteint - implémentation du backoff") return True # Signal pour retry return False # Erreur non récupérable def with_retry(max_retries: int = 3, backoff_factor: float = 2.0): """Décorateur pour retry avec exponential backoff""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): for attempt in range(max_retries): try: return await func(*args, **kwargs) except Exception as e: if attempt == max_retries - 1: raise delay = backoff_factor ** attempt print(f"🔄 Retry {attempt+1}/{max_retries} dans {delay:.1f}s") await asyncio.sleep(delay) @wraps(func) def sync_wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: if attempt == max_retries - 1: raise delay = backoff_factor ** attempt print(f"🔄 Retry {attempt+1}/{max_retries} dans {delay:.1f}s") time.sleep(delay) # Retourne le bon wrapper selon le type de fonction import asyncio if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator

Utilisation

@with_retry(max_retries=3) def call_llm(prompt: str) -> str: llm = ChatOpenAI(model="deepseek-v3.2") response = llm.invoke(prompt) return response.content

Exemple d'appel sécurisé

for i in range(100): try: result = call_llm(f"Requête {i}") print(f"✅ {i}: {result[:50]}...") except Exception as e: print(f"❌ {i}: {e}") break

Erreur 3 : "TimeoutError - Request Exceeded"

Symptôme : Les requêtes timeout après 30-60 secondes sans réponse, même pour des prompts simples.


❌ CODE QUI PROVOQUE L'ERREUR - Timeout par défaut trop court

from langchain_openai import ChatOpen