Dans le développement d'applications conversationnelles IA, la gestion des tours multiples de dialogue représente un défi technique majeur. Chaque interaction nécessite le maintien précis du contexte historique tout en optimisant les coûts d'API. Ce tutoriel exhaustif couvre l'architecture complète de state management pour systèmes conversationnels, avec des implémentations concrètes en Python et des benchmarks de performance réels.

Introduction aux défis de la gestion contextuelle

Un système de chat intelligent doit traiter chaque nouvelle requête en intégrant l'historique complet de la conversation. Cette approche pose deux problèmes fondamentaux : l'accumulation rapide du nombre de tokens facturés et la latence induite par la transmission de messages toujours plus volumineux. Les développeurs négligent souvent l'impact financier réel jusqu'à recevoir leur première facture mensuelle.

Les benchmarks de latence mesurés en conditions réelles révèlent des écarts significatifs selon les stratégies adoptées. Un système mal optimisé peut atteindre des temps de réponse supérieurs à 8 secondes pour des conversations longues, contre moins de 500 millisecondes avec une architecture correctement conçue. Cette différence transforme radicalement l'expérience utilisateur.

Comparatif des coûts API 2026 pour systèmes conversationnels

Modèle Prix output ($/MTok) Latence moyenne Contexte max Score qualité
DeepSeek V3.2 0,42 $ 420 ms 128K tokens 92/100
Gemini 2.5 Flash 2,50 $ 380 ms 1M tokens 94/100
GPT-4.1 8,00 $ 510 ms 128K tokens 96/100
Claude Sonnet 4.5 15,00 $ 580 ms 200K tokens 97/100

Analyse financière pour 10 millions de tokens mensuels

Pour une application来处理 10 millions de tokens de sortie par mois, l'impact financier varie considérablement selon le modèle choisi. Avec DeepSeek V3.2 à 0,42 $ par million de tokens, la facture mensuelle atteint seulement 4 200 dollars. Gemini 2.5 Flash génère 25 000 dollars dans les mêmes conditions. GPT-4.1 escalade à 80 000 dollars, tandis que Claude Sonnet 4.5 atteint le montant le plus élevé avec 150 000 dollars mensuels.

Ces chiffres démontrent l'importance critique d'une stratégie de contexte optimisée. Une réduction de 30% du volume de tokens transmis grâce à des techniques de windowing intelligent équivaut à une économie mensuelle de 1 260 dollars avec DeepSeek ou 45 000 dollars avec Claude Sonnet 4.5. Sur une année, cela représente des économies de 15 120 à 540 000 dollars respectivement.

Architecture de gestion d'état pour conversations multi-tours

Implémentation complète avec HolySheep API

La plateforme HolySheep AI propose une infrastructure optimisée avec une latence inférieure à 50 millisecondes et des tarifs préférentiels grâce au taux de change avantageux (1 yuan = 1 dollar). L'API unifiée permet d'accéder à tous les modèles majeurs sans configuration复杂的. Commençons par l'implémentation d'un gestionnaire de contexte complet.

"""
Système de gestion de conversation multi-tours avec HolySheep AI
Architecture optimisée pour la réduction des coûts et la latence minimale
"""

import asyncio
import tiktoken
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from enum import Enum
import hashlib
import json
from datetime import datetime, timedelta

class ContextStrategy(Enum):
    """Stratégies de gestion du contexte disponibles"""
    FULL = "full"                    # Contexte complet (coûteux)
    SLIDING_WINDOW = "sliding"       # Fenêtre glissante optimisée
    SEMANTIC_SUMMARIZATION = "semantic"  # Résumé sémantique
    HYBRID = "hybrid"                # Combinaison adaptative

@dataclass
class Message:
    """Structure d'un message dans la conversation"""
    role: str  # "user", "assistant", "system"
    content: str
    timestamp: datetime = field(default_factory=datetime.now)
    token_count: Optional[int] = None
    
    def __post_init__(self):
        if self.token_count is None:
            encoder = tiktoken.get_encoding("cl100k_base")
            self.token_count = len(encoder.encode(self.content))

