Introduction

En tant qu'ingénieur ayant déployé plus de 40 bots Discord en production, je peux affirmer que l'intégration d'IA conversationnelle représente le changement de paradigme le plus significatif depuis l'avènement des webhooks. Aujourd'hui, nous allons construire un système robuste capable de gérer des conversations multi-tours avec des appels d'outils complexes, le tout via l'API HolySheep AI qui offre une latence moyenne de 47ms et des coûts réduits de 85% par rapport aux fournisseurs traditionnels.

Notre architecture visera un吞吐量 de 500 messages/seconde avec un temps de réponse moyen inférieur à 200ms, tout en maintenant une cohérence conversationnelle parfaite sur des sessions de 50+ tours.

Architecture Système

Vue d'ensemble

Le système repose sur trois piliers fondamentaux :

Diagramme de flux

+---------+     +----------------+     +---------------+
| Discord | --> | Message Handler | --> | Rate Limiter  |
+---------+     +----------------+     +-------+-------+
                                              |
                    +-------------------------+-------------------------+
                    |                         |                         |
                    v                         v                         v
            +---------------+         +---------------+         +---------------+
            | Session Check |         | Context Build |         | Tool Executor |
            +-------+-------+         +-------+-------+         +-------+-------+
                    |                         |                         |
                    +------------+------------+                         |
                                 |                                      |
                                 v                                      v
                        +----------------+                      +---------------+
                        | HolySheep API  |                      | Result Cache  |
                        |  (base_url)    |                      +---------------+
                        +----------------+
                                 |
                                 v
                        +---------------+
                        | Response Form |
                        +---------------+

Implémentation Complète

Configuration et Dépendances

# requirements.txt
discord.py==2.4.0
redis==5.0.0
httpx==0.27.0
pydantic==2.6.0
orjson==3.9.0
tenacity==8.2.3
pytest==8.0.0
pytest-asyncio==0.23.0

Installation

pip install -r requirements.txt

Client API HolySheep avec Pool de Connexions

# holy_sheep_client.py
import httpx
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
from typing import Optional, List, Dict, Any
import orjson
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class HolySheepAIClient:
    """Client production-ready pour HolySheep AI avec pool de connexions."""
    
    def __init__(
        self,
        api_key: str,
        base_url: str = "https://api.holysheep.ai/v1",
        max_connections: int = 10,
        max_keepalive: int = 120
    ):
        self.api_key = api_key
        self.base_url = base_url
        self._semaphore = asyncio.Semaphore(max_connections)
        
        # Pool HTTP persistant
        limits = httpx.Limits(
            max_connections=max_connections,
            max_keepalive_connections=max_connections // 2
        )
        self._transport = httpx.AsyncHTTPTransport(retries=3)
        self._client = httpx.AsyncClient(
            limits=limits,
            transport=self._transport,
            timeout=httpx.Timeout(30.0, connect=5.0)
        )
        
        # Métriques
        self._metrics = {"requests": 0, "errors": 0, "total_latency": 0.0}
    
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10)
    )
    async def chat_completion(
        self,
        messages: List[Dict[str, str]],
        model: str = "deepseek-v3.2",
        tools: Optional[List[Dict]] = None,
        temperature: float = 0.7,
        max_tokens: int = 2048
    ) -> Dict[str, Any]:
        """Appel principal avec métriques et retry automatique."""
        
        async with self._semaphore:
            start_time = time.perf_counter()
            self._metrics["requests"] += 1
            
            try:
                payload = {
                    "model": model,
                    "messages": messages,
                    "temperature": temperature,
                    "max_tokens": max_tokens
                }
                
                if tools:
                    payload["tools"] = tools
                
                headers = {
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json"
                }
                
                response = await self._client.post(
                    f"{self.base_url}/chat/completions",
                    json=payload,
                    headers=headers
                )
                
                response.raise_for_status()
                result = response.json()
                
                latency = (time.perf_counter() - start_time) * 1000
                self._metrics["total_latency"] += latency
                
                logger.info(
                    f"Request completed | Model: {model} | "
                    f"Latency: {latency:.2f}ms | "
                    f"Tool calls: {len(result.get('choices', [{}])[0].get('message', {}).get('tool_calls', []))}"
                )
                
                return result
                
            except httpx.HTTPStatusError as e:
                self._metrics["errors"] += 1
                logger.error(f"HTTP Error {e.response.status_code}: {e.response.text}")
                raise
            except Exception as e:
                self._metrics["errors"] += 1
                logger.error(f"Unexpected error: {str(e)}")
                raise
    
    def get_metrics(self) -> Dict[str, Any]:
        """Retourne les métriques de performance."""
        avg_latency = (
            self._metrics["total_latency"] / self._metrics["requests"]
            if self._metrics["requests"] > 0 else 0
        )
        error_rate = (
            self._metrics["errors"] / self._metrics["requests"]
            if self._metrics["requests"] > 0 else 0
        )
        return {
            "total_requests": self._metrics["requests"],
            "total_errors": self._metrics["errors"],
            "error_rate": f"{error_rate * 100:.2f}%",
            "average_latency_ms": f"{avg_latency:.2f}ms"
        }
    
    async def close(self):
        """Fermeture propre du client."""
        await self._client.aclose()


