สถานการณ์ข้อผิดพลาดจริง: เมื่อ Bot เทรดของคุณ "หลับใน" กลางคัน

เช้าวันศุกร์ที่ผ่านมา ผมนั่งมองจอมอนิเตอร์พร้อมกาแฟแก้วโปรด ระบบ Trading Bot ที่พัฒนามา 3 เดือนกำลังทำงานได้ดีมาก กระทั่ง 09:15:32 น. — หน้าจอขึ้น ConnectionError: timeout after 30000ms ตามมาด้วย 401 Unauthorized หลังจาก restart เอง สิ่งที่เห็นคือ Order Book ที่มีข้อมูลล้าสมัยเกือบ 2 นาที สร้างความเสียหาย $4,230 ในไม่กี่วินาที

บทความนี้จะสอนวิธีสร้างระบบดึง Order Book ที่เสถียร พร้อมโค้ด Python ที่รันได้จริง ระบบ WebSocket reconnect อัตโนมัติ และวิธีจัดการ Error ทุกรูปแบบ

Order Book API คืออะไร และทำไมต้อง Real-time

Order Book คือรายการคำสั่งซื้อ-ขายที่รอดำเนินการในตลาด แสดงความลึกของราคา (Price Depth) และสภาพคล่องของตลาด

ข้อมูลที่ได้จาก Order Book:

โครงสร้าง API พื้นฐานสำหรับ Order Book

ก่อนเข้าสู่โค้ด Real-time เรามาดูโครงสร้าง REST API สำหรับดึง Snapshot ของ Order Book ก่อน

import requests
import time
from datetime import datetime

class CryptoOrderBookAPI:
    """คลาสสำหรับเชื่อมต่อ Order Book API พื้นฐาน"""
    
    def __init__(self, api_key, base_url="https://api.holysheep.ai/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
            "User-Agent": "OrderBook-Client/1.0"
        })
    
    def get_orderbook_snapshot(self, symbol="BTC-USDT", limit=20):
        """
        ดึง Order Book Snapshot
        :param symbol: คู่เทรด เช่น BTC-USDT, ETH-USDT
        :param limit: จำนวน Price Levels ที่ต้องการ (max 100)
        :return: Dictionary ที่มี bids, asks, timestamp
        """
        endpoint = f"{self.base_url}/orderbook"
        params = {
            "symbol": symbol,
            "limit": limit
        }
        
        try:
            response = self.session.get(endpoint, params=params, timeout=10)
            response.raise_for_status()
            data = response.json()
            
            return {
                "symbol": symbol,
                "bids": data.get("bids", []),  # [[price, volume], ...]
                "asks": data.get("asks", []),
                "last_update": datetime.now().isoformat(),
                "exchange_timestamp": data.get("timestamp")
            }
            
        except requests.exceptions.Timeout:
            raise ConnectionError(f"Request timeout หลัง 10 วินาทีสำหรับ {symbol}")
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 401:
                raise ConnectionError("API Key ไม่ถูกต้อง หรือหมดอายุ")
            raise ConnectionError(f"HTTP Error: {e}")
        except requests.exceptions.RequestException as e:
            raise ConnectionError(f"Connection Error: {e}")

วิธีใช้งาน

api_client = CryptoOrderBookAPI( api_key="YOUR_HOLYSHEEP_API_KEY", base_url="https://api.holysheep.ai/v1" ) try: orderbook = api_client.get_orderbook_snapshot("BTC-USDT", limit=50) print(f"ดึงข้อมูลสำเร็จ: {len(orderbook['bids'])} bids, {len(orderbook['asks'])} asks") print(f"Bid สูงสุด: {orderbook['bids'][0][0]} @ {orderbook['bids'][0][1]} BTC") print(f"Ask ต่ำสุด: {orderbook['asks'][0][0]} @ {orderbook['asks'][0][1]} BTC") except ConnectionError as e: print(f"เกิดข้อผิดพลาด: {e}")

ระบบ WebSocket แบบ Real-time พร้อม Auto-Reconnect