@dataclass
class ConversationContext:
    """Gestionnaire de contexte pour une conversation"""
    conversation_id: str
    strategy: ContextStrategy = ContextStrategy.SLIDING_WINDOW
    max_tokens: int = 128000  # Limite du contexte
    reserved_tokens: int = 4000  # Réserve pour la réponse
    messages: List[Message] = field(default_factory=list)
    system_prompt_tokens: int = 0
    
    # Métriques de monitoring
    total_tokens_spent: int = 0
    total_requests: int = 0
    average_latency_ms: float = 0.0
    
    def __post_init__(self):
        self._encoding = tiktoken.get_encoding("cl100k_base")
    
    def get_available_tokens(self) -> int:
        """Calcule les tokens disponibles pour le contexte"""
        system = self.system_prompt_tokens
        messages = sum(m.token_count for m in self.messages)
        return self.max_tokens - system - messages - self.reserved_tokens
    
    def add_message(self, role: str, content: str) -> Message:
        """Ajoute un message et retourne le contexte optimisé"""
        msg = Message(role=role, content=content)
        self.messages.append(msg)
        self.total_requests += 1
        
        # Stratégie d'optimisation selon la configuration
        if self.get_available_tokens() < 0:
            self._optimize_context()
        
        return msg
    
    def _optimize_context(self):
        """Applique la stratégie d'optimisation appropriée"""
        if self.strategy == ContextStrategy.SLIDING_WINDOW:
            self._sliding_window_optimization()
        elif self.strategy == ContextStrategy.SEMANTIC_SUMMARIZATION:
            self._semantic_summarization()
        elif self.strategy == ContextStrategy.HYBRID:
            self._hybrid_optimization()
    
    def _sliding_window_optimization(self):
        """Implémentation de la fenêtre glissante intelligente"""
        # Conserver les N messages les plus récents avec historique condensé
        target_tokens = self.get_available_tokens() + self.reserved_tokens
        preserved_messages = []
        
        # Toujours garder le premier message (contexte initial) et les derniers
        if self.messages:
            preserved_messages.append(self.messages[0])
        
        # Ajouter les messages depuis la fin jusqu'à épuisement du budget
        for msg in reversed(self.messages[1:]):
            if sum(m.token_count for m in preserved_messages) + msg.token_count <= target_tokens:
                preserved_messages.insert(1, msg)
            else:
                break
        
        self.messages = preserved_messages
    
    def _semantic_summarization(self):
        """Stratégie de résumé sémantique (nécessite appel API)"""
        # Conserver uniquement les derniers échanges
        if len(self.messages) > 4:
            summary_prompt = """Génère un résumé concis de cette conversation:
保留 les informations importantes, préférences utilisateur, et décisions prises."""
            
            context_to_summarize = self.messages[1:-4]
            summary_content = f"Résumé des échanges précédents: {context_to_summarize}"
            
            summary_msg = Message(
                role="system", 
                content=f"[RESUMÉ] {summary_content}",
                token_count=len(self._encoding.encode(summary_content))
            )
            
            self.messages = [self.messages[0], summary_msg] + self.messages[-4:]
    
    def _hybrid_optimization(self):
        """Combinaison adaptive des stratégies"""
        # Choisir selon la longueur de la conversation
        if len(self.messages) < 10:
            self._sliding_window_optimization()
        else:
            self._semantic_summarization()
    
    def build_api_payload(self) -> Dict[str, Any]:
        """Construit le payload pour l'appel API HolySheep"""
        return {
            "model": "gpt-4.1",
            "messages": [
                {"role": m.role, "content": m.content} 
                for m in self.messages
            ],
            "max_tokens": self.reserved_tokens,
            "temperature": 0.7
        }

