สถานการณ์ข้อผิดพลาดจริง: เมื่อ Bot เทรดของคุณ "หลับใน" กลางคัน
เช้าวันศุกร์ที่ผ่านมา ผมนั่งมองจอมอนิเตอร์พร้อมกาแฟแก้วโปรด ระบบ Trading Bot ที่พัฒนามา 3 เดือนกำลังทำงานได้ดีมาก กระทั่ง 09:15:32 น. — หน้าจอขึ้น ConnectionError: timeout after 30000ms ตามมาด้วย 401 Unauthorized หลังจาก restart เอง สิ่งที่เห็นคือ Order Book ที่มีข้อมูลล้าสมัยเกือบ 2 นาที สร้างความเสียหาย $4,230 ในไม่กี่วินาที
บทความนี้จะสอนวิธีสร้างระบบดึง Order Book ที่เสถียร พร้อมโค้ด Python ที่รันได้จริง ระบบ WebSocket reconnect อัตโนมัติ และวิธีจัดการ Error ทุกรูปแบบ
Order Book API คืออะไร และทำไมต้อง Real-time
Order Book คือรายการคำสั่งซื้อ-ขายที่รอดำเนินการในตลาด แสดงความลึกของราคา (Price Depth) และสภาพคล่องของตลาด
ข้อมูลที่ได้จาก Order Book:
- ราคา Bid (คำสั่งซื้อ) และ Ask (คำสั่งขาย)
- ปริมาณคำสั่ง (Volume) ที่แต่ละระดับราคา
- จำนวน Orders ในแต่ละ Price Level
- Spread ระหว่างราคาซื้อ-ขาย
โครงสร้าง API พื้นฐานสำหรับ Order Book
ก่อนเข้าสู่โค้ด Real-time เรามาดูโครงสร้าง REST API สำหรับดึง Snapshot ของ Order Book ก่อน
import requests
import time
from datetime import datetime
class CryptoOrderBookAPI:
"""คลาสสำหรับเชื่อมต่อ Order Book API พื้นฐาน"""
def __init__(self, api_key, base_url="https://api.holysheep.ai/v1"):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"User-Agent": "OrderBook-Client/1.0"
})
def get_orderbook_snapshot(self, symbol="BTC-USDT", limit=20):
"""
ดึง Order Book Snapshot
:param symbol: คู่เทรด เช่น BTC-USDT, ETH-USDT
:param limit: จำนวน Price Levels ที่ต้องการ (max 100)
:return: Dictionary ที่มี bids, asks, timestamp
"""
endpoint = f"{self.base_url}/orderbook"
params = {
"symbol": symbol,
"limit": limit
}
try:
response = self.session.get(endpoint, params=params, timeout=10)
response.raise_for_status()
data = response.json()
return {
"symbol": symbol,
"bids": data.get("bids", []), # [[price, volume], ...]
"asks": data.get("asks", []),
"last_update": datetime.now().isoformat(),
"exchange_timestamp": data.get("timestamp")
}
except requests.exceptions.Timeout:
raise ConnectionError(f"Request timeout หลัง 10 วินาทีสำหรับ {symbol}")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
raise ConnectionError("API Key ไม่ถูกต้อง หรือหมดอายุ")
raise ConnectionError(f"HTTP Error: {e}")
except requests.exceptions.RequestException as e:
raise ConnectionError(f"Connection Error: {e}")
วิธีใช้งาน
api_client = CryptoOrderBookAPI(
api_key="YOUR_HOLYSHEEP_API_KEY",
base_url="https://api.holysheep.ai/v1"
)
try:
orderbook = api_client.get_orderbook_snapshot("BTC-USDT", limit=50)
print(f"ดึงข้อมูลสำเร็จ: {len(orderbook['bids'])} bids, {len(orderbook['asks'])} asks")
print(f"Bid สูงสุด: {orderbook['bids'][0][0]} @ {orderbook['bids'][0][1]} BTC")
print(f"Ask ต่ำสุด: {orderbook['asks'][0][0]} @ {orderbook['asks'][0][1]} BTC")
except ConnectionError as e:
print(f"เกิดข้อผิดพลาด: {e}")
ระบบ WebSocket แบบ Real-time พร้อม Auto-Reconnect
การใช้ REST API อย่างเดียวไม่เพียงพอสำหรับ Trading Bot เพราะมี Latency สูง เราต้องใช้ WebSocket เพื่อรับข้อมูลอัพเดททันที
import websocket
import json
import threading
import time
import logging
from collections import deque
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RealTimeOrderBookClient:
"""
WebSocket Client สำหรับ Order Book Real-time
รองรับ Auto-reconnect, Heartbeat, Message Buffer
"""
def __init__(self, api_key, symbol="BTC-USDT"):
self.api_key = api_key
self.symbol = symbol
self.ws = None
self.is_running = False
self.reconnect_attempts = 0
self.max_reconnect = 5
self.reconnect_delay = 2 # วินาที
# เก็บ Order Book ล่าสุด
self.orderbook_bids = deque(maxlen=100) # ราคาซื้อ
self.orderbook_asks = deque(maxlen=100) # ราคาขาย
self.last_heartbeat = time.time()
self.last_message_time = time.time()
# Lock สำหรับ Thread Safety
self.data_lock = threading.Lock()
def get_websocket_url(self):
"""สร้าง WebSocket URL พร้อม Authentication"""
return f"wss://stream.holysheep.ai/v1/orderbook?symbol={self.symbol}&token={self.api_key}"
def on_message(self, ws, message):
"""รับ Message จาก WebSocket"""
try:
data = json.loads(message)
self.last_message_time = time.time()
# ตรวจสอบประเภท Message
msg_type = data.get("type")
if msg_type == "snapshot":
# ข้อมูล Snapshot เริ่มต้น
with self.data_lock:
self.orderbook_bids = deque(data.get("bids", []), maxlen=100)
self.orderbook_asks = deque(data.get("asks", []), maxlen=100)
logger.info(f"ได้รับ Snapshot: {len(data.get('bids', []))} bids")
elif msg_type == "update":
# อัพเดท增量 (Delta Updates)
updates = data.get("updates", [])
with self.data_lock:
for update in updates:
side = update.get("side") # "bid" หรือ "ask"
price = float(update.get("price"))
volume = float(update.get("volume"))
if side == "bid":
self._update_price_level(self.orderbook_bids, price, volume)
else:
self._update_price_level(self.orderbook_asks, price, volume)
elif msg_type == "heartbeat":
self.last_heartbeat = time.time()
# ส่ง Heartbeat Response กลับ
ws.send(json.dumps({"type": "pong"}))
elif msg_type == "error":
error_msg = data.get("message", "Unknown error")
logger.error(f"Server Error: {error_msg}")
except json.JSONDecodeError:
logger.warning(f"ไม่สามารถ Decode JSON: {message[:100]}")
except Exception as e:
logger.error(f"Error ใน on_message: {e}")
def _update_price_level(self, price_levels, price, volume):
"""อัพเดท Price Level ใน Order Book"""
# หา Position ของ Price นี้
for i, (p, v) in enumerate(price_levels):
if float(p) == price:
if volume == 0:
# ลบ Order นี้ (Volume = 0 หมายถึงลบ)
del price_levels[i]
else:
# อัพเดท Volume
price_levels[i] = (price, volume)
return
# ถ้าไม่เจอ และ Volume > 0 ให้เพิ่มใหม่
if volume > 0:
price_levels.append((price, volume))
def on_error(self, ws, error):
"""จัดการ Error"""
logger.error(f"WebSocket Error: {error}")
if "401" in str(error) or "Unauthorized" in str(error):
logger.critical("API Key ไม่ถูกต้อง! กรุณาตรวจสอบ")
self.is_running = False
def on_close(self, ws, close_status_code, close_msg):
"""เรียกเมื่อ Connection ปิด"""
logger.warning(f"Connection ปิด: {close_status_code} - {close_msg}")
self.is_running = False
# ลอง Reconnect อัตโนมัติ
if self.reconnect_attempts < self.max_reconnect:
self.reconnect_attempts += 1
delay = self.reconnect_delay * self.reconnect_attempts
logger.info(f"จะพยายาม Reconnect ใน {delay} วินาที (ครั้งที่ {self.reconnect_attempts})")
time.sleep(delay)
self.connect()
else:
logger.error("เลยจำนวนครั้ง Reconnect สูงสุดแล้ว")
def on_open(self, ws):
"""เรียกเมื่อ Connection เปิดสำเร็จ"""
logger.info(f"เชื่อมต่อ WebSocket สำเร็จสำหรับ {self.symbol}")
self.is_running = True
self.reconnect_attempts = 0
# Subscribe ไปยัง Order Book Channel
subscribe_msg = {
"type": "subscribe",
"channel": "orderbook",
"symbol": self.symbol
}
ws.send(json.dumps(subscribe_msg))
def connect(self):
"""เริ่มเชื่อมต่อ WebSocket"""
ws_url = self.get_websocket_url()
logger.info(f"กำลังเชื่อมต่อไปยัง: {ws_url.split('?')[0]}")
self.ws = websocket.WebSocketApp(
ws_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
# รัน WebSocket ใน Thread แยก
self.ws_thread = threading.Thread(
target=self.ws.run_forever,
daemon=True
)
self.ws_thread.start()
def get_current_spread(self):
"""คำนวณ Spread ปัจจุบัน"""
with self.data_lock:
if not self.orderbook_bids or not self.orderbook_asks:
return None
best_bid = float(self.orderbook_bids[0][0])
best_ask = float(self.orderbook_asks[0][0])
spread = best_ask - best_bid
spread_percent = (spread / best_bid) * 100
return {
"best_bid": best_bid,
"best_ask": best_ask,
"spread": spread,
"spread_percent": round(spread_percent, 4),
"mid_price": (best_bid + best_ask) / 2
}
def get_orderbook_depth(self, levels=10):
"""ดึง Order Book Depth (ผลรวม Volume ใน N Levels)"""
with self.data_lock:
bid_depth = sum(float(v) for _, v in list(self.orderbook_bids)[:levels])
ask_depth = sum(float(v) for _, v in list(self.orderbook_asks)[:levels])
return {
"bid_depth": bid_depth,
"ask_depth": ask_depth,
"imbalance": (bid_depth - ask_depth) / (bid_depth + ask_depth) if (bid_depth + ask_depth) > 0 else 0
}
def disconnect(self):
"""ยกเลิกเชื่อมต่อ"""
self.is_running = False
if self.ws:
self.ws.close()
วิธีใช้งาน
client = RealTimeOrderBookClient(
api_key="YOUR_HOLYSHEEP_API_KEY",
symbol="BTC-USDT"
)
client.connect()
รอให้ Connection สำเร็จ
time.sleep(3)
ทดสอบดึงข้อมูล
for i in range(10):
spread = client.get_current_spread()
if spread:
print(f"Spread: {spread['spread_percent']}% | Mid: ${spread['mid_price']:,.2f}")
depth = client.get_orderbook_depth(5)
print(f"Depth Imbalance: {depth['imbalance']:.4f}")
time.sleep(1)
client.disconnect()
ความแตกต่างระหว่าง REST vs WebSocket
สำหรับการพัฒนาระบบ Trading ที่ต้องการ Latency ต่ำ ความเข้าใจความแตกต่างระหว่าง REST API และ WebSocket มีความสำคัญมาก
| คุณสมบัติ | REST API | WebSocket |
|---|---|---|
| Latency เฉลี่ย | 100-300ms | <50ms (รวม HolySheep) |
| การใช้ Resource | สูง (สร้าง Connection ใหม่ทุกครั้ง) | ต่ำ (Connection คงที่) |
| ความถี่ในการดึงข้อมูล | 1-10 ครั้ง/วินาที | 100+ ครั้ง/วินาที |
| เหมาะกับ | ดึง Snapshot, Backtest | Trading Bot, Arbitrage |
| Rate Limit | มีจำกัด | ไม่มี (ถ้าใช้ Subscribe) |
ข้อผิดพลาดที่พบบ่อยและวิธีแก้ไข
กรณีที่ 1: 401 Unauthorized - API Key หมดอายุหรือไม่ถูกต้อง
# ❌ วิธีผิด: Hardcode API Key โดยตรงในโค้ด
API_KEY = "sk_live_xxxx" # ไม่ปลอดภัย!
✅ วิธีถูก: โหลดจาก Environment Variable
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.environ.get("HOLYSHEEP_API_KEY")
if not API_KEY:
raise ValueError("กรุณาตั้งค่า HOLYSHEEP_API_KEY ใน Environment Variable")
หรือใช้ Secret Manager
from keyring import get_password
API_KEY = get_password("holysheep", "api_key")
if not API_KEY:
raise ConnectionError("ไม่พบ API Key ใน Secret Manager")
วิธีตรวจสอบ API Key ก่อนใช้งาน
def validate_api_key(api_key):
"""ตรวจสอบความถูกต้องของ API Key"""
import requests
test_url = "https://api.holysheep.ai/v1/auth/validate"
headers = {"Authorization": f"Bearer {api_key}"}
try:
response = requests.get(test_url, headers=headers, timeout=5)
if response.status_code == 200:
return True, "API Key ถูกต้อง"
elif response.status_code == 401:
return False, "API Key ไม่ถูกต้องหรือหมดอายุ"
else:
return False, f"Error: {response.status_code}"
except Exception as e:
return False, f"ไม่สามารถตรวจสอบได้: {e}"
ทดสอบ
is_valid, message = validate_api_key(API_KEY)
print(message)
กรณีที่ 2: ConnectionError: timeout after 30000ms
# ❌ วิธีผิด: ใช้ Timeout คงที่ ไม่มี Retry Logic
response = requests.get(url, timeout=30)
✅ วิธีถูก: Exponential Backoff with Retry
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import requests
def create_session_with_retry(max_retries=3, backoff_factor=0.5):
"""สร้าง Session ที่รองรับ Auto-retry พร้อม Exponential Backoff"""
session = requests.Session()
# Retry Strategy: ลองใหม่เมื่อเกิด Error
retry_strategy = Retry(
total=max_retries,
backoff_factor=backoff_factor,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS"],
raise_on_status=False
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
วิธีใช้งาน
session = create_session_with_retry(max_retries=5, backoff_factor=1)
def fetch_with_timeout(url, api_key, timeout=10):
"""
ดึงข้อมูลพร้อม Timeout และ Auto-retry
:param url: URL ที่ต้องการ
:param api_key: API Key สำหรับ Authentication
:param timeout: Timeout ในวินาที
:return: Response JSON
"""
headers = {
"Authorization": f"Bearer {api_key}",
"Accept": "application/json"
}
try:
response = session.get(url, headers=headers, timeout=timeout)
# จัดการ Rate Limit
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
print(f"Rate Limited! รอ {retry_after} วินาที")
import time
time.sleep(retry_after)
return fetch_with_timeout(url, api_key, timeout) # Retry
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
raise ConnectionError(f"Request Timeout หลัง {timeout} วินาที")
except requests.exceptions.ConnectionError as e:
raise ConnectionError(f"Connection Error: {e}")
except requests.exceptions.HTTPError as e:
raise ConnectionError(f"HTTP {e.response.status_code}: {e}")
ตัวอย่างการใช้งาน
try:
url = "https://api.holysheep.ai/v1/orderbook?symbol=BTC-USDT&limit=20"
data = fetch_with_timeout(url, "YOUR_HOLYSHEEP_API_KEY", timeout=15)
print(f"ดึงข้อมูลสำเร็จ: {data}")
except ConnectionError as e:
print(f"ข้อผิดพลาด: {e}")
กรณีที่ 3: Stale Data - Order Book ข้อมูลเก่า
# ❌ วิธีผิด: ไม่ตรวจสอบความสดของข้อมูล
def get_orderbook():
response = requests.get(url)
return response.json() # ไม่รู้ว่าข้อมูลเก่าแค่ไหน
✅ วิธีถูก: ตรวจสอบ Data Freshness พร้อม Fallback
import time
from datetime import datetime, timezone
from dataclasses import dataclass
from typing import Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class OrderBookData:
"""Data Class สำหรับ Order Book พร้อม Metadata"""
bids: list
asks: list
timestamp: float
received_at: float
@property
def age_ms(self) -> float:
"""อายุของข้อมูลในหน่วยมิลลิวินาที"""
return (time.time() - self.timestamp) * 1000
@property
def is_fresh(self, max_age_ms: float = 1000) -> bool:
"""ตรวจสอบว่าข้อมูลยังสดอยู่หรือไม่"""
return self.age_ms < max_age_ms
class FreshnessMonitor:
"""ตรวจสอบความสดของข้อมูล Order Book"""
def __init__(self, max_acceptable_age_ms: float = 1000):
self.max_acceptable_age_ms = max_acceptable_age_ms
self.stale_count = 0
self.total_requests = 0
def validate_and_log(self, orderbook: OrderBookData) -> bool:
"""ตรวจสอบความสดและ Log สถิติ"""
self.total_requests += 1
is_fresh = orderbook.is_fresh(self.max_acceptable_age_ms)
if not is_fresh:
self.stale_count += 1
logger.warning(
f"⚠️ Order Book เก่า: {orderbook.age_ms:.0f}ms "
f"(ยอมรับได้: {self.max_acceptable_age_ms}ms)"
)
return is_fresh
def get_stale_ratio(self) -> float:
"""สถิติเปอร์เซ็นต์ข้อมูลเก่า"""
if self.total_requests == 0:
return 0.0
return (self.stale_count / self.total_requests) * 100
วิธีใช้งานใน Trading Bot
def get_orderbook_with_freshness_check(client, symbol: str) -> Optional[OrderBookData]:
"""
ดึง Order Book พร้อมตรวจสอบความสด
ถ้าข้อมูลเก่าเกินไปจะลอง Request ใหม่
"""
monitor = FreshnessMonitor(max_acceptable_age_ms=500) # ยอมรับได้ 500ms
for attempt in range(3):
try:
raw_data = client.get_orderbook_snapshot(symbol, limit=50)
orderbook = OrderBookData(
bids=raw_data.get("bids", []),
asks=raw_data.get("asks", []),
timestamp=raw_data.get("exchange_timestamp", time.time()),
received_at=time.time()
)
if monitor.validate_and_log(orderbook):
return orderbook
else:
logger.info(f"ลอง Request ใหม่ (ครั้งที่ {attempt + 2})")
time.sleep(0.1 * (attempt + 1)) # Backoff ก่อน Retry
except ConnectionError as e:
logger.error(f"Connection Error: {e}")
time.sleep(1)
# ถ้า Retry 3 ครั้งแล้วยังเก่า ให้ใช้ WebSocket Data แทน
logger.warning("ใช้ WebSocket Data เป็น Fallback")
return None # หรือ Fallback ไปใช้ WebSocket
สถิติการทำงาน
monitor = FreshnessMonitor()
print(f"อัตราข้อมูลเก่า: {monitor.get_stale_ratio():.2f}%")
Best Practices สำหรับ Order Book Data
จากประสบการณ์การพัฒนาระบบ Trading ที่ใช้งานจริง มีหลักการสำคัญที่ต้องปฏิบัติตาม
- ใช้ WebSocket เป็นหลัก — Latency ต่ำกว่า 50ms จาก HolySheep เหมาะส