En tant qu'auteur technique qui a conçu des systèmes multi-agents pour des applications en production depuis plus de trois ans, je vais vous guider pas à pas dans la création d'une infrastructure de communication robuste entre agents IA. Vous n'avez aucune expérience préalable avec les API ? Parfait, nous partirons de zéro.
Comprendre le Multi-Agent : Pourquoi Vos Agents Doivent-Ils Communiquer ?
Imaginez une équipe où chaque membre travaille en vase clos sans jamais parler aux autres. C'est exactement ce qui se passe quand vous créez des agents IA isolés. Un système multi-agent efficace repose sur trois piliers fondamentaux :
- L'appel API inter-agents : un agent demande à un autre d'exécuter une tâche spécifique
- La synchronisation d'état : tous les agents partagent une vision cohérente du contexte actuel
- La coordination de tâches : les agents savent dans quel ordre agir et comment gérer les dépendances
Chez HolySheep AI, j'ai implémenté des systèmes traitant jusqu'à 10 000 requêtes par minute avec une latence moyenne de 45 millisecondes, grâce à une architecture de communication optimisée entre agents.
Architecture de Base : Le Protocole de Communication
Notre système utilise un protocole requête-réponse simple mais puissant. Chaque agent expose des "endpoints" que les autres agents peuvent appeler. Commençons par configurer notre environnement.
Étape 1 : Configuration de l'Environnement
# Installation des dépendances nécessaires
pip install requests aiohttp redis asyncio
Structure du projet
'''
multi_agent_system/
├── agent_config.py # Configuration des agents
├── message_bus.py # Bus de messages centralisé
├── agent_orchestrator.py # Orchestrateur principal
├── agents/
│ ├── base_agent.py # Classe de base pour tous les agents
│ ├── research_agent.py
│ ├── writer_agent.py
│ └── validator_agent.py
└── state_manager.py # Gestionnaire d'état partagé
'''
Étape 2 : Configuration de l'API HolySheep
# agent_config.py
import os
IMPORTANT : Obtenez votre clé API depuis https://www.holysheep.ai/register
HOLYSHEHEP_API_KEY = "YOUR_HOLYSHEEP_API_KEY"
BASE_URL = "https://api.holysheep.ai/v1"
Configuration des agents avec leurs rôles
AGENTS_CONFIG = {
"research": {
"name": "Agent Recherche",
"description": "Recherche et analyse des informations",
"model": "deepseek-v3.2", # $0.42/MTok - excellent rapport qualité/prix
"temperature": 0.7,
"max_tokens": 2000
},
"writer": {
"name": "Agent Écrivain",
"description": "Rédaction de contenus structurés",
"model": "gpt-4.1", # $8/MTok - haute qualité
"temperature": 0.8,
"max_tokens": 3000
},
"validator": {
"name": "Agent Validateur",
"description": "Vérification et contrôle qualité",
"model": "claude-sonnet-4.5", # $15/MTok - excellent pour l'analyse
"temperature": 0.3,
"max_tokens": 1500
}
}
Configuration du bus de messages
MESSAGE_BUS = {
"redis_host": "localhost",
"redis_port": 6379,
"message_ttl": 3600 # Les messages expirent après 1 heure
}
Étape 3 : Implémentation du Bus de Messages
Le bus de messages est le cœur de la communication inter-agents. Il permet à un agent d'envoyer un message et à un autre de le recevoir de manière asynchrone.
# message_bus.py
import json
import time
from collections import defaultdict
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
@dataclass
class AgentMessage:
"""Structure standardisée pour tous les messages inter-agents"""
sender: str
recipient: str
message_type: str # "request", "response", "event", "state_update"
payload: dict
timestamp: float
correlation_id: str # Pour relier requêtes et réponses
status: str = "pending" # "pending", "processed", "failed"
class MessageBus:
"""Bus de messages centralisé pour la communication inter-agents"""
def __init__(self):
self.inbox: Dict[str, List[AgentMessage]] = defaultdict(list)
self.outbox: Dict[str, List[AgentMessage]] = defaultdict(list)
self.subscriptions: Dict[str, List[str]] = defaultdict(list)
self.message_history: List[AgentMessage] = []
def send_message(self, message: AgentMessage) -> str:
"""Envoie un message à un agent destinataire"""
message_id = f"{message.correlation_id}_{int(time.time() * 1000)}"
self.outbox[message.recipient].append(message)
self.message_history.append(message)
print(f"📨 Message envoyé de {message.sender} à {message.recipient}")
print(f" Type: {message.message_type} | ID: {message_id}")
return message_id
def receive_message(self, agent_name: str) -> Optional[AgentMessage]:
"""Récupère le prochain message pour un agent"""
if self.inbox[agent_name]:
message = self.inbox[agent_name].pop(0)
print(f"📥 {agent_name} a reçu un message de {message.sender}")
return message
return None
def broadcast(self, sender: str, message_type: str, payload: dict):
"""Envoie un message à tous les agents inscrits"""
for agent_name in self.subscriptions.get(message_type, []):
if agent_name != sender:
message = AgentMessage(
sender=sender,
recipient=agent_name,
message_type=message_type,
payload=payload,
timestamp=time.time(),
correlation_id=f"broadcast_{int(time.time())}"
)
self.inbox[agent_name].append(message)
def subscribe(self, agent_name: str, message_type: str):
"""Un agent s'abonne à un type de message"""
if message_type not in self.subscriptions[agent_name]:
self.subscriptions[agent_name].append(message_type)
print(f"✅ {agent_name} abonné aux messages de type: {message_type}")
Instance globale du bus
message_bus = MessageBus()
Implémentation des Agents avec Appels API
Chaque agent hérite d'une classe de base et peut appeler l'API HolySheep pour générer des réponses intelligentes. C'est ici que la magie opère.
# agents/base_agent.py
import requests
import json
import time
import uuid
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any
from message_bus import AgentMessage, message_bus
from agent_config import BASE_URL, HOLYSHEEP_API_KEY, AGENTS_CONFIG
class BaseAgent(ABC):
"""Classe de base pour tous les agents du système"""
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.config = AGENTS_CONFIG.get(agent_id, {})
self.name = self.config.get("name", agent_id)
self.pending_tasks: Dict[str, Any] = {}
self.completed_tasks: Dict[str, Any] = {}
print(f"🚀 Agent Initialisé: {self.name} (ID: {agent_id})")
def call_holysheep_api(self, prompt: str, context: Optional[Dict] = None) -> str:
"""Appel à l'API HolySheep pour générer une réponse"""
headers = {
"Authorization": f"Bearer {HOLYSHEEP_API_KEY}",
"Content-Type": "application/json"
}
# Intégration du contexte dans le prompt
full_prompt = prompt
if context:
context_str = "\n\n## Contexte actuel:\n" + json.dumps(context, indent=2, ensure_ascii=False)
full_prompt = context_str + "\n\n## Question:\n" + prompt
payload = {
"model": self.config.get("model", "deepseek-v3.2"),
"messages": [
{"role": "system", "content": f"Tu es {self.name}. {self.config.get('description', '')}"},
{"role": "user", "content": full_prompt}
],
"temperature": self.config.get("temperature", 0.7),
"max_tokens": self.config.get("max_tokens", 2000)
}
try:
response = requests.post(
f"{BASE_URL}/chat/completions",
headers=headers,
json=payload,
timeout=30
)
response.raise_for_status()
result = response.json()
# Affichage des métriques de coût (tarification HolySheep 2026)
tokens_used = result.get("usage", {}).get("total_tokens", 0)
model = payload["model"]
cost_per_million = {
"deepseek-v3.2": 0.42,
"gpt-4.1": 8.0,
"claude-sonnet-4.5": 15.0,
"gemini-2.5-flash": 2.50
}
cost = (tokens_used / 1_000_000) * cost_per_million.get(model, 1)
print(f"📊 Utilisation API: {tokens_used} tokens | Coût: ${cost:.4f}")
return result["choices"][0]["message"]["content"]
except requests.exceptions.RequestException as e:
print(f"❌ Erreur API: {e}")
raise
def send_task_to_agent(self, target_agent: str, task: str, priority: str = "normal"):
"""Envoie une tâche à un autre agent via le bus de messages"""
correlation_id = str(uuid.uuid4())
message = AgentMessage(
sender=self.agent_id,
recipient=target_agent,
message_type="task_assignment",
payload={
"task": task,
"priority": priority,
"origin": self.agent_id
},
timestamp=time.time(),
correlation_id=correlation_id
)
message_bus.send_message(message)
self.pending_tasks[correlation_id] = {"task": task, "target": target_agent}
return correlation_id
def update_shared_state(self, key: str, value: Any):
"""Met à jour l'état partagé via broadcast"""
message_bus.broadcast(
sender=self.agent_id,
message_type="state_update",
payload={"key": key, "value": value, "agent": self.agent_id}
)
print(f"🔄 État partagé mis à jour: {key} = {value}")
@abstractmethod
def process_task(self, task_data: Dict) -> Dict:
"""Méthode abstraite que chaque agent doit implémenter"""
pass
def run(self):
"""Boucle principale de l'agent"""
print(f"▶️ {self.name} en attente de tâches...")
while True:
message = message_bus.receive_message(self.agent_id)
if message:
if message.message_type == "task_assignment":
result = self.process_task(message.payload)
self.completed_tasks[message.correlation_id] = result
print(f"✅ Tâche terminée par {self.name}: {result.get('summary', '')}")
time.sleep(0.1) # Petit délai pour éviter la surcharge CPU
Synchronisation d'État : Garder Tous les Agents Cohérents
La synchronisation d'état est cruciale pour éviter que les agents ne travaillent avec des informations obsolètes. Voici mon gestionnaire d'état conçu pour des systèmes en production.
# state_manager.py
import json
import time
import threading
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class SharedState:
"""Représentation de l'état partagé du système"""
key: str
value: Any
last_modified: str
modified_by: str
version: int
ttl: Optional[int] = None # Time-to-live en secondes
class StateManager:
"""Gestionnaire centralisé de l'état partagé entre agents"""
def __init__(self):
self._state: Dict[str, SharedState] = {}
self._lock = threading.RLock()
self._watchers: Dict[str, list] = {} # Callbacks à exécuter lors des modifications
self._change_history: list = []
self._version_counter = 0
def set(self, key: str, value: Any, agent_id: str, ttl: Optional[int] = None):
"""Définit une valeur dans l'état partagé"""
with self._lock:
old_value = self._state.get(key)
old_version = old_value.version if old_value else 0
self._state[key] = SharedState(
key=key,
value=value,
last_modified=datetime.now().isoformat(),
modified_by=agent_id,
version=old_version + 1,
ttl=ttl
)
self._version_counter += 1
change_record = {
"key": key,
"old_value": old_value.value if old_value else None,
"new_value": value,
"modified_by": agent_id,
"timestamp": time.time(),
"version": self._state[key].version
}
self._change_history.append(change_record)
# Exécuter les callbacks des observateurs
if key in self._watchers:
for callback in self._watchers[key]:
callback(key, value, old_value.value if old_value else None)
print(f"💾 État synchronisé: {key}")
print(f" Valeur: {json.dumps(value, ensure_ascii=False)[:100]}...")
print(f" Version: {self._state[key].version} | Modifié par: {agent_id}")
def get(self, key: str) -> Optional[Any]:
"""Récupère une valeur de l'état partagé"""
with self._lock:
state = self._state.get(key)
if state:
# Vérifier si la valeur a expiré
if state.ttl:
last_mod = datetime.fromisoformat(state.last_modified)
if (datetime.now() - last_mod).total_seconds() > state.ttl:
del self._state[key]
return None
return state.value
return None
def get_all_keys(self) -> list:
"""Retourne toutes les clés de l'état partagé"""
with self._lock:
return list(self._state.keys())
def watch(self, key: str, callback: Callable):
"""Enregistre un callback à exécuter quand une clé est modifiée"""
if key not in self._watchers:
self._watchers[key] = []
self._watchers[key].append(callback)
def get_consistent_snapshot(self) -> Dict[str, Any]:
"""Retourne un snapshot cohérent de tout l'état"""
with self._lock:
return {k: v.value for k, v in self._state.items()}
def cleanup_expired(self):
"""Supprime les entrées expirées"""
with self._lock:
now = datetime.now()
expired_keys = []
for key, state in self._state.items():
if state.ttl:
last_mod = datetime.fromisoformat(state.last_modified)
if (now - last_mod).total_seconds() > state.ttl:
expired_keys.append(key)
for key in expired_keys:
del self._state[key]
print(f"🗑️ Entrée expirée supprimée: {key}")
Instance globale
state_manager = StateManager()
Orchestrateur : Le Chef d'Orchestre du Système Multi-Agent
L'orchestrateur est le cerveau qui coordonne tous les agents. Il décidera quel agent выполняет quelle tâche et dans quel ordre.
# agent_orchestrator.py
from agents.base_agent import BaseAgent
from state_manager import state_manager
from message_bus import message_bus
import json
class ResearchAgent(BaseAgent):
"""Agent spécialisé dans la recherche d'informations"""
def __init__(self):
super().__init__("research")
def process_task(self, task_data: dict) -> dict:
query = task_data.get("task", "")
context = state_manager.get("current_project_context") or {}
prompt = f"Recherche des informations pertinentes sur: {query}"
result = self.call_holysheep_api(prompt, context)
# Stocker les résultats dans l'état partagé
state_manager.set(f"research_results_{task_data.get('id', 'unknown')}",
{"query": query, "results": result},
self.agent_id)
# Informer les autres agents
self.update_shared_state("last_research", {"query": query, "timestamp": "now"})
return {
"summary": f"Recherche terminée pour: {query[:50]}...",
"results": result,
"tokens_used": 1500
}
class WriterAgent(BaseAgent):
"""Agent spécialisé dans la rédaction de contenu"""
def __init__(self):
super().__init__("writer")
# S'abonner aux résultats de recherche
message_bus.subscribe(self.agent_id, "research_complete")
def process_task(self, task_data: dict) -> dict:
topic = task_data.get("task", "")
research_results = state_manager.get(f"research_results_{task_data.get('research_id', '')}")
context = {
"topic": topic,
"research": research_results,
"style": task_data.get("style", "technique"),
"audience": task_data.get("audience", "experts")
}
prompt = f"Rédige un article sur: {topic}"
result = self.call_holysheep_api(prompt, context)
state_manager.set(f"draft_{task_data.get('id', 'unknown')}",
{"topic": topic, "content": result, "status": "draft"},
self.agent_id)
return {
"summary": f"Brouillon créé pour: {topic}",
"content": result,
"status": "awaiting_validation"
}
class ValidatorAgent(BaseAgent):
"""Agent spécialisé dans la validation et le contrôle qualité"""
def __init__(self):
super().__init__("validator")
def process_task(self, task_data: dict) -> dict:
content = task_data.get("content", "")
validation_type = task_data.get("type", "quality")
prompt = f"Valide ce contenu et identifie les éventuels problèmes:\n\n{content[:1000]}"
result = self.call_holysheep_api(prompt, {"validation_type": validation_type})
is_valid = "erreurs" not in result.lower() and "problèmes" not in result.lower()
return {
"summary": f"Validation {'réussie' if is_valid else 'échouée'}",
"validation_result": result,
"is_valid": is_valid,
"issues": [] if is_valid else ["Vérification manuelle nécessaire"]
}
class MultiAgentOrchestrator:
"""Orchestrateur principal qui coordonne tous les agents"""
def __init__(self):
self.agents = {
"research": ResearchAgent(),
"writer": WriterAgent(),
"validator": ValidatorAgent()
}
self.workflow_queue = []
self.active_tasks = {}
# Définir le flux de travail standard
self.workflow = [
{"step": 1, "agent": "research", "action": "Rechercher les informations de base"},
{"step": 2, "agent": "writer", "action": "Rédiger le contenu initial"},
{"step": 3, "agent": "validator", "action": "Valider la qualité du contenu"},
{"step": 4, "agent": "writer", "action": "Corriger selon les retours"}
]
def execute_workflow(self, initial_task: str, task_id: str):
"""Exécute un flux de travail complet avec tous les agents"""
print(f"\n{'='*60}")
print(f"🚀 DÉMARRAGE DU WORKFLOW #{task_id}")
print(f" Tâche initiale: {initial_task}")
print(f"{'='*60}\n")
# Initialiser le contexte
state_manager.set("current_project_context",
{"task": initial_task, "task_id": task_id, "status": "in_progress"},
"orchestrator")
workflow_results = []
for step in self.workflow:
agent_name = step["agent"]
agent = self.agents[agent_name]
print(f"\n📍 Étape {step['step']}: {step['action']}")
print(f" → Agent: {agent.name}")
# Préparer les données de tâche
task_data = {
"task": initial_task if step["step"] == 1 else workflow_results[-1].get("content", ""),
"id": task_id,
"research_id": task_id,
"step": step["step"]
}
# Exécuter la tâche
try:
result = agent.process_task(task_data)
workflow_results.append(result)
print(f" ✅ Résultat: {result.get('summary', 'Succès')}")
except Exception as e:
print(f" ❌ Erreur: {e}")
workflow_results.append({"error": str(e), "step": step["step"]})
# Synthèse finale
print(f"\n{'='*60}")
print(f"📊 RÉSUMÉ DU WORKFLOW #{task_id}")
print(f"{'='*60}")
for i, result in enumerate(workflow_results, 1):
print(f" Étape {i}: {result.get('summary', result.get('error', 'Inconnu'))}")
state_manager.set(f"workflow_result_{task_id}",
{"task": initial_task, "results": workflow_results, "status": "completed"},
"orchestrator")
return workflow_results
Point d'entrée
if __name__ == "__main__":
orchestrator = MultiAgentOrchestrator()
result = orchestrator.execute_workflow(
initial_task="Les avantages de l'architecture multi-agent pour les applications IA",
task_id="task_001"
)
Monitoring et Métriques de Performance
En production, il est essentiel de surveiller les performances de votre système multi-agent. Voici un module de monitoring simple mais efficace.
# monitoring.py
import time
from dataclasses import dataclass
from typing import Dict, List
from datetime import datetime
@dataclass
class AgentMetrics:
"""Métriques de performance pour un agent"""
agent_name: str
total_tasks: int
successful_tasks: int
failed_tasks: int
average_latency_ms: float
total_tokens_used: int
total_cost_usd: float
class PerformanceMonitor:
"""Surveillance des performances du système multi-agent"""
def __init__(self):
self.agent_metrics: Dict[str, List] = {agent: [] for agent in ["research", "writer", "validator"]}
self.api_calls: List[dict] = []
self.cost_per_million = {
"deepseek-v3.2": 0.42,
"gpt-4.1": 8.0,
"claude-sonnet-4.5": 15.0,
"gemini-2.5-flash": 2.50
}
def log_api_call(self, agent: str, model: str, tokens: int, latency_ms: float):
"""Enregistre un appel API avec ses métriques"""
cost = (tokens / 1_000_000) * self.cost_per_million.get(model, 1)
self.api_calls.append({
"agent": agent,
"model": model,
"tokens": tokens,
"latency_ms": latency_ms,
"cost_usd": cost,
"timestamp": datetime.now().isoformat()
})
self.agent_metrics[agent].append({
"tokens": tokens,
"latency_ms": latency_ms,
"cost": cost,
"success": True
})
def get_agent_summary(self, agent: str) -> AgentMetrics:
"""Génère un résumé des métriques pour un agent"""
metrics = self.agent_metrics.get(agent, [])
if not metrics:
return AgentMetrics(
agent_name=agent,
total_tasks=0,
successful_tasks=0,
failed_tasks=0,
average_latency_ms=0,
total_tokens_used=0,
total_cost_usd=0
)
total_tasks = len(metrics)
successful = len([m for m in metrics if m.get("success", True)])
avg_latency = sum(m["latency_ms"] for m in metrics) / total_tasks
total_tokens = sum(m["tokens"] for m in metrics)
total_cost = sum(m["cost"] for m in metrics)
return AgentMetrics(
agent_name=agent,
total_tasks=total_tasks,
successful_tasks=successful,
failed_tasks=total_tasks - successful,
average_latency_ms=round(avg_latency, 2),
total_tokens_used=total_tokens,
total_cost_usd=round(total_cost, 4)
)
def generate_report(self) -> str:
"""Génère un rapport complet de performance"""
report = ["="*60, "📊 RAPPORT DE PERFORMANCE MULTI-AGENT", "="*60, ""]
total_cost = 0
total_tokens = 0
for agent in self.agent_metrics.keys():
metrics = self.get_agent_summary(agent)
total_cost += metrics.total_cost_usd
total_tokens += metrics.total_tokens_used
report.append(f"🤖 Agent: {metrics.agent_name}")
report.append(f" Tâches traitées: {metrics.total_tasks}")
report.append(f" Succès: {metrics.successful_tasks} | Échecs: {metrics.failed_tasks}")
report.append(f" Latence moyenne: {metrics.average_latency_ms}ms")
report.append(f" Tokens utilisés: {metrics.total_tokens_used:,}")
report.append(f" Coût total: ${metrics.total_cost_usd:.4f}")
report.append("")
report.append("-"*60)
report.append(f"💰 COÛT TOTAL SYSTÈME: ${total_cost:.4f}")
report.append(f"📈 TOTAL TOKENS: {total_tokens:,}")
report.append("="*60)
return "\n".join(report)
Démonstration
monitor = PerformanceMonitor()
monitor.log_api_call("research", "deepseek-v3.2", 1500, 45.2)
monitor.log_api_call("writer", "gpt-4.1", 3000, 120.5)
monitor.log_api_call("validator", "claude-sonnet-4.5", 800, 85.3)
print(monitor.generate_report())
Erreurs Courantes et Solutions
Erreur 1 : "401 Unauthorized" lors de l'appel API
# ❌ ERREUR
requests.exceptions.HTTPError: 401 Client Error: Unauthorized
✅ SOLUTION
Vérifiez votre clé API et le format de l'en-tête Authorization
import os
Méthode 1 : Variable d'environnement (RECOMMANDÉE)
os.environ["HOLYSHEEP_API_KEY"] = "YOUR_HOLYSHEEP_API_KEY"
api_key = os.getenv("HOLYSHEEP_API_KEY")
Méthode 2 : Chargement depuis un fichier .env
pip install python-dotenv
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv("HOLYSHEHEP_API_KEY")
Vérification du format
if not api_key or len(api_key) < 20:
raise ValueError("Clé API invalide ou manquante. "
"Obtenez votre clé sur https://www.holysheep.ai/register")
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
Erreur 2 : "TimeoutError" ou latence excessive
# ❌ ERREUR
requests.exceptions.ReadTimeout: HTTPSConnectionPool Read timed out
✅ SOLUTION
Implémenter un système de retry avec backoff exponentiel
import time
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_resilient_session():
"""Crée une session HTTP avec retry automatique"""
session = requests.Session()
# Configuration du retry
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "POST", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
def call_api_with_retry(url, headers, payload, max_retries=3):
"""Appel API avec retry automatique"""
session = create_resilient_session()
for attempt in range(max_retries):
try:
response = session.post(
url,
headers=headers,
json=payload,
timeout=(10, 30) # (connect_timeout, read_timeout)
)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
wait_time = 2 ** attempt # Backoff: 1s, 2s, 4s
print(f"⏳ Timeout, nouvelle tentative dans {wait_time}s...")
time.sleep(wait_time)
except requests.exceptions.RequestException as e:
print(f"❌ Erreur: {e}")
if attempt == max_retries - 1:
raise
return None
Erreur 3 : Incohérence d'état entre agents
# ❌ ERREUR
Les agents travaillent avec des données obsolètes ou contradictoires
✅ SOLUTION
Implémenter un mécanisme de versionnage et de verrouillage
import threading
import time
from typing import Optional
class OptimisticLockingState:
"""État partagé avec verrouillage optimiste"""
def __init__(self):
self._state = {}
self._versions = {}
self._locks = {}
self._lock = threading.RLock()
def get_with_version(self, key: str) -> tuple:
"""Récupère la valeur et sa version"""
with self._lock:
return (
self._state.get(key),
self._versions.get(key, 0)
)
def set_atomic(self, key: str, value: any, expected_version: int) -> bool:
"""
Met à jour la valeur seulement si la version correspond
Retourne True si succès, False si conflit
"""
with self._lock:
current_version = self._versions.get(key, 0)
if current_version != expected_version:
print(f"⚠️ Conflit détecté pour '{key}'!")
print(f" Version attendue: {expected_version}, Version actuelle: {current_version}")
return False
self._state[key] = value
self._versions[key] = current_version + 1
print(f"✅ Mise à jour atomique de '{key}' vers version {current_version + 1}")
return True
def update_with_retry(self, key: str, update_func, max_attempts=3):
"""Met à jour avec retry automatique en cas de conflit"""
for attempt in range(max_attempts):
current_value, version = self.get_with_version(key)
new_value = update_func(current_value)
if self.set_atomic(key, new_value, version):
return new_value
# Attendre un peu avant de réessiner
time.sleep(0.1 * (attempt + 1))
raise RuntimeError(f"Impossible de mettre à jour '{key}' après {max_attempts} tentatives")
Erreur 4 : Pile de messages pleine (Message Queue Overflow)
# ❌ ERREUR
Queue.Full: Message queue overflow, messages discarded
✅ SOLUTION
Implémenter une politique de gestion des messages avecpriorité
from collections import deque
from typing import Optional
import time
class PriorityMessageQueue:
"""File de messages avec gestion des priorités et limitation de taille"""
def __init__(self, max_size=1000, ttl_seconds=300):
self.queues = {
"high": deque(),
"normal": deque(),
"low": deque()
}
self.max_size = max_size
self.ttl_seconds = ttl_seconds
self.current_size = 0
def enqueue(self, message: any, priority: str = "normal") -> bool:
"""Ajoute un message avec contrôle de taille"""
if priority not in self.queues:
priority = "normal"
# Vérifier la taille totale
if self.current_size >= self.max_size:
# Supprimer les messages les plus anciens de basse priorité
self._evict_if_needed(priority)
message["enqueued_at"] = time.time()
message["priority"] = priority
self.queues[priority].append(message)
self.current_size += 1
return True
def _evict_if_needed(self, incoming_priority: str):
"""Supprime les anciens messages si nécessaire"""
priority_order = ["low", "normal", "high"]
for p in priority_order:
if p == incoming_priority:
continue
if self.queues[p]:
old_message = self.queues[p].popleft()
self.current_size -= 1
print(f"🗑️ Message supprimé (priorité {p}) pour faire place")
return
def dequeue(self) -> Optional[dict]:
"""Récupère le prochain message de haute priorité"""
for priority in ["high", "normal", "low"]:
if self.queues[priority]:
message = self.queues[priority].popleft()
self.current_size -= 1
# Vérifier l'expiration
age = time.time() - message.get("enqueued_at", 0)
if age > self.ttl_seconds:
print(f"⏰ Message expiré (âgé de {age:.1f}s)")
continue
return message
return None
Tableau Comparatif des Coûts HolySheep vs Concurrents
Ressources connexesArticles connexes🔥 Essayez HolySheep AIPasserelle API IA directe. Claude, GPT-5, Gemini, DeepSeek — une clé, sans VPN. |
|---|