การใช้ REST API อย่างเดียวไม่เพียงพอสำหรับ Trading Bot เพราะมี Latency สูง เราต้องใช้ WebSocket เพื่อรับข้อมูลอัพเดททันที

import websocket
import json
import threading
import time
import logging
from collections import deque

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RealTimeOrderBookClient:
    """
    WebSocket Client สำหรับ Order Book Real-time
    รองรับ Auto-reconnect, Heartbeat, Message Buffer
    """
    
    def __init__(self, api_key, symbol="BTC-USDT"):
        self.api_key = api_key
        self.symbol = symbol
        self.ws = None
        self.is_running = False
        self.reconnect_attempts = 0
        self.max_reconnect = 5
        self.reconnect_delay = 2  # วินาที
        
        # เก็บ Order Book ล่าสุด
        self.orderbook_bids = deque(maxlen=100)  # ราคาซื้อ
        self.orderbook_asks = deque(maxlen=100)  # ราคาขาย
        self.last_heartbeat = time.time()
        self.last_message_time = time.time()
        
        # Lock สำหรับ Thread Safety
        self.data_lock = threading.Lock()
    
    def get_websocket_url(self):
        """สร้าง WebSocket URL พร้อม Authentication"""
        return f"wss://stream.holysheep.ai/v1/orderbook?symbol={self.symbol}&token={self.api_key}"
    
    def on_message(self, ws, message):
        """รับ Message จาก WebSocket"""
        try:
            data = json.loads(message)
            self.last_message_time = time.time()
            
            # ตรวจสอบประเภท Message
            msg_type = data.get("type")
            
            if msg_type == "snapshot":
                # ข้อมูล Snapshot เริ่มต้น
                with self.data_lock:
                    self.orderbook_bids = deque(data.get("bids", []), maxlen=100)
                    self.orderbook_asks = deque(data.get("asks", []), maxlen=100)
                logger.info(f"ได้รับ Snapshot: {len(data.get('bids', []))} bids")
                
            elif msg_type == "update":
                # อัพเดท增量 (Delta Updates)
                updates = data.get("updates", [])
                with self.data_lock:
                    for update in updates:
                        side = update.get("side")  # "bid" หรือ "ask"
                        price = float(update.get("price"))
                        volume = float(update.get("volume"))
                        
                        if side == "bid":
                            self._update_price_level(self.orderbook_bids, price, volume)
                        else:
                            self._update_price_level(self.orderbook_asks, price, volume)
                            
            elif msg_type == "heartbeat":
                self.last_heartbeat = time.time()
                # ส่ง Heartbeat Response กลับ
                ws.send(json.dumps({"type": "pong"}))
                
            elif msg_type == "error":
                error_msg = data.get("message", "Unknown error")
                logger.error(f"Server Error: {error_msg}")
                
        except json.JSONDecodeError:
            logger.warning(f"ไม่สามารถ Decode JSON: {message[:100]}")
        except Exception as e:
            logger.error(f"Error ใน on_message: {e}")
    
    def _update_price_level(self, price_levels, price, volume):
        """อัพเดท Price Level ใน Order Book"""
        # หา Position ของ Price นี้
        for i, (p, v) in enumerate(price_levels):
            if float(p) == price:
                if volume == 0:
                    # ลบ Order นี้ (Volume = 0 หมายถึงลบ)
                    del price_levels[i]
                else:
                    # อัพเดท Volume
                    price_levels[i] = (price, volume)
                return
        
        # ถ้าไม่เจอ และ Volume > 0 ให้เพิ่มใหม่
        if volume > 0:
            price_levels.append((price, volume))
    
    def on_error(self, ws, error):
        """จัดการ Error"""
        logger.error(f"WebSocket Error: {error}")
        if "401" in str(error) or "Unauthorized" in str(error):
            logger.critical("API Key ไม่ถูกต้อง! กรุณาตรวจสอบ")
            self.is_running = False
    
    def on_close(self, ws, close_status_code, close_msg):
        """เรียกเมื่อ Connection ปิด"""
        logger.warning(f"Connection ปิด: {close_status_code} - {close_msg}")
        self.is_running = False
        
        # ลอง Reconnect อัตโนมัติ
        if self.reconnect_attempts < self.max_reconnect:
            self.reconnect_attempts += 1
            delay = self.reconnect_delay * self.reconnect_attempts
            logger.info(f"จะพยายาม Reconnect ใน {delay} วินาที (ครั้งที่ {self.reconnect_attempts})")
            time.sleep(delay)
            self.connect()
        else:
            logger.error("เลยจำนวนครั้ง Reconnect สูงสุดแล้ว")
    
    def on_open(self, ws):
        """เรียกเมื่อ Connection เปิดสำเร็จ"""
        logger.info(f"เชื่อมต่อ WebSocket สำเร็จสำหรับ {self.symbol}")
        self.is_running = True
        self.reconnect_attempts = 0
        
        # Subscribe ไปยัง Order Book Channel
        subscribe_msg = {
            "type": "subscribe",
            "channel": "orderbook",
            "symbol": self.symbol
        }
        ws.send(json.dumps(subscribe_msg))
    
    def connect(self):
        """เริ่มเชื่อมต่อ WebSocket"""
        ws_url = self.get_websocket_url()
        logger.info(f"กำลังเชื่อมต่อไปยัง: {ws_url.split('?')[0]}")
        
        self.ws = websocket.WebSocketApp(
            ws_url,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close,
            on_open=self.on_open
        )
        
        # รัน WebSocket ใน Thread แยก
        self.ws_thread = threading.Thread(
            target=self.ws.run_forever,
            daemon=True
        )
        self.ws_thread.start()
    
    def get_current_spread(self):
        """คำนวณ Spread ปัจจุบัน"""
        with self.data_lock:
            if not self.orderbook_bids or not self.orderbook_asks:
                return None
            
            best_bid = float(self.orderbook_bids[0][0])
            best_ask = float(self.orderbook_asks[0][0])
            spread = best_ask - best_bid
            spread_percent = (spread / best_bid) * 100
            
            return {
                "best_bid": best_bid,
                "best_ask": best_ask,
                "spread": spread,
                "spread_percent": round(spread_percent, 4),
                "mid_price": (best_bid + best_ask) / 2
            }
    
    def get_orderbook_depth(self, levels=10):
        """ดึง Order Book Depth (ผลรวม Volume ใน N Levels)"""
        with self.data_lock:
            bid_depth = sum(float(v) for _, v in list(self.orderbook_bids)[:levels])
            ask_depth = sum(float(v) for _, v in list(self.orderbook_asks)[:levels])
            
            return {
                "bid_depth": bid_depth,
                "ask_depth": ask_depth,
                "imbalance": (bid_depth - ask_depth) / (bid_depth + ask_depth) if (bid_depth + ask_depth) > 0 else 0
            }
    
    def disconnect(self):
        """ยกเลิกเชื่อมต่อ"""
        self.is_running = False
        if self.ws:
            self.ws.close()

