En tant que développeur qui a intégré des dizaines de modèles LLM dans des applications de production, j'ai passé des heures interminables à déboguer des appels API silencieux, des latences inexpliquées et des coûts qui flambaient sans avertissement. Le Callback机制 de LangChain a transformé ma façon de superviser les intégrations IA. Aujourd'hui, je vous partage tout ce que j'ai appris après des mois de pratique intensive avec l'API HolySheep, qui offre des performances exceptionnelles à des tarifs défiant toute concurrence.
Tableau Comparatif : HolySheep vs API Officielle vs Services Relais
| Critère | HolySheep AI | API OpenAI/Anthropic Officielle | Services Relais Classiques |
|---|---|---|---|
| Latence moyenne | moins de 50ms | 150-300ms | 100-250ms |
| Coût GPT-4.1 | $8/1M tokens | $8/1M tokens | $10-12/1M tokens |
| Coût Claude Sonnet 4.5 | $15/1M tokens | $15/1M tokens | $18-22/1M tokens |
| Coût DeepSeek V3.2 | $0.42/1M tokens | N/A | $0.50-0.60/1M tokens |
| Économie via ¥1=$1 | 85%+ vs facturation USD | Prix catalogue USD | Frais supplémentaires |
| Paiement | WeChat Pay, Alipay, USDT | Carte internationale uniquement | Limité |
| Crédits gratuits | Oui, à l'inscription | $5 pour nouveaux comptes | Rarement |
| API compatible LangChain | 100% compatible | Natif | Compatible mais lent |
Pourquoi le Callback机制 Est Essentiel
Le système de Callbacks de LangChain fonctionne comme un système d'événements centralisé. Chaque opération — qu'il s'agisse d'un appel invoke(), batch() ou stream() — traverse une série de points d'ancrage où vous pouvez intercepter, mesurer et logger des informations cruciales.
Dans mon expérience, voici les problèmes que j'ai résolus grâce aux Callbacks :
- Identification des requêtes lentes (au-delà de 200ms sur HolySheep, c'est anormal)
- Détection de tokens gaspillés par des prompts mal optimisés
- Suivi des coûts en temps réel pour éviter les surprises en fin de mois
- Récupération gracieuse après les erreurs réseau temporaires
- Debuggage des chaines LLM complexes avec des dizaines de maillons
Architecture du Système de Callbacks
Le CallbackHandler de LangChain définit plusieurs méthodes qui correspondent aux différentes phases du cycle de vie d'une requête :
Les méthodes principales du CallbackHandler
class BaseCallbackHandler:
"""Méthodes hookées par LangChain à chaque étape"""
# === Gestion du run (cycle complet) ===
def on_llm_start(self, serialized, prompts, **kwargs):
"""Déclenché AVANT l'envoi de la requête au modèle"""
pass
def on_llm_new_token(self, token, **kwargs):
"""Déclenché pour chaque token en streaming"""
pass
def on_llm_end(self, response, **kwargs):
"""Déclenché APRÈS réception complète de la réponse"""
pass
def on_llm_error(self, error, **kwargs):
"""Déclenché en cas d'erreur pendant l'appel"""
pass
# === Pour les Chains ===
def on_chain_start(self, serialized, inputs, **kwargs):
"""Début d'exécution d'une chaîne"""
pass
def on_chain_end(self, outputs, **kwargs):
"""Fin réussie d'une chaîne"""
pass
def on_chain_error(self, error, **kwargs):
"""Erreur dans une chaîne"""
pass
# === Pour les Tools ===
def on_tool_start(self, serialized, input_str, **kwargs):
"""Appel d'un tool"""
pass
def on_tool_end(self, output, **kwargs):
"""Fin d'un tool"""
pass
def on_tool_error(self, error, **kwargs):
"""Erreur d'un tool"""
pass
Implémentation Pratique avec HolySheep API
Passons aux choses sérieuses avec une implémentation complète. Le code suivant intègre l'API HolySheep avec un système de Callbacks complet pour la surveillance temps réel.
Installation requise: pip install langchain langchain-community langchain-openai
import os
import time
import json
from datetime import datetime
from typing import Optional, Any, Dict, List
from dataclasses import dataclass, field
from collections import defaultdict
Configuration HolySheep
os.environ["OPENAI_API_KEY"] = "YOUR_HOLYSHEEP_API_KEY"
os.environ["OPENAI_API_BASE"] = "https://api.holysheep.ai/v1"
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage, SystemMessage, LLMResult
from langchain.callbacks.base import BaseCallbackHandler
============================================
MONITORING CALLBACK - Le cœur du système
============================================
@dataclass
class RequestMetrics:
"""Métriques agrégées pour une requête"""
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
tokens_generated: int = 0
tokens_prompt: int = 0
cost_usd: float = 0.0
model: str = ""
error: Optional[str] = None
@property
def latency_ms(self) -> float:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return 0.0
@property
def total_tokens(self) -> int:
return self.tokens_prompt + self.tokens_generated
class HolySheepMonitorCallback(BaseCallbackHandler):
"""Callback complet pour monitoring HolySheep API"""
# Tarification HolySheep 2026 (en USD par million de tokens)
PRICING = {
"gpt-4.1": {"input": 2.00, "output": 8.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-sonnet-4.5": {"input": 3.00, "output": 15.00},
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
"gemini-2.5-flash": {"input": 0.125, "output": 2.50},
"deepseek-v3.2": {"input": 0.14, "output": 0.42},
}
def __init__(self):
self.metrics: List[RequestMetrics] = []
self.current_metrics: Optional[RequestMetrics] = None
self.total_cost = 0.0
self.request_count = 0
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
**kwargs
) -> None:
"""Capture le début d'une requête LLM"""
self.current_metrics = RequestMetrics()
model = serialized.get("name", "unknown")
self.current_metrics.model = model
self.current_metrics.start_time = time.time()
# Estimation des tokens du prompt (approximatif: 1 token ≈ 4 caractères)
self.current_metrics.tokens_prompt = sum(
len(p) // 4 for p in prompts
)
print(f"🔵 [LLM START] Modèle: {model}")
print(f" Prompt tokens estimés: {self.current_metrics.tokens_prompt}")
def on_llm_new_token(self, token: str, **kwargs) -> None:
"""Capture chaque nouveau token en streaming"""
if self.current_metrics:
self.current_metrics.tokens_generated += 1
def on_llm_end(self, response: LLMResult, **kwargs) -> None:
"""Calcule les métriques finales après succès"""
if not self.current_metrics:
return
self.current_metrics.end_time = time.time()
# Extraction des métriques réelles depuis la réponse
generation_info = response.generations[0][0].generation_info
if generation_info:
self.current_metrics.tokens_generated = generation_info.get(
"token_usage", {}
).get("completion_tokens", self.current_metrics.tokens_generated)
self.current_metrics.tokens_prompt = generation_info.get(
"token_usage", {}
).get("prompt_tokens", self.current_metrics.tokens_prompt)
# Calcul du coût basé sur le modèle
model = self.current_metrics.model
pricing = self.PRICING.get(model, {"input": 1.0, "output": 2.0})
cost = (
self.current_metrics.tokens_prompt * pricing["input"] / 1_000_000 +
self.current_metrics.tokens_generated * pricing["output"] / 1_000_000
)
self.current_metrics.cost_usd = cost
self.total_cost += cost
self.request_count += 1
self.metrics.append(self.current_metrics)
print(f"🟢 [LLM END] Latence: {self.current_metrics.latency_ms:.2f}ms")
print(f" Tokens: {self.current_metrics.tokens_prompt} (prompt) + "
f"{self.current_metrics.tokens_generated} (completion)")
print(f" Coût: ${cost:.6f}")
self.current_metrics = None
def on_llm_error(self, error: Exception, **kwargs) -> None:
"""Log les erreurs pour debugging"""
if self.current_metrics:
self.current_metrics.error = str(error)
self.current_metrics.end_time = time.time()
self.metrics.append(self.current_metrics)
print(f"🔴 [LLM ERROR] {error.__class__.__name__}: {error}")
def get_summary(self) -> Dict[str, Any]:
"""Génère un résumé des métriques"""
if not self.metrics:
return {}
latencies = [m.latency_ms for m in self.metrics]
return {
"total_requests": self.request_count,
"total_cost_usd": self.total_cost,
"total_tokens": sum(m.total_tokens for m in self.metrics),
"avg_latency_ms": sum(latencies) / len(latencies),
"min_latency_ms": min(latencies),
"max_latency_ms": max(latencies),
"error_count": sum(1 for m in self.metrics if m.error),
}
============================================
UTILISATION PRATIQUE
============================================
def demo_holy_sheep_callback():
"""Démonstration complète du système de callbacks"""
# Initialisation du monitor
monitor = HolySheepMonitorCallback()
# Configuration du modèle avec HolySheep
llm = ChatOpenAI(
model="deepseek-v3.2", # Modèle économique: $0.42/1M tokens output
temperature=0.7,
callbacks=[monitor] # ← Intégration du callback
)
print("=" * 60)
print("DÉMO: Monitoring HolySheep API avec LangChain Callbacks")
print("=" * 60)
# Test avec un prompt simple
response = llm.invoke(
"Explique en 3 phrases ce qu'est un callback en programmation."
)
print(f"\n📝 Réponse: {response.content}\n")
# Test avec un prompt plus complexe
messages = [
SystemMessage(content="Tu es un assistant technique concis."),
HumanMessage(content="Qu'est-ce que le pattern Observer?")
]
response2 = llm.invoke(messages)
print(f"📝 Réponse 2: {response2.content}\n")
# Affichage du résumé
summary = monitor.get_summary()
print("=" * 60)
print("📊 RÉSUMÉ DES MÉTRIQUES")
print("=" * 60)
print(f" Requêtes totales: {summary['total_requests']}")
print(f" Coût total: ${summary['total_cost_usd']:.6f}")
print(f" Latence moyenne: {summary['avg_latency_ms']:.2f}ms")
print(f" Latence min/max: {summary['min_latency_ms']:.2f}ms / "
f"{summary['max_latency_ms']:.2f}ms")
print(f" Erreurs: {summary['error_count']}")
if __name__ == "__main__":
demo_holy_sheep_callback()
Logs Structurés et Export vers Fichiers
Dans un contexte de production, vous voudrez persister vos logs. Voici une version avancée qui écrit dans des fichiers JSON structurés, idéal pour ingestion dans des systèmes comme ELK Stack ou Datadog.
import json
import logging
from pathlib import Path
from datetime import datetime
from threading import Lock
class ProductionCallbackHandler(BaseCallbackHandler):
"""Callback production-ready avec logs fichiers et console"""
def __init__(
self,
log_dir: str = "./logs",
log_level: int = logging.INFO,
max_file_size_mb: int = 10
):
super().__init__()
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
# Logger principal avec rotation
self.logger = logging.getLogger("holy_sheep_monitor")
self.logger.setLevel(log_level)
# Handler fichier avec JSON Lines
log_file = self.log_dir / f"llm_calls_{datetime.now():%Y%m%d}.jsonl"
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(file_handler)
# Handler console
console_handler = logging.StreamHandler()
console_handler.setFormatter(
logging.Formatter('[%(levelname)s] %(message)s')
)
self.logger.addHandler(console_handler)
self._lock = Lock()
self._current_run_id = None
self._run_data = {}
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
**kwargs
) -> None:
"""Enregistre le début avec timestamp précis"""
run_id = kwargs.get("run_id", str(uuid.uuid4()))
with self._lock:
self._current_run_id = run_id
self._run_data = {
"event": "llm_start",
"run_id": run_id,
"timestamp": datetime.now().isoformat(),
"model": serialized.get("name", "unknown"),
"prompt_preview": prompts[0][:200] if prompts else "",
"prompt_length": sum(len(p) for p in prompts),
"env": "production"
}
self.logger.info(json.dumps(self._run_data))
def on_llm_end(self, response: LLMResult, **kwargs) -> None:
"""Log la complétion avec métriques finales"""
with self._lock:
if self._current_run_id:
end_data = {
"event": "llm_end",
"run_id": self._current_run_id,
"timestamp": datetime.now().isoformat(),
"latency_ms": (
datetime.now() - datetime.fromisoformat(
self._run_data["timestamp"]
)
).total_seconds() * 1000,
"response_preview": (
response.generations[0][0].text[:200]
if response.generations else ""
),
}
# Ajout des token counts si disponibles
if hasattr(response, 'llm_output') and response.llm_output:
usage = response.llm_output.get('token_usage', {})
end_data.update({
"prompt_tokens": usage.get('prompt_tokens', 0),
"completion_tokens": usage.get('completion_tokens', 0),
"total_tokens": usage.get('total_tokens', 0),
})
self.logger.info(json.dumps(end_data))
self._current_run_id = None
self._run_data = {}
def on_llm_error(self, error: Exception, **kwargs) -> None:
"""Log les erreurs pour alerting"""
error_data = {
"event": "llm_error",
"run_id": self._current_run_id,
"timestamp": datetime.now().isoformat(),
"error_type": error.__class__.__name__,
"error_message": str(error),
}
self.logger.error(json.dumps(error_data))
# Réinitialisation de l'état
with self._lock:
self._current_run_id = None
self._run_data = {}
Export du résumé CSV pour analyse
def export_metrics_csv(metrics: List[RequestMetrics], output_path: str):
"""Exporte les métriques en CSV pour analyse Excel/Python"""
import csv
with open(output_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=[
'timestamp', 'model', 'latency_ms', 'prompt_tokens',
'completion_tokens', 'total_tokens', 'cost_usd', 'error'
])
writer.writeheader()
for m in metrics:
writer.writerow({
'timestamp': datetime.fromtimestamp(m.start_time).isoformat(),
'model': m.model,
'latency_ms': round(m.latency_ms, 2),
'prompt_tokens': m.tokens_prompt,
'completion_tokens': m.tokens_generated,
'total_tokens': m.total_tokens,
'cost_usd': round(m.cost_usd, 6),
'error': m.error or ''
})
print(f"✅ Métriques exportées vers {output_path}")
Callback pour Chaines et Tools
Les Callbacks ne se limitent pas aux appels LLM simples. Ils fonctionnent également pour les Chains LangChain et les Tools. Voici comment je surveille une chaîne de traitement complexe.
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.tools import Tool
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.schema import AgentFinish, AgentAction
class ChainMonitorCallback(BaseCallbackHandler):
"""Surveillance complète d'une chaîne d'agents"""
def __init__(self):
self.chain_timeline = []
self.tool_calls = defaultdict(int)
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
**kwargs
) -> None:
"""Capture le début d'une chaîne"""
chain_name = serialized.get("name", serialized.get("id", ["unknown"])[-1])
self.chain_timeline.append({
"type": "chain_start",
"name": chain_name,
"time": time.time()
})
print(f"🔗 [CHAIN START] {chain_name}")
def on_chain_end(self, outputs: Dict[str, Any], **kwargs) -> None:
"""Capture la fin d'une chaîne"""
chain_name = self.chain_timeline[-1]["name"] if self.chain_timeline else "unknown"
duration = time.time() - self.chain_timeline[-1]["time"]
self.chain_timeline.append({
"type": "chain_end",
"name": chain_name,
"time": time.time(),
"duration_ms": duration * 1000
})
print(f"🔗 [CHAIN END] {chain_name} ({duration*1000:.2f}ms)")
def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
**kwargs
) -> None:
"""Surveille l'appel des tools"""
tool_name = serialized.get("name", "unknown")
self.tool_calls[tool_name] += 1
self.chain_timeline.append({
"type": "tool_start",
"name": tool_name,
"time": time.time(),
"input_preview": input_str[:100]
})
print(f"🔧 [TOOL START] {tool_name}")
def on_tool_end(self, output: str, **kwargs) -> None:
"""Surveille la fin des tools"""
tool_name = self.chain_timeline[-1]["name"]
duration = time.time() - self.chain_timeline[-1]["time"]
self.chain_timeline.append({
"type": "tool_end",
"name": tool_name,
"time": time.time(),
"duration_ms": duration * 1000,
"output_preview": output[:100]
})
print(f"🔧 [TOOL END] {tool_name} ({duration*1000:.2f}ms)")
def on_tool_error(self, error: Exception, **kwargs) -> None:
"""Log les erreurs de tools"""
print(f"🔴 [TOOL ERROR] {error}")
============================================
EXEMPLE COMPLET: AGENT AVEC MONITORING
============================================
def create_monitored_agent():
"""Crée un agent avec monitoring complet"""
# Définition des tools
def calculate(expression: str) -> str:
"""Évalue une expression mathématique simple"""
try:
result = eval(expression)
return f"Résultat: {result}"
except Exception as e:
return f"Erreur: {e}"
def search_docs(query: str) -> str:
"""Simule une recherche dans la documentation"""
docs = {
"callback": "Un callback est une fonction passée comme argument...",
"langchain": "LangChain est un framework pour construire des apps IA...",
}
return docs.get(query.lower(), "Documentation non trouvée.")
tools = [
Tool(name="Calculator", func=calculate, description="Pour calculs mathématiques"),
Tool(name="SearchDocs", func=search_docs, description="Recherche dans la docs"),
]
# Prompt de l'agent
prompt = ChatPromptTemplate.from_messages([
("system", "Tu es un assistant technique qui utilise des outils."),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
# Initialisation du monitor
monitor = ChainMonitorCallback()
# Création de l'agent avec HolySheep
llm = ChatOpenAI(
model="gpt-4o-mini", # Modèle économique: $0.60/1M output
callbacks=[monitor]
)
agent = create_openai_functions_agent(llm, tools, prompt)
executor = AgentExecutor(
agent=agent,
tools=tools,
callbacks=[monitor], # Surveillance des tools aussi
verbose=True
)
return executor, monitor
Exécution de démonstration
if __name__ == "__main__":
agent, monitor = create_monitored_agent()
result = agent.invoke({
"input": "Calcule 15 * 23 + 45, puis cherche des infos sur 'callback'"
})
print("\n" + "=" * 50)
print("TIMELINE DES OPÉRATIONS")
print("=" * 50)
for event in monitor.chain_timeline:
print(f" {event}")
print(f"\n📊 Stats: {dict(monitor.tool_calls)}")
Intégration avec AsyncIO pour Haute Performance
Pour les applications nécessitant des performances maximales, l'intégration avec AsyncIO permet de gérer des centaines de requêtes simultanées sans bloquer le thread principal.
import asyncio
from typing import Optional
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
from datetime import datetime
class AsyncStreamingCallback(BaseCallbackHandler):
"""Callback async avec streaming et buffer de tokens"""
def __init__(self):
self.tokens_buffer = []
self.start_time: Optional[float] = None
self.first_token_time: Optional[float] = None
async def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
**kwargs
) -> None:
self.start_time = time.time()
self.tokens_buffer = []
print(f"⏱️ Requête démarrée à {datetime.now():%H:%M:%S.%f}")
async def on_llm_new_token(self, token: str, **kwargs) -> None:
if self.first_token_time is None:
self.first_token_time = time.time()
ttft = (self.first_token_time - self.start_time) * 1000
print(f"⚡ Time To First Token (TTFT): {ttft:.2f}ms")
self.tokens_buffer.append(token)
async def on_llm_end(self, response: LLMResult, **kwargs) -> None:
end_time = time.time()
total_time = (end_time - self.start_time) * 1000
tokens_count = len(self.tokens_buffer)
if tokens_count > 0:
throughput = tokens_count / ((end_time - self.first_token_time) * 1000)
print(f"✅ Requête terminée en {total_time:.2f}ms")
print(f" Tokens générés: {tokens_count}")
print(f" Throughput: {throughput:.2f} tokens/ms")
async def parallel_requests_demo():
"""Lance plusieurs requêtes en parallèle avec monitoring"""
monitor = AsyncStreamingCallback()
llm = ChatOpenAI(
model="deepseek-v3.2",
streaming=True,
callbacks=[monitor]
)
prompts = [
"Explique la programmation asynchrone en Python.",
"Qu'est-ce que FastAPI et ses avantages?",
"Comment implémenter un callback en JavaScript?",
"Décris le pattern Observer en programmation.",
"Explique la différence entre async/await et promises.",
]
print("🚀 Lancement de 5 requêtes en parallèle...")
start = time.time()
# Exécution parallèle
tasks = [llm.apredict(prompt) for prompt in prompts]
results = await asyncio.gather(*tasks, return_exceptions=True)
total_time = time.time() - start
print(f"\n📊 Temps total (parallèle): {total_time*1000:.2f}ms")
print(f" Temps moyen par requête: {total_time*1000/5:.2f}ms")
# Affichage des résultats
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f" Requête {i+1}: ❌ Erreur - {result}")
else:
print(f" Requête {i+1}: ✅ {len(result)} caractères")
Exécution
if __name__ == "__main__":
asyncio.run(parallel_requests_demo())
Erreurs Courantes et Solutions
Après des mois d'utilisation intensive, j'ai rencontré et résolu de nombreux problèmes. Voici les 5 erreurs les plus fréquentes que vous rencontrerez probablement.
Erreur 1 : "AuthenticationError - Invalid API Key"
Symptôme : L'erreur AuthenticationError apparaît immédiatement après l'appel à l'API, même avec une clé aparentemente valide.
❌ CODE QUI PROVOQUE L'ERREUR
import os
from langchain_openai import ChatOpenAI
Erreur fréquente: espace ou caractère invisible dans la clé
os.environ["OPENAI_API_KEY"] = "sk-holysheep-xxxxx " # Espace final!
os.environ["OPENAI_API_BASE"] = "https://api.holysheep.ai/v1"
llm = ChatOpenAI(model="gpt-4o-mini")
response = llm.invoke("Test") # AuthenticationError!
✅ SOLUTION CORRIGÉE
import os
import re
from langchain_openai import ChatOpenAI
def sanitize_api_key(key: str) -> str:
"""Nettoie la clé API de tout caractère superflu"""
if not key:
raise ValueError("La clé API ne peut pas être vide")
# Supprime les espaces, sauts de ligne, tabulations
cleaned = key.strip()
# Vérifie le format HolySheep
if not cleaned.startswith("sk-"):
raise ValueError(f"Format de clé invalide: {cleaned[:10]}...")
return cleaned
Configuration propre
API_KEY = os.environ.get("HOLYSHEEP_API_KEY", "")
os.environ["OPENAI_API_KEY"] = sanitize_api_key(API_KEY)
os.environ["OPENAI_API_BASE"] = "https://api.holysheep.ai/v1"
Vérification de la connexion
try:
llm = ChatOpenAI(model="gpt-4o-mini")
response = llm.invoke("Ping") # Test rapide
print("✅ Connexion HolySheep réussie!")
except Exception as e:
print(f"🔴 Erreur: {e}")
# Actions de dépannage...
Erreur 2 : "RateLimitError - Too Many Requests"
Symptôme : L'erreur RateLimitError survient même avec un petit nombre de requêtes, particulièrement lors de tests intensifs.
❌ CODE QUI PROVOQUE L'ERREUR - Pas de gestion de rate limit
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
Boucle qui sature le rate limit
for i in range(100):
response = llm.invoke(f"Requête {i}") # RateLimitError inévitable!
✅ SOLUTION CORRIGÉE AVEC RETRY ET BACKOFF
import time
import asyncio
from functools import wraps
from langchain_openai import ChatOpenAI
from langchain.callbacks.base import BaseCallbackHandler
class RateLimitHandler(BaseCallbackHandler):
"""Gestion intelligente du rate limit avec retry automatique"""
def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
self.max_retries = max_retries
self.base_delay = base_delay
self.request_count = 0
self.last_request_time = 0
def on_llm_start(self, serialized, prompts, **kwargs):
current_time = time.time()
elapsed = current_time - self.last_request_time
# HolySheep: limite ~100 req/min pour les plans gratuits
# On garde 80 req/min pour marge de sécurité
min_interval = 60 / 80
if elapsed < min_interval:
sleep_time = min_interval - elapsed
print(f"⏳ Rate limiting: pause de {sleep_time:.2f}s")
time.sleep(sleep_time)
self.last_request_time = time.time()
self.request_count += 1
def on_llm_error(self, error, **kwargs):
error_str = str(error).lower()
if "rate limit" in error_str or "429" in error_str:
print("⚠️ Rate limit atteint - implémentation du backoff")
return True # Signal pour retry
return False # Erreur non récupérable
def with_retry(max_retries: int = 3, backoff_factor: float = 2.0):
"""Décorateur pour retry avec exponential backoff"""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = backoff_factor ** attempt
print(f"🔄 Retry {attempt+1}/{max_retries} dans {delay:.1f}s")
await asyncio.sleep(delay)
@wraps(func)
def sync_wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = backoff_factor ** attempt
print(f"🔄 Retry {attempt+1}/{max_retries} dans {delay:.1f}s")
time.sleep(delay)
# Retourne le bon wrapper selon le type de fonction
import asyncio
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
return decorator
Utilisation
@with_retry(max_retries=3)
def call_llm(prompt: str) -> str:
llm = ChatOpenAI(model="deepseek-v3.2")
response = llm.invoke(prompt)
return response.content
Exemple d'appel sécurisé
for i in range(100):
try:
result = call_llm(f"Requête {i}")
print(f"✅ {i}: {result[:50]}...")
except Exception as e:
print(f"❌ {i}: {e}")
break
Erreur 3 : "TimeoutError - Request Exceeded"
Symptôme : Les requêtes timeout après 30-60 secondes sans réponse, même pour des prompts simples.
❌ CODE QUI PROVOQUE L'ERREUR - Timeout par défaut trop court
from langchain_openai import ChatOpen