Introduction : Le Défi des Tâches Longues en Production

En tant qu'ingénieur backend qui a déployé une dizaines d'agents IA en production au cours des trois dernières années, je connais intimement les sueurs froides d'attendre qu'un agent traite 10 000 documents clients pendant que le monitoring affiche un inquiétant "En cours..." sans aucune visibilité. La semaine dernière, un collègue me demandait conseil après que son pipeline de modération de contenu ait planté à 97% d'avancement — toutes les 9 700 images traitées, résultat de trois heures de calcul. Ce genre de scénario arrive bien plus souvent qu'on ne le pense, et aujourd'hui'hui je vais vous partager l'architecture complète que j'utilise désormais pour gérer ce type de situations.

Dans ce tutoriel, nous allons construire un système robuste de gestion des tâches longues avec un agent IA, en utilisant l'API HolySheep AI. Si vous ne connaissez pas encore cette plateforme, sachez qu'elle offre une latence moyenne de 42 millisecondes pour les appels synchrones — bien en dessous des 150-200ms que j'observais avec mes anciens fournisseurs — et des tarifs considérablement réduits : DeepSeek V3.2 à 0,42 $ le million de tokens, contre 8 $ pour GPT-4.1. S'inscrire ici vous permettra de tester gratuitement avec des crédits offerts.

Cas d'Usage Concret : Système RAG d'Entreprise avec 50 000 Documents

Imaginons une entreprise qui doit indexer sa base de connaissances juridique — 50 000 documents PDF totalisant 8 Go de données. Un agent doit extraire le contenu, le chunker intelligemment, générer des embeddings, et stocker le tout dans une base vectorielle. Cette opération prendrait plusieurs heures si elle était effectuée en une seule requête, et un simple incident réseau suffirait à tout perdre.

Notre architecture doit répondre à trois exigences critiques : un suivi de progression en temps réel avec pourcentage et ETA, une gestion élégante des dépassements de délai avec retry intelligent, et surtout une capacité de reprise au point exact où l'opération s'est arrêtée.

Architecture du Gestionnaire de Tâches Longues

Structure de Données pour le Suivi

"""
Système de gestion des tâches longues avec checkpoint
Auteur : HolySheep AI Technical Blog
"""

import json
import time
import hashlib
from datetime import datetime, timedelta
from dataclasses import dataclass, field, asdict
from typing import Optional, List, Dict, Any, Callable
from enum import Enum
import asyncio
from aiofiles import async_open

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    PAUSED = "paused"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class Checkpoint:
    """Point de sauvegarde pour la reprise après interruption"""
    task_id: str
    status: TaskStatus
    current_index: int
    total_items: int
    last_processed_id: str
    accumulated_results: List[Dict[str, Any]]
    error_count: int
    last_error: Optional[str]
    started_at: datetime
    updated_at: datetime
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def to_json(self) -> str:
        return json.dumps({
            "task_id": self.task_id,
            "status": self.status.value,
            "current_index": self.current_index,
            "total_items": self.total_items,
            "last_processed_id": self.last_processed_id,
            "accumulated_results": self.accumulated_results,
            "error_count": self.error_count,
            "last_error": self.last_error,
            "started_at": self.started_at.isoformat(),
            "updated_at": self.updated_at.isoformat(),
            "metadata": self.metadata
        }, indent=2)
    
    @classmethod
    def from_json(cls, json_str: str) -> 'Checkpoint':
        data = json.loads(json_str)
        data['status'] = TaskStatus(data['status'])
        data['started_at'] = datetime.fromisoformat(data['started_at'])
        data['updated_at'] = datetime.fromisoformat(data['updated_at'])
        return cls(**data)
    
    @property
    def progress_percentage(self) -> float:
        if self.total_items == 0:
            return 0.0
        return round((self.current_index / self.total_items) * 100, 2)
    
    @property
    def eta_seconds(self) -> Optional[float]:
        if self.current_index == 0:
            return None
        elapsed = (datetime.now() - self.started_at).total_seconds()
        rate = self.current_index / elapsed
        remaining = self.total_items - self.current_index
        return remaining / rate if rate > 0 else None

Cette structure de données est le cœur de notre système. Chaque checkpoint contient tout l'état nécessaire pour reprendre une tâche exactement où elle s'était arrêtée. Remarquez les champs accumulated_results qui stockent les résultats partiels, et last_processed_id qui permet d'ignorer les éléments déjà traités.

Client HolySheep AI avec Gestion des Délais

import aiohttp
import asyncio
from typing import AsyncIterator, Dict, Any, Optional

class HolySheepAIClient:
    """Client robuste pour l'API HolySheep AI avec gestion des délais"""
    
    def __init__(
        self,
        api_key: str,
        base_url: str = "https://api.holysheep.ai/v1",
        timeout_seconds: int = 30,
        max_retries: int = 3,
        retry_backoff: float = 1.5
    ):
        self.api_key = api_key
        self.base_url = base_url
        self.timeout = aiohttp.ClientTimeout(total=timeout_seconds)
        self.max_retries = max_retries
        self.retry_backoff = retry_backoff
        self._session: Optional[aiohttp.ClientSession] = None
    
    async def __aenter__(self):
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        self._session = aiohttp.ClientSession(headers=headers, timeout=self.timeout)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self._session:
            await self._session.close()
    
    async def chat_completion(
        self,
        messages: List[Dict[str, str]],
        model: str = "deepseek-v3.2",
        temperature: float = 0.7,
        max_tokens: int = 2048
    ) -> Dict[str, Any]:
        """Appel avec retry exponentiel et timeout adaptatif"""
        payload = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
            "max_tokens": max_tokens
        }
        
        for attempt in range(self.max_retries):
            try:
                async with self._session.post(
                    f"{self.base_url}/chat/completions",
                    json=payload
                ) as response:
                    if response.status == 200:
                        return await response.json()
                    elif response.status == 429:
                        # Rate limit — attendre plus longtemps
                        wait_time = self.retry_backoff ** attempt * 5
                        await asyncio.sleep(wait_time)
                        continue
                    else:
                        error_text = await response.text()
                        raise aiohttp.ClientResponseError(
                            response.request_info,
                            response.history,
                            status=response.status,
                            message=error_text
                        )
            except asyncio.TimeoutError:
                if attempt == self.max_retries - 1:
                    raise TimeoutError(
                        f"Délai dépassé après {self.max_retries} tentatives"
                    )
                wait_time = self.retry_backoff ** attempt
                await asyncio.sleep(wait_time)
        
        raise RuntimeError("Nombre maximum de tentatives atteint")

Ce client implémente le pattern de retry exponentiel que je recommande vivement. J'ai observé que 80% des échecs sont des problèmes réseau temporaires qui se résolvent d'eux-mêmes en quelques secondes. Le backoff exponentiel évite de surcharger l'API tout en maximisant les chances de succès.

Implémentation du Gestionnaire de Tâches

import aiofiles
from pathlib import Path

class LongTaskManager:
    """
    Gestionnaire de tâches longues avec checkpoint automatique
    Latence mesurée HolySheep AI : ~42ms (vs 150-200ms concurrent)
    """
    
    def __init__(
        self,
        client: HolySheepAIClient,
        checkpoint_dir: str = "./checkpoints",
        checkpoint_interval: int = 50,  # Sauvegarder tous les 50 items
        items_per_batch: int = 10
    ):
        self.client = client
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
        self.checkpoint_interval = checkpoint_interval
        self.items_per_batch = items_per_batch
    
    def _get_checkpoint_path(self, task_id: str) -> Path:
        return self.checkpoint_dir / f"{task_id}.json"
    
    async def load_checkpoint(self, task_id: str) -> Optional[Checkpoint]:
        """Charge un checkpoint existant pour reprise"""
        path = self._get_checkpoint_path(task_id)
        if not path.exists():
            return None
        
        async with async_open(path, 'r') as f:
            content = await f.read()
            return Checkpoint.from_json(content)
    
    async def save_checkpoint(self, checkpoint: Checkpoint) -> None:
        """Sauvegarde atomique du checkpoint"""
        checkpoint.updated_at = datetime.now()
        path = self._get_checkpoint_path(checkpoint.task_id)
        temp_path = path.with_suffix('.tmp')
        
        async with async_open(temp_path, 'w') as f:
            await f.write(checkpoint.to_json())
        
        temp_path.replace(path)  # Atomic sur la plupart des systèmes
    
    async def process_long_task(
        self,
        task_id: str,
        items: List[Dict[str, Any]],
        processor: Callable[[Dict, HolySheepAIClient], AsyncIterator[Dict]],
        model: str = "deepseek-v3.2"
    ) -> Checkpoint:
        """
        Traite une liste d'items avec checkpoint et progression
        
        Args:
            task_id: Identifiant unique de la tâche
            items: Liste des éléments à traiter
            processor: Fonction asynchrone qui traite chaque item
            model: Modèle HolySheep à utiliser (deepseek-v3.2 recommandé: $0.42/1M tokens)
        
        Returns:
            Checkpoint final avec tous les résultats
        """
        # Charger ou créer le checkpoint
        checkpoint = await self.load_checkpoint(task_id)
        start_index = 0
        
        if checkpoint and checkpoint.status == TaskStatus.RUNNING:
            print(f"📦 Reprise détectée : {checkpoint.progress_percentage}% déjà traité")
            start_index = checkpoint.current_index
            checkpoint.status = TaskStatus.RUNNING
        else:
            checkpoint = Checkpoint(
                task_id=task_id,
                status=TaskStatus.RUNNING,
                current_index=0,
                total_items=len(items),
                last_processed_id="",
                accumulated_results=[],
                error_count=0,
                last_error=None,
                started_at=datetime.now(),
                updated_at=datetime.now()
            )
        
        # Traitement par lots
        for batch_start in range(start_index, len(items), self.items_per_batch):
            batch = items[batch_start:batch_start + self.items_per_batch]
            
            for i, item in enumerate(batch):
                global_index = batch_start + i
                
                try:
                    async for result in processor(item, self.client):
                        checkpoint.accumulated_results.append(result)
                        checkpoint.last_processed_id = item.get('id', str(global_index))
                    
                    checkpoint.current_index = global_index + 1
                    
                    # Log de progression toutes les 10 items
                    if checkpoint.current_index % 10 == 0:
                        eta = checkpoint.eta_seconds
                        eta_str = f"{eta/60:.1f}min" if eta else "calcul en cours"
                        print(
                            f"⚡ Progression: {checkpoint.progress_percentage}% "
                            f"({checkpoint.current_index}/{checkpoint.total_items}) "
                            f"ETA: {eta_str}"
                        )
                    
                    # Sauvegarde périodique du checkpoint
                    if checkpoint.current_index % self.checkpoint_interval == 0:
                        await self.save_checkpoint(checkpoint)
                        print(f"💾 Checkpoint sauvegardé à {checkpoint.progress_percentage}%")
                
                except Exception as e:
                    checkpoint.error_count += 1
                    checkpoint.last_error = str(e)
                    print(f"⚠️ Erreur sur item {global_index}: {e}")
                    
                    # Stratégie : continuer ou s'arrêter selon le type d'erreur
                    if isinstance(e, (asyncio.TimeoutError, aiohttp.ClientError)):
                        await asyncio.sleep(2)  # Pause avant retry
                        continue
                    else:
                        checkpoint.status = TaskStatus.FAILED
                        await self.save_checkpoint(checkpoint)
                        raise
        
        # Finalisation
        checkpoint.status = TaskStatus.COMPLETED
        checkpoint.updated_at = datetime.now()
        await self.save_checkpoint(checkpoint)
        
        return checkpoint

Cette architecture a fait ses preuves en production. Le paramètre checkpoint_interval est crucial : j'utilise généralement 50 pour des tâches volumineuses, ce qui offre un bon compromis entre la sécurité des sauvegardes et les overheads d'écriture. Pour les tâches critiques, je descends à 10.

Exemple Pratique : Indexation RAG avec Reprise Automatique

async def rag_document_processor(
    document: Dict[str, Any],
    client: HolySheepAIClient
) -> AsyncIterator[Dict[str, Any]]:
    """
    Traite un document pour indexation RAG avec extraction IA
    Coût estimé HolySheep: $0.0012/document avec DeepSeek V3.2
    (vs $0.023 avec GPT-4.1 — économie de ~95%)
    """
    doc_id = document['id']
    content = document['content']
    
    # Étape 1 : Résumé intelligent du document
    messages = [
        {"role": "system", "content": "Tu es un assistant d'extraction de données. "
         "Extrait les informations clés du document en JSON structuré."},
        {"role": "user", "content": f"Document ID {doc_id}:\n\n{content[:4000]}"}
    ]
    
    response = await client.chat_completion(
        messages=messages,
        model="deepseek-v3.2",
        max_tokens=1000
    )
    
    extracted_data = response['choices'][0]['message']['content']
    
    yield {
        "doc_id": doc_id,
        "extracted": extracted_data,
        "tokens_used": response.get('usage', {}).get('total_tokens', 0),
        "processed_at": datetime.now().isoformat()
    }

async def main():
    """Exemple d'utilisation complète"""
    documents = [
        {"id": f"doc_{i}", "content": f"Contenu du document {i}..."}
        for i in range(50000)
    ]
    
    async with HolySheepAIClient(
        api_key="YOUR_HOLYSHEEP_API_KEY",
        timeout_seconds=30,
        max_retries=3
    ) as client:
        manager = LongTaskManager(
            client=client,
            checkpoint_dir="./rag_checkpoints",
            checkpoint_interval=100,
            items_per_batch=20
        )
        
        # Traitement avec gestion des interruptions
        final_checkpoint = await manager.process_long_task(
            task_id="rag_indexing_2024_01",
            items=documents,
            processor=rag_document_processor,
            model="deepseek-v3.2"
        )
        
        print(f"✅ Traitement terminé: {len(final_checkpoint.accumulated_results)} documents")
        print(f"📊 Erreurs: {final_checkpoint.error_count}")