วิธีใช้งาน

client = RealTimeOrderBookClient( api_key="YOUR_HOLYSHEEP_API_KEY", symbol="BTC-USDT" ) client.connect()

รอให้ Connection สำเร็จ

time.sleep(3)

ทดสอบดึงข้อมูล

for i in range(10): spread = client.get_current_spread() if spread: print(f"Spread: {spread['spread_percent']}% | Mid: ${spread['mid_price']:,.2f}") depth = client.get_orderbook_depth(5) print(f"Depth Imbalance: {depth['imbalance']:.4f}") time.sleep(1) client.disconnect()

ความแตกต่างระหว่าง REST vs WebSocket

สำหรับการพัฒนาระบบ Trading ที่ต้องการ Latency ต่ำ ความเข้าใจความแตกต่างระหว่าง REST API และ WebSocket มีความสำคัญมาก

คุณสมบัติ REST API WebSocket
Latency เฉลี่ย 100-300ms <50ms (รวม HolySheep)
การใช้ Resource สูง (สร้าง Connection ใหม่ทุกครั้ง) ต่ำ (Connection คงที่)
ความถี่ในการดึงข้อมูล 1-10 ครั้ง/วินาที 100+ ครั้ง/วินาที
เหมาะกับ ดึง Snapshot, Backtest Trading Bot, Arbitrage
Rate Limit มีจำกัด ไม่มี (ถ้าใช้ Subscribe)

