ในฐานะวิศวกรที่ทำงานกับ AI API มาหลายปี ผมเคยเจอปัญหาที่ทำให้นอนไม่หลับหลายครั้ง — นั่นคือการจัดการ Rate Limit ของ AI API ที่ไม่เคยเป็นใจ ไม่ว่าจะเป็นช่วง Peak Season ของร้านค้าอีคอมเมิร์ซ หรือตอนเปิดตัวระบบ RAG ให้พนักงานใช้งานพร้อมกันหลายร้อยคน

บทความนี้จะพาคุณเข้าใจหลักการ Rate Limiting และวิธีสร้างระบบ Request Scheduler ที่ทำงานได้อย่างมีประสิทธิภาพ โดยใช้ HolySheep AI เป็นตัวอย่าง ซึ่งมี Latency เฉลี่ยต่ำกว่า 50ms และราคาประหยัดกว่า 85% เมื่อเทียบกับผู้ให้บริการอื่น

ทำไมต้องควบคุม Concurrency?

เมื่อคุณส่ง Request ไปยัง AI API มากกว่าที่ระบบกำหนด สิ่งที่เกิดขึ้นคือ HTTP 429 (Too Many Requests) และนั่นหมายความว่า User Experience ของคุณจะพังทันที จากประสบการณ์ตรง ผมเคยเห็นระบบที่ใช้ AI ตอบลูกค้าเออเรอร์ทั้งร้านเพราะไม่ได้จัดการ Queue ถูกต้อง

กรณีศึกษา: ระบบ Chat ร้านอีคอมเมิร์ซขนาดใหญ่

ลองนึกภาพร้านค้าออนไลน์ที่มีลูกค้าเข้ามาแชทพร้อมกัน 500 คนในช่วง Flash Sale หากทุกคนส่ง Request ไป AI พร้อมกันโดยไม่มีการจัดการ ระบบจะล่มทันที แต่ถ้าเรามี Scheduler ที่ดี เราสามารถ:

รู้จัก Rate Limit และโครงสร้างข้อมูลพื้นฐาน

ก่อนเขียนโค้ด เราต้องเข้าใจโครงสร้างข้อมูลที่จำเป็น:

Implementation: Token Bucket พร้อม Priority Queue

import asyncio
import time
from dataclasses import dataclass, field
from typing import List, Optional
from heapq import heappush, heappop

@dataclass(order=True)
class QueuedRequest:
    priority: int
    timestamp: float = field(compare=False)
    request_id: str = field(compare=False)
    payload: dict = field(compare=False)
    future: asyncio.Future = field(default=None, compare=False)