if __name__ == "__main__":
    asyncio.run(main())

Ce script,处理50 000 documents en environ 3 heures avec des checkpoints toutes les 100 items. Si une interruption survient, le simple lancement du script reprend exactement où il s'était arrêté. Lafacture finale ? Environ 60 $ avec HolySheep AI contre plus de 1 150 $ avec GPT-4.1 — une économie qui change la donne pour les projets à grand volume.

Monitoring et Tableau de Bord

Pour suivre visuellement la progression de vos tâches longues, voici une fonction de reporting en temps réel :

import rich.console
import rich.table
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeRemainingColumn

def display_progress_dashboard(checkpoint: Checkpoint) -> None:
    """Affiche un tableau de bord de progression stylisé"""
    console = rich.console.Console()
    
    table = rich.table.Table(title=f"📊 Dashboard — Tâche {checkpoint.task_id}")
    table.add_column("Métrique", style="cyan", no_wrap=True)
    table.add_column("Valeur", style="magenta")
    
    table.add_row("Statut", f"[{'green' if checkpoint.status == TaskStatus.COMPLETED else 'yellow'}]{checkpoint.status.value}")
    table.add_row("Progression", f"{checkpoint.progress_percentage}%")
    table.add_row("Items traités", f"{checkpoint.current_index:,} / {checkpoint.total_items:,}")
    table.add_row("Erreurs", str(checkpoint.error_count))
    table.add_row("Démarré", checkpoint.started_at.strftime("%Y-%m-%d %H:%M:%S"))
    table.add_row("Dernière mise à jour", checkpoint.updated_at.strftime("%Y-%m-%d %H:%M:%S"))
    
    if checkpoint.eta_seconds:
        eta_delta = timedelta(seconds=int(checkpoint.eta_seconds))
        table.add_row("Temps restant estimé", str(eta_delta))
    
    if checkpoint.last_error:
        table.add_row("Dernière erreur", checkpoint.last_error[:50] + "...")
    
    console.print(table)
    
    # Barre de progression ASCII
    bar_length = 40
    filled = int(checkpoint.progress_percentage / 100 * bar_length)
    bar = "█" * filled + "░" * (bar_length - filled)
    console.print(f"\n[{bar}] {checkpoint.progress_percentage}%")

Erreurs courantes et solutions

Erreur 1 : Timeout récurrents avec gros volumes

Symptôme : Les appels API dépassent le délai même après plusieurs retries.

# ❌ Solution naive — timeout fixe trop court
async def process_with_short_timeout():
    async with aiohttp.ClientTimeout(total=5) as timeout:  # 5 secondes insuffisant
        # Traitement...
        pass

✅ Solution recommandée — timeout adaptatif selon la taille

async def process_with_adaptive_timeout(client, item_size: int): # Estimer le timeout nécessaire : 1 seconde + 100ms par Ko au-dessus de 10Ko base_timeout = 10 # secondes size_overhead = max(0, (item_size - 10000) / 1000) * 0.1 calculated_timeout = base_timeout + size_overhead async with client.session.timeout(total=calculated_timeout) as timeout: # Traitement avec timeout ajusté pass

Erreur 2 : Perte de données après plantage système

Symptôme : Le checkpoint existe mais accumulated_results est vide ou corrompu.

# ❌ Problème : Sauvegarde non atomique
async def bad_save_checkpoint(checkpoint: Checkpoint):
    path = Path(f"checkpoints/{checkpoint.task_id}.json")
    content = checkpoint.to_json()
    
    # Crash ici = fichier corrompu avec anciennes données
    with open(path, 'w') as f:
        f.write(content)

✅ Solution : Écriture atomique via fichier temporaire

async def atomic_save_checkpoint(checkpoint: Checkpoint): path = Path(f"checkpoints/{checkpoint.task_id}.json") temp_path = path.with_suffix('.json.tmp') async with async_open(temp_path, 'w') as f: await f.write(checkpoint.to_json()) await f.flush() await f.sync() # Force l'écriture physique temp_path.replace(path) # Atomique sur la plupart des FS

Erreur 3 : Doublons après reprise de tâche

Symptôme : Les mêmes items sont traités plusieurs fois à la reprise.

# ❌