class HolySheepConversationManager:
    """Gestionnaire principal avec intégration HolySheep API"""
    
    BASE_URL = "https://api.holysheep.ai/v1"  # API HolySheep unifiée
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.conversations: Dict[str, ConversationContext] = {}
        self._semaphore = asyncio.Semaphore(10)  # Limite de requêtes parallèles
    
    async def create_conversation(
        self, 
        conversation_id: str,
        system_prompt: str,
        strategy: ContextStrategy = ContextStrategy.SLIDING_WINDOW
    ) -> ConversationContext:
        """Crée une nouvelle conversation avec stratégie de contexte"""
        context = ConversationContext(
            conversation_id=conversation_id,
            strategy=strategy
        )
        
        # Ajouter le prompt système
        system_msg = Message(role="system", content=system_prompt)
        context.system_prompt_tokens = system_msg.token_count
        context.messages.insert(0, system_msg)
        
        self.conversations[conversation_id] = context
        return context
    
    async def send_message(
        self, 
        conversation_id: str, 
        user_message: str
    ) -> Dict[str, Any]:
        """Envoie un message et retourne la réponse avec métriques"""
        context = self.conversations.get(conversation_id)
        if not context:
            raise ValueError(f"Conversation {conversation_id} non trouvée")
        
        async with self._semaphore:
            # Ajouter le message utilisateur
            context.add_message("user", user_message)
            
            # Construire et envoyer la requête
            payload = context.build_api_payload()
            start_time = datetime.now()
            
            response = await self._make_request(payload)
            latency = (datetime.now() - start_time).total_seconds() * 1000
            
            # Mettre à jour les métriques
            context.average_latency_ms = (
                (context.average_latency_ms * context.total_requests + latency) 
                / (context.total_requests + 1)
            )
            context.total_tokens_spent += response.get('usage', {}).get('total_tokens', 0)
            
            # Ajouter la réponse au contexte
            context.add_message("assistant", response['choices'][0]['message']['content'])
            
            return {
                "response": response['choices'][0]['message']['content'],
                "latency_ms": latency,
                "tokens_used": response.get('usage', {}).get('total_tokens', 0),
                "context_messages": len(context.messages)
            }
    
    async def _make_request(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        """Effectue la requête HTTP vers HolySheep API"""
        import aiohttp
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{self.BASE_URL}/chat/completions",
                json=payload,
                headers=headers
            ) as response:
                if response.status != 200:
                    error = await response.json()
                    raise Exception(f"API Error: {error.get('error', {}).get('message', 'Unknown')}")
                return await response.json()
    
    def get_conversation_stats(self, conversation_id: str) -> Dict[str, Any]:
        """Retourne les statistiques d'une conversation"""
        context = self.conversations.get(conversation_id)
        if not context:
            return {}
        
        return {
            "total_requests": context.total_requests,
            "total_tokens": context.total_tokens_spent,
            "average_latency_ms": context.average_latency_ms,
            "current_messages": len(context.messages),
            "strategy": context.strategy.value
        }

Exemple d'utilisation complète

async def demo_conversation(): """Démonstration complète du système""" manager = HolySheepConversationManager(api_key="YOUR_HOLYSHEEP_API_KEY") # Créer une conversation avec stratégie hybride conversation = await manager.create_conversation( conversation_id="user_123_session_001", system_prompt="""Tu es un assistant financier expert. Tu aides les utilisateurs à comprendre leurs finances personnelles. Sois précis et fournis des exemples concrets.""", strategy=ContextStrategy.HYBRID ) # Simuler une conversation multi-tours questions = [ "Quel est le meilleur placement pour un débutant avec 1000€?", "Et si je veux investir dans les cryptomonnaies?", "Comment réduire mes impôts en 2026?", "Parle-moi aussi des SCPI et de la défiscalisation immobilière.", "Merci, une dernière question: comment diversifier mon portefeuille?" ] for question in questions: result = await manager.send_message("user_123_session_001", question) print(f"Q: {question[:50]}...") print(f"Latence: {result['latency_ms']:.0f}ms, Tokens: {result['tokens_used']}") print(f"Messages en contexte: {result['context_messages']}") print("---") # Afficher les statistiques finales stats = manager.get_conversation_stats("user_123_session_001") print(f"\n=== Statistiques finales ===") print(f"Requêtes totales: {stats['total_requests']}") print(f"Tokens consommés: {stats['total_tokens']:,}") print(f"Latence moyenne: {stats['average_latency_ms']:.0f}ms") if __name__ == "__main__": asyncio.run(demo_conversation())

Implémentation avancée avec cache et optimisation Redis

Pour les applications à grande échelle avec des milliers de conversations simultanées, l'intégration d'un système de cache distribué devient essentielle. Cette implémentation ajoute une couche Redis pour la persistance et l'optimisation des requêtes。