class ConcurrencyController:
    """
    ระบบควบคุม Concurrent Request สำหรับ AI API
    ออกแบบมาเพื่อรับมือกับ Traffic Spike ได้อย่างมีประสิทธิภาพ
    """
    
    def __init__(
        self,
        rpm_limit: int = 60,        # Requests per minute
        tpm_limit: int = 30000,      # Tokens per minute
        max_concurrent: int = 10,
        base_url: str = "https://api.holysheep.ai/v1"
    ):
        self.rpm_limit = rpm_limit
        self.tpm_limit = tpm_limit
        self.max_concurrent = max_concurrent
        
        # Token bucket state
        self.tokens = float(rpm_limit)
        self.last_refill = time.time()
        self.token_refill_rate = rpm_limit / 60.0  # tokens per second
        
        # Semaphore สำหรับจำกัด concurrent requests
        self.semaphore = asyncio.Semaphore(max_concurrent)
        
        # Priority queue สำหรับ requests ที่รอ
        self.queue: List[QueuedRequest] = []
        
        # Tracking
        self.tokens_used_this_minute = 0
        self.minute_start = time.time()
        
        # HTTP Client
        self.base_url = base_url
        self._api_key = "YOUR_HOLYSHEEP_API_KEY"
    
    def _refill_tokens(self):
        """เติม tokens ตามเวลาที่ผ่านไป"""
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(
            self.rpm_limit,
            self.tokens + (elapsed * self.token_refill_rate)
        )
        self.last_refill = now
        
        # Reset TPM tracking ทุกนาที
        if now - self.minute_start >= 60:
            self.tokens_used_this_minute = 0
            self.minute_start = now
    
    def _can_proceed(self, estimated_tokens: int) -> bool:
        """ตรวจสอบว่าสามารถส่ง request ได้หรือไม่"""
        self._refill_tokens()
        
        # ตรวจสอบ RPM
        if self.tokens < 1:
            return False
        
        # ตรวจสอบ TPM
        if self.tokens_used_this_minute + estimated_tokens > self.tpm_limit:
            return False
        
        return True
    
    def _wait_time(self, estimated_tokens: int) -> float:
        """คำนวณเวลาที่ต้องรอ (วินาที)"""
        self._refill_tokens()
        
        # เวลารอเพราะ RPM
        if self.tokens < 1:
            rpm_wait = (1 - self.tokens) / self.token_refill_rate
        else:
            rpm_wait = 0
        
        # เวลารอเพราะ TPM
        available_tpm = self.tpm_limit - self.tokens_used_this_minute
        if available_tpm < estimated_tokens:
            tpm_wait = 60 - (time.time() - self.minute_start)
        else:
            tpm_wait = 0
        
        return max(rpm_wait, tpm_wait)
    
    async def enqueue(
        self,
        request_id: str,
        payload: dict,
        priority: int = 5,
        estimated_tokens: int = 500
    ) -> dict:
        """
        เพิ่ม request เข้าคิวและรอจนถึงคิว
        
        Args:
            request_id: รหัสประจำตัว request
            payload: ข้อมูลที่จะส่งไป AI API
            priority: ลำดับความสำคัญ (ต่ำกว่า = สำคัญกว่า)
            estimated_tokens: จำนวน tokens ที่ประมาณการว่าจะใช้
        
        Returns:
            Response จาก AI API
        """
        request = QueuedRequest(
            priority=priority,
            timestamp=time.time(),
            request_id=request_id,
            payload=payload
        )
        
        # ถ้ายังไม่มี request รอในคิว ให้ลอง execute ทันที
        if not self.queue:
            wait_time = self._wait_time(estimated_tokens)
            if wait_time > 0:
                await asyncio.sleep(wait_time)
            
            if self._can_proceed(estimated_tokens):
                return await self._execute_request(request, estimated_tokens)
        
        # เพิ่มเข้าคิวและรอ
        heappush(self.queue, request)
        
        while True:
            # ตรวจสอบว่า request นี้ถึงคิวหรือยัง
            if self.queue and self.queue[0] == request:
                wait_time = self._wait_time(estimated_tokens)
                if wait_time > 0:
                    await asyncio.sleep(wait_time)
                
                if self._can_proceed(estimated_tokens):
                    heappop(self.queue)
                    return await self._execute_request(request, estimated_tokens)
            
            await asyncio.sleep(0.1)  # ตรวจสอบทุก 100ms
    
    async def _execute_request(self, request: QueuedRequest, estimated_tokens: int) -> dict:
        """Execute request ไปยัง AI API"""
        async with self.semaphore:
            self.tokens -= 1
            self.tokens_used_this_minute += estimated_tokens
            
            try:
                # เรียกใช้ HolySheep AI API
                # สมัครได้ที่ https://www.holysheep.ai/register
                response = await self._call_api(request.payload)
                return response
            except Exception as e:
                # หากเกิด error ให้ retry
                for attempt in range(3):
                    try:
                        await asyncio.sleep(2 ** attempt)
                        response = await self._call_api(request.payload)
                        return response
                    except:
                        continue
                raise e
    
    async def _call_api(self, payload: dict) -> dict:
        """เรียก HolySheep AI API"""
        import aiohttp
        
        headers = {
            "Authorization": f"Bearer {self._api_key}",
            "Content-Type": "application/json"
        }
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{self.base_url}/chat/completions",
                headers=headers,
                json=payload,
                timeout=aiohttp.ClientTimeout(total=30)
            ) as response:
                if response.status == 429:
                    raise Exception("Rate limit exceeded")
                elif response.status != 200:
                    raise Exception(f"API error: {response.status}")
                
                return await response.json()

