Ngày 15 tháng 3 năm 2026, tôi nhận được một cuộc gọi lúc 2 giờ sáng từ đồng nghiệp. Hệ thống xử lý 10.000 tài liệu PDF của khách hàng đã treo cứng sau 3 giờ chạy. Khi kiểm tra log, tôi thấy lỗi kinh điển:
ConnectionError: Connection timeout after 120s
Task ID: task_8x7f9d2a
Processed: 3,421 / 10,000 documents
Status: FAILED - No checkpoint saved
3 giờ công sức, 3.421 tài liệu đã xử lý — tất cả đều mất trắng. Đó là khoảnh khắc tôi quyết định xây dựng một hệ thống quản lý long-task hoàn chỉnh. Bài viết này sẽ chia sẻ toàn bộ kiến thức và source code để bạn không phải lặp lại sai lầm của tôi.
1. Tại sao Agent cần Long-Task Management?
Khi xây dựng AI agent với HolySheep AI (base URL: https://api.holysheep.ai/v1), chúng ta thường gặp các tác vụ dài:
- Xử lý hàng nghìn tài liệu (PDF, Word, Excel)
- Tạo báo cáo phân tích dữ liệu lớn
- Batch embedding cho vector database
- Tổng hợp và phân tích feedback khách hàng
Với HolySheep AI, chi phí chỉ từ $0.42/MTok (DeepSeek V3.2) — tiết kiệm 85%+ so với OpenAI. Nhưng nếu mỗi lần chạy task 10.000 items mà mất 3 tiếng rồi fail, chi phí này trở nên vô nghĩa.
2. Kiến trúc tổng thể
Hệ thống của chúng ta gồm 4 thành phần chính:
┌─────────────────────────────────────────────────────────────┐
│ LONG-TASK MANAGER │
├──────────────┬──────────────┬──────────────┬────────────────┤
│ Progress │ Timeout │ Checkpoint │ Resume │
│ Tracker │ Controller │ Manager │ Handler │
├──────────────┼──────────────┼──────────────┼────────────────┤
│ Real-time │ Per-step │ Auto-save │ State restore │
│ callbacks │ deadlines │ every N │ from JSON │
└──────────────┴──────────────┴──────────────┴────────────────┘
3. Implement chi tiết
3.1. HolySheep AI Client với Timeout
import requests
import json
import time
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, Callable
from dataclasses import dataclass, asdict
import hashlib
@dataclass
class TaskProgress:
task_id: str
total_items: int
processed_items: int
failed_items: int
current_status: str
start_time: datetime
last_update: datetime
checkpoint_path: str
results: list
class HolySheepLongTaskManager:
"""Manager cho các tác vụ dài với HolySheep AI API"""
BASE_URL = "https://api.holysheep.ai/v1"
def __init__(
self,
api_key: str,
checkpoint_dir: str = "./checkpoints",
timeout_per_request: int = 60,
max_retries: int = 3,
checkpoint_interval: int = 50
):
self.api_key = api_key
self.checkpoint_dir = checkpoint_dir
self.timeout = timeout_per_request
self.max_retries = max_retries
self.checkpoint_interval = checkpoint_interval
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
})
def _call_api(
self,
prompt: str,
model: str = "deepseek-v3.2",
temperature: float = 0.7
) -> Dict[str, Any]:
"""Gọi HolySheep AI API với retry và timeout"""
url = f"{self.BASE_URL}/chat/completions"
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": temperature
}
for attempt in range(self.max_retries):
try:
response = self.session.post(
url,
json=payload,
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
print(f"⏰ Timeout lần {attempt + 1}/{self.max_retries}")
if attempt == self.max_retries - 1:
raise TimeoutError(f"HolySheep API timeout sau {self.timeout}s")
time.sleep(2 ** attempt)
except requests.exceptions.HTTPError as e:
if response.status_code == 429:
print(f"⚠️ Rate limit, đợi 60s...")
time.sleep(60)
elif response.status_code == 401:
raise PermissionError("API Key không hợp lệ!")
else:
raise
def _generate_task_id(self, items: list) -> str:
"""Tạo task ID duy nhất từ hash của items"""
content_hash = hashlib.md5(
str(items[:10]).encode()
).hexdigest()[:8]
return f"task_{datetime.now().strftime('%Y%m%d')}_{content_hash}"
def _save_checkpoint(
self,
progress: TaskProgress,
additional_data: Optional[Dict] = None
):
"""Lưu checkpoint để có thể resume"""
checkpoint = {
"progress": asdict(progress),
"saved_at": datetime.now().isoformat(),
"additional_data": additional_data or {}
}
with open(progress.checkpoint_path, 'w', encoding='utf-8') as f:
json.dump(checkpoint, f, ensure_ascii=False, indent=2)
print(f"💾 Checkpoint saved: {progress.processed_items}/{progress.total_items}")
def _load_checkpoint(self, checkpoint_path: str) -> Optional[Dict]:
"""Load checkpoint nếu tồn tại"""
try:
with open(checkpoint_path, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
return None
def process_batch(
self,
items: list,
prompt_template: str,
model: str = "deepseek-v3.2",
resume: bool = True,
progress_callback: Optional[Callable] = None
) -> TaskProgress:
"""
Xử lý batch items với progress tracking và checkpoint
Args:
items: Danh sách items cần xử lý
prompt_template: Template prompt với {item} placeholder
model: Model sử dụng (default: deepseek-v3.2)
resume: Có resume từ checkpoint không
progress_callback: Callback để update UI
Returns:
TaskProgress object với kết quả
"""
task_id = self._generate_task_id(items)
checkpoint_path = f"{self.checkpoint_dir}/{task_id}.json"
# Check resume
start_index = 0
existing_results = []
if resume:
checkpoint = self._load_checkpoint(checkpoint_path)
if checkpoint:
start_index = checkpoint['progress']['processed_items']
existing_results = checkpoint['progress']['results']
print(f"🔄 Resuming từ checkpoint: đã xử lý {start_index}/{len(items)}")
# Initialize progress
progress = TaskProgress(
task_id=task_id,
total_items=len(items),
processed_items=start_index,
failed_items=0,
current_status="RUNNING",
start_time=datetime.now(),
last_update=datetime.now(),
checkpoint_path=checkpoint_path,
results=existing_results
)
# Process items
for i in range(start_index, len(items)):
item = items[i]
prompt = prompt_template.format(item=item)
try:
result = self._call_api(prompt, model=model)
content = result['choices'][0]['message']['content']
progress.results.append({
"index": i,
"item": item,
"result": content,
"status": "SUCCESS",
"timestamp": datetime.now().isoformat()
})
progress.processed_items += 1
except Exception as e:
progress.failed_items += 1
progress.results.append({
"index": i,
"item": item,
"error": str(e),
"status": "FAILED",
"timestamp": datetime.now().isoformat()
})
print(f"❌ Item {i} failed: {e}")
progress.last_update = datetime.now()
# Progress callback
if progress_callback:
progress_callback(progress)
# Auto checkpoint
if (i + 1) % self.checkpoint_interval == 0:
self._save_checkpoint(progress)
progress.current_status = "COMPLETED"
self._save_checkpoint(progress)
return progress
=== SỬ DỤNG ===
if __name__ == "__main__":
# Khởi tạo manager với HolySheep API
manager = HolySheepLongTaskManager(
api_key="YOUR_HOLYSHEEP_API_KEY", # Thay bằng key thật
checkpoint_dir="./checkpoints",
timeout_per_request=60,
checkpoint_interval=50
)
# Callback để track progress real-time
def on_progress(p: TaskProgress):
percent = (p.processed_items / p.total_items) * 100
elapsed = (datetime.now() - p.start_time).total_seconds()
eta = (elapsed / p.processed_items) * (p.total_items - p.processed_items)
print(f"📊 Progress: {percent:.1f}% | ETA: {eta/60:.1f} phút | Failed: {p.failed_items}")
# Sample items
documents = [f"doc_{i}.pdf" for i in range(1000)]
prompt = """Phân tích nội dung tài liệu sau và trích xuất:
1. Chủ đề chính
2. Các từ khóa quan trọng
3. Tóm tắt 3 câu
Tài liệu: {item}"""
# Chạy với resume tự động
result = manager.process_batch(
items=documents,
prompt_template=prompt,
model="deepseek-v3.2",
resume=True,
progress_callback=on_progress
)
print(f"✅ Hoàn thành: {result.processed_items} items, {result.failed_items} thất bại")
3.2. Advanced: Queue-based Task với Celery
Với hệ thống production lớn, bạn cần queue-based architecture:
import celery
from celery import Celery
from celery.signals import task_success, task_failure
import json
import redis
app = Celery('long_task_manager', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=5, default_retry_delay=30)
def process_single_item(self, item_data: dict, task_metadata: dict):
"""
Celery task để xử lý từng item
Task này có thể retry tự động khi fail
"""
task_id = task_metadata['task_id']
checkpoint_key = f"checkpoint:{task_id}"
redis_client = redis.Redis(host='localhost', port=6379, db=0)
try:
# Load checkpoint
checkpoint = json.loads(
redis_client.get(checkpoint_key) or '{"processed": []}'
)
# Skip nếu đã xử lý
if item_data['index'] in checkpoint['processed']:
return {"status": "SKIPPED", "index": item_data['index']}
# Gọi HolySheep API
from holy_sheep_client import HolySheepLongTaskManager
manager = HolySheepLongTaskManager(
api_key=task_metadata['api_key']
)
result = manager._call_api(
prompt=task_metadata['prompt_template'].format(item=item_data['item']),
model=task_metadata.get('model', 'deepseek-v3.2')
)
# Update checkpoint
checkpoint['processed'].append(item_data['index'])
checkpoint['results'][str(item_data['index'])] = {
'result': result['choices'][0]['message']['content'],
'processed_at': str(datetime.now())
}
redis_client.set(checkpoint_key, json.dumps(checkpoint))
# Update progress
redis_client.incr(f"progress:{task_id}")
return {
"status": "SUCCESS",
"index": item_data['index'],
"result": result
}
except requests.exceptions.Timeout:
# Retry với exponential backoff
raise self.retry(exc=self.request.retries)
except Exception as e:
# Log error và retry
redis_client.lpush(f"errors:{task_id}", json.dumps({
'index': item_data['index'],
'error': str(e),
'timestamp': str(datetime.now())
}))
raise
@app.task
def start_batch_task(items: list, task_metadata: dict):
"""
Task cha để dispatch tất cả items vào queue
"""
task_id = task_metadata['task_id']
redis_client = redis.Redis(host='localhost', port=6379, db=0)
# Initialize checkpoint
redis_client.set(f"checkpoint:{task_id}", json.dumps({
'processed': [],
'results': {},
'total': len(items),
'started_at': str(datetime.now())
}))
# Dispatch tất cả items
for i, item in enumerate(items):
process_single_item.apply_async(
args=[{'index': i, 'item': item}, task_metadata],
task_id=f"{task_id}_{i}"
)
return {
"task_id": task_id,
"total_items": len(items),
"status": "DISPATCHED"
}
@app.task
def get_task_progress(task_id: str) -> dict:
"""Lấy progress của batch task"""
redis_client = redis.Redis(host='localhost', port=6379, db=0)
processed = int(redis_client.get(f"progress:{task_id}") or 0)
checkpoint = json.loads(redis_client.get(f"checkpoint:{task_id}") or '{}')
return {
"task_id": task_id,
"total": checkpoint.get('total', 0),
"processed": processed,
"percent": (processed / checkpoint.get('total', 1)) * 100,
"errors": redis_client.llen(f"errors:{task_id}")
}
=== API Flask để quản lý ===
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/api/tasks/start', methods=['POST'])
def start_task():
data = request.json
# Validate
if 'items' not in data or 'prompt_template' not in data:
return jsonify({"error": "Missing required fields"}), 400
task_metadata = {
'task_id': f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
'api_key': 'YOUR_HOLYSHEEP_API_KEY',
'prompt_template': data['prompt_template'],
'model': data.get('model', 'deepseek-v3.2')
}
result = start_batch_task.delay(data['items'], task_metadata)
return jsonify({
"task_id": task_metadata['task_id'],
"celery_task_id": result.id,
"status": "STARTED"
})
@app.route('/api/tasks//progress')
def get_progress(task_id):
return jsonify(get_task_progress(task_id))
@app.route('/api/tasks//resume', methods=['POST'])
def resume_task(task_id):
"""Resume failed task từ checkpoint"""
redis_client = redis.Redis(host='localhost', port=6379, db=0)
checkpoint = json.loads(redis_client.get(f"checkpoint:{task_id}") or '{}')
# Lấy items chưa xử lý
total = checkpoint.get('total', 0)
processed = set(checkpoint.get('processed', []))
# Bạn cần lưu items gốc để resume
# items = json.loads(redis_client.get(f"items:{task_id}"))
# remaining = [items[i] for i in range(total) if i not in processed]
# Dispatch remaining items
# ...
return jsonify({"status": "RESUMING", "task_id": task_id})
3.3. Timeout Controller với Per-Step Deadlines
import signal
import functools
from typing import Optional, Callable
from contextlib import contextmanager
class TimeoutException(Exception):
"""Exception khi vượt quá thời gian timeout"""
pass
class TimeoutController:
"""
Controller để quản lý timeout cho từng bước trong pipeline
Sử dụng signal-based timeout (chỉ hoạt động trên Unix)
"""
def __init__(self, default_timeout: int = 60):
self.default_timeout = default_timeout
self.current_timeout = default_timeout
self._original_handler = None
@contextmanager
def timeout_context(self, seconds: Optional[int] = None):
"""
Context manager để wrap một block code với timeout
Usage:
with controller.timeout_context(30):
# Code có timeout 30s
pass
"""
timeout = seconds or self.default_timeout
def handler(signum, frame):
raise TimeoutException(
f"Operation exceeded {timeout} seconds"
)
# Lưu original handler
old_handler = signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout)
try:
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
def with_timeout(self, timeout_seconds: Optional[int] = None):
"""
Decorator để thêm timeout cho function
Usage:
@controller.with_timeout(45)
def my_function():
pass
"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
timeout = timeout_seconds or self.default_timeout
with self.timeout_context(timeout):
return func(*args, **kwargs)
return wrapper
return decorator
class AdaptiveTimeoutController(TimeoutController):
"""
Controller thông minh tự điều chỉnh timeout
dựa trên độ phức tạp của request
"""
COMPLEXITY_THRESHOLDS = {
'simple': 30, # Prompt < 100 chars
'medium': 60, # Prompt 100-500 chars
'complex': 120, # Prompt 500-2000 chars
'ultra': 300 # Prompt > 2000 chars
}
def calculate_timeout(self, prompt: str, model: str) -> int:
"""
Tính timeout phù hợp dựa trên:
- Độ dài prompt
- Model được sử dụng
- Số lượng items (nếu batch)
"""
prompt_length = len(prompt)
# Xác định complexity
if prompt_length < 100:
complexity = 'simple'
elif prompt_length < 500:
complexity = 'medium'
elif prompt_length < 2000:
complexity = 'complex'
else:
complexity = 'ultra'
base_timeout = self.COMPLEXITY_THRESHOLDS[complexity]
# Adjust theo model
model_adjustments = {
'deepseek-v3.2': 1.0, # Nhanh và rẻ
'gpt-4.1': 1