ข้อผิดพลาดที่พบบ่อยและวิธีแก้ไข

กรณีที่ 1: 401 Unauthorized - API Key หมดอายุหรือไม่ถูกต้อง

# ❌ วิธีผิด: Hardcode API Key โดยตรงในโค้ด
API_KEY = "sk_live_xxxx"  # ไม่ปลอดภัย!

✅ วิธีถูก: โหลดจาก Environment Variable

import os from dotenv import load_dotenv load_dotenv() API_KEY = os.environ.get("HOLYSHEEP_API_KEY") if not API_KEY: raise ValueError("กรุณาตั้งค่า HOLYSHEEP_API_KEY ใน Environment Variable")

หรือใช้ Secret Manager

from keyring import get_password API_KEY = get_password("holysheep", "api_key") if not API_KEY: raise ConnectionError("ไม่พบ API Key ใน Secret Manager")

วิธีตรวจสอบ API Key ก่อนใช้งาน

def validate_api_key(api_key): """ตรวจสอบความถูกต้องของ API Key""" import requests test_url = "https://api.holysheep.ai/v1/auth/validate" headers = {"Authorization": f"Bearer {api_key}"} try: response = requests.get(test_url, headers=headers, timeout=5) if response.status_code == 200: return True, "API Key ถูกต้อง" elif response.status_code == 401: return False, "API Key ไม่ถูกต้องหรือหมดอายุ" else: return False, f"Error: {response.status_code}" except Exception as e: return False, f"ไม่สามารถตรวจสอบได้: {e}"

ทดสอบ

is_valid, message = validate_api_key(API_KEY) print(message)

กรณีที่ 2: ConnectionError: timeout after 30000ms

# ❌ วิธีผิด: ใช้ Timeout คงที่ ไม่มี Retry Logic
response = requests.get(url, timeout=30)

✅ วิธีถูก: Exponential Backoff with Retry

from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import requests def create_session_with_retry(max_retries=3, backoff_factor=0.5): """สร้าง Session ที่รองรับ Auto-retry พร้อม Exponential Backoff""" session = requests.Session() # Retry Strategy: ลองใหม่เมื่อเกิด Error retry_strategy = Retry( total=max_retries, backoff_factor=backoff_factor, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["HEAD", "GET", "OPTIONS"], raise_on_status=False ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("https://", adapter) session.mount("http://", adapter) return session

วิธีใช้งาน

session = create_session_with_retry(max_retries=5, backoff_factor=1) def fetch_with_timeout(url, api_key, timeout=10): """ ดึงข้อมูลพร้อม Timeout และ Auto-retry :param url: URL ที่ต้องการ :param api_key: API Key สำหรับ Authentication :param timeout: Timeout ในวินาที :return: Response JSON """ headers = { "Authorization": f"Bearer {api_key}", "Accept": "application/json" } try: response = session.get(url, headers=headers, timeout=timeout) # จัดการ Rate Limit if response.status_code == 429: retry_after = int(response.headers.get("Retry-After", 60)) print(f"Rate Limited! รอ {retry_after} วินาที") import time time.sleep(retry_after) return fetch_with_timeout(url, api_key, timeout) # Retry response.raise_for_status() return response.json() except requests.exceptions.Timeout: raise ConnectionError(f"Request Timeout หลัง {timeout} วินาที") except requests.exceptions.ConnectionError as e: raise ConnectionError(f"Connection Error: {e}") except requests.exceptions.HTTPError as e: raise ConnectionError(f"HTTP {e.response.status_code}: {e}")

