Trong quá trình xây dựng hệ thống RAG cho doanh nghiệp thương mại điện tử quy mô lớn với hơn 50 triệu sản phẩm, tôi đã phải đối mặt với một thách thức nghiêm trọng: chi phí API tăng phi mã từ 2.000 USD/tháng lên 18.000 USD/tháng chỉ trong 3 tháng. Nguyên nhân chính? Không có cơ chế kiểm soát request hiệu quả. Bài viết này sẽ hướng dẫn bạn thiết kế và triển khai hệ thống quản lý rate limiting và quota từ A đến Z, tích hợp hoàn hảo với HolySheep AI — nền tảng với chi phí chỉ bằng 15% so với các provider khác nhờ tỷ giá ¥1=$1.

Tại Sao Cần Hệ Thống Quản Lý Rate Limit?

Khi làm việc với các API AI như GPT-4.1 ($8/MTok), Claude Sonnet 4.5 ($15/MTok), hoặc DeepSeek V3.2 ($0.42/MTok), việc không kiểm soát được lưu lượng sẽ dẫn đến:

Kiến Trúc Hệ Thống Tổng Quan


"""
AI API Rate Limiter & Quota Manager
Author: HolySheep AI Technical Team
Version: 2.0.0
"""

import time
import asyncio
from dataclasses import dataclass, field
from typing import Dict, Optional, List
from collections import defaultdict
from enum import Enum
import threading

class RateLimitStrategy(Enum):
    TOKEN_BUCKET = "token_bucket"
    SLIDING_WINDOW = "sliding_window"
    FIXED_WINDOW = "fixed_window"
    ADAPTIVE = "adaptive"

@dataclass
class QuotaConfig:
    """Cấu hình quota cho mỗi tier người dùng"""
    requests_per_minute: int = 60
    requests_per_hour: int = 1000
    requests_per_day: int = 10000
    tokens_per_minute: int = 100000
    tokens_per_month: int = 10000000  # 10M tokens
    max_concurrent: int = 5
    
@dataclass
class QuotaUsage:
    """Theo dõi usage thời gian thực"""
    requests_this_minute: int = 0
    requests_this_hour: int = 0
    requests_today: int = 0
    tokens_this_minute: int = 0
    tokens_this_month: int = 0
    concurrent_requests: int = 0
    minute_reset_at: float = 0
    hour_reset_at: float = 0
    day_reset_at: float = 0

class TokenBucket:
    """Token Bucket Algorithm - mượt mà, không burst"""
    
    def __init__(self, rate: float, capacity: int):
        self.rate = rate  # tokens/second
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.time()
        self.lock = threading.Lock()
    
    def consume(self, tokens: int) -> bool:
        """Kiểm tra và tiêu thụ tokens. Trả về True nếu thành công"""
        with self.lock:
            now = time.time()
            elapsed = now - self.last_update
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.last_update = now
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