Advanced: Adaptive Rate Limiting สำหรับ Multi-tenant System

สำหรับระบบองค์กรที่มีผู้ใช้หลายระดับ (Free, Pro, Enterprise) เราต้องมีการจัดการ Rate Limit ที่แตกต่างกัน ด้านล่างคือ Implementation ที่ผมใช้ในโปรเจกต์จริง:

import asyncio
from enum import IntEnum
from dataclasses import dataclass
from typing import Dict, Optional
import time

class UserTier(IntEnum):
    FREE = 1
    PRO = 2
    ENTERPRISE = 3

@dataclass
class TierConfig:
    rpm: int
    tpm: int
    max_concurrent: int
    queue_limit: int  # จำนวน request สูงสุดในคิว

class MultiTenantRateLimiter:
    """
    ระบบ Rate Limiting สำหรับ Multi-tenant
    รองรับ Free, Pro, Enterprise tiers
    """
    
    TIER_CONFIGS = {
        UserTier.FREE: TierConfig(rpm=30, tpm=10000, max_concurrent=2, queue_limit=10),
        UserTier.PRO: TierConfig(rpm=120, tpm=100000, max_concurrent=10, queue_limit=50),
        UserTier.ENTERPRISE: TierConfig(rpm=500, tpm=500000, max_concurrent=50, queue_limit=200),
    }
    
    def __init__(self):
        # ติดตาม rate limit ต่อ tenant
        self.tenant_controllers: Dict[str, 'TenantController'] = {}
        self._lock = asyncio.Lock()
    
    async def get_controller(self, tenant_id: str, tier: UserTier) -> 'TenantController':
        """Get or create controller สำหรับ tenant นี้"""
        async with self._lock:
            if tenant_id not in self.tenant_controllers:
                config = self.TIER_CONFIGS[tier]
                self.tenant_controllers[tenant_id] = TenantController(
                    tenant_id=tenant_id,
                    config=config
                )
            return self.tenant_controllers[tenant_id]
    
    async def process_request(
        self,
        tenant_id: str,
        tier: UserTier,
        payload: dict,
        priority: Optional[int] = None
    ) -> dict:
        """
        ประมวลผล request โดยคำนึงถึง tier ของ tenant
        
        Enterprise users จะได้สิทธิ์ใช้งานก่อนเสมอ
        """
        controller = await self.get_controller(tenant_id, tier)
        
        # Priority ขึ้นอยู่กับ tier (ต่ำกว่า = สำคัญกว่า)
        effective_priority = priority if priority is not None else (10 - tier * 2)
        
        return await controller.enqueue(
            payload=payload,
            priority=effective_priority
        )

class TenantController:
    """Controller สำหรับแต่ละ tenant"""
    
    def __init__(self, tenant_id: str, config: TierConfig):
        self.tenant_id = tenant_id
        self.config = config
        
        # Rate tracking
        self.tokens = float(config.rpm)
        self.last_refill = time.time()
        self.refill_rate = config.rpm / 60.0
        
        # Queue
        self.queue: asyncio.PriorityQueue = asyncio.PriorityQueue(
            maxsize=config.queue_limit
        )
        self.active_requests = 0
        self._semaphore = asyncio.Semaphore(config.max_concurrent)
    
    async def enqueue(self, payload: dict, priority: int) -> dict:
        """เพิ่ม request เข้าคิวและรอ execute"""
        try:
            self.queue.put_nowait((priority, payload))
        except asyncio.QueueFull:
            raise Exception(
                f"Queue full for tenant {self.tenant_id}. "
                f"Max queue size: {self