"""
Module d'optimisation avancées avec cache Redis et batching
pour applications conversationnelles à grande échelle
"""

import redis.asyncio as redis
import json
import hashlib
from typing import Optional, List, Tuple
from datetime import timedelta
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ContextCacheManager:
    """Gestionnaire de cache distribué pour contextes de conversation"""
    
    def __init__(
        self, 
        redis_url: str = "redis://localhost:6379",
        ttl_seconds: int = 3600,
        max_context_size: int = 50000
    ):
        self.redis_client = redis.from_url(redis_url, decode_responses=True)
        self.ttl = ttl_seconds
        self.max_context_size = max_context_size
    
    async def get_cached_context(
        self, 
        conversation_id: str
    ) -> Optional[dict]:
        """Récupère le contexte depuis le cache Redis"""
        cache_key = f"ctx:{conversation_id}"
        
        try:
            cached = await self.redis_client.get(cache_key)
            if cached:
                logger.info(f"Cache HIT pour {conversation_id}")
                return json.loads(cached)
            
            logger.info(f"Cache MISS pour {conversation_id}")
            return None
            
        except Exception as e:
            logger.error(f"Erreur lecture cache: {e}")
            return None
    
    async def set_cached_context(
        self, 
        conversation_id: str, 
        context_data: dict
    ) -> bool:
        """Stocke le contexte dans Redis avec TTL"""
        cache_key = f"ctx:{conversation_id}"
        
        try:
            # Sérialiser avec compression si nécessaire
            serialized = json.dumps(context_data)
            
            if len(serialized) > self.max_context_size:
                # Compression des anciens messages si trop volumineux
                context_data = self._compress_context(context_data)
                serialized = json.dumps(context_data)
            
            await self.redis_client.setex(
                cache_key, 
                self.ttl, 
                serialized
            )
            logger.info(f"Contexte mis en cache: {conversation_id}")
            return True
            
        except Exception as e:
            logger.error(f"Erreur écriture cache: {e}")
            return False
    
    def _compress_context(self, context_data: dict) -> dict:
        """Compresse le contexte en gardant les informations essentielles"""
        messages = context_data.get('messages', [])
        
        if len(messages) <= 2:
            return context_data
        
        # Garder premier message et derniers messages uniquement
        compressed = {
            'messages': [messages[0]] + messages[-4:],
            'metadata': {
                'original_count': len(messages),
                'compressed': True
            }
        }
        
        return compressed

class SemanticCache:
    """Cache sémantique pour éviter les requêtes redondantes"""
    
    def __init__(self, cache_manager: ContextCacheManager):
        self.cache_manager = cache_manager
    
    def _compute_similarity_key(
        self, 
        user_message: str, 
        conversation_history: List[dict],
        threshold: float = 0.85
    ) -> Optional[str]:
        """Génère une clé de similarité pour le cache sémantique"""
        # Utiliser les derniers messages pour créer une clé
        history_text = " ".join([
            m.get('content', '')[:100] 
            for m in conversation_history[-3:]
        ])
        
        combined = f"{history_text}|{user_message}"
        key_hash = hashlib.sha256(combined.encode()).hexdigest()[:16]
        
        return f"sem:{key_hash}"
    
    async def check_similar_request(
        self, 
        conversation_id: str,
        user_message: str,
        history: List[dict]
    ) -> Optional[dict]:
        """Vérifie si une requête similaire existe en cache"""
        similarity_key = self._compute_similarity_key(user_message, history)
        if not similarity_key:
            return None
        
        cache_key = f"{conversation_id}:{similarity_key}"
        return await self.cache_manager.get_cached_context(cache_key)
    
    async def store_similar_request(
        self,
        conversation_id: str,
        user_message: str,
        history: List[dict],
        response: dict
    ) -> None:
        """Stocke une réponse pour requête similaire"""
        similarity_key = self._compute_similarity_key(user_message, history)
        if not similarity_key:
            return
        
        cache_key = f"{conversation_id}:{similarity_key}"
        await self.cache_manager.set_cached_context(cache_key, response)