Exemple d'utilisation

async def main(): client = HolySheepAIClient(api_key="YOUR_HOLYSHEEP_API_KEY") messages = [ {"role": "system", "content": "Tu es un assistant Discord helpful."}, {"role": "user", "content": "Explain tool calling in 50 words."} ] result = await client.chat_completion(messages, model="deepseek-v3.2") print(f"Response: {result['choices'][0]['message']['content']}") print(f"Metrics: {client.get_metrics()}") await client.close() if __name__ == "__main__": asyncio.run(main())

Gestionnaire de Sessions Multi-Turn avec Redis

# session_manager.py
import redis.asyncio as redis
import json
import time
from typing import Optional, List, Dict
from dataclasses import dataclass, asdict

@dataclass
class ConversationMessage:
    role: str
    content: str
    timestamp: float = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = time.time()
    
    def to_dict(self) -> Dict:
        return asdict(self)

@dataclass
class Session:
    user_id: str
    guild_id: str
    channel_id: str
    messages: List[ConversationMessage]
    created_at: float
    last_activity: float
    metadata: Dict
    
    @classmethod
    def new(cls, user_id: str, guild_id: str, channel_id: str) -> "Session":
        now = time.time()
        return cls(
            user_id=user_id,
            guild_id=guild_id,
            channel_id=channel_id,
            messages=[],
            created_at=now,
            last_activity=now,
            metadata={}
        )

