Ngày 15 tháng 3 năm 2026, tôi nhận được một cuộc gọi lúc 2 giờ sáng từ đồng nghiệp. Hệ thống xử lý 10.000 tài liệu PDF của khách hàng đã treo cứng sau 3 giờ chạy. Khi kiểm tra log, tôi thấy lỗi kinh điển:

ConnectionError: Connection timeout after 120s
Task ID: task_8x7f9d2a
Processed: 3,421 / 10,000 documents
Status: FAILED - No checkpoint saved

3 giờ công sức, 3.421 tài liệu đã xử lý — tất cả đều mất trắng. Đó là khoảnh khắc tôi quyết định xây dựng một hệ thống quản lý long-task hoàn chỉnh. Bài viết này sẽ chia sẻ toàn bộ kiến thức và source code để bạn không phải lặp lại sai lầm của tôi.

1. Tại sao Agent cần Long-Task Management?

Khi xây dựng AI agent với HolySheep AI (base URL: https://api.holysheep.ai/v1), chúng ta thường gặp các tác vụ dài:

Với HolySheep AI, chi phí chỉ từ $0.42/MTok (DeepSeek V3.2) — tiết kiệm 85%+ so với OpenAI. Nhưng nếu mỗi lần chạy task 10.000 items mà mất 3 tiếng rồi fail, chi phí này trở nên vô nghĩa.

2. Kiến trúc tổng thể

Hệ thống của chúng ta gồm 4 thành phần chính:

┌─────────────────────────────────────────────────────────────┐
│                    LONG-TASK MANAGER                        │
├──────────────┬──────────────┬──────────────┬────────────────┤
│   Progress   │    Timeout   │   Checkpoint │     Resume     │
│   Tracker    │   Controller │    Manager   │    Handler     │
├──────────────┼──────────────┼──────────────┼────────────────┤
│  Real-time   │  Per-step    │  Auto-save   │  State restore │
│  callbacks   │  deadlines   │  every N     │  from JSON     │
└──────────────┴──────────────┴──────────────┴────────────────┘

3. Implement chi tiết

3.1. HolySheep AI Client với Timeout

import requests
import json
import time
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, Callable
from dataclasses import dataclass, asdict
import hashlib

@dataclass
class TaskProgress:
    task_id: str
    total_items: int
    processed_items: int
    failed_items: int
    current_status: str
    start_time: datetime
    last_update: datetime
    checkpoint_path: str
    results: list

class HolySheepLongTaskManager:
    """Manager cho các tác vụ dài với HolySheep AI API"""
    
    BASE_URL = "https://api.holysheep.ai/v1"
    
    def __init__(
        self,
        api_key: str,
        checkpoint_dir: str = "./checkpoints",
        timeout_per_request: int = 60,
        max_retries: int = 3,
        checkpoint_interval: int = 50
    ):
        self.api_key = api_key
        self.checkpoint_dir = checkpoint_dir
        self.timeout = timeout_per_request
        self.max_retries = max_retries
        self.checkpoint_interval = checkpoint_interval
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        })
    
    def _call_api(
        self,
        prompt: str,
        model: str = "deepseek-v3.2",
        temperature: float = 0.7
    ) -> Dict[str, Any]:
        """Gọi HolySheep AI API với retry và timeout"""
        url = f"{self.BASE_URL}/chat/completions"
        payload = {
            "model": model,
            "messages": [{"role": "user", "content": prompt}],
            "temperature": temperature
        }
        
        for attempt in range(self.max_retries):
            try:
                response = self.session.post(
                    url,
                    json=payload,
                    timeout=self.timeout
                )
                response.raise_for_status()
                return response.json()
            
            except requests.exceptions.Timeout:
                print(f"⏰ Timeout lần {attempt + 1}/{self.max_retries}")
                if attempt == self.max_retries - 1:
                    raise TimeoutError(f"HolySheep API timeout sau {self.timeout}s")
                time.sleep(2 ** attempt)
            
            except requests.exceptions.HTTPError as e:
                if response.status_code == 429:
                    print(f"⚠️ Rate limit, đợi 60s...")
                    time.sleep(60)
                elif response.status_code == 401:
                    raise PermissionError("API Key không hợp lệ!")
                else:
                    raise
    
    def _generate_task_id(self, items: list) -> str:
        """Tạo task ID duy nhất từ hash của items"""
        content_hash = hashlib.md5(
            str(items[:10]).encode()
        ).hexdigest()[:8]
        return f"task_{datetime.now().strftime('%Y%m%d')}_{content_hash}"
    
    def _save_checkpoint(
        self,
        progress: TaskProgress,
        additional_data: Optional[Dict] = None
    ):
        """Lưu checkpoint để có thể resume"""
        checkpoint = {
            "progress": asdict(progress),
            "saved_at": datetime.now().isoformat(),
            "additional_data": additional_data or {}
        }
        
        with open(progress.checkpoint_path, 'w', encoding='utf-8') as f:
            json.dump(checkpoint, f, ensure_ascii=False, indent=2)
        
        print(f"💾 Checkpoint saved: {progress.processed_items}/{progress.total_items}")
    
    def _load_checkpoint(self, checkpoint_path: str) -> Optional[Dict]:
        """Load checkpoint nếu tồn tại"""
        try:
            with open(checkpoint_path, 'r', encoding='utf-8') as f:
                return json.load(f)
        except FileNotFoundError:
            return None
    
    def process_batch(
        self,
        items: list,
        prompt_template: str,
        model: str = "deepseek-v3.2",
        resume: bool = True,
        progress_callback: Optional[Callable] = None
    ) -> TaskProgress:
        """
        Xử lý batch items với progress tracking và checkpoint
        
        Args:
            items: Danh sách items cần xử lý
            prompt_template: Template prompt với {item} placeholder
            model: Model sử dụng (default: deepseek-v3.2)
            resume: Có resume từ checkpoint không
            progress_callback: Callback để update UI
        
        Returns:
            TaskProgress object với kết quả
        """
        task_id = self._generate_task_id(items)
        checkpoint_path = f"{self.checkpoint_dir}/{task_id}.json"
        
        # Check resume
        start_index = 0
        existing_results = []
        
        if resume:
            checkpoint = self._load_checkpoint(checkpoint_path)
            if checkpoint:
                start_index = checkpoint['progress']['processed_items']
                existing_results = checkpoint['progress']['results']
                print(f"🔄 Resuming từ checkpoint: đã xử lý {start_index}/{len(items)}")
        
        # Initialize progress
        progress = TaskProgress(
            task_id=task_id,
            total_items=len(items),
            processed_items=start_index,
            failed_items=0,
            current_status="RUNNING",
            start_time=datetime.now(),
            last_update=datetime.now(),
            checkpoint_path=checkpoint_path,
            results=existing_results
        )
        
        # Process items
        for i in range(start_index, len(items)):
            item = items[i]
            prompt = prompt_template.format(item=item)
            
            try:
                result = self._call_api(prompt, model=model)
                content = result['choices'][0]['message']['content']
                
                progress.results.append({
                    "index": i,
                    "item": item,
                    "result": content,
                    "status": "SUCCESS",
                    "timestamp": datetime.now().isoformat()
                })
                progress.processed_items += 1
                
            except Exception as e:
                progress.failed_items += 1
                progress.results.append({
                    "index": i,
                    "item": item,
                    "error": str(e),
                    "status": "FAILED",
                    "timestamp": datetime.now().isoformat()
                })
                print(f"❌ Item {i} failed: {e}")
            
            progress.last_update = datetime.now()
            
            # Progress callback
            if progress_callback:
                progress_callback(progress)
            
            # Auto checkpoint
            if (i + 1) % self.checkpoint_interval == 0:
                self._save_checkpoint(progress)
        
        progress.current_status = "COMPLETED"
        self._save_checkpoint(progress)
        
        return progress


