การสร้างระบบที่รองรับคำขอพร้อมกันมากกว่า 1,000 คำขอต่อวินาที ไม่ใช่เรื่องยากอีกต่อไป ในบทความนี้ผมจะพาทุกคนเข้าใจหลักการออกแบบระบบ Load Balancing และการทำ Failover อย่างง่าย ๆ พร้อมโค้ดตัวอย่างที่นำไปใช้ได้จริง โดยเราจะใช้ HolySheep AI เป็น API Provider หลักในตัวอย่าง
Load Balancing คืออะไร — อธิบายแบบเข้าใจง่าย
ลองนึกภาพว่าคุณมีร้านกาแฟ 1 คน ถ้าลูกค้ามา 10 คนพร้อมกัน คนเดียวต้องรับทั้งหมดจนลูกค้าต้องรอนาน แต่ถ้าคุณจ้างคนเพิ่มอีก 3 คน และมีพนักงานแจกจ่ายลูกค้าให้คนละ 2-3 คน ทุกคนก็จะได้รับบริการเร็วขึ้น นี่คือหลักการเดียวกันกับ Load Balancing
- Load Balancer คือ "พนักงานแจกจ่าย" ที่ทำหน้าที่ส่งคำขอไปยัง Server ที่ว่างอยู่
- Backend Server คือ "พนักงานชงกาแฟ" ที่ทำงานจริง
- Health Check คือการตรวจสอบว่าพนักงานคนไหนยังทำงานได้ดี
ทำไมต้องมี Failover?
สมมติว่าร้านกาแฟ 1 ใน 4 คนป่วยกะทันหัน ถ้าไม่มีระบบ Failover ลูกค้าที่ถูกส่งไปร้านนั้นจะต้องรอจนกว่าจะมีคนมาบอกว่าร้านปิด Failover จะช่วยตรวจจับปัญหานี้และส่งลูกค้าไปร้านอื่นที่เปิดอยู่ทันที
โครงสร้างพื้นฐานของระบบ
ก่อนจะเข้าสู่โค้ด มาดูโครงสร้างระบบที่เราจะสร้างกัน
+------------------+ +------------------+ +------------------+
| Client/User | --> | Load Balancer | --> | API Server 1 |
+------------------+ | (หรือ Library) | +------------------+
+------------------+ +------------------+
| Health Check | --> | API Server 2 |
+------------------+ +------------------+
| Failover Logic | +------------------+
+------------------+ --> | API Server 3 |
+------------------+
วิธีที่ 1: ใช้ Python Library "requests-futures" สำหรับ QPS ต่ำกว่า 100
สำหรับระบบที่ต้องการรองรับไม่เกิน 100 QPS สามารถใช้วิธีง่าย ๆ ด้วย Library ช่วยจัดการ Connection Pool
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import os
ตั้งค่า API Key ของคุณ
API_KEY = os.environ.get("YOUR_HOLYSHEEP_API_KEY", "sk-your-key-here")
BASE_URL = "https://api.holysheep.ai/v1"
สร้าง Session ที่มี Retry Strategy ในตัว
class HolySheepClient:
def __init__(self, api_key):
self.api_key = api_key
self.base_url = BASE_URL
self.session = self._create_session()
def _create_session(self):
session = requests.Session()
# ตั้งค่า Retry Logic อัตโนมัติ
retry_strategy = Retry(
total=3, # ลองใหม่สูงสุด 3 ครั้ง
backoff_factor=0.5, # รอ 0.5, 1, 2 วินาที ระหว่างลองใหม่
status_forcelist=[500, 502, 503, 504], # ลองใหม่เมื่อ Server มีปัญหา
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10, # จำนวน Connection ที่เปิดค้าง
pool_maxsize=20 # ขนาด Connection Pool สูงสุด
)
session.mount("https://", adapter)
return session
def chat(self, message, model="gpt-4o"):
"""ส่งข้อความไปยัง AI API"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": [{"role": "user", "content": message}]
}
response = self.session.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload,
timeout=30 # รอสูงสุด 30 วินาที
)
response.raise_for_status()
return response.json()
วิธีใช้งาน
if __name__ == "__main__":
client = HolySheepClient(API_KEY)
result = client.chat("สวัสดีครับ")
print(result["choices"][0]["message"]["content"])
วิธีที่ 2: Load Balancing ด้วย Round Robin สำหรับ QPS 100-500
เมื่อต้องการรองรับโหลดมากขึ้น เราต้องกระจายคำขอไปยังหลาย Connection หรือหลาย Endpoint
import random
import time
from threading import Lock
class LoadBalancer:
"""ระบบ Load Balancer แบบ Round Robin พื้นฐาน"""
def __init__(self, api_keys):
"""
api_keys: รายการ API Keys หลายตัว (สำหรับกระจายโหลด)
"""
self.api_keys = api_keys
self.current_index = 0
self.lock = Lock()
self.error_count = {} # นับจำนวนครั้งที่ Key ทำงานผิดพลาด
self.last_used = {} # เวลาที่ใช้งานล่าสุด
# ตั้งค่าเริ่มต้น
for key in api_keys:
self.error_count[key] = 0
self.last_used[key] = 0
def get_next_key(self):
"""เลือก API Key ถัดไปแบบ Round Robin"""
with self.lock:
# กรองเอาเฉพาะ Key ที่ยังทำงานได้ดี
available_keys = [
k for k in self.api_keys
if self.error_count[k] < 5 # ถ้าผิดพลาดเกิน 5 ครั้ง ให้หยุดใช้ชั่วคราว
]
if not available_keys:
# ถ้าทุก Key มีปัญหา ให้ Reset แล้วลองใหม่
print("⚠️ ทุก API Key มีปัญหา กำลัง Reset...")
for key in self.api_keys:
self.error_count[key] = 0
available_keys = self.api_keys
# Round Robin: เลือก Key ถัดไปในลำดับ
key = available_keys[self.current_index % len(available_keys)]
self.current_index += 1
self.last_used[key] = time.time()
return key
def mark_error(self, api_key):
"""บันทึกว่า API Key นี้ทำงานผิดพลาด"""
with self.lock:
self.error_count[api_key] += 1
print(f"❌ API Key มีปัญหา: {api_key[:10]}... ({self.error_count[api_key]} ครั้ง)")
def mark_success(self, api_key):
"""บันทึกว่า API Key นี้ทำงานสำเร็จ"""
with self.lock:
if self.error_count[api_key] > 0:
self.error_count[api_key] -= 1 # ค่อย ๆ ลดความน่าเชื่อถือกลับมา
วิธีใช้งาน
if __name__ == "__main__":
# สมมติว่าคุณมีหลาย API Keys
my_keys = [
"sk-key-001-xxxx",
"sk-key-002-xxxx",
"sk-key-003-xxxx"
]
balancer = LoadBalancer(my_keys)
# จำลองการใช้งาน 10 ครั้ง
for i in range(10):
selected_key = balancer.get_next_key()
print(f"ครั้งที่ {i+1}: ใช้ Key {selected_key[:12]}...")
วิธีที่ 3: ระบบ Failover แบบเต็มรูปแบบสำหรับ QPS 1000+
นี่คือโค้ดเต็ม ๆ ที่ใช้งานจริงใน Production รองรับการตรวจสอบสถานะอัตโนมัติและทำงานแทนเมื่อ Server หลักมีปัญหา
import requests
import time
import asyncio
import aiohttp
from typing import Optional, List, Dict
from dataclasses import dataclass
from threading import Thread, Lock
@dataclass
class ServerStatus:
"""เก็บสถานะของแต่ละ API Server"""
name: str
url: str
is_healthy: bool = True
last_check: float = 0
consecutive_failures: int = 0
avg_response_time: float = 0
class HolySheepLoadBalancer:
"""
ระบบ Load Balancer + Failover สำหรับ HolySheep AI API
รองรับ QPS 1000+ ด้วย Connection Pooling และ Health Check
"""
def __init__(self, api_keys: List[str]):
self.api_keys = api_keys
self.current_key_index = 0
self.lock = Lock()
# สถานะของ API Key แต่ละตัว
self.key_status: Dict[str, ServerStatus] = {}
for key in api_keys:
self.key_status[key] = ServerStatus(
name=f"key_{key[:8]}",
url="https://api.holysheep.ai/v1",
is_healthy=True
)
# เริ่ม Health Check Thread
self.health_check_thread = Thread(target=self._health_check_loop, daemon=True)
self.health_check_thread.start()
def _health_check_loop(self):
"""ทำ Health Check ทุก 30 วินาที"""
while True:
time.sleep(30)
self._perform_health_check()
def _perform_health_check(self):
"""ตรวจสอบสถานะ API ทั้งหมด"""
for key, status in self.key_status.items():
try:
start = time.time()
response = requests.get(
"https://api.holysheep.ai/v1/models",
headers={"Authorization": f"Bearer {key}"},
timeout=5
)
elapsed = (time.time() - start) * 1000 # แปลงเป็น ms
status.is_healthy = response.status_code == 200
status.last_check = time.time()
status.avg_response_time = (status.avg_response_time * 0.7 + elapsed * 0.3)
if status.consecutive_failures > 0:
print(f"✅ {status.name} กลับมาทำงานได้ (Latency: {elapsed:.0f}ms)")
except Exception as e:
status.consecutive_failures += 1
status.is_healthy = False
if status.consecutive_failures == 1:
print(f"⚠️ {status.name} มีปัญหา: {str(e)[:50]}")
def get_healthy_key(self) -> Optional[str]:
"""เลือก API Key ที่สุขภาพดีและมี Response Time ต่ำที่สุด"""
with self.lock:
healthy_keys = [
(key, status) for key, status in self.key_status.items()
if status.is_healthy and status.consecutive_failures < 3
]
if not healthy_keys:
# Fallback: ใช้ Key แรกสุด
return self.api_keys[0] if self.api_keys else None
# เลือก Key ที่มี Response Time ต่ำที่สุด
healthy_keys.sort(key=lambda x: x[1].avg_response_time)
return healthy_keys[0][0]
async def chat_async(self, message: str, model: str = "gpt-4o") -> dict:
"""ส่งคำขอแบบ Async เพื่อรองรับ QPS สูง"""
api_key = self.get_healthy_key()
if not api_key:
raise Exception("ไม่มี API Key ที่พร้อมใช้งาน")
url = "https://api.holysheep.ai/v1/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": [{"role": "user", "content": message}]
}
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.post(url, headers=headers, json=payload) as response:
if response.status == 200:
self.key_status[api_key].consecutive_failures = 0
return await response.json()
else:
self.key_status[api_key].consecutive_failures += 1
error_text = await response.text()
raise Exception(f"HTTP {response.status}: {error_text}")
except Exception as e:
self.key_status[api_key].consecutive_failures += 1
print(f"❌ คำขอล้มเหลว: {str(e)[:80]}")
raise
def get_status_report(self) -> str:
"""สร้างรายงานสถานะระบบ"""
report = ["=== สถานะระบบ Load Balancer ==="]
for key, status in self.key_status.items():
health_icon = "🟢