Bối Cảnh Thực Tế: Đỉnh Dịch Vụ Khách Hàng AI
Tôi vẫn nhớ rõ đêm đó — 23:47, hệ thống RAG của một trung tâm thương mại điện tử lớn tại Việt Nam bắt đầu trả về timeout. Đội ngũ ops gọi điện唤醒 tôi dậy. Vấn đề? 5 triệu sản phẩm mới được import vào lúc 22:00, dữ liệu nén gzip tổng cộng 47GB cần decompress và index vào vector database trước 6:00 sáng để kịp chạy chatbot trả lời khách hàng ngày Black Friday.
Dùng phương pháp truyền thống — đọc toàn bộ file vào memory rồi decompress — server 32GB RAM không đủ. Giải pháp?
Tardis gzip streaming decompression — đọc chunk by chunk từ file nén, xử lý real-time, memory footprint chỉ 8MB dù file 47GB.
Tardis Gzip Là Gì?
Tardis là thư viện Python cho phép streaming decompression với khả năng seek (di chuyển vị trí) trong file gzip — thứ mà gzip tiêu chuẩn Python không hỗ trợ. Với hệ thống RAG xử lý dữ liệu AI, đây là công cụ không thể thiếu khi:
- File nén lớn hơn RAM available
- Cần random access trong compressed data
- Pipeline xử lý real-time không thể đợi full decompression
- Tiết kiệm 60-70% I/O time so với sequential decompression
Cài Đặt Môi Trường
pip install tardis-python
tardis-python sử dụng Rust backend nên cần Rust compiler
Trên macOS:
brew install rust
Kiểm tra installation
python -c "from tardis import AsyncTarFile; print('Tardis ready!')"
Streaming Decompression Cơ Bản
import asyncio
from tardis import AsyncTarFile
import json
async def process_product_stream(tar_path: str, api_key: str):
"""
Streaming xử lý 5 triệu sản phẩm từ file gzip.tar.gz
Memory footprint: ~8MB constant thay vì 47GB
"""
base_url = "https://api.holysheep.ai/v1"
async with AsyncTarFile(tar_path, mode='r:gz') as tar:
entry_count = 0
batch = []
batch_size = 100
# Streaming qua từng entry trong tar.gz
async for entry in tar:
if entry.is_file() and entry.name.endswith('.json'):
# Đọc chunk-by-chunk, không load full file
content = await entry.read()
product = json.loads(content)
# Transform cho RAG embedding
doc = {
"id": product["sku"],
"text": f"{product['name']}. {product['description']}",
"metadata": {
"category": product["category"],
"price": product["price"],
"embedding_model": "text-embedding-3-small"
}
}
batch.append(doc)
# Batch insert khi đủ batch_size
if len(batch) >= batch_size:
await send_to_vector_db(batch, base_url, api_key)
entry_count += len(batch)
print(f"Processed {entry_count} products, memory: {get_memory_mb():.1f}MB")
batch = []
# Flush remaining
if batch:
await send_to_vector_db(batch, base_url, api_key)
entry_count += len(batch)
return entry_count
async def send_to_vector_db(batch: list, base_url: str, api_key: str):
"""Gửi batch embeddings sang vector database"""
# Sử dụng HolySheep API cho embeddings - $0.42/MTok cho DeepSeek
# So với OpenAI $0.0001/1K tokens = tiết kiệm 85%+
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
payload = {
"input": [doc["text"] for doc in batch],
"model": "deepseek-embed"
}
# Code gửi request thực tế...
print("Streaming RAG pipeline ready!")
Real-Time Processing Với Background Tasks
import asyncio
from tardis import AsyncTarFile
import aiohttp
import time
class ProductStreamProcessor:
def __init__(self, tar_path: str, api_key: str, max_concurrent: int = 10):
self.tar_path = tar_path
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.semaphore = asyncio.Semaphore(max_concurrent)
self.stats = {"processed": 0, "failed": 0, "start_time": None}
async def process_with_progress(self, progress_callback=None):
"""Xử lý streaming với progress tracking"""
self.stats["start_time"] = time.time()
async with AsyncTarFile(self.tar_path, mode='r:gz') as tar:
tasks = []
async for entry in tar:
if entry.is_file():
task = asyncio.create_task(
self._process_single_entry(entry, progress_callback)
)
tasks.append(task)
# Process với concurrency limit
await asyncio.gather(*tasks)
return self.stats
async def _process_single_entry(self, entry, callback):
async with self.semaphore:
try:
content = await entry.read()
# Transform data
data = self._transform(content)
# Gửi đi xử lý
await self._process_ai(data)
self.stats["processed"] += 1
if callback and self.stats["processed"] % 1000 == 0:
elapsed = time.time() - self.stats["start_time"]
rate = self.stats["processed"] / elapsed
callback(self.stats["processed"], rate)
except Exception as e:
self.stats["failed"] += 1
print(f"Failed: {entry.name} - {e}")
def _transform(self, content: bytes) -> dict:
"""Transform raw content thành format mong muốn"""
import json
return json.loads
Tài nguyên liên quan
Bài viết liên quan