=== SỬ DỤNG ===

if __name__ == "__main__": # Khởi tạo manager với HolySheep API manager = HolySheepLongTaskManager( api_key="YOUR_HOLYSHEEP_API_KEY", # Thay bằng key thật checkpoint_dir="./checkpoints", timeout_per_request=60, checkpoint_interval=50 ) # Callback để track progress real-time def on_progress(p: TaskProgress): percent = (p.processed_items / p.total_items) * 100 elapsed = (datetime.now() - p.start_time).total_seconds() eta = (elapsed / p.processed_items) * (p.total_items - p.processed_items) print(f"📊 Progress: {percent:.1f}% | ETA: {eta/60:.1f} phút | Failed: {p.failed_items}") # Sample items documents = [f"doc_{i}.pdf" for i in range(1000)] prompt = """Phân tích nội dung tài liệu sau và trích xuất: 1. Chủ đề chính 2. Các từ khóa quan trọng 3. Tóm tắt 3 câu Tài liệu: {item}""" # Chạy với resume tự động result = manager.process_batch( items=documents, prompt_template=prompt, model="deepseek-v3.2", resume=True, progress_callback=on_progress ) print(f"✅ Hoàn thành: {result.processed_items} items, {result.failed_items} thất bại")

3.2. Advanced: Queue-based Task với Celery

