ในฐานะวิศวกรที่ดูแลระบบ RAG ขององค์กรขนาดใหญ่มากว่า 3 ปี ผมเคยเจอสถานการณ์ที่ API response time พุ่งจาก 80ms ไปถึง 2.3 วินาทีในช่วง prime time ทำให้ระบบ search ล่มทั้งระบบ วันนี้ผมจะมาแบ่งปันวิธีแก้ปัญหาด้วย Dynamic Routing ที่พิสูจน์แล้วว่าใช้งานได้จริงใน production

ทำไมต้อง Dynamic Routing?

เมื่อเรา deploy ระบบ RAG สำหรับองค์กรที่มีเอกสารหลายล้านฉบับ การใช้งาน API จาก provider เดียวอย่าง HolySheep AI ที่มีราคาประหยัดถึง 85%+ (¥1=$1) และ latency ต่ำกว่า 50ms อาจไม่เพียงพอในช่วง peak hours

ปัญหาหลักที่พบ:

หลักการทำงานของ Weighted Round Robin + Latency-based Health Check

ระบบ dynamic routing ที่เราจะสร้างจะทำงานโดย:

Implementation ด้วย Python + Redis

ด้านล่างคือ implementation ที่ใช้งานจริงใน production ของผม:

# config.py - กำหนดค่า endpoints และ weights
import os
from dataclasses import dataclass

@dataclass
class APIEndpoint:
    name: str
    base_url: str
    api_key: str
    model: str
    weight: float = 1.0
    avg_latency: float = 0.0
    failure_count: int = 0
    last_success: float = 0.0

ใช้ HolySheep AI เป็น endpoint หลัก