class AIMultiProviderLimiter:
    """Rate Limiter đa provider với fallback thông minh"""
    
    def __init__(self):
        # Cấu hình quota theo tier (đơn vị: requests/phút)
        self.tier_configs: Dict[str, QuotaConfig] = {
            "free": QuotaConfig(requests_per_minute=10, requests_per_day=1000),
            "starter": QuotaConfig(requests_per_minute=60, requests_per_day=50000),
            "pro": QuotaConfig(requests_per_minute=300, requests_per_day=500000),
            "enterprise": QuotaConfig(requests_per_minute=1000, requests_per_hour=30000),
        }
        
        # Token bucket cho mỗi provider
        self.provider_buckets: Dict[str, TokenBucket] = {
            "holysheep_gpt": TokenBucket(rate=50, capacity=100),    # 50 req/s
            "holysheep_claude": TokenBucket(rate=30, capacity=60),  # 30 req/s
            "holysheep_deepseek": TokenBucket(rate=100, capacity=200),  # 100 req/s
        }
        
        # Theo dõi usage theo user
        self.user_usages: Dict[str, QuotaUsage] = defaultdict(QuotaUsage)
        self.user_tiers: Dict[str, str] = {}
        self._cleanup_interval = 3600  # 1 giờ
        
    def get_user_tier(self, user_id: str) -> str:
        """Xác định tier của user (cache hoặc query từ DB)"""
        if user_id not in self.user_tiers:
            # Mặc định free tier
            self.user_tiers[user_id] = "free"
        return self.user_tiers[user_id]
    
    def check_quota(self, user_id: str, estimated_tokens: int = 1000) -> tuple[bool, str]:
        """
        Kiểm tra quota trước khi gửi request
        Returns: (is_allowed, error_message)
        """
        tier = self.get_user_tier(user_id)
        config = self.tier_configs.get(tier, self.tier_configs["free"])
        usage = self.user_usages[user_id]
        now = time.time()
        
        # Reset counters nếu cần
        self._reset_if_needed(usage, now)
        
        # 1. Kiểm tra concurrent limit
        if usage.concurrent_requests >= config.max_concurrent:
            return False, f"MAX_CONCURRENT: Đang có {usage.concurrent_requests} request đang xử lý"
        
        # 2. Kiểm tra rate limit theo phút
        if usage.requests_this_minute >= config.requests_per_minute:
            wait_time = max(0, 60 - (now - usage.minute_reset_at))
            return False, f"RATE_LIMIT_MINUTE: Vui lòng chờ {wait_time:.1f}s"
        
        # 3. Kiểm tra hourly quota
        if usage.requests_this_hour >= config.requests_per_hour:
            return False, "QUOTA_HOUR_EXCEEDED: Đã vượt giới hạn giờ này"
        
        # 4. Kiểm tra daily quota
        if usage.requests_today >= config.requests_per_day:
            return False, "QUOTA_DAY_EXCEEDED: Đã vượt giới hạn ngày này"
        
        # 5. Kiểm tra token quota
        if usage.tokens_this_month + estimated_tokens > config.tokens_per_month:
            return False, "TOKEN_QUOTA_EXCEEDED: Sắp hết quota tháng"
        
        return True, "OK"
    
    def _reset_if_needed(self, usage: QuotaUsage, now: float):
        """Reset counters theo window thời gian"""
        if now >= usage.minute_reset_at:
            usage.requests_this_minute = 0
            usage.tokens_this_minute = 0
            usage.minute_reset_at = now + 60
        
        if now >= usage.hour_reset_at:
            usage.requests_this_hour = 0
            usage.hour_reset_at = now + 3600
            
        if now >= usage.day_reset_at:
            usage.requests_today = 0
            usage.day_reset_at = now + 86400
    
    async def acquire_and_execute(
        self, 
        user_id: str, 
        provider: str,
        execute_func,
        estimated_tokens: int = 1000
    ):
        """Acquire quota và execute request với automatic cleanup"""
        # 1. Check quota
        allowed, error = self.check_quota(user_id, estimated_tokens)
        if not allowed:
            raise QuotaExceededError(error)
        
        # 2. Check provider rate limit
        bucket = self.provider_buckets.get(provider)
        if bucket and not bucket.consume(1):
            raise ProviderRateLimitError(f"{provider} rate limit exceeded")
        
        # 3. Update usage
        usage = self.user_usages[user_id]
        usage.concurrent_requests += 1
        usage.requests_this_minute += 1
        usage.requests_this_hour += 1
        usage.requests_today += 1
        usage.tokens_this_minute += estimated_tokens
        usage.tokens_this_month += estimated_tokens
        
        try:
            # Execute với timeout
            result = await asyncio.wait_for(execute_func(), timeout=30)
            return result
        finally:
            usage.concurrent_requests -= 1

class QuotaExceededError(Exception):
    pass

class ProviderRateLimitError(Exception):
    pass

Tích Hợp HolySheep AI Vào Hệ Thống

HolySheep AI cung cấp API endpoint tương thích OpenAI với độ trễ trung bình dưới 50ms và hỗ trợ thanh toán qua WeChat/Alipay. Dưới đây là module tích hợp hoàn chỉnh:


"""
HolySheep AI API Client với built-in Rate Limiting
Base URL: https://api.holysheep.ai/v1
Pricing 2026: GPT-4.1 $8, Claude Sonnet 4.5 $15, DeepSeek V3.2 $0.42/MTok
"""

import aiohttp
import asyncio
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import json
import hashlib
from datetime import datetime

@dataclass
class HolySheepConfig:
    api_key: str = "YOUR_HOLYSHEEP_API_KEY"
    base_url: str = "https://api.holysheep.ai/v1"
    timeout: int = 30
    max_retries: int = 3
    retry_delay: float = 1.0
    default_model: str = "gpt-4.1"