class SessionManager:
    """Gestionnaire de sessions avec TTL intelligent et compression."""
    
    # TTL dynamique selon l'activité
    TTL_SHORT = 300    # 5 minutes
    TTL_MEDIUM = 1800  # 30 minutes
    TTL_LONG = 3600    # 1 heure
    
    # Limites
    MAX_MESSAGES = 50
    MAX_CONTEXT_TOKENS = 4096
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url, decode_responses=True)
    
    def _session_key(self, user_id: str, channel_id: str) -> str:
        return f"discord:session:{channel_id}:{user_id}"
    
    async def get_or_create_session(
        self,
        user_id: str,
        guild_id: str,
        channel_id: str
    ) -> Session:
        """Récupère ou crée une session avec TTL adapté."""
        key = self._session_key(user_id, channel_id)
        
        data = await self.redis.get(key)
        if data:
            parsed = json.loads(data)
            messages = [
                ConversationMessage(**msg) for msg in parsed["messages"]
            ]
            session = Session(
                user_id=parsed["user_id"],
                guild_id=parsed["guild_id"],
                channel_id=parsed["channel_id"],
                messages=messages,
                created_at=parsed["created_at"],
                last_activity=parsed["last_activity"],
                metadata=parsed.get("metadata", {})
            )
        else:
            session = Session.new(user_id, guild_id, channel_id)
        
        # TTL dynamique selon le nombre de messages
        ttl = self._calculate_ttl(session)
        session.last_activity = time.time()
        
        await self._save_session(session, ttl)
        return session
    
    def _calculate_ttl(self, session: Session) -> int:
        """TTL adaptatif selon le contexte."""
        time_since_activity = time.time() - session.last_activity
        message_count = len(session.messages)
        
        if message_count > 30:
            return self.TTL_LONG
        elif message_count > 10 or time_since_activity < 60:
            return self.TTL_MEDIUM
        return self.TTL_SHORT
    
    async def add_message(
        self,
        user_id: str,
        channel_id: str,
        role: str,
        content: str
    ) -> Session:
        """Ajoute un message et applique la troncature si nécessaire."""
        session = await self.get_or_create_session(
            user_id, session.guild_id if hasattr(session, 'guild_id') else "", channel_id
        )
        
        message = ConversationMessage(role=role, content=content)
        session.messages.append(message)
        
        # Troncature si dépassement
        if len(session.messages) > self.MAX_MESSAGES:
            session.messages = session.messages[-self.MAX_MESSAGES:]
        
        ttl = self._calculate_ttl(session)
        await self._save_session(session, ttl)
        return session
    
    async def _save_session(self, session: Session, ttl: int):
        """Sauvegarde avec sérialisation optimisée."""
        key = self._session_key(session.user_id, session.channel_id)
        
        data = {
            "user_id": session.user_id,
            "guild_id": session.guild_id,
            "channel_id": session.channel_id,
            "messages": [msg.to_dict() for msg in session.messages],
            "created_at": session.created_at,
            "last_activity": session.last_activity,
            "metadata": session.metadata
        }
        
        await self.redis.setex(key, ttl, json.dumps(data, default=str))
    
    def get_conversation_for_api(
        self,
        session: Session,
        system_prompt: str = ""
    ) -> List[Dict[str, str]]:
        """Construit le contexte pour l'API avec système prompt."""
        messages = []
        
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        
        for msg in session.messages:
            messages.append({
                "role": msg.role,
                "content": msg.content
            })
        
        return messages
    
    async def close(self):
        """Fermeture de la connexion Redis."""
        await self.redis.close()

Bot Discord Principal avec Outils

# discord_bot.py
import discord
from discord.ext import commands
import asyncio
from typing import Optional
import logging
from holy_sheep_client import HolySheepAIClient
from session_manager import SessionManager

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

Définition des outils disponibles

AVAILABLE_TOOLS = [ { "type": "function", "function": { "name": "search_wikipedia", "description": "Recherche des informations sur Wikipedia", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "La requête de recherche" } }, "required": ["query"] } } }, { "type": "function", "function": { "name": "get_weather", "description": "Obtient la météo d'une ville", "parameters": { "type": "object", "properties": { "city": { "type": "string", "description": "Nom de la ville" }, "unit": { "type": "string", "enum": ["celsius", "fahrenheit"], "default": "celsius" } }, "required": ["city"] } } }, { "type": "function", "function": { "name": "calculate", "description": "Effectue un calcul mathématique", "parameters": { "type": "object", "properties": { "expression": { "type": "string", "description": "Expression mathématique (ex: 2+2, sqrt(16))" } }, "required": ["expression"] } } } ]

Implémentation des outils

async def execute_tool(tool_name: str, arguments: dict) -> str: """Exécute l'outil demandé et retourne le résultat.""" if tool_name == "search_wikipedia": # Simulation - en production, appels réels return f"Résultat Wikipedia pour '{arguments['query']}': information trouvée." elif tool_name == "get_weather": return f"Météo à {arguments['city']}: 22°C, ensoleillé." elif tool_name == "calculate": try: result = eval(arguments['expression']) # En production, utiliser eval sécurisé return f"Résultat: {result}" except: return "Erreur de calcul" return "Outil non reconnu" class DiscordAIBot(commands.Bot): """Bot Discord avec IA multi-tour et appels d'outils.""" def __init__( self, api_key: str, command_prefix: str = "!", max_conversation_turns: int = 50 ): intents = discord.Intents.default() intents.message_content = True super().__init__(command_prefix=command_prefix, intents=intents) self.ai_client = HolySheepAIClient(api_key=api_key) self.session_manager = SessionManager() self.max_turns = max_conversation_turns self._cooldown = commands.CooldownMapping.from_cooldown( 5, 30, commands.BucketType.user ) self.system_prompt = """Tu es un assistant IA intégré dans Discord. Tu peux utiliser des outils pour répondre précisément aux questions. Sois concis, utile et amical. Réponds en français.""" async def setup_hook(self): """Initialisation au démarrage.""" logger.info("Initialisation du bot HolySheep AI...") await self.tree.sync() logger.info("Commandes synchronisées") async def on_message(self, message: discord.Message): """Handler principal des messages.""" # Ignorer les messages des bots if message.author.bot: return # Vérifier le cooldown bucket = self._cooldown.get_bucket(message) retry_after = bucket.update_rate_limit() if retry_after: await message.reply( f"Patientez {retry_after:.1f}s avant de réutiliser le bot." ) return # Traitement des commandes await self.process_commands(message) # Traitement IA si mention ou DM if self.user.mentioned_in(message) or isinstance( message.channel, discord.DMChannel ): await self.handle_ai_message(message) async def handle_ai_message(self, message: discord.Message): """Traitement principal avec gestion des outils.""" await message.channel.typing() user_id = str(message.author.id) channel_id = str(message.channel.id) guild_id = str(message.guild.id) if message.guild else "DM" try: # Récupérer ou créer la session session = await self.session_manager.get_or_create_session( user_id, guild_id, channel_id ) # Ajouter le message utilisateur user_content = message.content.replace( f'<@{self.user.id}>', '' ).strip() session.messages.append( type('Session', (), { 'role': 'user', 'content': user_content, 'timestamp': __import__('time').time() })() ) # Construire le contexte messages = self.session_manager.get_conversation_for_api( session, self.system_prompt ) # Appel API avec outils response = await self.ai_client.chat_completion( messages=messages, model="deepseek-v3.2", tools=AVAILABLE_TOOLS, temperature=0.7 ) assistant_message = response['choices'][0]['message'] content = assistant_message.get('content', '') tool_calls = assistant_message.get('tool_calls', []) # Exécuter les outils si présents if tool_calls: tool_results = [] for call in tool_calls: func = call['function'] args = json.loads(func['arguments']) result = await execute_tool(func['name'], args) tool_results.append({ "role": "tool", "tool_call_id": call['id'], "name": func['name'], "content": result }) # Ajouter les résultats à la conversation messages.append({ "role": "assistant", "content": content }) messages.extend(tool_results) # Nouvelle appel avec résultats des outils response = await self.ai_client.chat_completion( messages=messages, model="deepseek-v3.2", temperature=0.7 ) content = response['choices'][0]['message']['content'] # Sauvegarder dans l'historique session.messages.append( type('Session', (), { 'role': 'assistant', 'content': content, 'timestamp': __import__('time').time() })() ) # Envoyer la réponse await message.reply(content, mention_author=True) except Exception as e: logger.error(f"Erreur traitement: {str(e)}") await message.reply( "❌ Une erreur s'est produite. Réessayez dans quelques instants." ) @commands.command(name="stats") async def stats(self, ctx): """Affiche les statistiques du bot.""" metrics = self.ai_client.get_metrics() embed = discord.Embed( title="📊 Statistiques HolySheep Bot", color=discord.Color.blue() ) embed.add_field( name="Requêtes totales", value=metrics['total_requests'], inline=True ) embed.add_field( name="Erreurs", value=metrics['total_errors'], inline=True ) embed.add_field( name="Latence moyenne", value=metrics['average_latency_ms'], inline=True ) await ctx.send(embed=embed) @commands.command(name="reset") async def reset_conversation(self, ctx): """Reset la conversation en cours.""" key = f"discord:session:{ctx.channel.id}:{ctx.author.id}" await self.session_manager.redis.delete(key) await ctx.send("✅ Conversation réinitialisée!") async def close(self): """Fermeture propre du bot.""" await self.ai_client.close() await self.session_manager.close() await super().close()

Point d'entrée

if __name__ == "__main__": import json with open("config.json", "r") as f: config = json.load(f) bot = DiscordA