Dans l'écosystème des agents conversationnels modernes, la capacité à orchestrer des appels d'outils représente une compétence architecturale fondamentale. Cet article explore en profondeur l'implémentation d'agents LangChain avec une couche proxy centralisée, en optimisant la latence, le contrôle de concurrence et la rentabilité. Nous utiliserons HolySheep AI comme fournisseur API de référence, offrant un taux de change ¥1=$1 avec une latence moyenne inférieure à 50ms.
Architecture Fondamentale d'un Agent LangChain
Un agent LangChain se compose de trois piliers architecturaux : le PromptTemplate qui structure le comportement, le Tool qui encapsule les capacités d'action, et le AgentExecutor qui orchestre le cycle reasoning-action-observation. La configuration proxy permet de centraliser la gestion des credentials, le rate limiting et la journalisation centralisée.
Configuration du Client Proxy HolySheep
La configuration initiale nécessite l'installation des dépendances et l'initialisation du client avec les paramètres de production.
Installation des dépendances requises
pip install langchain langchain-community langchain-openai
import os
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, Semaphore
import asyncio
import time
Configuration HolySheep API
HOLYSHEEP_API_KEY = os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY")
HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1"
@dataclass
class ProxyConfig:
"""Configuration centralisée du proxy API."""
base_url: str = HOLYSHEEP_BASE_URL
api_key: str = HOLYSHEEP_API_KEY
max_concurrent_requests: int = 50
request_timeout: int = 120
max_retries: int = 3
retry_delay: float = 1.0
model: str = "gpt-4.1"
# Configuration du rate limiting
requests_per_minute: int = 500
tokens_per_minute: int = 150_000
class HolySheepLLM:
"""Client LLM optimisé pour HolySheep avec gestion de concurrence."""
def __init__(self, config: Optional[ProxyConfig] = None):
self.config = config or ProxyConfig()
self._semaphore = Semaphore(self.config.max_concurrent_requests)
self._client = ChatOpenAI(
api_key=self.config.api_key,
base_url=self.config.base_url,
model=self.config.model,
timeout=self.config.request_timeout,
max_retries=self.config.max_retries
)
self._request_times: List[float] = []
def invoke(self, messages: List[Dict], **kwargs) -> Any:
"""Appel synchrone avec contrôle de concurrence."""
start_time = time.time()
with self._semaphore:
response = self._client.invoke(messages, **kwargs)
elapsed = time.time() - start_time
self._request_times.append(elapsed)
return response
async def ainvoke(self, messages: List[Dict], **kwargs) -> Any:
"""Appel asynchrone pour performance maximale."""
start_time = time.time()
async with self._semaphore:
response = await self._client.ainvoke(messages, **kwargs)
elapsed = time.time() - start_time
self._request_times.append(elapsed)
return response
def get_stats(self) -> Dict[str, float]:
"""Retourne les statistiques de performance."""
if not self._request_times:
return {"avg_latency": 0, "max_latency": 0, "total_requests": 0}
return {
"avg_latency": sum(self._request_times) / len(self._request_times),
"max_latency": max(self._request_times),
"min_latency": min(self._request_times),
"total_requests": len(self._request_times)
}
Initialisation du client
llm_client = HolySheepLLM()
print(f"Client initialisé : {llm_client.config.base_url}")
print(f"Modèle : {llm_client.config.model}")
Définition et Registre d'Outils
La définition rigoureuse des outils avec validation de schéma et gestion d'erreurs constitue le cœur fonctionnel de l'agent. Chaque outil doit exposer un schéma JSON清晰的 pour permettre au modèle de comprendre ses capacités.
from langchain_core.tools import tool
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel, Field
from typing import Optional, Type
import json
Schémas de validation pour les entrées d'outils
class SearchInput(BaseModel):
query: str = Field(description="Requête de recherche web")
max_results: int = Field(default=5, ge=1, le=20)
class DatabaseQueryInput(BaseModel):
sql: str = Field(description="Requête SQL sécurisée")
params: Optional[dict] = Field(default=None)
class APICallInput(BaseModel):
endpoint: str = Field(description="Endpoint API à appeler")
method: str = Field(default="GET")
payload: Optional[dict] = Field(default=None)
Registre centralisé des outils
TOOL_REGISTRY: Dict[str, tuple[Type[BaseModel], Any]] = {}
@tool(args_schema=SearchInput, return_direct=False)
def web_search(query: str, max_results: int = 5) -> dict:
"""
Effectue une recherche web et retourne les résultats structurés.
Utilisé pour trouver des informations récentes ou vérifier des faits.
"""
# Implémentation simulée - remplacer par une vraie API
results = [
{"title": f"Résultat {i+1} pour '{query}'", "url": f"https://example.com/{i}"}
for i in range(min(max_results, 10))
]
return {"query": query, "results": results, "count": len(results)}
@tool(args_schema=DatabaseQueryInput, return_direct=False)
def execute_sql_query(sql: str, params: Optional[dict] = None) -> dict:
"""
Exécute une requête SQL sur la base de données configurée.
Nécessite une validation préalable des permissions.
"""
# Validation de sécurité - список blanc de patterns autorisés
allowed_patterns = ["SELECT", "INSERT", "UPDATE", "DELETE"]
sql_upper = sql.upper().strip()
if not any(sql_upper.startswith(p) for p in allowed_patterns):
raise ValueError(f"Type de requête non autorisé: {sql[:50]}")
# Simulation d'exécution
return {
"status": "success",
"rows_affected": 0,
"execution_time_ms": 45,
"data": []
}
@tool(args_schema=APICallInput, return_direct=False)
def call_external_api(endpoint: str, method: str = "GET",
payload: Optional[dict] = None) -> dict:
"""
Appelle un endpoint API externe avec gestion des erreurs.
Support GET, POST, PUT, DELETE.
"""
import httpx
valid_methods = ["GET", "POST", "PUT", "DELETE", "PATCH"]
if method.upper() not in valid_methods:
raise ValueError(f"Méthode non valide: {method}")
try:
with httpx.Client(timeout=30.0) as client:
response = client.request(
method=method.upper(),
url=endpoint,
json=payload
)
return {
"status_code": response.status_code,
"body": response.json() if response.headers.get("content-type", "").startswith("application/json") else response.text,
"headers": dict(response.headers)
}
except httpx.TimeoutException:
return {"error": "Timeout - l'API n'a pas répondu dans les temps"}
except Exception as e:
return {"error": str(e)}
Enregistrement des outils
TOOL_REGISTRY = {
"web_search": (SearchInput, web_search),
"execute_sql_query": (DatabaseQueryInput, execute_sql_query),
"call_external_api": (APICallInput, call_external_api)
}
Liste des outils pour l'agent
TOOLS = [web_search, execute_sql_query, call_external_api]
print(f"Registre initialisé avec {len(TOOLS)} outils")
Construction de l'Agent avec Gestion Avancée
L'agent combine le LLM configuré avec les outils et implémente une logique de conversation permettant le reasoning multi-étapes avec mémorisation du contexte.
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnableConfig
from typing import List, Dict, Any
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProductionAgent:
"""
Agent de production avec :
- Gestion de la mémoire conversationnelle
- Contrôle de concurrence
- Récupération d'erreurs
- Optimisation des coûts
"""
SYSTEM_PROMPT = """Tu es un assistant IA expert avec accès à des outils puissants.
Règles de fonctionnement :
1. Analyse la question et détermine si un outil est nécessaire
2. Si un outil est nécessaire, appelle-le avec les bons paramètres
3. Interprète le résultat de l'outil et réponds de manière précise
4. Si une erreur se produit, essaie une approche alternative
Outils disponibles :
- web_search : Recherche d'informations sur le web
- execute_sql_query : Interrogation de la base de données
- call_external_api : Appels à des APIs externes
Réponds toujours en français de manière claire et concise."""
def __init__(self, llm_client: HolySheepLLM, tools: List[Any]):
self.llm = llm_client
self.tools = tools
# Construction du prompt
prompt = ChatPromptTemplate.from_messages([
("system", self.SYSTEM_PROMPT),
MessagesPlaceholder(variable_name="chat_history", optional=True),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
# Création de l'agent
agent = create_openai_functions_agent(
llm=self.llm._client,
tools=tools,
prompt=prompt
)
# Configuration de l'executor avec gestion d'erreurs
self.executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=10,
max_execution_time=120,
early_stopping_method="force",
handle_parsing_errors=True,
return_intermediate_steps=True
)
self._chat_history: List[Dict] = []
def invoke(self, user_input: str, config: Optional[RunnableConfig] = None) -> Dict[str, Any]:
"""Exécution synchrone avec historique."""
start_time = time.time()
try:
result = self.executor.invoke(
{
"input": user_input,
"chat_history": self._chat_history
},
config=config or RunnableConfig()
)
# Mise à jour de l'historique
self._chat_history.append(("human", user_input))
self._chat_history.append(("ai", result["output"]))
elapsed = time.time() - start_time
logger.info(f"Agent exécuté en {elapsed:.2f}s")
return {
"output": result["output"],
"intermediate_steps": result.get("intermediate_steps", []),
"execution_time": elapsed,
"tokens_used": result.get("token_usage", {})
}
except Exception as e:
logger.error(f"Erreur d'exécution: {str(e)}")
return {
"output": f"Erreur: {str(e)}",
"intermediate_steps": [],
"execution_time": time.time() - start_time,
"error": True
}
async def ainvoke(self, user_input: str) -> Dict[str, Any]:
"""Exécution asynchrone optimisée pour haute performance."""
start_time = time.time()
try:
result = await self.executor.ainvoke(
{
"input": user_input,
"chat_history": self._chat_history
}
)
self._chat_history.append(("human", user_input))
self._chat_history.append(("ai", result["output"]))
return {
"output": result["output"],
"execution_time": time.time() - start_time
}
except Exception as e:
return {"output": f"Erreur: {str(e)}", "error": True}
def reset_history(self):
"""Réinitialise l'historique de conversation."""
self._chat_history = []
Instanciation de l'agent de production
agent = ProductionAgent(llm_client, TOOLS)
print("Agent de production initialisé avec succès")
Contrôle de Concurrence et Rate Limiting
Pour les environnements de production à haute charge, le contrôle de concurrence devient critique. Nous implémentons un système de rate limiting intelligent avec.token bucket et burst handling.
import asyncio
import time
from collections import deque
from threading import Lock
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class TokenBucketRateLimiter:
"""
Implémentation Token Bucket pour le rate limiting.
- tokens : nombre actuel de tokens disponibles
- max_tokens : capacité maximale du bucket
- refill_rate : tokens ajoutés par seconde
"""
def __init__(self, requests_per_minute: int, burst_size: Optional[int] = None):
self.max_tokens = requests_per_minute
self.tokens = requests_per_minute
self.refill_rate = requests_per_minute / 60.0 # par seconde
self.burst_size = burst_size or requests_per_minute
self.last_refill = time.time()
self._lock = Lock()
def _refill(self):
"""Rajoute des tokens selon le temps écoulé."""
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = elapsed * self.refill_rate
self.tokens = min(self.max_tokens, self.tokens + tokens_to_add)
self.last_refill = now
def acquire(self, tokens_needed: int = 1, timeout: float = 60.0) -> bool:
"""
Acquiert des tokens avec attente optionnelle.
Retourne True si l'acquisition réussit, False sinon.
"""
start_wait = time.time()
while True:
with self._lock:
self._refill()
if self.tokens >= tokens_needed:
self.tokens -= tokens_needed
return True
if time.time() - start_wait >= timeout:
return False
# Backoff exponentiel
time.sleep(min(0.1 * (1 + start_wait - time.time()), 1.0))
async def aacquire(self, tokens_needed: int = 1, timeout: float = 60.0) -> bool:
"""Version asynchrone de acquire."""
start_wait = time.time()
while True:
with self._lock:
self._refill()
if self.tokens >= tokens_needed:
self.tokens -= tokens_needed
return True
if time.time() - start_wait >= timeout:
return False
await asyncio.sleep(0.05)
def get_status(self) -> dict:
"""Retourne l'état actuel du rate limiter."""
with self._lock:
self._refill()
return {
"available_tokens": self.tokens,
"max_tokens": self.max_tokens,
"utilization": 1 - (self.tokens / self.max_tokens)
}
class ConcurrencyController:
"""
Contrôleur de concurrence multi-niveaux :
- Niveau global : nombre total de requêtes simultanées
- Niveau par endpoint : limitation spécifique
- Niveau par utilisateur : quotas individuels
"""
def __init__(self, global_limit: int = 100):
self.global_semaphore = Semaphore(global
Ressources connexes
Articles connexes