Với hệ thống production lớn, bạn cần queue-based architecture:

import celery
from celery import Celery
from celery.signals import task_success, task_failure
import json
import redis

app = Celery('long_task_manager', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=5, default_retry_delay=30)
def process_single_item(self, item_data: dict, task_metadata: dict):
    """
    Celery task để xử lý từng item
    Task này có thể retry tự động khi fail
    """
    task_id = task_metadata['task_id']
    checkpoint_key = f"checkpoint:{task_id}"
    redis_client = redis.Redis(host='localhost', port=6379, db=0)
    
    try:
        # Load checkpoint
        checkpoint = json.loads(
            redis_client.get(checkpoint_key) or '{"processed": []}'
        )
        
        # Skip nếu đã xử lý
        if item_data['index'] in checkpoint['processed']:
            return {"status": "SKIPPED", "index": item_data['index']}
        
        # Gọi HolySheep API
        from holy_sheep_client import HolySheepLongTaskManager
        manager = HolySheepLongTaskManager(
            api_key=task_metadata['api_key']
        )
        
        result = manager._call_api(
            prompt=task_metadata['prompt_template'].format(item=item_data['item']),
            model=task_metadata.get('model', 'deepseek-v3.2')
        )
        
        # Update checkpoint
        checkpoint['processed'].append(item_data['index'])
        checkpoint['results'][str(item_data['index'])] = {
            'result': result['choices'][0]['message']['content'],
            'processed_at': str(datetime.now())
        }
        redis_client.set(checkpoint_key, json.dumps(checkpoint))
        
        # Update progress
        redis_client.incr(f"progress:{task_id}")
        
        return {
            "status": "SUCCESS",
            "index": item_data['index'],
            "result": result
        }
    
    except requests.exceptions.Timeout:
        # Retry với exponential backoff
        raise self.retry(exc=self.request.retries)
    
    except Exception as e:
        # Log error và retry
        redis_client.lpush(f"errors:{task_id}", json.dumps({
            'index': item_data['index'],
            'error': str(e),
            'timestamp': str(datetime.now())
        }))
        raise


@app.task
def start_batch_task(items: list, task_metadata: dict):
    """
    Task cha để dispatch tất cả items vào queue
    """
    task_id = task_metadata['task_id']
    redis_client = redis.Redis(host='localhost', port=6379, db=0)
    
    # Initialize checkpoint
    redis_client.set(f"checkpoint:{task_id}", json.dumps({
        'processed': [],
        'results': {},
        'total': len(items),
        'started_at': str(datetime.now())
    }))
    
    # Dispatch tất cả items
    for i, item in enumerate(items):
        process_single_item.apply_async(
            args=[{'index': i, 'item': item}, task_metadata],
            task_id=f"{task_id}_{i}"
        )
    
    return {
        "task_id": task_id,
        "total_items": len(items),
        "status": "DISPATCHED"
    }


@app.task
def get_task_progress(task_id: str) -> dict:
    """Lấy progress của batch task"""
    redis_client = redis.Redis(host='localhost', port=6379, db=0)
    
    processed = int(redis_client.get(f"progress:{task_id}") or 0)
    checkpoint = json.loads(redis_client.get(f"checkpoint:{task_id}") or '{}')
    
    return {
        "task_id": task_id,
        "total": checkpoint.get('total', 0),
        "processed": processed,
        "percent": (processed / checkpoint.get('total', 1)) * 100,
        "errors": redis_client.llen(f"errors:{task_id}")
    }


=== API Flask để quản lý ===