ตัวอย่างการใช้งาน

try: url = "https://api.holysheep.ai/v1/orderbook?symbol=BTC-USDT&limit=20" data = fetch_with_timeout(url, "YOUR_HOLYSHEEP_API_KEY", timeout=15) print(f"ดึงข้อมูลสำเร็จ: {data}") except ConnectionError as e: print(f"ข้อผิดพลาด: {e}")

กรณีที่ 3: Stale Data - Order Book ข้อมูลเก่า

# ❌ วิธีผิด: ไม่ตรวจสอบความสดของข้อมูล
def get_orderbook():
    response = requests.get(url)
    return response.json()  # ไม่รู้ว่าข้อมูลเก่าแค่ไหน

✅ วิธีถูก: ตรวจสอบ Data Freshness พร้อม Fallback

import time from datetime import datetime, timezone from dataclasses import dataclass from typing import Optional import logging logger = logging.getLogger(__name__) @dataclass class OrderBookData: """Data Class สำหรับ Order Book พร้อม Metadata""" bids: list asks: list timestamp: float received_at: float @property def age_ms(self) -> float: """อายุของข้อมูลในหน่วยมิลลิวินาที""" return (time.time() - self.timestamp) * 1000 @property def is_fresh(self, max_age_ms: float = 1000) -> bool: """ตรวจสอบว่าข้อมูลยังสดอยู่หรือไม่""" return self.age_ms < max_age_ms class FreshnessMonitor: """ตรวจสอบความสดของข้อมูล Order Book""" def __init__(self, max_acceptable_age_ms: float = 1000): self.max_acceptable_age_ms = max_acceptable_age_ms self.stale_count = 0 self.total_requests = 0 def validate_and_log(self, orderbook: OrderBookData) -> bool: """ตรวจสอบความสดและ Log สถิติ""" self.total_requests += 1 is_fresh = orderbook.is_fresh(self.max_acceptable_age_ms) if not is_fresh: self.stale_count += 1 logger.warning( f"⚠️ Order Book เก่า: {orderbook.age_ms:.0f}ms " f"(ยอมรับได้: {self.max_acceptable_age_ms}ms)" ) return is_fresh def get_stale_ratio(self) -> float: """สถิติเปอร์เซ็นต์ข้อมูลเก่า""" if self.total_requests == 0: return 0.0 return (self.stale_count / self.total_requests) * 100

วิธีใช้งานใน Trading Bot

def get_orderbook_with_freshness_check(client, symbol: str) -> Optional[OrderBookData]: """ ดึง Order Book พร้อมตรวจสอบความสด ถ้าข้อมูลเก่าเกินไปจะลอง Request ใหม่ """ monitor = FreshnessMonitor(max_acceptable_age_ms=500) # ยอมรับได้ 500ms for attempt in range(3): try: raw_data = client.get_orderbook_snapshot(symbol, limit=50) orderbook = OrderBookData( bids=raw_data.get("bids", []), asks=raw_data.get("asks", []), timestamp=raw_data.get("exchange_timestamp", time.time()), received_at=time.time() ) if monitor.validate_and_log(orderbook): return orderbook else: logger.info(f"ลอง Request ใหม่ (ครั้งที่ {attempt + 2})") time.sleep(0.1 * (attempt + 1)) # Backoff ก่อน Retry except ConnectionError as e: logger.error(f"Connection Error: {e}") time.sleep(1) # ถ้า Retry 3 ครั้งแล้วยังเก่า ให้ใช้ WebSocket Data แทน logger.warning("ใช้ WebSocket Data เป็น Fallback") return None # หรือ Fallback ไปใช้ WebSocket

สถิติการทำงาน

monitor = FreshnessMonitor() print(f"อัตราข้อมูลเก่า: {monitor.get_stale_ratio():.2f}%")

Best Practices สำหรับ Order Book Data

จากประสบการณ์การพัฒนาระบบ Trading ที่ใช้งานจริง มีหลักการสำคัญที่ต้องปฏิบัติตาม