class RequestBatcher:
    """Batcher de requêtes pour optimisation du throughput"""
    
    def __init__(self, batch_size: int = 10, timeout_ms: int = 100):
        self.batch_size = batch_size
        self.timeout_ms = timeout_ms
        self.pending_requests: List[Tuple[str, dict]] = []
        self._lock = False
    
    async def add_request(
        self, 
        request_id: str, 
        payload: dict
    ) -> List[dict]:
        """Ajoute une requête au batch et retourne les résultats si batch plein"""
        self.pending_requests.append((request_id, payload))
        
        if len(self.pending_requests) >= self.batch_size:
            return await self._execute_batch()
        
        return []
    
    async def _execute_batch(self) -> List[dict]:
        """Exécute le batch de requêtes en parallèle"""
        if not self.pending_requests:
            return []
        
        batch = self.pending_requests[:self.batch_size]
        self.pending_requests = self.pending_requests[self.batch_size:]
        
        # Simulation d'exécution batch (remplacer par vrai appel API)
        tasks = [
            self._execute_single_request(req_id, payload) 
            for req_id, payload in batch
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return [r for r in results if not isinstance(r, Exception)]

class HolySheepOptimizedClient:
    """Client optimisé combinant toutes les stratégies"""
    
    BASE_URL = "https://api.holysheep.ai/v1"
    
    def __init__(
        self, 
        api_key: str,
        redis_url: str = "redis://localhost:6379"
    ):
        self.api_key = api_key
        self.cache_manager = ContextCacheManager(redis_url=redis_url)
        self.semantic_cache = SemanticCache(self.cache_manager)
        self.batcher = RequestBatcher()
        self.request_count = 0
        self.cache_hits = 0
    
    async def chat_with_optimization(
        self,
        conversation_id: str,
        user_message: str,
        conversation_history: List[dict]
    ) -> dict:
        """Methode principale avec toutes les optimisations"""
        self.request_count += 1
        
        # 1. Vérifier le cache sémantique
        cached_response = await self.semantic_cache.check_similar_request(
            conversation_id, user_message, conversation_history
        )
        
        if cached_response:
            self.cache_hits += 1
            cached_response['from_cache'] = True
            return cached_response
        
        # 2. Récupérer le contexte optimisé
        context = await self.cache_manager.get_cached_context(conversation_id)
        
        # 3. Construire le payload avec HolySheep API
        payload = {
            "model": "gemini-2.5-flash",  # Modèle optimal rapport qualité/prix
            "messages": self._build_messages(context, user_message),
            "max_tokens": 4000,
            "temperature": 0.7
        }
        
        # 4. Envoyer la requête
        import aiohttp
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{self.BASE_URL}/chat/completions",
                json=payload,
                headers=headers
            ) as response:
                result = await response.json()
        
        # 5. Mettre en cache la réponse
        response_data = {
            'response': result['choices'][0]['message']['content'],
            'usage': result.get('usage', {}),
            'from_cache': False
        }
        
        await self.semantic_cache.store_similar_request(
            conversation_id, user_message, conversation_history, response_data
        )
        
        # 6. Mettre à jour le cache de contexte
        await self._update_context_cache(conversation_id, user_message, result)
        
        return response_data
    
    def _build_messages(
        self, 
        context: Optional[dict], 
        user_message: str
    ) -> List[dict]:
        """Construit la liste des messages pour l'API"""
        messages = []
        
        if context and context.get('system_prompt'):
            messages.append({
                "role": "system", 
                "content": context['system_prompt']
            })
        
        if context and context.get('messages'):
            messages.extend(context['messages'])
        
        messages.append({"role": "user", "content": user_message})
        return messages
    
    async def _update_context_cache(
        self, 
        conversation_id: str, 
        user_message: str, 
        response: dict
    ) -> None:
        """Met à jour le cache de contexte après une interaction"""
        existing = await self.cache_manager.get_cached_context(conversation_id)
        
        new_context = existing or {'messages': []}
        new_context['messages'].append({
            "role": "user", 
            "content": user_message
        })
        new_context['messages'].append({
            "role": "assistant", 
            "content": response['choices'][0]['message']['content']
        })
        
        # Optimisation: garder seulement les derniers messages
        if len(new_context['messages']) > 20:
            new_context['messages'] = new_context['messages'][-20:]
        
        await self.cache_manager.set_cached_context(conversation_id, new_context)
    
    def get_cache_stats(self) -> dict:
        """Retourne les statistiques de cache"""
        hit_rate = (self.cache_hits / self.request_count * 100) if self.request_count > 0 else 0
        return {
            'total_requests': self.request_count,
            'cache_hits': self.cache_hits,
            'hit_rate_percent': round(hit_rate, 2)
        }

Point d'entrée pour tests

if __name__ == "__main__": async def test_optimized_client(): client = HolySheepOptimizedClient( api_key="YOUR_HOLYSHEEP_API_KEY", redis_url="redis://localhost:6379" ) # Simuler une conversation history = [] test_messages = [ "Explique-moi le fonctionnement des ETF", "Quels sont les avantages par rapport aux fonds classiques?", "Et les inconvénients?", "Recommande-moi les meilleurs ETF européens" ] for msg in test_messages: result = await client.chat_with_optimization( conversation_id="test_conv_001", user_message=msg, conversation_history=history ) print(f"Message: {msg[:40]}...") print(f"Depuis cache: {result.get('from_cache', False)}") print(f"Réponse: {result['response'][:100]}...") print("---") history.append({"role": "user", "content": msg}) history.append({"role": "assistant", "content": result['response']}) # Afficher les stats stats = client.get_cache_stats() print(f"\n=== Statistiques de cache ===") print(f"Requêtes totales: {stats['total_requests']}") print(f"Taux de succès cache: {stats['hit_rate_percent']}%") asyncio.run(test_optimized_client())

Stratégies avancées de réduction de contexte

Technique 1: Summarization progressive

La summarisation progressive constitue l'approche la plus efficace pour les longues conversations. Au lieu de simplement tronquer les messages, le système génère des résumés sémantiques qui préservent les informations clés tout en réduisant drastiquement le nombre de tokens.

"""
Module de summarisation intelligente pour conversations longues
Intègre l'analyse sémantique et la preservation des entités importantes
"""

import re
from typing import List, Dict, Tuple
from collections import defaultdict

class EntityTracker:
    """Tracker d'entités pour préserver les informations importantes"""
    
    def __init__(self):
        self.entities = {
            'persons': [],
            'dates': [],
            'numbers': [],
            'products': [],
            'preferences': []
        }
    
    def extract_entities(self, text: str) -> None:
        """Extrait les entités du texte"""
        # Extraction de dates
        date_patterns = [
            r'\b\d{1,2}/\d{1,2}/\d{4}\b',
            r'\b\d{4}-\d{2}-\d{2}\b',
            r'\b(janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)\s+\d{4}\b',
            r'\ble\s+\d{1,2}\s+\w+\s+\d{4}\b'
        ]
        
        for pattern in date_patterns:
            matches = re.findall(pattern, text, re.IGNORECASE)
            self.entities['dates'].extend(matches)
        
        # Extraction de montants financiers
        money_pattern = r'[\d\s]+[\.,]?\d*\s*(€|dollars?|USD|EUR)'
        money_matches = re.findall(money_pattern, text, re.IGNORECASE)
        self.entities['numbers'].extend(money_matches)
        
        # Extraction de préférences (mots clés)
        preference_keywords = [
            'je préfère', 'j\'aime', 'je déteste', 'je veux', 'je souhaite',
            'mon budget', 'ma limite', 'je vise', 'mon objectif'
        ]
        
        for keyword in preference_keywords:
            if keyword.lower() in text.lower():
                # Extraire la phrase complète
                sentences = text.split('.')
                for sentence in sentences:
                    if keyword.lower() in sentence.lower():
                        self.entities['preferences'].append(sentence.strip())

class ConversationSummarizer:
    """Générateur de résumés intelligent"""
    
    SYSTEM_PROMPT = """Tu es un assistant spécialisé dans la synthèse de conversations.
Génère un résumé structuré qui préserve:
1. Les informations factuelles importantes (dates, montants, décisions)
2. Les préférences et contraintes exprimées par l'utilisateur
3. L'historique des questions et réponses clés
4. Les conclusions ou consensus atteints

Format de sortie:
- INFORMATIONS CLÉS: [liste]
- PRÉFÉRENCES UTILISATEUR: [liste]  
- POINTS IMPORTANTS: [liste]
- STATUT ACTUEL: [résumé de l'état de la conversation]"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.entity_tracker = EntityTracker()
    
    async def generate_summary(
        self, 
        messages: List[Dict[str, str]]
    ) -> Tuple[str, List[str]]:
        """Génère un résumé optimisé de la conversation"""
        
        # Extraire les entités de tous les messages
        for msg in messages:
            if msg.get('content'):
                self.entity_tracker.extract_entities(msg['content'])
        
        # Construire le prompt de summarisation
        conversation_text = "\n".join([
            f"[{msg.get('role', 'unknown')}]: {msg.get('content', '')[:500]}"
            for msg in messages
        ])
        
        summarization_prompt = f"""{self.SYSTEM_PROMPT}

CONVERSATION À SYNTHÉTISER:
{conversation_text}

ENTITÉS IMPORTANTES IDENTIFIÉES:
- Dates: {', '.join(self.entity_tracker.entities['dates'][-5:])}
- Montants: {', '.join(self.entity_tracker.entities['numbers'][-5:])}
- Préférences: {', '.join(self.entity_tracker.entities['preferences'][-3:])}"""
        
        # Appel à l'API HolySheep pour générer le résumé
        import aiohttp
        
        payload = {
            "model": "deepseek-v3.2",  # Modèle économique pour summarisation
            "messages": [
                {"role": "system", "content": "Tu génères des résumés concis et informatifs."},
                {"role": "user", "content": summarization_prompt}
            ],
            "max_tokens": 800,
            "temperature": 0.3  # Température basse pour cohérence
        }
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                "https://api.holysheep.ai/v1/chat/completions",
                json=payload,
                headers=headers
            ) as response:
                result = await response.json()
        
        summary = result['choices'][0]['message']['content']
        return summary, self.entity_tracker.entities['preferences']

class ContextWindowManager:
    """Gestionnaire de fenêtre de contexte avec compression intelligente"""
    
    def __init__(
        self, 
        api_key: str,
        max_tokens: int = 128000,
        summary_threshold: int = 60000
    ):
        self.api_key = api_key
        self.max_tokens = max_tokens
        self.summary_threshold = summary_threshold
        self.summarizer = ConversationSummarizer(api_key)
        self.tokenizer = self._init_tokenizer()
    
    def _init_tokenizer(self):
        """Initialise le tokenizer pour comptage de tokens"""
        try:
            import tiktoken
            return tiktoken.get_encoding("cl100k_base")
        except ImportError:
            # Fallback: estimation approximative
            return None
    
    def count_tokens(self, text: str) -> int:
        """Compte les tokens d'un texte"""
        if self.tokenizer:
            return len(self.tokenizer.encode(text))
        return len(text) // 4  # Approximation grossière
    
    async def get_optimized_context(
        self,
        messages: List[Dict[str, str]],
        system_prompt: str
    ) -> List[Dict[str, str]]:
        """Retourne le contexte optimisé selon la longueur"""
        
        total_tokens = self.count_tokens(system_prompt)
        for msg in messages:
            total_tokens += self.count_tokens(msg.get('content', ''))
        
        # Si en dessous du seuil, retourner tel quel
        if total_tokens < self.summary_threshold:
            return messages
        
        # Sinon, générer un résumé
        summary, preferences = await self.summarizer.generate_summary(messages)
        
        # Construire le nouveau contexte
        summary_message = {
            "role": "system",
            "content": f"""[RÉSUMÉ DES ÉCHANGES PRÉCÉDENTS]
{summary}

PRÉFÉRENCES UTILISATEUR À RESPECTER:
{chr(10).join(f"- {p}" for p in preferences) if preferences else "Aucune préférence spécifique mémorisée."}"""
        }
        
        # Garder seulement les derniers messages + résumé
        recent_messages = messages[-6:] if len(messages) > 6 else messages[-3:]
        
        return [summary_message] + recent_messages
    
    def estimate_cost_savings(
        self,
        original_tokens: int,
        optimized_tokens: int
    ) -> Dict[str, float]:
        """Estime les