class HolySheepAIClient:
    """HolySheep AI Client với exponential backoff retry"""
    
    def __init__(self, config: HolySheepConfig, rate_limiter: AIMultiProviderLimiter):
        self.config = config
        self.rate_limiter = rate_limiter
        self._session: Optional[aiohttp.ClientSession] = None
        self._request_count = 0
        self._total_tokens = 0
        
    async def __aenter__(self):
        self._session = aiohttp.ClientSession(
            headers={
                "Authorization": f"Bearer {self.config.api_key}",
                "Content-Type": "application/json"
            },
            timeout=aiohttp.ClientTimeout(total=self.config.timeout)
        )
        return self
    
    async def __aexit__(self, *args):
        if self._session:
            await self._session.close()
    
    async def chat_completions(
        self,
        messages: List[Dict[str, str]],
        model: str = "gpt-4.1",
        temperature: float = 0.7,
        max_tokens: int = 2048,
        user_id: str = "default"
    ) -> Dict[str, Any]:
        """
        Gửi chat completion request với quota check và retry logic
        """
        # Ước tính tokens cho quota check (rough estimate)
        estimated_input_tokens = sum(len(m.get("content", "")) // 4 for m in messages)
        estimated_output_tokens = max_tokens
        
        async def _execute():
            payload = {
                "model": model,
                "messages": messages,
                "temperature": temperature,
                "max_tokens": max_tokens
            }
            
            # Retry với exponential backoff
            last_error = None
            for attempt in range(self.config.max_retries):
                try:
                    async with self._session.post(
                        f"{self.config.base_url}/chat/completions",
                        json=payload
                    ) as response:
                        if response.status == 429:
                            # Rate limited - wait và retry
                            retry_after = int(response.headers.get("Retry-After", 5))
                            await asyncio.sleep(retry_after)
                            continue
                        
                        if response.status == 402:
                            raise PaymentRequiredError("Quota thanh toán đã hết")
                        
                        if response.status != 200:
                            error_body = await response.text()
                            raise APIError(f"HTTP {response.status}: {error_body}")
                        
                        result = await response.json()
                        
                        # Track usage
                        self._request_count += 1
                        if "usage" in result:
                            self._total_tokens += result["usage"].get("total_tokens", 0)
                        
                        return result
                        
                except aiohttp.ClientError as e:
                    last_error = e
                    if attempt < self.config.max_retries - 1:
                        await asyncio.sleep(self.config.retry_delay * (2 ** attempt))
                        
            raise last_error or APIError("Max retries exceeded")
        
        # Sử dụng rate limiter để kiểm soát
        return await self.rate_limiter.acquire_and_execute(
            user_id=user_id,
            provider="holysheep_gpt",
            execute_func=_execute,
            estimated_tokens=estimated_input_tokens + estimated_output_tokens
        )
    
    def get_usage_report(self) -> Dict[str, Any]:
        """Báo cáo usage chi tiết"""
        return {
            "total_requests": self._request_count,
            "total_tokens": self._total_tokens,
            "estimated_cost": self._calculate_cost(),
            "timestamp": datetime.now().isoformat()
        }
    
    def _calculate_cost(self) -> float:
        """Tính chi phí ước tính theo bảng giá HolySheep"""
        # Rough estimate - sử dụng tỷ lệ 1:1 input:output
        input_tokens = self._total_tokens // 2
        output_tokens = self._total_tokens - input_tokens
        
        # Bảng giá HolySheep AI 2026 (USD/MTok)
        pricing = {
            "gpt-4.1": 8.0,
            "gpt-4.1-turbo": 4.0,
            "claude-sonnet-4.5": 15.0,
            "gemini-2.5-flash": 2.5,
            "deepseek-v3.2": 0.42
        }
        
        avg_price = sum(pricing.values()) / len(pricing)
        return (self._total_tokens / 1_000_000) * avg_price

class APIError(Exception):
    pass

class PaymentRequiredError(Exception):
    pass

============ VÍ DỤ SỬ DỤNG ============

async def demo_ecommerce_rag(): """Demo: E-commerce RAG system với multi-user quota control""" limiter = AIMultiProviderLimiter() # Cấu hình tier cho từng loại user limiter.user_tiers["vip_customer_1"] = "pro" limiter.user_tiers["trial_user_1"] = "free" config = HolySheepConfig( api_key="YOUR_HOLYSHEEP_API_KEY", default_model="gpt-4.1" ) async with HolySheepAIClient(config, limiter) as client: # Query cho VIP customer messages = [ {"role": "system", "content": "Bạn là trợ lý tìm kiếm sản phẩm"}, {"role": "user", "content": "Tìm điện thoại Samsung giá dưới 10 triệu"} ] try: response = await client.chat_completions( messages=messages, model="gpt-4.1", user_id="vip_customer_1" ) print(f"Response: {response['choices'][0]['message']['content']}") except QuotaExceededError as e: print(f"Quota exceeded: {e}") # Fallback sang DeepSeek V3.2 ($0.42/MTok - rẻ hơn 95%) # Implement fallback logic ở đây

Chạy demo

if __name__ == "__main__": asyncio.run(demo_ecommerce_rag())

Cấu Hình Nginx Làm Reverse Proxy Rate Limiter

Để giảm tải cho application layer, chúng ta nên đặt rate limiting ở Nginx level:


/etc/nginx/conf.d/ai-api-rate-limit.conf

Zone cho rate limiting theo IP

limit_req_zone $binary_remote_addr zone=ip_limit:10m rate=30r/s;

Zone cho rate limiting theo API key

limit_req_zone $http_authorization zone=api_key_limit:10m rate=100r/s;

Zone cho bandwidth limit

limit_conn_zone $binary_remote_addr zone=conn_limit:10m; upstream holysheep_api { server api.holysheep.ai:443; keepalive 32; } server { listen 8443 ssl http2; server_name your-api-gateway.com; ssl_certificate /path/to/cert.pem; ssl_certificate_key /path/to/key.pem; # Giới hạn kết nối đồng thời limit_conn conn_limit 10; # Rate limiting - burst 20 requests, delay 10 limit_req zone=ip_limit burst=20 nodelay; limit_req zone=api_key_limit burst=100 delay=10; # Timeout settings proxy_connect_timeout 5s; proxy_send_timeout 30s; proxy_read_timeout 30s; # Buffer size proxy_buffer_size 128k; proxy_buffers 4 256k; proxy_busy_buffers_size 256k; location /v1/chat/completions { # Proxy sang HolySheep AI proxy_pass https://api.holysheep.ai/v1/chat/completions; # Headers forwarding proxy_set_header Host api.holysheep.ai; proxy_set_header Authorization $http_authorization; proxy_set_header Content-Type application/json; proxy_http_version 1.1; # Connection reuse proxy_set_header Connection ""; # Retry on upstream failure proxy_next_upstream error timeout http_502 http_503; proxy_next_upstream_tries 3; proxy_next_upstream_timeout 10s; # Cache 200 responses (optional) proxy_cache_valid 200 60s; add_header X-Cache-Status $upstream_cache_status; } # Health check endpoint location /health { access_log off; return 200 "healthy\n"; add_header Content-Type text/plain; } # Rate limit status location /rate_limit_status { # Trả về trạng thái rate limit (implement bằng Lua/OpenResty) content_by_lua_block { ngx.say("Rate limit OK") } } }

Custom error pages

error_page 429 = @rate_limit_exceeded; location @rate_limit_exceeded { default_type application/json; return 429 '{"error": "Too Many Requests", "message": "Rate limit exceeded", "retry_after": 60}'; }

Monitoring và Alerting Dashboard


"""
Prometheus Metrics Exporter cho Rate Limiter
Collect và expose metrics cho Grafana dashboard
"""

from prometheus_client import Counter, Gauge, Histogram, start_http_server
import time
from flask import Flask, jsonify

app = Flask(__name__)

Counters

requests_total = Counter( 'ai_api_requests_total', 'Total AI API requests', ['user_id', 'model', 'status'] ) quota_exceeded_total = Counter( 'ai_api_quota_exceeded_total', 'Total quota exceeded events', ['user_id', 'quota_type'] )

Gauges

active_requests = Gauge( 'ai_api_active_requests', 'Currently active requests', ['user_id'] ) user_quota_remaining = Gauge( 'ai_api_quota_remaining', 'Remaining quota for user', ['user_id', 'quota_type'] )

Histograms

request_duration = Histogram( 'ai_api_request_duration_seconds', 'Request duration in seconds', ['model', 'endpoint'], buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0] ) token_usage = Histogram( 'ai_api_token_usage', 'Token usage per request', ['model'], buckets=[100, 500, 1000, 2000, 5000, 10000, 50000] )

Cost tracking

daily_cost = Gauge( 'ai_api_daily_cost_usd', 'Estimated daily cost in USD', ['provider'] ) class MetricsCollector: """Thu thập và export metrics""" def __init__(self, rate_limiter: AIMultiProviderLimiter): self.limiter = rate_limiter self.start_time = time.time() def record_request(self, user_id: str, model: str, status: str, duration: float, tokens: int): """Record một request thành công""" requests_total.labels(user_id=user_id, model=model, status=status).inc() request_duration.labels(model=model, endpoint='chat').observe(duration) token_usage.labels(model=model).observe(tokens) def record_quota_exceeded(self, user_id: str, quota_type: str): """Record quota exceeded event""" quota_exceeded_total.labels(user_id=user_id, quota_type=quota_type).inc() def update_quota_gauges(self): """Update quota remaining gauges""" for user_id, usage in self.limiter.user_usages.items(): tier = self.limiter.get_user_tier(user_id) config = self.limiter.tier_configs[tier] user_quota_remaining.labels( user_id=user_id, quota_type="minute" ).set(config.requests_per_minute - usage.requests_this_minute) user_quota_remaining.labels( user_id=user_id, quota_type="day" ).set(config.requests_per_day - usage.requests_today) active_requests.labels(user_id=user_id).set(usage.concurrent_requests) def calculate_daily_cost(self) -> dict: """Tính chi phí hàng ngày dựa trên usage""" costs = { "holysheep_gpt": 0.0, "holysheep_claude": 0.0, "holysheep_deepseek": 0.0 } pricing_per_mtok = { "holysheep_gpt": 8.0, # GPT-4.1: $8/MTok "holysheep_claude": 15.0, # Claude Sonnet 4.5: $15/MTok "holysheep_deepseek": 0.42 # DeepSeek V3.2: $0.42/MTok } # Tính tổng tokens đã sử dụng total_tokens = sum(u.tokens_this_month for u in self.limiter.user_usages.values()) # Rough split: 60% GPT, 25% Claude, 15% DeepSeek costs["holysheep_gpt"] = (total_tokens * 0.6 / 1_000_000) * pricing_per_mtok["holysheep_gpt"] costs["holysheep_claude"] = (total_tokens * 0.25 / 1_000_000) * pricing_per_mtok["holysheep_claude"] costs["holysheep_deepseek"] = (total_tokens * 0.15 / 1_000_000) * pricing_per_mtok["holysheep_deepseek"] for provider, cost in costs.items(): daily_cost.labels(provider=provider).set(cost) return costs @app.route('/metrics') def metrics(): """Prometheus scrape endpoint""" from prometheus_client import generate_latest, CONTENT_TYPE_LATEST return generate_latest(), 200, {'Content-Type': CONTENT_TYPE_LATEST} @app.route('/usage/') def user_usage(user_id: str): """API endpoint để query usage của user""" if user_id not in limiter.user_usages: return jsonify({"error": "User not found"}), 404 usage = limiter.user_usages[user_id] tier = limiter.get_user_tier(user_id) config = limiter.tier_configs[tier] return jsonify({ "user_id": user_id, "tier": tier, "usage": { "requests_this_minute": usage.requests_this_minute, "requests_this_hour": usage.requests_this_hour, "requests_today": usage.requests_today, "tokens_this_month": usage.tokens_this_month, "concurrent_requests": usage.concurrent_requests }, "limits": { "requests_per_minute": config.requests_per_minute, "requests_per_hour": config.requests_per_hour, "requests_per_day": config.requests_per_day, "tokens_per_month": config.tokens_per_month }, "remaining": { "minute": config.requests_per_minute - usage.requests_this_minute, "hour": config.requests_per_hour - usage.requests_this_hour, "day": config.requests_per_day - usage.requests_today, "tokens": config.tokens_per_month - usage.tokens_this_month } }) if __name__ == "__main__": # Start Prometheus metrics server on port 9090 start_http_server(9090) # Start Flask app on port 5000 app.run(host='0.0.0.0', port=5000)

Lỗi thường gặp và cách khắc phục

1. Lỗi 429 Too Many Requests - Token Bucket tràn


❌ SAI: Retry ngay lập tức không có backoff

async def bad_retry(): for _ in range(10): response = await session.post(url, json=data) if response.status != 429: return response raise RateLimitError("Max retries")

✅ ĐÚNG: Exponential backoff với jitter

async def good_retry_with_backoff(): max_retries = 5 base_delay = 1.0 for attempt in range(max_retries): response = await session.post(url, json=data) if response.status != 429: return response # Parse Retry-After header retry_after = int(response.headers.get("Retry-After", base_delay)) # Exponential backoff: 1, 2, 4, 8, 16 seconds delay = min(retry_after, base_delay * (2 ** attempt)) # Thêm jitter ±25% để tránh thundering herd import random jitter = delay * 0.25 * (2 * random.random() - 1) delay = delay + jitter print(f"Rate limited. Waiting {delay:.2f}s before retry {attempt + 1}") await asyncio.sleep(delay) raise RateLimitError(f"Failed after {max_retries} retries")

2. Lỗi Concurrent Request quá nhiều - Memory leak


❌ NGUY HIỂM: Không có semaphore giới hạn concurrent

async def dangerous_unbounded_requests(urls: List[str]): tasks = [fetch(url) for url in urls] # 10,000 tasks cùng chạy! return await asyncio.gather(*tasks)

✅ AN TOÀN: Bounded semaphore

async def safe_bounded_requests(urls: List[str], max_concurrent: int = 50): semaphore = asyncio.Semaphore(max_concurrent) async def bounded_fetch(url: str): async with semaphore: return await fetch(url) # Sử dụng chunk để tránh tạo quá nhiều tasks cùng lúc results = [] chunk_size = 100 for i in range(0, len(urls), chunk_size): chunk = urls[i:i + chunk_size] chunk_results = await asyncio.gather( *[bounded_fetch(url) for url in chunk], return_exceptions=True ) results.extend(chunk_results) # Rate limit giữa các chunks await asyncio.sleep(0.1) return results

3. Lỗi Race Condition khi update counters


❌ SAI: Non-atomic read-modify-write

class UnsafeCounter: def __init__(self): self.count = 0 self.lock = threading.Lock() # Lock nhưng vẫn có race condition! def increment(self): # Vẫn có race condition vì đọc và ghi không atomic with self.lock: current = self.count # Đọc # ... có thể có context switch ở đây ... with self.lock: self.count = current + 1 # Ghi

✅ ĐÚNG: Atomic increment với Lock đúng cách

class SafeCounter: def __init__(self): self._count = 0 self._lock = threading.Lock() def increment(self) -> int: with self._lock: self._count += 1 return self._count def get_and_reset(self) -> int: with self._lock: old = self._count self._count = 0 return old

✅ TỐT NHẤT: Sử dụng atomic operations

import asyncio from asyncio import Lock class AsyncSafeCounter: def __init__(self): self._count = 0 self._lock = Lock() async def increment(self) -> int: async with self._lock: self._count += 1 return self._count async def get_count(self) -> int: async with self._lock: return self._count

4. Lỗi Quota Reset không chính xác múi giờ


❌ SAI: Reset theo server time không consider timezone

class TimezoneAwareQuota: def __init__(self): self.window_start = time.time() # Server UTC time self.limit = 1000 def should_reset(self) -> bool: return time.time() - self.window_start >= 86400 # 24 giờ

✅ ĐÚNG: Reset theo ngày local timezone (Asia/Ho_Chi_Minh = UTC+7)

from datetime import datetime, timezone, timedelta class AsiaHoChiMinhQuota: TZ = timezone(timedelta(hours=7)) # UTC+7 def __init__(self): self.daily_limit = 1000 self._reset_at = self._get_next_midnight() def _get_next_midnight(self) -> datetime: """Lấy thời điểm midnight tiếp theo theo giờ Việt Nam""" now = datetime.now(self.TZ) tomorrow = now.date() + timedelta(days=1) return datetime.combine(tomorrow, datetime.min.time(), tzinfo=self.TZ) def should_reset(self) -> bool: return datetime.now(self.TZ) >= self._reset_at def reset(self): """Reset counter và cập nhật next midnight""" self.usage = 0 self._reset_at = self._get_next_midnight() print(f"Quota reset at {datetime.now(self.TZ).isoformat()}") print(f"Next reset at {self._reset_at.isoformat()}")

Kết Quả Đạt Được

Sau khi triển khai hệ thống này cho dự án e-commerce RAG với 50 triệu sản phẩm: