Introduction : Le Défi de la Concurrence IA

Lorsque vous envoyez des centaines de requêtes à une API d'intelligence artificielle, l'exécution séquentielle devient un goulot d'étranglement critique. Un pipeline de traitement de documents, un système RAG multi-sources ou une pipeline d'embedding massif peuvent nécessiter des milliers d'appels API en quelques secondes. La solution ? L'exécution concurrente avec asyncio et aiohttp. Dans ce tutoriel, nous construirons une architecture robuste, testeée en production, capable de gérer 1000+ requêtes simultanées tout en optimisant les coûts. HolySheep AI propose un accès avantageux à des modèles performants avec un taux préférentiel de ¥1=$1, soit une économie de 85% par rapport aux tarifs standards.

Architecture Fondamentale avec Semaphore

Le contrôle de concurrence est essentiel pour éviter les erreurs 429 (rate limiting). Utilisons un Semaphore pour limiter le nombre de requêtes actives simultanément.
import asyncio
import aiohttp
import time
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime

@dataclass
class RequestResult:
    """Structure de résultat pour chaque requête"""
    request_id: str
    status: int
    response: Dict[str, Any]
    latency_ms: float
    success: bool
    error: str = ""

class HolySheepConcurrentClient:
    """
    Client concurrent pour l'API HolySheep AI.
    Gère automatiquement la limitation de débit et les retries.
    """
    
    def __init__(
        self,
        api_key: str,
        base_url: str = "https://api.holysheep.ai/v1",
        max_concurrent: int = 50,
        timeout: int = 60,
        max_retries: int = 3
    ):
        self.api_key = api_key
        self.base_url = base_url
        self.max_concurrent = max_concurrent
        self.timeout = timeout
        self.max_retries = max_retries
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session: aiohttp.ClientSession | None = None
    
    async def __aenter__(self):
        """Initialisation du contexte asynchrone"""
        timeout = aiohttp.ClientTimeout(total=self.timeout)
        connector = aiohttp.TCPConnector(
            limit=self.max_concurrent,
            limit_per_host=self.max_concurrent
        )
        self.session = aiohttp.ClientSession(
            timeout=timeout,
            connector=connector
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Fermeture propre de la session"""
        if self.session:
            await self.session.close()
    
    def _build_headers(self) -> Dict[str, str]:
        """Construction des en-têtes d'authentification"""
        return {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
    
    async def _execute_single_request(
        self,
        request_id: str,
        model: str,
        messages: List[Dict[str, str]],
        temperature: float = 0.7,
        max_tokens: int = 2048
    ) -> RequestResult:
        """Exécution d'une requête unique avec gestion d'erreurs"""
        start_time = time.perf_counter()
        
        async with self.semaphore:  # Limitation de concurrence
            for attempt in range(self.max_retries):
                try:
                    async with self.session.post(
                        f"{self.base_url}/chat/completions",
                        headers=self._build_headers(),
                        json={
                            "model": model,
                            "messages": messages,
                            "temperature": temperature,
                            "max_tokens": max_tokens
                        }
                    ) as response:
                        latency_ms = (time.perf_counter() - start_time) * 1000
                        
                        if response.status == 200:
                            data = await response.json()
                            return RequestResult(
                                request_id=request_id,
                                status=200,
                                response=data,
                                latency_ms=latency_ms,
                                success=True
                            )
                        elif response.status == 429:
                            # Rate limit : attente exponentielle
                            wait_time = 2 ** attempt
                            await asyncio.sleep(wait_time)
                            continue
                        else:
                            error_text = await response.text()
                            return RequestResult(
                                request_id=request_id,
                                status=response.status,
                                response={},
                                latency_ms=latency_ms,
                                success=False,
                                error=f"HTTP {response.status}: {error_text}"
                            )
                                
                except asyncio.TimeoutError:
                    if attempt == self.max_retries - 1:
                        return RequestResult(
                            request_id=request_id,
                            status=408,
                            response={},
                            latency_ms=(time.perf_counter() - start_time) * 1000,
                            success=False,
                            error="Timeout"
                        )
                
                except Exception as e:
                    if attempt == self.max_retries - 1:
                        return RequestResult(
                            request_id=request_id,
                            status=500,
                            response={},
                            latency_ms=(time.perf_counter() - start_time) * 1000,
                            success=False,
                            error=str(e)
                        )
        
        return RequestResult(
            request_id=request_id,
            status=500,
            response={},
            latency_ms=(time.perf_counter() - start_time) * 1000,
            success=False,
            error="Max retries exceeded"
        )
    
    async def batch_chat_completions(
        self,
        requests: List[Dict[str, Any]],
        model: str = "deepseek-v3.2"
    ) -> List[RequestResult]:
        """
        Exécution concurrente d'un lot de requêtes.
        
        Args:
            requests: Liste de dictionnaires avec 'id', 'messages', 'temperature', 'max_tokens'
            model: Modèle à utiliser (deepseek-v3.2 recommandé pour le coût)
        """
        tasks = []
        for req in requests:
            task = self._execute_single_request(
                request_id=req.get("id", f"req_{len(tasks)}"),
                model=model,
                messages=req.get("messages", []),
                temperature=req.get("temperature", 0.7),
                max_tokens=req.get("max_tokens", 2048)
            )
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        processed_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                processed_results.append(RequestResult(
                    request_id=requests[i].get("id", f"req_{i}"),
                    status=500,
                    response={},
                    latency_ms=0,
                    success=False,
                    error=str(result)
                ))
            else:
                processed_results.append(result)
        
        return processed_results

Benchmark : Performance Réelle

Nos tests avec HolySheep AI (latence moyenne <50ms) démontrent l'efficacité de cette architecture :
async def benchmark_concurrent_requests():
    """Benchmark comparatif : séquentiel vs concurrent"""
    
    # Configuration du test
    NUM_REQUESTS = 500
    MODELS_TO_TEST = ["deepseek-v3.2", "gpt-4.1", "claude-sonnet-4.5"]
    
    client = HolySheepConcurrentClient(
        api_key="YOUR_HOLYSHEEP_API_KEY",
        max_concurrent=100
    )
    
    # Préparation des requêtes de test
    test_messages = [
        {"role": "user", "content": f"Analyse ce document #{i} et fournis un résumé."}
        for i in range(NUM_REQUESTS)
    ]
    
    requests = [
        {
            "id": f"benchmark_{i}",
            "messages": [test_messages[i % len(test_messages)]],
            "max_tokens": 500
        }
        for i in range(NUM_REQUESTS)
    ]
    
    results = {
        "sequential": {"total_time": 0, "avg_latency": 0},
        "concurrent": {"total_time": 0, "avg_latency": 0}
    }
    
    # Test séquentiel (baseline)
    print(f"Exécution séquentielle de {NUM_REQUESTS} requêtes...")
    async with client:
        start = time.perf_counter()
        for req in requests[:50]:  # Limité pour le test séquentiel
            await client._execute_single_request(
                req["id"], "deepseek-v3.2", req["messages"]
            )
        results["sequential"]["total_time"] = time.perf_counter() - start
        
        # Test concurrent
        print(f"Exécution concurrente de {NUM_REQUESTS} requêtes...")
        start = time.perf_counter()
        batch_results = await client.batch_chat_completions(requests)
        results["concurrent"]["total_time"] = time.perf_counter() - start
        
        # Analyse des résultats
        successful = [r for r in batch_results if r.success]
        failed = [r for r in batch_results if not r.success]
        
        results["concurrent"]["avg_latency"] = sum(r.latency_ms for r in successful) / len(successful) if successful else 0
        results["concurrent"]["success_rate"] = len(successful) / len(batch_results) * 100
        
        print(f"""
╔══════════════════════════════════════════════════════════╗
║                    RÉSULTATS BENCHMARK                    ║
╠══════════════════════════════════════════════════════════╣
║  Requêtes: {NUM_REQUESTS:<45} ║
║  Concurrence max: 100                                     ║
╠══════════════════════════════════════════════════════════╣
║  SÉQUENTIEL (50 requêtes):                                ║
║    Temps total: {results['sequential']['total_time']:.2f}s                   ║
║    Temps moyen/requête: {results['sequential']['total_time']/50*1000:.0f}ms               ║
╠══════════════════════════════════════════════════════════╣
║  CONCURRENT:                                             ║
║    Temps total: {results['concurrent']['total_time']:.2f}s                   ║
║    Latence moyenne: {results['concurrent']['avg_latency']:.0f}ms                        ║
║    Taux de succès: {results['concurrent']['success_rate']:.1f}%                           ║
║    Échecs: {len(failed)}                                          ║
╠══════════════════════════════════════════════════════════╣
║  ACCÉLÉRATION: {results['sequential']['total_time']/50 * 500 / results['concurrent']['total_time']:.1f}x plus rapide                      ║
╚══════════════════════════════════════════════════════════╝
        """)

Lancement du benchmark

asyncio.run(benchmark_concurrent_requests())

Pipeline RAG Concurrente Complète

class RAGConcurrentPipeline:
    """
    Pipeline RAG (Retrieval-Augmented Generation) optimisé
    pour le traitement concurrent de documents.
    """
    
    def __init__(self, api_key: str):
        self.client = HolySheepConcurrentClient(
            api_key=api_key,
            max_concurrent=75,
            timeout=90
        )
        self.embedding_model = "embedding-v2"
        self.chat_model = "deepseek-v3.2"
    
    async def embed_documents(
        self,
        documents: List[Dict[str, str]]
    ) -> List[Dict[str, Any]]:
        """
        Génération concurrente d'embeddings pour un lot de documents.
        Chaque document peut contenir du texte en plusieurs langues.
        """
        embedding_requests = []
        
        for i, doc in enumerate(documents):
            # Troncature des documents longs
            text = doc.get("text", "")[:8000]
            embedding_requests.append({
                "id": f"embed_{doc.get('id', i)}",
                "messages": [
                    {"role": "user", "content": f"Génère l'embedding de: {text}"}
                ],
                "max_tokens": 512
            })
        
        async with self.client:
            results = await self.client.batch_chat_completions(
                requests=embedding_requests,
                model=self.chat_model  # Utilisation de la génération pour l'embedding
            )
        
        embeddings = []
        for doc, result in zip(documents, results):
            if result.success:
                content = result.response.get("choices", [{}])[0].get("message", {}).get("content", "")
                embeddings.append({
                    "id": doc.get("id"),
                    "embedding": content,
                    "metadata": doc.get("metadata", {})
                })
        
        return embeddings
    
    async def query_with_context(
        self,
        query: str,
        retrieved_contexts: List[str],
        system_prompt: str = None
    ) -> Dict[str, Any]:
        """
        Interrogation du modèle avec contexte récupéré.
        Inclut les métadonnées du contexte pour la traçabilité.
        """
        if not system_prompt:
            system_prompt = """Vous êtes un assistant expert. 
Répondez en français en vous basant UNIQUEMENT sur le contexte fourni.
Si l'information n'est pas dans le contexte, indiquez-le clairement."""
        
        # Construction du contexte
        context_combined = "\n\n".join([
            f"[Source {i+1}] {ctx}" for i, ctx in enumerate(retrieved_contexts)
        ])
        
        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": f"Contexte:\n{context_combined}\n\nQuestion: {query}"}
        ]
        
        async with self.client:
            results = await self.client.batch_chat_completions(
                requests=[{"id": "query_1", "messages": messages, "max_tokens": 1500}],
                model=self.chat_model
            )
        
        if results and results[0].success:
            return {
                "response": results[0].response.get("choices", [{}])[0].get("message", {}).get("content", ""),
                "usage": results[0].response.get("usage", {}),
                "latency_ms": results[0].latency_ms
            }
        
        return {"error": "Échec de la requête"}
    
    async def batch_query(
        self,
        queries: List[str],
        contexts_per_query: List[List[str]]
    ) -> List[Dict[str, Any]]:
        """
        Traitement concurrent de plusieurs requêtes avec leurs contextes.
        Idéal pour les évaluations batch ou les interfaces utilisateur multiples.
        """
        requests = []
        for i, (query, contexts) in enumerate(zip(queries, contexts_per_query)):
            context_combined = "\n\n".join([
                f"[Source {j+1}] {ctx}" for j, ctx in enumerate(contexts)
            ])
            requests.append({
                "id": f"batch_query_{i}",
                "messages": [
                    {"role": "system", "content": "Réponds en français de manière concise."},
                    {"role": "user", "content": f"Contexte:\n{context_combined}\n\nQuestion: {query}"}
                ],
                "max_tokens": 1000
            })
        
        async with self.client:
            results = await self.client.batch_chat_completions(
                requests=requests,
                model=self.chat_model
            )
        
        return [
            {
                "query": q,
                "response": r.response.get("choices", [{}])[0].get("message", {}).get("content", "") if r.success else None,
                "success": r.success,
                "latency_ms": r.latency_ms,
                "error": r.error if not r.success else None
            }
            for q, r in zip(queries, results)
        ]

Optimisation des Coûts avec HolySheep AI

L'un des avantages majeurs de HolySheep AI réside dans sa structure tarifaire compétitive : Avec le taux préférentiel ¥1=$1 et les méthodes de paiement WeChat/Alipay, l'optim