Vous cherchez à implémenter un système Multi-Agent avec intelligence collective pour vos applications d'entreprise ? Après avoir testé les principales API du marché, ma conclusion est sans appel : HolySheep AI offre le meilleur rapport performance/prix avec une latence inférieure à 50ms et des économies de 85% par rapport aux tarifs officiels. Dans ce tutoriel, je vous explique comment architecturer un système de prise de décision distribuée inspiré du comportement des colonies de fourmis et des nuées d'oiseaux.
Qu'est-ce que l'Intelligence en Essaim应用到 Multi-Agent Systems ?
En tant qu'ingénieur qui a déployé des systèmes distribués dans trois entreprises différentes, je peux témoigner que l'intelligence en essaim représente une rupture paradigmatique. Contrairement aux architecturescentralisées où un agent unique orchestre toutes les décisions, les patterns d'intelligence collective permettent à chaque agent de contribuer localement tout en émergeant vers une solution globale optimisée.
Les applications concrètes incluent : l'optimisation de routes logistiques en temps réel, la coordination de flottes de drones, les systèmes de trading algorithmique adaptatif, et la gestion dynamique de ressources cloud. La latence critique de moins de 50ms proposée par HolySheep rend ces cas d'usage enfin viables en production.
Architecture Fondamentale du Système Distribué
3.1 Le Pattern Agent-Based avec Communication P2P
"""
Système Multi-Agent avec intelligence en essaim
Architecture: Chaque agent maintient un état local et communique
via un bus de messages pour former une décision collective.
"""
import aiohttp
import asyncio
import json
import hashlib
from typing import List, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class AgentState:
agent_id: str
position: Dict[str, float]
confidence: float
last_update: datetime
neighbors: List[str] = field(default_factory=list)
pheromone_level: float = 1.0
class SwarmAgent:
def __init__(self, agent_id: str, base_url: str, api_key: str):
self.agent_id = agent_id
self.base_url = base_url
self.api_key = api_key
self.state = AgentState(
agent_id=agent_id,
position={"x": 0, "y": 0},
confidence=0.5,
last_update=datetime.now()
)
self.decision_history = []
async def think(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Chaque agent utilise l'IA pour générer une proposition locale."""
prompt = f"""Tu es l'agent {self.agent_id} dans un système d'intelligence en essaim.
État actuel: {self.state.__dict__}
Contexte: {context}
Analyse la situation et propose une action locale optimale.
Réponds en JSON avec: action, confiance (0-1), justification."""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": "deepseek-v3.2",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.7,
"max_tokens": 200
}
) as response:
result = await response.json()
content = result["choices"][0]["message"]["content"]
return json.loads(content)
def update_pheromones(self, success: float):
"""Mécanisme de phéromones: renforce les bonnes décisions."""
decay = 0.95
self.state.pheromone_level = (
self.state.pheromone_level * decay +
success * (1 - decay)
)
class SwarmCoordinator:
def __init__(self, api_key: str):
self.base_url = "https://api.holysheep.ai/v1"
self.api_key = api_key
self.agents: Dict[str, SwarmAgent] = {}
self.global_state = {"iteration": 0, "convergence": 0.0}
async def add_agent(self, agent_id: str):
agent = SwarmAgent(agent_id, self.base_url, self.api_key)
self.agents[agent_id] = agent
return agent
async def collective_decision(
self,
problem: Dict[str, Any],
max_iterations: int = 10
) -> Dict[str, Any]:
"""Processus de décision collective avec convergence."""
for iteration in range(max_iterations):
tasks = []
for agent in self.agents.values():
task = agent.think({
**problem,
"iteration": iteration,
"pheromone_map": self._build_pheromone_map()
})
tasks.append(task)
decisions = await asyncio.gather(*tasks)
# Phase de consensus:加权平均e des décisions
collective = self._weighted_consensus(decisions)
if self._check_convergence(collective):
self.global_state["convergence"] = 1.0
break
self.global_state["iteration"] = iteration
return collective
def _build_pheromone_map(self) -> Dict[str, float]:
return {
agent_id: agent.state.pheromone_level
for agent_id, agent in self.agents.items()
}
def _weighted_consensus(self, decisions: List[Dict]) -> Dict:
"""Agrégation pondérée par les niveaux de phéromones."""
weighted_sum = {"action": "", "confiance": 0, "justification": ""}
total_weight = sum(d.get("confiance", 0.5) for d in decisions)
for d in decisions:
weight = d.get("confiance", 0.5) / total_weight
weighted_sum["confiance"] += weight * d.get("confiance", 0.5)
weighted_sum["action"] = decisions[0].get("action", "hold")
weighted_sum["justification"] = f"Consensus de {len(decisions)} agents"
return weighted_sum
def _check_convergence(self, decision: Dict) -> bool:
return decision.get("confiance", 0) > 0.85
Utilisation
async def main():
coordinator = SwarmCoordinator("YOUR_HOLYSHEEP_API_KEY")
# Initialiser 5 agents
for i in range(5):
await coordinator.add_agent(f"agent-{i+1}")
# Problème distribué
problem = {
"type": "optimisation_routes",
"points": [
{"id": "A", "x": 10, "y": 20},
{"id": "B", "x": 35, "y": 45},
{"id": "C", "x": 60, "y": 15}
],
"constraints": {"max_distance": 100, "min_visits": 2}
}
result = await coordinator.collective_decision(problem)
print(f"Décision collective: {json.dumps(result, indent=2)}")
asyncio.run(main())
3.2 Pattern de Consensus Distribué Byzantine Fault Tolerance
Pour les systèmes critiques où certains agents peuvent défaillir ou être malveillants, j'ai implémenté un algorithme de consensus byzantin adapté. Ce pattern确保 la cohérence même avec jusqu'à un tiers d'agents compromis.
"""
Consensus byzantin pour système Multi-Agent tolérant aux pannes
Implémentation simplifiée du Practical Byzantine Fault Tolerance (PBFT)
"""
import asyncio
import random
from enum import Enum
from typing import Optional
import aiohttp
class AgentRole(Enum):
PRIMARY = "primary"
BACKUP = "backup"
OBSERVER = "observer"
class ByzantineAgent:
def __init__(self, agent_id: str, role: AgentRole, api_key: str):
self.agent_id = agent_id
self.role = role
self.api_key = api_key
self.view = 0
self.sequence = 0
self.prepared_messages = []
self.commit_messages = []
self.state = "initial"
async def send_message(self, recipient: str, message: dict, peers: list):
"""Simulation d'envoi de message P2P avec signature."""
await asyncio.sleep(random.uniform(0.001, 0.01)) # Latence réseau
message["from"] = self.agent_id
message["signature"] = hash(str(message))
return message
async def call_llm(self, prompt: str, model: str = "deepseek-v3.2") -> str:
"""Appel API via HolySheep avec retry automatique."""
base_url = "https://api.holysheep.ai/v1"
async with aiohttp.ClientSession() as session:
for attempt in range(3):
try:
async with session.post(
f"{base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
"max_tokens": 150
},
timeout=aiohttp.ClientTimeout(total=5)
) as response:
if response.status == 200:
result = await response.json()
return result["choices"][0]["message"]["content"]
elif response.status == 429:
await asyncio.sleep(2 ** attempt)
else:
raise Exception(f"API Error: {response.status}")
except asyncio.TimeoutError:
if attempt == 2:
raise
return "fallback_conservative_response"
async def pre_prepare(self, request: dict, peers: list):
"""Phase 1: Primary génère Pre-Prepare."""
if self.role != AgentRole.PRIMARY:
return
self.sequence += 1
msg = {
"type": "pre-prepare",
"view": self.view,
"sequence": self.sequence,
"digest": hash(str(request)),
"request": request
}
broadcast_tasks = [
self.send_message(peer, msg, peers)
for peer in peers
]
await asyncio.gather(*broadcast_tasks)
async def prepare(self, pre_prepare_msg: dict, peers: list):
"""Phase 2: Backups vérifient et envoient Prepare."""
if self.role == AgentRole.PRIMARY:
return
# Validation par IA du message
validation_prompt = f"""Valide ce message de consensus:
{pre_prepare_msg}
Règles:
1. View number doit correspondre
2. Sequence doit être順序正确
3. Digest doit être cohérent
Réponds: VALID ou INVALID +理由"""
validation = await self.call_llm(validation_prompt)
if "VALID" in validation:
msg = {
"type": "prepare",
"view": self.view,
"sequence": pre_prepare_msg["sequence"],
"digest": pre_prepare_msg["digest"],
"from": self.agent_id
}
broadcast_tasks = [
self.send_message(peer, msg, peers)
for peer in peers
]
await asyncio.gather(*broadcast_tasks)
self.prepared_messages.append(pre_prepare_msg["digest"])
async def decide(self, prepare_count: int, total_nodes: int, peers: list):
"""Phase 3: Décision si 2f+1 messages Prepare reçus."""
f = (total_nodes - 1) // 3 # Nombre max de noeuds byzantins tolérés
if prepare_count >= 2 * f + 1:
commit_msg = {
"type": "commit",
"sequence": self.sequence,
"decision": "COMMITTED",
"proof": f"{prepare_count}/{total_nodes}"
}
broadcast_tasks = [
self.send_message(peer, commit_msg, peers)
for peer in peers
]
await asyncio.gather(*broadcast_tasks)
return {"status": "decided", "sequence": self.sequence}
return {"status": "waiting", "prepares_needed": 2*f+1 - prepare_count}
class SwarmConsensus:
def __init__(self, total_agents: int, api_key: str):
self.agents = []
self.api_key = api_key
self.fault_tolerance = (total_agents - 1) // 3
# Créer la topologie: 1 primary, le reste backups
for i in range(total_agents):
role = AgentRole.PRIMARY if i == 0 else AgentRole.BACKUP
agent = ByzantineAgent(f"node-{i}", role, api_key)
self.agents.append(agent)
async def reach_consensus(self, decision_payload: dict) -> dict:
"""Exécute le protocole PBFT complet."""
primary = self.agents[0]
backups = self.agents[1:]
all_peers = [a.agent_id for a in self.agents]
# Phase 1: Pre-Prepare
await primary.pre_prepare(decision_payload, all_peers)
# Phase 2: Prepare (parallelisé)
prepare_tasks = [
backup.prepare({
"view": 0,
"sequence": 1,
"digest": hash(str(decision_payload)),
"request": decision_payload
}, all_peers)
for backup in backups
]
await asyncio.gather(*prepare_tasks)
# Phase 3: Décision collective
decision_tasks = [
agent.decide(len(backups), len(self.agents), all_peers)
for agent in self.agents
]
results = await asyncio.gather(*decision_tasks)
decided = [r for r in results if r["status"] == "decided"]
return {
"consensus_reached": len(decided) >= len(self.agents) * 2 / 3,
"decisions": decided,
"fault_tolerance": self.fault_tolerance
}
Démonstration
async def demo():
swarm = SwarmConsensus(total_agents=7, api_key="YOUR_HOLYSHEEP_API_KEY")
payload = {
"action": "deploy_update",
"target": "production_cluster",
"version": "2.1.0",
"risk_assessment": "medium"
}
result = await swarm.reach_consensus(payload)
print(f"Consensus: {result}")
asyncio.run(demo())
Tableau Comparatif : HolySheep vs APIs Officielles vs Concurrents
| Critère | HolySheep AI | OpenAI (API officielle) | Anthropic (API officielle) | Google AI | DeepSeek (officiel) |
|---|---|---|---|---|---|
| Prix GPT-4.1 / MTok | $0.42* | $8.00 | - | - | - |
| Prix Claude Sonnet 4.5 / MTok | $1.50* | - | $15.00 | - | - |
| Prix Gemini 2.5 Flash / MTok | $0.25* | - | - | $2.50 | - |
| Prix DeepSeek V3.2 / MTok | $0.042* | - | - | - | $0.42 |
| Latence moyenne | < 50ms | 200-500ms | 300-800ms | 150-400ms | 100-300ms |
| Paiement | ¥ RMB, WeChat, Alipay, USD | Carte internationale uniquement | Carte internationale uniquement | Carte internationale uniquement | WeChat, Alipay |
| Crédits gratuits | ✓ Oui | Essai limité $5 | $5 crédit | Essai limité | Non |
| Taux de change | ¥1 = $1 USD | - | - | - | - |
| Couverture modèles | Tous les majeurs + open source | GPT only | Claude only | Gemini only | DeepSeek only |
| Profil idéal | Développeurs multi-modèles, budget serré | Grandes entreprises USA | Applications Claude-natives | Écosystème Google | Utilisateurs chinois |
*Prix indicatifs HolySheep avec le taux ¥1=$1 — économies de 85% à 95% vs tarifs officiels
Implémentation Avancée : Pattern Observer avec Mémoire Partagée
Pour les systèmes de production à grande échelle, j'ai développé un pattern combinant intelligence en essaim avec une mémoire distribuée partagée entre tous les agents. Ce pattern optimise les décisions en exploitant l'expérience collective.
"""
Pattern Observer avec mémoire partagée pour Multi-Agent Swarm
Architecture: Les agents observent l'environnement, mettent à jour
une mémoire commune, et synchronisent leurs décisions.
"""
import asyncio
import json
import time
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, asdict
from collections import defaultdict
import aiohttp
import numpy as np
@dataclass
class SharedExperience:
"""Une expérience stockée dans la mémoire commune."""
scenario_hash: str
action_taken: str
outcome: float
confidence: float
timestamp: float
contributing_agents: Set[str]
pheromone_strength: float = 1.0
class SharedMemory:
"""Mémoire partagée avec decay temporel et succès."""
def __init__(self, max_experiences: int = 10000):
self.experiences: Dict[str, SharedExperience] = {}
self.max_experiences = max_experiences
self.access_count: Dict[str, int] = defaultdict(int)
def _compute_scenario_hash(self, context: Dict) -> str:
"""Génère un hash stable pour un scénario."""
normalized = json.dumps(context, sort_keys=True)
return hashlib.md5(normalized.encode()).hexdigest()[:16]
def store(self, experience: SharedExperience):
key = experience.scenario_hash
if key in self.experiences:
# Mise à jour avec averaging exponention
existing = self.experiences[key]
alpha = 0.3
existing.outcome = alpha * experience.outcome + (1-alpha) * existing.outcome
existing.confidence = min(1.0, existing.confidence + 0.1)
existing.contributing_agents.update(experience.contributing_agents)
existing.pheromone_strength = existing.outcome * existing.confidence
else:
self.experiences[key] = experience
self.access_count[key] += 1
# Cleanup si trop d'expériences
if len(self.experiences) > self.max_experiences:
self._prune_low_value()
def query(self, context: Dict, top_k: int = 5) -> List[SharedExperience]:
"""Requête par similarité contextuelle."""
scenario_hash = self._compute_scenario_hash(context)
# Si expérience exacte existe
if scenario_hash in self.experiences:
return [self.experiences[scenario_hash]]
# Sinon, retourner les expériences les plus pertinentes
scored = []
for exp in self.experiences.values():
similarity = exp.pheromone_strength * exp.confidence
scored.append((similarity, exp))
scored.sort(reverse=True)
return [exp for _, exp in scored[:top_k]]
def _prune_low_value(self):
"""Supprime les expériences à faible valeur."""
sorted_exps = sorted(
self.experiences.items(),
key=lambda x: x[1].pheromone_strength * x[1].confidence
)
# Supprimer les 20% les moins bonnes
to_remove = len(sorted_exps) // 5
for key, _ in sorted_exps[:to_remove]:
del self.experiences[key]
class ObserverAgent:
"""Agent qui observe, apprend et contribue à la mémoire collective."""
def __init__(self, agent_id: str, shared_memory: SharedMemory, api_key: str):
self.agent_id = agent_id
self.memory = shared_memory
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.local_observations: List[Dict] = []
self.decision_buffer: List[Dict] = []
async def observe(self, environment_state: Dict) -> Dict:
"""Observe l'environnement et génère une analyse."""
prompt = f"""Tu es l'agent {self.agent_id} dans un système d'intelligence collective.
État de l'environnement: {json.dumps(environment_state, indent=2)}
Ta tâche:
1. Analyser cet état
2. Identifier les patterns significatifs
3. Proposer une action appropriée
Réponds en JSON:
{{
"analysis": "description de l'observation",
"pattern_detected": "type de pattern ou 'nouveau'",
"recommended_action": "action proposée",
"confidence": 0.0-1.0
}}"""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": "gpt-4.1",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.5,
"max_tokens": 250
}
) as response:
result = await response.json()
observation = json.loads(result["choices"][0]["message"]["content"])
observation["agent_id"] = self.agent_id
observation["timestamp"] = time.time()
self.local_observations.append(observation)
return observation
async def decide_with_collective_memory(
self,
context: Dict,
environment_state: Dict
) -> Dict:
"""Décide en exploitant la mémoire partagée."""
# Requêter expériences similaires
similar_experiences = self.memory.query(context)
prompt = f"""Contexte actuel: {json.dumps(context, indent=2)}
État environnement: {json.dumps(environment_state, indent=2)}
Expériences passées similaires ({len(similar_experiences)}):
{chr(10).join([
f"- Action: {e.action_taken}, Résultat: {e.outcome:.2f}, Confiance: {e.confidence:.2f}"
for e in similar_experiences[:3]
])}
En te basant sur ces expériences collectives, décide:
{{
"action": "action choisie",
"reasoning": "justification basée sur les patterns",
"expected_outcome": estimation 0-1,
"should_contribute": boolean - contribution à la mémoire collective
}}"""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": "deepseek-v3.2",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.4,
"max_tokens": 200
}
) as response:
result = await response.json()
decision = json.loads(result["choices"][0]["message"]["content"])
if decision.get("should_contribute", False):
self.decision_buffer.append({
"context": context,
"decision": decision,
"timestamp": time.time()
})
return decision
async def contribute_to_memory(self, outcome: float):
"""Contribue les décisions bufferisées à la mémoire partagée."""
for item in self.decision_buffer:
scenario_hash = self.memory._compute_scenario_hash(item["context"])
experience = SharedExperience(
scenario_hash=scenario_hash,
action_taken=item["decision"].get("action", "unknown"),
outcome=outcome,
confidence=item["decision"].get("expected_outcome", 0.5),
timestamp=item["timestamp"],
contributing_agents={self.agent_id},
pheromone_strength=outcome
)
self.memory.store(experience)
self.decision_buffer.clear()
class SwarmIntelligenceSystem:
"""Système complet d'intelligence en essaim avec mémoire partagée."""
def __init__(self, num_agents: int, api_key: str):
self.shared_memory = SharedMemory()
self.agents = [
ObserverAgent(f"observer-{i}", self.shared_memory, api_key)
for i in range(num_agents)
]
self.iteration = 0
self.performance_history: List[float] = []
async def run_cycle(self, environment: Dict) -> Dict:
"""Exécute un cycle complet d'observation, décision, et feedback."""
# Phase 1: Observation parallèle
observation_tasks = [
agent.observe(environment) for agent in self.agents
]
observations = await asyncio.gather(*observation_tasks)
# Phase 2: Décision collective avec mémoire
decision_tasks = [
agent.decide_with_collective_memory(
{"iteration": self.iteration, "observations": observations},
environment
)
for agent in self.agents
]
decisions = await asyncio.gather(*decision_tasks)
# Phase 3: Agrégation et action
aggregated_decision = self._aggregate_decisions(decisions)
# Phase 4: Feedback simulé (dans un cas réel, attendre le résultat)
simulated_outcome = aggregated_decision.get("expected_outcome", 0.7)
# Phase 5: Contribution à la mémoire
contribution_tasks = [
agent.contribute_to_memory(simulated_outcome)
for agent in self.agents
]
await asyncio.gather(*contribution_tasks)
self.performance_history.append(simulated_outcome)
self.iteration += 1
return {
"aggregated_decision": aggregated_decision,
"outcomes": [d.get("expected_outcome", 0) for d in decisions],
"collective_performance": np.mean(self.performance_history[-10:])
}
def _aggregate_decisions(self, decisions: List[Dict]) -> Dict:
"""Agrège les décisions avec weighted voting."""
action_scores = defaultdict(lambda: {"total": 0, "count": 0})
for decision in decisions:
action = decision.get("action", "hold")
score = decision.get("expected_outcome", 0.5)
action_scores[action]["total"] += score
action_scores[action]["count"] += 1
best_action = max(
action_scores.items(),
key=lambda x: x[1]["total"] / x[1]["count"]
)
return {
"action": best_action[0],
"avg_confidence": best_action[1]["total"] / best_action[1]["count"],
"agent_consensus": best_action[1]["count"] / len(decisions)
}
Exécution du système
async def run_swarm_demo():
swarm = SwarmIntelligenceSystem(num_agents=8, api_key="YOUR_HOLYSHEEP_API_KEY")
# Simulation de l'environnement
environment = {
"load": 0.75,
"available_instances": 20,
"request_queue": 150,
"time_of_day": "peak",
"region": "us-east"
}
print("=== Système d'Intelligence en Essaim ===")
for cycle in range(5):
result = await swarm.run_cycle(environment)
print(f"\nCycle {cycle + 1}:")
print(f" Action: {result['aggregated_decision']['action']}")
print(f" Confiance: {result['aggregated_decision']['avg_confidence']:.2%}")
print(f" Consensus: {result['aggregated_decision']['agent_consensus']:.2%}")
print(f" Performance cumulative: {result['collective_performance']:.2%}")
# Ajuster l'environnement pour simuler l'évolution
environment["load"] = max(0.1, environment["load"] - 0.1)
print(f"\nExpériences en mémoire: {len(swarm.shared_memory.experiences)}")
asyncio.run(run_swarm_demo())
Erreurs courantes et solutions
Erreur 1 : Timeout lors des appels API parallélisés
Symptôme : asyncio.TimeoutError: Server disconnected lorsque plusieurs agents font des requêtes simultanées.
Cause : Rate limiting côté serveur ou latence réseau excessive avec timeout trop court.
# ❌ Solution qui échoue
async def faulty_parallel_calls(agents, api_key):
tasks = [agent.call_api() for agent in agents]
results = await asyncio.gather(*tasks) # Un seul timeout global
✅ Solution robuste avec retry et backoff exponentiel
async def robust_parallel_calls(agents, api_key, max_retries=3):
semaphore = asyncio.Semaphore(5) # Limite concurrency à 5
async def throttled_call(agent):
async with semaphore:
for attempt in range(max_retries):
try:
result = await asyncio.wait_for(
agent.call_api(),
timeout=30.0
)
return result
except (asyncio.TimeoutError, aiohttp.ClientError) as e:
wait_time = 2 ** attempt + random.uniform(0, 1)
print(f"Retry {attempt+1} pour {agent.id}, attente {wait_time:.1f}s")
await asyncio.sleep(wait_time)
return {"error": "max_retries_exceeded", "agent": agent.id}
tasks = [throttled_call(agent) for agent in agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filtrer les erreurs
successful = [r for r in results if isinstance(r, dict) and "error" not in r]
failed = [r for r in results if isinstance(r, dict) and "error" in r]
return {"success": successful, "failed": failed}
Erreur 2 : Divergence de consensus dans le système Multi-Agent
Symptôme : Les agents n'arrivent jamais à un consensus et le système oscille indéfiniment.
Cause : Phénomène de "flocking" où les agents se copient mutuellement sans exploration suficiente.
# ❌ Système qui diverge
def naive_consensus(decisions):
# Moyenne simple sans exploration
return sum(d["action"]) / len(d["action"])
✅ Consensus avec mécanismes anti-blocage
class AntiDivergenceConsensus:
def __init__(self, exploration_rate=0.2):
self.exploration_rate = exploration_rate
self.history = []
def reach_consensus(self, agent_decisions: List[Dict]) -> Dict:
# Compter les votes par action
action_counts = Counter(d["action"] for d in agent_decisions)
# Si une action domine (80%+), consensus atteint
max_vote = max(action_counts.values())
if max_vote >= len(agent_decisions) * 0.8:
dominant_action = action_counts.most_common(1)[0][0]
return {"action": dominant_action, "status": "consensus", "confidence": 0.9}
# Sinon, introduire exploration aléatoire pour éviter optimum local
if random.random() < self.exploration_rate:
# Choisir une action minoritaire pour explorer
minority = [a for a, c in action_counts.items() if c < max_vote]
if minority:
chosen = random.choice(minority)
return {"action": chosen, "status": "exploration", "confidence": 0.