HOLYSHEEP_ENDPOINT = APIEndpoint( name="holysheep-primary", base_url="https://api.holysheep.ai/v1", api_key=os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY"), model="gpt-4.1", weight=1.0 )

Fallback endpoints

FALLBACK_ENDPOINTS = [ APIEndpoint( name="holysheep-gpt4", base_url="https://api.holysheep.ai/v1", api_key=os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY"), model="gpt-4.1", weight=1.0 ), APIEndpoint( name="holysheep-sonnet", base_url="https://api.holysheep.ai/v1", api_key=os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY"), model="claude-sonnet-4.5", weight=0.8 ), ]

กำหนดค่า latency threshold

MAX_LATENCY_MS = 500 # ถ้าเกินนี้ถือว่า slow CIRCUIT_BREAKER_THRESHOLD = 5 # fail 5 ครั้งติด = open circuit
# latency_tracker.py - ติดตามและคำนวณ latency
import time
import redis
import json
from collections import deque
from typing import Optional

class LatencyTracker:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.window_size = 100  # เก็บ latency 100 ครั้งล่าสุด
        self.decay_factor = 0.95  # weight ล่าสุดมีผลมากกว่า
        
    def record_latency(self, endpoint: str, latency_ms: float, success: bool):
        """บันทึก latency ของแต่ละ endpoint"""
        key = f"latency:{endpoint}"
        
        # ใช้ sorted set เก็บ timestamp:latency
        timestamp = time.time()
        member = f"{timestamp}:{latency_ms}"
        
        pipe = self.redis.pipeline()
        pipe.zadd(key, {member: timestamp})
        pipe.zremrangebyscore(key, 0, timestamp - 300)  # เก็บแค่ 5 นาที
        pipe.execute()
        
        # อัพเดต failure count
        if not success:
            self.redis.hincrby("failures", endpoint, 1)
        else:
            self.redis.hset("failures", endpoint, 0)
    
    def get_avg_latency(self, endpoint: str) -> float:
        """คำนวณ latency เฉลี่ยแบบ weighted"""
        key = f"latency:{endpoint}"
        scores = self.redis.zrange(key, 0, -1, withscores=True)
        
        if not scores:
            return float('inf')
        
        total_weighted = 0.0
        total_weight = 0.0
        weights = [self.decay_factor ** i for i in range(len(scores))]
        weights.reverse()
        
        for (member, timestamp), weight in zip(scores, weights):
            latency = float(member.split(':')[1])
            total_weighted += latency * weight
            total_weight += weight
        
        return total_weighted / total_weight if total_weight > 0 else float('inf')
    
    def get_weight(self, endpoint: str) -> float:
        """คำนวณ weight ตาม latency"""
        avg_latency = self.get_avg_latency(endpoint)
        
        if avg_latency == float('inf'):
            return 0.1  # ไม่มีข้อมูล = weight ต่ำ
        
        if avg_latency > MAX_LATENCY_MS:
            return 0.0  # เกิน threshold = ไม่ใช้ endpoint นี้
        
        # ยิ่ง latency ต่ำ = weight สูง
        # ใช้ exponential decay
        return max(0.1, 2.0 ** (-avg_latency / 200))

latency_tracker = LatencyTracker()
# dynamic_router.py - Router หลัก
import httpx
import asyncio
import random
from typing import List, Optional
from config import HOLYSHEEP_ENDPOINT, FALLBACK_ENDPOINTS, MAX_LATENCY_MS
from latency_tracker import latency_tracker

class DynamicRouter:
    def __init__(self):
        self.endpoints = [HOLYSHEEP_ENDPOINT] + FALLBACK_ENDPOINTS
        self.client = httpx.AsyncClient(timeout=30.0)
        
    async def route_request(self, prompt: str, context: dict) -> dict:
        """เลือก endpoint ที่เหมาะสมที่สุดและส่ง request"""
        # คำนวณ weight ของทุก endpoint
        weights = []
        available_endpoints = []
        
        for ep in self.endpoints:
            weight = latency_tracker.get_weight(ep.name)
            failure_count = latency_tracker.redis.hget("failures", ep.name) or 0
            
            # Circuit breaker: ถ้า fail เกิน threshold ไม่ใช้ endpoint นี้
            if int(failure_count) >= CIRCUIT_BREAKER_THRESHOLD:
                continue
                
            weights.append(weight)
            available_endpoints.append(ep)
        
        if not available_endpoints:
            raise Exception("No available endpoints")
        
        # Weighted random selection
        selected = random.choices(available_endpoints, weights=weights, k=1)[0]
        
        # วัด latency ของ request นี้
        start_time = time.time()
        success = False
        
        try:
            result = await self._call_api(selected, prompt, context)
            success = True
            return result
        finally:
            latency_ms = (time.time() - start_time) * 1000
            latency_tracker.record_latency(selected.name, latency_ms, success)
    
    async def _call_api(self, endpoint: APIEndpoint, prompt: str, context: dict) -> dict:
        """เรียก API จริง"""
        headers = {
            "Authorization": f"Bearer {endpoint.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": endpoint.model,
            "messages": [
                {"role": "system", "content": f"Context: {context}"},
                {"role": "user", "content": prompt}
            ],
            "temperature": 0.7,
            "max_tokens": 1000
        }
        
        response = await self.client.post(
            f"{endpoint.base_url}/chat/completions",
            headers=headers,
            json=payload
        )
        response.raise_for_status()
        return response.json()
    
    async def health_check_loop(self):
        """ตรวจสอบ health ของทุก endpoint เป็นระยะ"""
        while True:
            for ep in self.endpoints:
                start = time.time()
                try:
                    await self._ping(ep)
                    latency = (time.time() - start) * 1000
                    latency_tracker.record_latency(ep.name, latency, True)
                except:
                    latency_tracker.record_latency(ep.name, 99999, False)
            
            await asyncio.sleep(10)  # ตรวจสอบทุก 10 วินาที
    
    async def _ping(self, endpoint: APIEndpoint):
        """Simple ping เพื่อตรวจสอบ endpoint ว่ายังทำงานได้"""
        headers = {"Authorization": f"Bearer {endpoint.api_key}"}
        payload = {"model": endpoint.model, "messages": [{"role": "user", "content": "ping"}], "max_tokens": 1}
        
        await self.client.post(
            f"{endpoint.base_url}/chat/completions",
            headers=headers,
            json=payload
        )

import time  # เพิ่ม import นี้

router = DynamicRouter()

การตั้งค่า Deployment สำหรับ Kubernetes

สำหรับ deployment จริงบน Kubernetes ผมแนะนำให้ใช้ service mesh เช่น Istio ร่วมกับ custom implementation:

# kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: rag-dynamic-router
spec:
  replicas: 3
  selector:
    matchLabels:
      app: rag-router
  template:
    metadata:
      labels:
        app: rag-router
    spec:
      containers:
      - name: router
        image: your-registry/rag-router:latest
        ports:
        - containerPort: 8000
        env:
        - name: HOLYSHEEP_API_KEY
          valueFrom:
            secretKeyRef:
              name: holysheep-credentials
              key: api-key
        - name: REDIS_URL
          value: "redis://redis-cluster:6379"
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 5
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 3
---
apiVersion: v1
kind: Service
metadata:
  name: rag-router-service
spec:
  selector:
    app: rag-router
  ports:
  - port: 80
    targetPort: 8000
  type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: rag-router-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: rag-dynamic-router
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70

ผลลัพธ์หลังจาก Implement

หลังจาก deploy ระบบนี้ใน production เราได้ผลลัพธ์ดังนี้:

ข้อผิดพลาดที่พบบ่อยและวิธีแก้ไข

จากประสบการณ์ที่ผมเจอปัญหาหลายครั้งใน production นี่คือ 3 กรณีที่พบบ่อยที่สุดพร้อมวิธีแก้:

1. Connection Pool Exhaustion

อาการ: ข้อผิดพลาด ConnectionPoolTimeoutException: Timeout waiting for connection from pool

สาเหตุ: httpx client ใช้ connection pool default ที่ 100 connections ถ้า latency สูง request queue จะค้างจน pool เต็ม

# วิธีแก้: เพิ่ม limits และใช้ connection pool ที่เหมาะสม
from httpx import Limits, Timeout

class DynamicRouter:
    def __init__(self):
        # เพิ่ม connections และ timeouts ให้เหมาะสม
        self.client = httpx.AsyncClient(
            timeout=Timeout(30.0, connect=5.0),
            limits=Limits(
                max_connections=500,      # เพิ่มจาก default 100
                max_keepalive_connections=100,
                keepalive_expiry=30.0
            )
        )
        
    async def route_request(self, prompt: str, context: dict) -> dict:
        # เพิ่ม semaphore เ