Dans le développement d'applications conversationnelles IA, la gestion des tours multiples de dialogue représente un défi technique majeur. Chaque interaction nécessite le maintien précis du contexte historique tout en optimisant les coûts d'API. Ce tutoriel exhaustif couvre l'architecture complète de state management pour systèmes conversationnels, avec des implémentations concrètes en Python et des benchmarks de performance réels.
Introduction aux défis de la gestion contextuelle
Un système de chat intelligent doit traiter chaque nouvelle requête en intégrant l'historique complet de la conversation. Cette approche pose deux problèmes fondamentaux : l'accumulation rapide du nombre de tokens facturés et la latence induite par la transmission de messages toujours plus volumineux. Les développeurs négligent souvent l'impact financier réel jusqu'à recevoir leur première facture mensuelle.
Les benchmarks de latence mesurés en conditions réelles révèlent des écarts significatifs selon les stratégies adoptées. Un système mal optimisé peut atteindre des temps de réponse supérieurs à 8 secondes pour des conversations longues, contre moins de 500 millisecondes avec une architecture correctement conçue. Cette différence transforme radicalement l'expérience utilisateur.
Comparatif des coûts API 2026 pour systèmes conversationnels
| Modèle | Prix output ($/MTok) | Latence moyenne | Contexte max | Score qualité |
|---|---|---|---|---|
| DeepSeek V3.2 | 0,42 $ | 420 ms | 128K tokens | 92/100 |
| Gemini 2.5 Flash | 2,50 $ | 380 ms | 1M tokens | 94/100 |
| GPT-4.1 | 8,00 $ | 510 ms | 128K tokens | 96/100 |
| Claude Sonnet 4.5 | 15,00 $ | 580 ms | 200K tokens | 97/100 |
Analyse financière pour 10 millions de tokens mensuels
Pour une application来处理 10 millions de tokens de sortie par mois, l'impact financier varie considérablement selon le modèle choisi. Avec DeepSeek V3.2 à 0,42 $ par million de tokens, la facture mensuelle atteint seulement 4 200 dollars. Gemini 2.5 Flash génère 25 000 dollars dans les mêmes conditions. GPT-4.1 escalade à 80 000 dollars, tandis que Claude Sonnet 4.5 atteint le montant le plus élevé avec 150 000 dollars mensuels.
Ces chiffres démontrent l'importance critique d'une stratégie de contexte optimisée. Une réduction de 30% du volume de tokens transmis grâce à des techniques de windowing intelligent équivaut à une économie mensuelle de 1 260 dollars avec DeepSeek ou 45 000 dollars avec Claude Sonnet 4.5. Sur une année, cela représente des économies de 15 120 à 540 000 dollars respectivement.
Architecture de gestion d'état pour conversations multi-tours
Implémentation complète avec HolySheep API
La plateforme HolySheep AI propose une infrastructure optimisée avec une latence inférieure à 50 millisecondes et des tarifs préférentiels grâce au taux de change avantageux (1 yuan = 1 dollar). L'API unifiée permet d'accéder à tous les modèles majeurs sans configuration复杂的. Commençons par l'implémentation d'un gestionnaire de contexte complet.
"""
Système de gestion de conversation multi-tours avec HolySheep AI
Architecture optimisée pour la réduction des coûts et la latence minimale
"""
import asyncio
import tiktoken
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from enum import Enum
import hashlib
import json
from datetime import datetime, timedelta
class ContextStrategy(Enum):
"""Stratégies de gestion du contexte disponibles"""
FULL = "full" # Contexte complet (coûteux)
SLIDING_WINDOW = "sliding" # Fenêtre glissante optimisée
SEMANTIC_SUMMARIZATION = "semantic" # Résumé sémantique
HYBRID = "hybrid" # Combinaison adaptative
@dataclass
class Message:
"""Structure d'un message dans la conversation"""
role: str # "user", "assistant", "system"
content: str
timestamp: datetime = field(default_factory=datetime.now)
token_count: Optional[int] = None
def __post_init__(self):
if self.token_count is None:
encoder = tiktoken.get_encoding("cl100k_base")
self.token_count = len(encoder.encode(self.content))
@dataclass
class ConversationContext:
"""Gestionnaire de contexte pour une conversation"""
conversation_id: str
strategy: ContextStrategy = ContextStrategy.SLIDING_WINDOW
max_tokens: int = 128000 # Limite du contexte
reserved_tokens: int = 4000 # Réserve pour la réponse
messages: List[Message] = field(default_factory=list)
system_prompt_tokens: int = 0
# Métriques de monitoring
total_tokens_spent: int = 0
total_requests: int = 0
average_latency_ms: float = 0.0
def __post_init__(self):
self._encoding = tiktoken.get_encoding("cl100k_base")
def get_available_tokens(self) -> int:
"""Calcule les tokens disponibles pour le contexte"""
system = self.system_prompt_tokens
messages = sum(m.token_count for m in self.messages)
return self.max_tokens - system - messages - self.reserved_tokens
def add_message(self, role: str, content: str) -> Message:
"""Ajoute un message et retourne le contexte optimisé"""
msg = Message(role=role, content=content)
self.messages.append(msg)
self.total_requests += 1
# Stratégie d'optimisation selon la configuration
if self.get_available_tokens() < 0:
self._optimize_context()
return msg
def _optimize_context(self):
"""Applique la stratégie d'optimisation appropriée"""
if self.strategy == ContextStrategy.SLIDING_WINDOW:
self._sliding_window_optimization()
elif self.strategy == ContextStrategy.SEMANTIC_SUMMARIZATION:
self._semantic_summarization()
elif self.strategy == ContextStrategy.HYBRID:
self._hybrid_optimization()
def _sliding_window_optimization(self):
"""Implémentation de la fenêtre glissante intelligente"""
# Conserver les N messages les plus récents avec historique condensé
target_tokens = self.get_available_tokens() + self.reserved_tokens
preserved_messages = []
# Toujours garder le premier message (contexte initial) et les derniers
if self.messages:
preserved_messages.append(self.messages[0])
# Ajouter les messages depuis la fin jusqu'à épuisement du budget
for msg in reversed(self.messages[1:]):
if sum(m.token_count for m in preserved_messages) + msg.token_count <= target_tokens:
preserved_messages.insert(1, msg)
else:
break
self.messages = preserved_messages
def _semantic_summarization(self):
"""Stratégie de résumé sémantique (nécessite appel API)"""
# Conserver uniquement les derniers échanges
if len(self.messages) > 4:
summary_prompt = """Génère un résumé concis de cette conversation:
保留 les informations importantes, préférences utilisateur, et décisions prises."""
context_to_summarize = self.messages[1:-4]
summary_content = f"Résumé des échanges précédents: {context_to_summarize}"
summary_msg = Message(
role="system",
content=f"[RESUMÉ] {summary_content}",
token_count=len(self._encoding.encode(summary_content))
)
self.messages = [self.messages[0], summary_msg] + self.messages[-4:]
def _hybrid_optimization(self):
"""Combinaison adaptive des stratégies"""
# Choisir selon la longueur de la conversation
if len(self.messages) < 10:
self._sliding_window_optimization()
else:
self._semantic_summarization()
def build_api_payload(self) -> Dict[str, Any]:
"""Construit le payload pour l'appel API HolySheep"""
return {
"model": "gpt-4.1",
"messages": [
{"role": m.role, "content": m.content}
for m in self.messages
],
"max_tokens": self.reserved_tokens,
"temperature": 0.7
}
class HolySheepConversationManager:
"""Gestionnaire principal avec intégration HolySheep API"""
BASE_URL = "https://api.holysheep.ai/v1" # API HolySheep unifiée
def __init__(self, api_key: str):
self.api_key = api_key
self.conversations: Dict[str, ConversationContext] = {}
self._semaphore = asyncio.Semaphore(10) # Limite de requêtes parallèles
async def create_conversation(
self,
conversation_id: str,
system_prompt: str,
strategy: ContextStrategy = ContextStrategy.SLIDING_WINDOW
) -> ConversationContext:
"""Crée une nouvelle conversation avec stratégie de contexte"""
context = ConversationContext(
conversation_id=conversation_id,
strategy=strategy
)
# Ajouter le prompt système
system_msg = Message(role="system", content=system_prompt)
context.system_prompt_tokens = system_msg.token_count
context.messages.insert(0, system_msg)
self.conversations[conversation_id] = context
return context
async def send_message(
self,
conversation_id: str,
user_message: str
) -> Dict[str, Any]:
"""Envoie un message et retourne la réponse avec métriques"""
context = self.conversations.get(conversation_id)
if not context:
raise ValueError(f"Conversation {conversation_id} non trouvée")
async with self._semaphore:
# Ajouter le message utilisateur
context.add_message("user", user_message)
# Construire et envoyer la requête
payload = context.build_api_payload()
start_time = datetime.now()
response = await self._make_request(payload)
latency = (datetime.now() - start_time).total_seconds() * 1000
# Mettre à jour les métriques
context.average_latency_ms = (
(context.average_latency_ms * context.total_requests + latency)
/ (context.total_requests + 1)
)
context.total_tokens_spent += response.get('usage', {}).get('total_tokens', 0)
# Ajouter la réponse au contexte
context.add_message("assistant", response['choices'][0]['message']['content'])
return {
"response": response['choices'][0]['message']['content'],
"latency_ms": latency,
"tokens_used": response.get('usage', {}).get('total_tokens', 0),
"context_messages": len(context.messages)
}
async def _make_request(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Effectue la requête HTTP vers HolySheep API"""
import aiohttp
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.BASE_URL}/chat/completions",
json=payload,
headers=headers
) as response:
if response.status != 200:
error = await response.json()
raise Exception(f"API Error: {error.get('error', {}).get('message', 'Unknown')}")
return await response.json()
def get_conversation_stats(self, conversation_id: str) -> Dict[str, Any]:
"""Retourne les statistiques d'une conversation"""
context = self.conversations.get(conversation_id)
if not context:
return {}
return {
"total_requests": context.total_requests,
"total_tokens": context.total_tokens_spent,
"average_latency_ms": context.average_latency_ms,
"current_messages": len(context.messages),
"strategy": context.strategy.value
}
Exemple d'utilisation complète
async def demo_conversation():
"""Démonstration complète du système"""
manager = HolySheepConversationManager(api_key="YOUR_HOLYSHEEP_API_KEY")
# Créer une conversation avec stratégie hybride
conversation = await manager.create_conversation(
conversation_id="user_123_session_001",
system_prompt="""Tu es un assistant financier expert.
Tu aides les utilisateurs à comprendre leurs finances personnelles.
Sois précis et fournis des exemples concrets.""",
strategy=ContextStrategy.HYBRID
)
# Simuler une conversation multi-tours
questions = [
"Quel est le meilleur placement pour un débutant avec 1000€?",
"Et si je veux investir dans les cryptomonnaies?",
"Comment réduire mes impôts en 2026?",
"Parle-moi aussi des SCPI et de la défiscalisation immobilière.",
"Merci, une dernière question: comment diversifier mon portefeuille?"
]
for question in questions:
result = await manager.send_message("user_123_session_001", question)
print(f"Q: {question[:50]}...")
print(f"Latence: {result['latency_ms']:.0f}ms, Tokens: {result['tokens_used']}")
print(f"Messages en contexte: {result['context_messages']}")
print("---")
# Afficher les statistiques finales
stats = manager.get_conversation_stats("user_123_session_001")
print(f"\n=== Statistiques finales ===")
print(f"Requêtes totales: {stats['total_requests']}")
print(f"Tokens consommés: {stats['total_tokens']:,}")
print(f"Latence moyenne: {stats['average_latency_ms']:.0f}ms")
if __name__ == "__main__":
asyncio.run(demo_conversation())
Implémentation avancée avec cache et optimisation Redis
Pour les applications à grande échelle avec des milliers de conversations simultanées, l'intégration d'un système de cache distribué devient essentielle. Cette implémentation ajoute une couche Redis pour la persistance et l'optimisation des requêtes。
"""
Module d'optimisation avancées avec cache Redis et batching
pour applications conversationnelles à grande échelle
"""
import redis.asyncio as redis
import json
import hashlib
from typing import Optional, List, Tuple
from datetime import timedelta
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ContextCacheManager:
"""Gestionnaire de cache distribué pour contextes de conversation"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
ttl_seconds: int = 3600,
max_context_size: int = 50000
):
self.redis_client = redis.from_url(redis_url, decode_responses=True)
self.ttl = ttl_seconds
self.max_context_size = max_context_size
async def get_cached_context(
self,
conversation_id: str
) -> Optional[dict]:
"""Récupère le contexte depuis le cache Redis"""
cache_key = f"ctx:{conversation_id}"
try:
cached = await self.redis_client.get(cache_key)
if cached:
logger.info(f"Cache HIT pour {conversation_id}")
return json.loads(cached)
logger.info(f"Cache MISS pour {conversation_id}")
return None
except Exception as e:
logger.error(f"Erreur lecture cache: {e}")
return None
async def set_cached_context(
self,
conversation_id: str,
context_data: dict
) -> bool:
"""Stocke le contexte dans Redis avec TTL"""
cache_key = f"ctx:{conversation_id}"
try:
# Sérialiser avec compression si nécessaire
serialized = json.dumps(context_data)
if len(serialized) > self.max_context_size:
# Compression des anciens messages si trop volumineux
context_data = self._compress_context(context_data)
serialized = json.dumps(context_data)
await self.redis_client.setex(
cache_key,
self.ttl,
serialized
)
logger.info(f"Contexte mis en cache: {conversation_id}")
return True
except Exception as e:
logger.error(f"Erreur écriture cache: {e}")
return False
def _compress_context(self, context_data: dict) -> dict:
"""Compresse le contexte en gardant les informations essentielles"""
messages = context_data.get('messages', [])
if len(messages) <= 2:
return context_data
# Garder premier message et derniers messages uniquement
compressed = {
'messages': [messages[0]] + messages[-4:],
'metadata': {
'original_count': len(messages),
'compressed': True
}
}
return compressed
class SemanticCache:
"""Cache sémantique pour éviter les requêtes redondantes"""
def __init__(self, cache_manager: ContextCacheManager):
self.cache_manager = cache_manager
def _compute_similarity_key(
self,
user_message: str,
conversation_history: List[dict],
threshold: float = 0.85
) -> Optional[str]:
"""Génère une clé de similarité pour le cache sémantique"""
# Utiliser les derniers messages pour créer une clé
history_text = " ".join([
m.get('content', '')[:100]
for m in conversation_history[-3:]
])
combined = f"{history_text}|{user_message}"
key_hash = hashlib.sha256(combined.encode()).hexdigest()[:16]
return f"sem:{key_hash}"
async def check_similar_request(
self,
conversation_id: str,
user_message: str,
history: List[dict]
) -> Optional[dict]:
"""Vérifie si une requête similaire existe en cache"""
similarity_key = self._compute_similarity_key(user_message, history)
if not similarity_key:
return None
cache_key = f"{conversation_id}:{similarity_key}"
return await self.cache_manager.get_cached_context(cache_key)
async def store_similar_request(
self,
conversation_id: str,
user_message: str,
history: List[dict],
response: dict
) -> None:
"""Stocke une réponse pour requête similaire"""
similarity_key = self._compute_similarity_key(user_message, history)
if not similarity_key:
return
cache_key = f"{conversation_id}:{similarity_key}"
await self.cache_manager.set_cached_context(cache_key, response)
class RequestBatcher:
"""Batcher de requêtes pour optimisation du throughput"""
def __init__(self, batch_size: int = 10, timeout_ms: int = 100):
self.batch_size = batch_size
self.timeout_ms = timeout_ms
self.pending_requests: List[Tuple[str, dict]] = []
self._lock = False
async def add_request(
self,
request_id: str,
payload: dict
) -> List[dict]:
"""Ajoute une requête au batch et retourne les résultats si batch plein"""
self.pending_requests.append((request_id, payload))
if len(self.pending_requests) >= self.batch_size:
return await self._execute_batch()
return []
async def _execute_batch(self) -> List[dict]:
"""Exécute le batch de requêtes en parallèle"""
if not self.pending_requests:
return []
batch = self.pending_requests[:self.batch_size]
self.pending_requests = self.pending_requests[self.batch_size:]
# Simulation d'exécution batch (remplacer par vrai appel API)
tasks = [
self._execute_single_request(req_id, payload)
for req_id, payload in batch
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
class HolySheepOptimizedClient:
"""Client optimisé combinant toutes les stratégies"""
BASE_URL = "https://api.holysheep.ai/v1"
def __init__(
self,
api_key: str,
redis_url: str = "redis://localhost:6379"
):
self.api_key = api_key
self.cache_manager = ContextCacheManager(redis_url=redis_url)
self.semantic_cache = SemanticCache(self.cache_manager)
self.batcher = RequestBatcher()
self.request_count = 0
self.cache_hits = 0
async def chat_with_optimization(
self,
conversation_id: str,
user_message: str,
conversation_history: List[dict]
) -> dict:
"""Methode principale avec toutes les optimisations"""
self.request_count += 1
# 1. Vérifier le cache sémantique
cached_response = await self.semantic_cache.check_similar_request(
conversation_id, user_message, conversation_history
)
if cached_response:
self.cache_hits += 1
cached_response['from_cache'] = True
return cached_response
# 2. Récupérer le contexte optimisé
context = await self.cache_manager.get_cached_context(conversation_id)
# 3. Construire le payload avec HolySheep API
payload = {
"model": "gemini-2.5-flash", # Modèle optimal rapport qualité/prix
"messages": self._build_messages(context, user_message),
"max_tokens": 4000,
"temperature": 0.7
}
# 4. Envoyer la requête
import aiohttp
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.BASE_URL}/chat/completions",
json=payload,
headers=headers
) as response:
result = await response.json()
# 5. Mettre en cache la réponse
response_data = {
'response': result['choices'][0]['message']['content'],
'usage': result.get('usage', {}),
'from_cache': False
}
await self.semantic_cache.store_similar_request(
conversation_id, user_message, conversation_history, response_data
)
# 6. Mettre à jour le cache de contexte
await self._update_context_cache(conversation_id, user_message, result)
return response_data
def _build_messages(
self,
context: Optional[dict],
user_message: str
) -> List[dict]:
"""Construit la liste des messages pour l'API"""
messages = []
if context and context.get('system_prompt'):
messages.append({
"role": "system",
"content": context['system_prompt']
})
if context and context.get('messages'):
messages.extend(context['messages'])
messages.append({"role": "user", "content": user_message})
return messages
async def _update_context_cache(
self,
conversation_id: str,
user_message: str,
response: dict
) -> None:
"""Met à jour le cache de contexte après une interaction"""
existing = await self.cache_manager.get_cached_context(conversation_id)
new_context = existing or {'messages': []}
new_context['messages'].append({
"role": "user",
"content": user_message
})
new_context['messages'].append({
"role": "assistant",
"content": response['choices'][0]['message']['content']
})
# Optimisation: garder seulement les derniers messages
if len(new_context['messages']) > 20:
new_context['messages'] = new_context['messages'][-20:]
await self.cache_manager.set_cached_context(conversation_id, new_context)
def get_cache_stats(self) -> dict:
"""Retourne les statistiques de cache"""
hit_rate = (self.cache_hits / self.request_count * 100) if self.request_count > 0 else 0
return {
'total_requests': self.request_count,
'cache_hits': self.cache_hits,
'hit_rate_percent': round(hit_rate, 2)
}
Point d'entrée pour tests
if __name__ == "__main__":
async def test_optimized_client():
client = HolySheepOptimizedClient(
api_key="YOUR_HOLYSHEEP_API_KEY",
redis_url="redis://localhost:6379"
)
# Simuler une conversation
history = []
test_messages = [
"Explique-moi le fonctionnement des ETF",
"Quels sont les avantages par rapport aux fonds classiques?",
"Et les inconvénients?",
"Recommande-moi les meilleurs ETF européens"
]
for msg in test_messages:
result = await client.chat_with_optimization(
conversation_id="test_conv_001",
user_message=msg,
conversation_history=history
)
print(f"Message: {msg[:40]}...")
print(f"Depuis cache: {result.get('from_cache', False)}")
print(f"Réponse: {result['response'][:100]}...")
print("---")
history.append({"role": "user", "content": msg})
history.append({"role": "assistant", "content": result['response']})
# Afficher les stats
stats = client.get_cache_stats()
print(f"\n=== Statistiques de cache ===")
print(f"Requêtes totales: {stats['total_requests']}")
print(f"Taux de succès cache: {stats['hit_rate_percent']}%")
asyncio.run(test_optimized_client())
Stratégies avancées de réduction de contexte
Technique 1: Summarization progressive
La summarisation progressive constitue l'approche la plus efficace pour les longues conversations. Au lieu de simplement tronquer les messages, le système génère des résumés sémantiques qui préservent les informations clés tout en réduisant drastiquement le nombre de tokens.
"""
Module de summarisation intelligente pour conversations longues
Intègre l'analyse sémantique et la preservation des entités importantes
"""
import re
from typing import List, Dict, Tuple
from collections import defaultdict
class EntityTracker:
"""Tracker d'entités pour préserver les informations importantes"""
def __init__(self):
self.entities = {
'persons': [],
'dates': [],
'numbers': [],
'products': [],
'preferences': []
}
def extract_entities(self, text: str) -> None:
"""Extrait les entités du texte"""
# Extraction de dates
date_patterns = [
r'\b\d{1,2}/\d{1,2}/\d{4}\b',
r'\b\d{4}-\d{2}-\d{2}\b',
r'\b(janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)\s+\d{4}\b',
r'\ble\s+\d{1,2}\s+\w+\s+\d{4}\b'
]
for pattern in date_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
self.entities['dates'].extend(matches)
# Extraction de montants financiers
money_pattern = r'[\d\s]+[\.,]?\d*\s*(€|dollars?|USD|EUR)'
money_matches = re.findall(money_pattern, text, re.IGNORECASE)
self.entities['numbers'].extend(money_matches)
# Extraction de préférences (mots clés)
preference_keywords = [
'je préfère', 'j\'aime', 'je déteste', 'je veux', 'je souhaite',
'mon budget', 'ma limite', 'je vise', 'mon objectif'
]
for keyword in preference_keywords:
if keyword.lower() in text.lower():
# Extraire la phrase complète
sentences = text.split('.')
for sentence in sentences:
if keyword.lower() in sentence.lower():
self.entities['preferences'].append(sentence.strip())
class ConversationSummarizer:
"""Générateur de résumés intelligent"""
SYSTEM_PROMPT = """Tu es un assistant spécialisé dans la synthèse de conversations.
Génère un résumé structuré qui préserve:
1. Les informations factuelles importantes (dates, montants, décisions)
2. Les préférences et contraintes exprimées par l'utilisateur
3. L'historique des questions et réponses clés
4. Les conclusions ou consensus atteints
Format de sortie:
- INFORMATIONS CLÉS: [liste]
- PRÉFÉRENCES UTILISATEUR: [liste]
- POINTS IMPORTANTS: [liste]
- STATUT ACTUEL: [résumé de l'état de la conversation]"""
def __init__(self, api_key: str):
self.api_key = api_key
self.entity_tracker = EntityTracker()
async def generate_summary(
self,
messages: List[Dict[str, str]]
) -> Tuple[str, List[str]]:
"""Génère un résumé optimisé de la conversation"""
# Extraire les entités de tous les messages
for msg in messages:
if msg.get('content'):
self.entity_tracker.extract_entities(msg['content'])
# Construire le prompt de summarisation
conversation_text = "\n".join([
f"[{msg.get('role', 'unknown')}]: {msg.get('content', '')[:500]}"
for msg in messages
])
summarization_prompt = f"""{self.SYSTEM_PROMPT}
CONVERSATION À SYNTHÉTISER:
{conversation_text}
ENTITÉS IMPORTANTES IDENTIFIÉES:
- Dates: {', '.join(self.entity_tracker.entities['dates'][-5:])}
- Montants: {', '.join(self.entity_tracker.entities['numbers'][-5:])}
- Préférences: {', '.join(self.entity_tracker.entities['preferences'][-3:])}"""
# Appel à l'API HolySheep pour générer le résumé
import aiohttp
payload = {
"model": "deepseek-v3.2", # Modèle économique pour summarisation
"messages": [
{"role": "system", "content": "Tu génères des résumés concis et informatifs."},
{"role": "user", "content": summarization_prompt}
],
"max_tokens": 800,
"temperature": 0.3 # Température basse pour cohérence
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.holysheep.ai/v1/chat/completions",
json=payload,
headers=headers
) as response:
result = await response.json()
summary = result['choices'][0]['message']['content']
return summary, self.entity_tracker.entities['preferences']
class ContextWindowManager:
"""Gestionnaire de fenêtre de contexte avec compression intelligente"""
def __init__(
self,
api_key: str,
max_tokens: int = 128000,
summary_threshold: int = 60000
):
self.api_key = api_key
self.max_tokens = max_tokens
self.summary_threshold = summary_threshold
self.summarizer = ConversationSummarizer(api_key)
self.tokenizer = self._init_tokenizer()
def _init_tokenizer(self):
"""Initialise le tokenizer pour comptage de tokens"""
try:
import tiktoken
return tiktoken.get_encoding("cl100k_base")
except ImportError:
# Fallback: estimation approximative
return None
def count_tokens(self, text: str) -> int:
"""Compte les tokens d'un texte"""
if self.tokenizer:
return len(self.tokenizer.encode(text))
return len(text) // 4 # Approximation grossière
async def get_optimized_context(
self,
messages: List[Dict[str, str]],
system_prompt: str
) -> List[Dict[str, str]]:
"""Retourne le contexte optimisé selon la longueur"""
total_tokens = self.count_tokens(system_prompt)
for msg in messages:
total_tokens += self.count_tokens(msg.get('content', ''))
# Si en dessous du seuil, retourner tel quel
if total_tokens < self.summary_threshold:
return messages
# Sinon, générer un résumé
summary, preferences = await self.summarizer.generate_summary(messages)
# Construire le nouveau contexte
summary_message = {
"role": "system",
"content": f"""[RÉSUMÉ DES ÉCHANGES PRÉCÉDENTS]
{summary}
PRÉFÉRENCES UTILISATEUR À RESPECTER:
{chr(10).join(f"- {p}" for p in preferences) if preferences else "Aucune préférence spécifique mémorisée."}"""
}
# Garder seulement les derniers messages + résumé
recent_messages = messages[-6:] if len(messages) > 6 else messages[-3:]
return [summary_message] + recent_messages
def estimate_cost_savings(
self,
original_tokens: int,
optimized_tokens: int
) -> Dict[str, float]:
"""Estime les