from flask import Flask, request, jsonify app = Flask(__name__) @app.route('/api/tasks/start', methods=['POST']) def start_task(): data = request.json # Validate if 'items' not in data or 'prompt_template' not in data: return jsonify({"error": "Missing required fields"}), 400 task_metadata = { 'task_id': f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}", 'api_key': 'YOUR_HOLYSHEEP_API_KEY', 'prompt_template': data['prompt_template'], 'model': data.get('model', 'deepseek-v3.2') } result = start_batch_task.delay(data['items'], task_metadata) return jsonify({ "task_id": task_metadata['task_id'], "celery_task_id": result.id, "status": "STARTED" }) @app.route('/api/tasks//progress') def get_progress(task_id): return jsonify(get_task_progress(task_id)) @app.route('/api/tasks//resume', methods=['POST']) def resume_task(task_id): """Resume failed task từ checkpoint""" redis_client = redis.Redis(host='localhost', port=6379, db=0) checkpoint = json.loads(redis_client.get(f"checkpoint:{task_id}") or '{}') # Lấy items chưa xử lý total = checkpoint.get('total', 0) processed = set(checkpoint.get('processed', [])) # Bạn cần lưu items gốc để resume # items = json.loads(redis_client.get(f"items:{task_id}")) # remaining = [items[i] for i in range(total) if i not in processed] # Dispatch remaining items # ... return jsonify({"status": "RESUMING", "task_id": task_id})

3.3. Timeout Controller với Per-Step Deadlines

import signal
import functools
from typing import Optional, Callable
from contextlib import contextmanager

class TimeoutException(Exception):
    """Exception khi vượt quá thời gian timeout"""
    pass


class TimeoutController:
    """
    Controller để quản lý timeout cho từng bước trong pipeline
    Sử dụng signal-based timeout (chỉ hoạt động trên Unix)
    """
    
    def __init__(self, default_timeout: int = 60):
        self.default_timeout = default_timeout
        self.current_timeout = default_timeout
        self._original_handler = None
    
    @contextmanager
    def timeout_context(self, seconds: Optional[int] = None):
        """
        Context manager để wrap một block code với timeout
        
        Usage:
            with controller.timeout_context(30):
                # Code có timeout 30s
                pass
        """
        timeout = seconds or self.default_timeout
        
        def handler(signum, frame):
            raise TimeoutException(
                f"Operation exceeded {timeout} seconds"
            )
        
        # Lưu original handler
        old_handler = signal.signal(signal.SIGALRM, handler)
        signal.alarm(timeout)
        
        try:
            yield
        finally:
            signal.alarm(0)
            signal.signal(signal.SIGALRM, old_handler)
    
    def with_timeout(self, timeout_seconds: Optional[int] = None):
        """
        Decorator để thêm timeout cho function
        
        Usage:
            @controller.with_timeout(45)
            def my_function():
                pass
        """
        def decorator(func: Callable):
            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                timeout = timeout_seconds or self.default_timeout
                with self.timeout_context(timeout):
                    return func(*args, **kwargs)
            return wrapper
        return decorator


class AdaptiveTimeoutController(TimeoutController):
    """
    Controller thông minh tự điều chỉnh timeout
    dựa trên độ phức tạp của request
    """
    
    COMPLEXITY_THRESHOLDS = {
        'simple': 30,      # Prompt < 100 chars
        'medium': 60,      # Prompt 100-500 chars
        'complex': 120,    # Prompt 500-2000 chars
        'ultra': 300       # Prompt > 2000 chars
    }
    
    def calculate_timeout(self, prompt: str, model: str) -> int:
        """
        Tính timeout phù hợp dựa trên:
        - Độ dài prompt
        - Model được sử dụng
        - Số lượng items (nếu batch)
        """
        prompt_length = len(prompt)
        
        # Xác định complexity
        if prompt_length < 100:
            complexity = 'simple'
        elif prompt_length < 500:
            complexity = 'medium'
        elif prompt_length < 2000:
            complexity = 'complex'
        else:
            complexity = 'ultra'
        
        base_timeout = self.COMPLEXITY_THRESHOLDS[complexity]
        
        # Adjust theo model
        model_adjustments = {
            'deepseek-v3.2': 1.0,    # Nhanh và rẻ
            'gpt-4.1': 1