Bối Cảnh Thực Tế: Đỉnh Dịch Vụ Khách Hàng AI

Tôi vẫn nhớ rõ đêm đó — 23:47, hệ thống RAG của một trung tâm thương mại điện tử lớn tại Việt Nam bắt đầu trả về timeout. Đội ngũ ops gọi điện唤醒 tôi dậy. Vấn đề? 5 triệu sản phẩm mới được import vào lúc 22:00, dữ liệu nén gzip tổng cộng 47GB cần decompress và index vào vector database trước 6:00 sáng để kịp chạy chatbot trả lời khách hàng ngày Black Friday. Dùng phương pháp truyền thống — đọc toàn bộ file vào memory rồi decompress — server 32GB RAM không đủ. Giải pháp? Tardis gzip streaming decompression — đọc chunk by chunk từ file nén, xử lý real-time, memory footprint chỉ 8MB dù file 47GB.

Tardis Gzip Là Gì?

Tardis là thư viện Python cho phép streaming decompression với khả năng seek (di chuyển vị trí) trong file gzip — thứ mà gzip tiêu chuẩn Python không hỗ trợ. Với hệ thống RAG xử lý dữ liệu AI, đây là công cụ không thể thiếu khi:

Cài Đặt Môi Trường

pip install tardis-python

tardis-python sử dụng Rust backend nên cần Rust compiler

Trên macOS:

brew install rust

Kiểm tra installation

python -c "from tardis import AsyncTarFile; print('Tardis ready!')"

Streaming Decompression Cơ Bản

import asyncio
from tardis import AsyncTarFile
import json

async def process_product_stream(tar_path: str, api_key: str):
    """
    Streaming xử lý 5 triệu sản phẩm từ file gzip.tar.gz
    Memory footprint: ~8MB constant thay vì 47GB
    """
    base_url = "https://api.holysheep.ai/v1"
    
    async with AsyncTarFile(tar_path, mode='r:gz') as tar:
        entry_count = 0
        batch = []
        batch_size = 100
        
        # Streaming qua từng entry trong tar.gz
        async for entry in tar:
            if entry.is_file() and entry.name.endswith('.json'):
                # Đọc chunk-by-chunk, không load full file
                content = await entry.read()
                product = json.loads(content)
                
                # Transform cho RAG embedding
                doc = {
                    "id": product["sku"],
                    "text": f"{product['name']}. {product['description']}",
                    "metadata": {
                        "category": product["category"],
                        "price": product["price"],
                        "embedding_model": "text-embedding-3-small"
                    }
                }
                batch.append(doc)
                
                # Batch insert khi đủ batch_size
                if len(batch) >= batch_size:
                    await send_to_vector_db(batch, base_url, api_key)
                    entry_count += len(batch)
                    print(f"Processed {entry_count} products, memory: {get_memory_mb():.1f}MB")
                    batch = []
        
        # Flush remaining
        if batch:
            await send_to_vector_db(batch, base_url, api_key)
            entry_count += len(batch)
    
    return entry_count

async def send_to_vector_db(batch: list, base_url: str, api_key: str):
    """Gửi batch embeddings sang vector database"""
    # Sử dụng HolySheep API cho embeddings - $0.42/MTok cho DeepSeek
    # So với OpenAI $0.0001/1K tokens = tiết kiệm 85%+
    headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
    payload = {
        "input": [doc["text"] for doc in batch],
        "model": "deepseek-embed"
    }
    # Code gửi request thực tế...

print("Streaming RAG pipeline ready!")

Real-Time Processing Với Background Tasks

import asyncio
from tardis import AsyncTarFile
import aiohttp
import time

class ProductStreamProcessor:
    def __init__(self, tar_path: str, api_key: str, max_concurrent: int = 10):
        self.tar_path = tar_path
        self.api_key = api_key
        self.base_url = "https://api.holysheep.ai/v1"
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.stats = {"processed": 0, "failed": 0, "start_time": None}
    
    async def process_with_progress(self, progress_callback=None):
        """Xử lý streaming với progress tracking"""
        self.stats["start_time"] = time.time()
        
        async with AsyncTarFile(self.tar_path, mode='r:gz') as tar:
            tasks = []
            
            async for entry in tar:
                if entry.is_file():
                    task = asyncio.create_task(
                        self._process_single_entry(entry, progress_callback)
                    )
                    tasks.append(task)
            
            # Process với concurrency limit
            await asyncio.gather(*tasks)
        
        return self.stats
    
    async def _process_single_entry(self, entry, callback):
        async with self.semaphore:
            try:
                content = await entry.read()
                
                # Transform data
                data = self._transform(content)
                
                # Gửi đi xử lý
                await self._process_ai(data)
                
                self.stats["processed"] += 1
                
                if callback and self.stats["processed"] % 1000 == 0:
                    elapsed = time.time() - self.stats["start_time"]
                    rate = self.stats["processed"] / elapsed
                    callback(self.stats["processed"], rate)
                    
            except Exception as e:
                self.stats["failed"] += 1
                print(f"Failed: {entry.name} - {e}")
    
    def _transform(self, content: bytes) -> dict:
        """Transform raw content thành format mong muốn"""
        import json
        return json.loads