บทนำ: ทำไม WebSocket ถึงสำคัญสำหรับ Trading System

ในโลกของการเทรดคริปโตเคอร์เรนซี ความเร็วคือทุกอย่าง ความหน่วง (latency) เพียง 100 มิลลิวินาทีอาจหมายถึงความแตกต่างระหว่างกำไรกับขาดทุน บทความนี้จะพาคุณสร้างระบบรับข้อมูลราคาแบบเรียลไทม์ด้วย WebSocket ที่มีความหน่วงต่ำกว่า 50 มิลลิวินาที พร้อมสำหรับการใช้งานจริงในระดับ Production

สถาปัตยกรรมระบบ WebSocket สำหรับ Crypto Exchange

หลักการทำงานของ WebSocket Connection

WebSocket เป็น protocol การสื่อสารแบบ full-duplex ผ่าน TCP connection เดียว ต่างจาก HTTP ที่ต้องสร้าง connection ใหม่ทุกครั้ง WebSocket รักษา connection ไว้ตลอดเวลา ทำให้เหมาะสำหรับการรับข้อมูลแบบ real-time

// สถาปัตยกรรมการเชื่อมต่อ WebSocket
┌─────────────────────────────────────────────────────────────────┐
│                        Trading Client                            │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐       │
│  │   WebSocket  │───▶│   Message    │───▶│   Trading    │       │
│  │   Manager    │    │   Parser     │    │   Engine     │       │
│  └──────────────┘    └──────────────┘    └──────────────┘       │
│         │                   │                   │                │
│         ▼                   ▼                   ▼                │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐       │
│  │ Reconnection │    │  JSON/Binary │    │    Order     │       │
│  │   Handler   │    │   Decoder    │    │   Executor   │       │
│  └──────────────┘    └──────────────┘    └──────────────┘       │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                     Exchange WebSocket API                       │
│  wss://stream.binance.com:9443/ws/btcusdt@trade                │
│  wss://ws-feed.pro.coinbase.com                                │
│  wss://fstream.bittrex.com/ws                                  │
└─────────────────────────────────────────────────────────────────┘

Protocol ที่ใช้งานจริง: Binance WebSocket Stream

# การเชื่อมต่อ WebSocket กับ Binance Exchange

WebSocket URL: wss://stream.binance.com:9443/ws/<stream_name>

import websocket import json import time from typing import Dict, List, Callable from dataclasses import dataclass from threading import Thread, Lock import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class TradeMessage: """โครงสร้างข้อมูล Trade จาก Exchange""" symbol: str # เช่น "BTCUSDT" price: float # ราคาที่เทรด quantity: float # จำนวนที่เทรด timestamp: int # Unix timestamp (มิลลิวินาที) is_buyer_maker: bool # True = ผู้ขายเป็น maker @property def trade_time_ms(self) -> int: return self.timestamp class CryptoWebSocketClient: """ WebSocket Client สำหรับรับข้อมูลราคาคริปโตแบบเรียลไทม์ ออกแบบมาสำหรับ Production โดยเฉพาะ """ def __init__( self, on_trade: Callable[[TradeMessage], None] = None, reconnect_delay: float = 1.0, max_reconnect_attempts: int = 10 ): self.ws = None self.on_trade = on_trade self.reconnect_delay = reconnect_delay self.max_reconnect_attempts = max_reconnect_attempts self.is_running = False self.reconnect_count = 0 self.message_count = 0 self.last_latency_check = time.time() self.latencies: List[float] = [] self._lock = Lock() # สถิติประสิทธิภาพ self.stats = { 'total_messages': 0, 'messages_per_second': 0.0, 'avg_latency_ms': 0.0, 'reconnections': 0 } def connect(self, streams: List[str]) -> None: """ เชื่อมต่อกับ Exchange WebSocket streams: รายชื่อ streams เช่น ["btcusdt@trade", "ethusdt@trade"] """ # Binance Combined Stream Format param_str = '/'.join(streams) url = f"wss://stream.binance.com:9443/stream?streams={param_str}" logger.info(f"กำลังเชื่อมต่อกับ {len(streams)} streams...") self.ws = websocket.WebSocketApp( url, on_message=self._on_message, on_error=self._on_error, on_close=self._on_close, on_open=self._on_open ) self.is_running = True # รันใน thread แยกเพื่อไม่บล็อก main thread self.ws_thread = Thread(target=self.ws.run_forever, daemon=True) self.ws_thread.start() # รัน thread สำหรับสถิติ self.stats_thread = Thread(target=self._stats_loop, daemon=True) self.stats_thread.start() def _on_open(self, ws) -> None: logger.info("✅ WebSocket เชื่อมต่อสำเร็จ") self.reconnect_count = 0 def _on_message(self, ws, message: str) -> None: """ประมวลผลข้อความที่ได้รับ""" start_time = time.perf_counter() try: data = json.loads(message) # Binance Combined Stream Format if 'stream' in data and 'data' in data: stream_data = data['data'] if stream_data.get('e') == 'trade': trade = TradeMessage( symbol=stream_data['s'], price=float(stream_data['p']), quantity=float(stream_data['q']), timestamp=stream_data['T'], is_buyer_maker=stream_data['m'] ) # คำนวณ latency latency_ms = (time.time() * 1000) - trade.timestamp with self._lock: self.latencies.append(latency_ms) if len(self.latencies) > 1000: self.latencies = self.latencies[-500:] self.message_count += 1 self.stats['total_messages'] += 1 # เรียก callback function if self.on_trade: self.on_trade(trade) except json.JSONDecodeError as e: logger.error(f"JSON Parse Error: {e}") except Exception as e: logger.error(f"Message Processing Error: {e}") processing_time = (time.perf_counter() - start_time) * 1000 if processing_time > 10: logger.warning(f"Slow message processing: {processing_time:.2f}ms") def _on_error(self, ws, error) -> None: logger.error(f"WebSocket Error: {error}") def _on_close(self, ws, close_status_code, close_msg) -> None: logger.warning(f"WebSocket ปิดการเชื่อมต่อ: {close_status_code}") self.is_running = False # พยายามเชื่อมต่อใหม่ if self.reconnect_count < self.max_reconnect_attempts: self._reconnect() def _reconnect(self) -> None: """จัดการการเชื่อมต่อใหม่อัตโนมัติ""" self.reconnect_count += 1 delay = min(self.reconnect_delay * (2 ** self.reconnect_count), 30) logger.info(f"พยายามเชื่อมต่อใหม่ใน {delay:.1f} วินาที (ครั้งที่ {self.reconnect_count})") time.sleep(delay) self.is_running = True self.ws_thread = Thread(target=self.ws.run_forever, daemon=True) self.ws_thread.start() def _stats_loop(self) -> None: """อัปเดตสถิติประจำวินาที""" last_count = 0 while self.is_running: time.sleep(1) with self._lock: messages_in_last_second = self.message_count - last_count last_count = self.message_count self.stats['messages_per_second'] = messages_in_last_second if self.latencies: self.stats['avg_latency_ms'] = sum(self.latencies) / len(self.latencies) self.stats['p99_latency_ms'] = sorted(self.latencies)[int(len(self.latencies) * 0.99)] if len(self.latencies) > 10 else 0 logger.info( f"Stats: {self.stats['messages_per_second']} msg/s | " f"Avg Latency: {self.stats['avg_latency_ms']:.2f}ms | " f"Total: {self.stats['total_messages']:,}" ) def close(self) -> None: """ปิดการเชื่อมต่อ""" self.is_running = False if self.ws: self.ws.close() logger.info("WebSocket ปิดการเชื่อมต่อแล้ว")

ตัวอย่างการใช้งาน

if __name__ == "__main__": last_prices: Dict[str, float] = {} def on_trade(trade: TradeMessage): """Callback function สำหรับจัดการ trade data""" last_prices[trade.symbol] = trade.price # แสดงข้อมูล trade ล่าสุด if trade.symbol == "BTCUSDT": print(f"[{trade.trade_time_ms}] {trade.symbol}: ${trade.price:,.2f} | Qty: {trade.quantity}") # สร้าง client และเชื่อมต่อ client = CryptoWebSocketClient( on_trade=on_trade, reconnect_delay=1.0 ) # เชื่อมต่อกับหลาย streams streams = [ "btcusdt@trade", "ethusdt@trade", "bnbusdt@trade", "solusdt@trade" ] client.connect(streams) # รันเป็นเวลา 60 วินาที try: time.sleep(60) except KeyboardInterrupt: print("\nหยุดการทำงาน...") finally: client.close() # แสดงสรุปสถิติ print(f"\n📊 สรุปสถิติ:") print(f" ข้อความทั้งหมด: {client.stats['total_messages']:,}") print(f" การเชื่อมต่อใหม่: {client.stats['reconnections']}") print(f" Latency เฉลี่ย: {client.stats.get('avg_latency_ms', 0):.2f}ms")

การเพิ่มประสิทธิภาพและลด Latency

1. Connection Pooling และ Reuse

การสร้าง WebSocket connection ใหม่ทุกครั้งมี cost สูง ควร reuse connection และใช้ connection pool เพื่อลด overhead

"""
High-Performance WebSocket Connection Pool
ออกแบบสำหรับ low-latency trading system
"""

import asyncio
import aiohttp
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, field
from contextlib import asynccontextmanager
import time
import logging
import json

logger = logging.getLogger(__name__)

@dataclass
class ConnectionStats:
    """สถิติของ connection"""
    messages_received: int = 0
    messages_per_second: float = 0.0
    last_message_time: float = 0.0
    avg_latency_ms: float = 0.0
    errors: int = 0
    reconnect_count: int = 0

@dataclass
class WebSocketPoolConfig:
    """Configuration สำหรับ Connection Pool"""
    max_connections: int = 10
    connect_timeout: float = 10.0
    read_timeout: float = 30.0
    heartbeat_interval: float = 20.0
    max_message_size: int = 1024 * 1024  # 1MB
    auto_reconnect: bool = True
    max_reconnect_attempts: int = 100
    backoff_base: float = 1.0
    backoff_max: float = 60.0

class WebSocketConnection:
    """
    Single WebSocket Connection พร้อม auto-reconnect
    """
    
    def __init__(
        self,
        url: str,
        session: aiohttp.ClientSession,
        config: WebSocketPoolConfig
    ):
        self.url = url
        self.session = session
        self.config = config
        self.ws: Optional[aiohttp.ClientWebSocketResponse] = None
        self.stats = ConnectionStats()
        self._is_connected = False
        self._should_reconnect = True
        self._subscriptions: Set[str] = set()
    
    @property
    def is_connected(self) -> bool:
        return self._is_connected and self.ws is not None
    
    async def connect(self) -> bool:
        """เชื่อมต่อกับ WebSocket server"""
        try:
            self.ws = await self.session.ws_connect(
                self.url,
                timeout=aiohttp.ClientWSTimeout(
                    connect=self.config.connect_timeout,
                    total=self.config.read_timeout
                ),
                heartbeat=self.config.heartbeat_interval,
                max_msg_size=self.config.max_message_size
            )
            self._is_connected = True
            
            # Re-subscribe to previous streams
            for stream in self._subscriptions:
                await self.send(json.dumps({"method": "subscribe", "params": [stream]}))
            
            logger.info(f"✅ เชื่อมต่อสำเร็จ: {self.url}")
            return True
            
        except Exception as e:
            logger.error(f"❌ เชื่อมต่อล้มเหลว: {e}")
            self.stats.errors += 1
            return False
    
    async def subscribe(self, streams: List[str]) -> None:
        """Subscribe ไปยัง streams"""
        self._subscriptions.update(streams)
        
        if self.is_connected:
            # Binance format
            subscribe_msg = {
                "method": "SUBSCRIBE",
                "params": streams,
                "id": int(time.time() * 1000)
            }
            await self.send(json.dumps(subscribe_msg))
            logger.info(f"✅ Subscribe สำเร็จ: {streams}")
    
    async def send(self, data: str) -> None:
        """ส่งข้อความ"""
        if self.is_connected:
            await self.ws.send_str(data)
    
    async def receive(self) -> Optional[dict]:
        """รับข้อความ"""
        if not self.is_connected:
            return None
        
        try:
            msg = await self.ws.receive()
            
            if msg.type == aiohttp.WSMsgType.TEXT:
                self.stats.messages_received += 1
                self.stats.last_message_time = time.time()
                return json.loads(msg.data)
            
            elif msg.type == aiohttp.WSMsgType.CLOSED:
                logger.warning("WebSocket closed by server")
                self._is_connected = False
                
            elif msg.type == aiohttp.WSMsgType.ERROR:
                logger.error(f"WebSocket error: {msg.data}")
                self.stats.errors += 1
                self._is_connected = False
                
        except asyncio.TimeoutError:
            pass
        except Exception as e:
            logger.error(f"Receive error: {e}")
            self.stats.errors += 1
        
        return None
    
    async def close(self) -> None:
        """ปิด connection"""
        self._should_reconnect = False
        if self.ws:
            await self.ws.close()
        self._is_connected = False


class WebSocketConnectionPool:
    """
    Connection Pool สำหรับจัดการหลาย WebSocket connections
    รองรับ load balancing และ automatic failover
    """
    
    def __init__(self, config: WebSocketPoolConfig = None):
        self.config = config or WebSocketPoolConfig()
        self.connections: Dict[str, WebSocketConnection] = {}
        self.session: Optional[aiohttp.ClientSession] = None
        self._lock = asyncio.Lock()
    
    async def __aenter__(self):
        await self.start()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.stop()
    
    async def start(self) -> None:
        """เริ่มต้น connection pool"""
        connector = aiohttp.TCPConnector(
            limit=self.config.max_connections,
            force_close=True,  # ปิด connection ทันทีเมื่อใช้เสร็จ
            enable_cleanup_closed=True
        )
        
        self.session = aiohttp.ClientSession(connector=connector)
        logger.info("WebSocket Connection Pool เริ่มต้นแล้ว")
    
    async def stop(self) -> None:
        """หยุด connection pool"""
        for conn in self.connections.values():
            await conn.close()
        
        if self.session:
            await self.session.close()
        
        logger.info("WebSocket Connection Pool หยุดแล้ว")
    
    async def get_connection(
        self,
        url: str,
        streams: List[str] = None
    ) -> WebSocketConnection:
        """
        รับ connection จาก pool
        สร้างใหม่ถ้ายังไม่มี
        """
        async with self._lock:
            if url not in self.connections:
                conn = WebSocketConnection(url, self.session, self.config)
                self.connections[url] = conn
                
                # เชื่อมต่อและ subscribe
                await conn.connect()
                
                if streams:
                    await conn.subscribe(streams)
            
            return self.connections[url]
    
    @asynccontextmanager
    async def connection_context(
        self,
        url: str,
        streams: List[str] = None
    ):
        """Context manager สำหรับใช้ connection"""
        conn = await self.get_connection(url, streams)
        try:
            yield conn
        except Exception as e:
            logger.error(f"Connection error: {e}")
            
            # ลอง reconnect
            if self.config.auto_reconnect:
                await conn.connect()
                if streams:
                    await conn.subscribe(streams)
            raise
    
    async def get_all_stats(self) -> Dict[str, ConnectionStats]:
        """รวบรวมสถิติทั้งหมด"""
        return {url: conn.stats for url, conn in self.connections.items()}


ตัวอย่างการใช้งาน

async def main(): config = WebSocketPoolConfig( max_connections=5, auto_reconnect=True, backoff_base=1.0 ) async with WebSocketConnectionPool(config) as pool: # เชื่อมต่อกับ Binance binance_url = "wss://stream.binance.com:9443/ws" async with pool.connection_context( binance_url, streams=["btcusdt@trade", "ethusdt@trade"] ) as conn: print("กำลังรับข้อมูล...") # รับข้อความเป็นเวลา 10 วินาที start = time.time() while time.time() - start < 10: msg = await conn.receive() if msg: print(f"📩 ได้รับ: {msg}") # วัด latency if 'data' in msg and 'T' in msg['data']: server_time = msg['data']['T'] local_time = int(time.time() * 1000) latency = local_time - server_time print(f"⏱️ Latency: {latency}ms") await asyncio.sleep(0.001) # หยุดเล็กน้อยเพื่อไม่ให้ CPU 100% # แสดงสถิติ stats = await pool.get_all_stats() print("\n📊 สถิติการเชื่อมต่อ:") for url, stat in stats.items(): print(f" {url}:") print(f" ข้อความทั้งหมด: {stat.messages_received:,}") print(f" Errors: {stat.errors}") print(f" Reconnections: {stat.reconnect_count}") if __name__ == "__main__": asyncio.run(main())

2. Binary Protocol เพื่อลด Bandwidth และ Latency

การใช้ binary protocol แทน JSON สามารถลดขนาดข้อมูลได้ถึง 70% และเพิ่มความเร็วในการ parse

"""
Binary Message Encoder/Decoder สำหรับ Crypto Data
ใช้ MessagePack ซึ่งเร็วกว่า JSON ถึง 10 เท่า
"""

import msgpack
import struct
import time
from typing import Dict, Any, Optional
from dataclasses import dataclass
from enum import IntEnum

class MessageType(IntEnum):
    """ประเภทของ binary message"""
    TRADE = 1
    DEPTH_UPDATE = 2
    KLINE = 3
    TICKER = 4
    AGG_TRADE = 5

@dataclass
class BinaryTradeMessage:
    """Trade message ในรูปแบบ binary"""
    message_type: MessageType = MessageType.TRADE
    symbol: int = 0           # Symbol ID (mapped from string)
    price: int = 0           # Price * 100000 (8 decimal places)
    quantity: int = 0         # Quantity * 100000
    timestamp: int = 0        # Unix timestamp in ms
    trade_id: int = 0
    is_buyer_maker: bool = False
    
    def encode(self) -> bytes:
        """Encode เป็น binary"""
        # struct format: type(1) + symbol(4) + price(8) + qty(8) + timestamp(8) + id(8) + flags(1)
        flags = 1 if self.is_buyer_maker else 0
        
        return struct.pack(
            '<BIQIQIB',
            self.message_type,
            self.symbol,
            self.price,
            self.quantity,
            self.timestamp,
            self.trade_id,
            flags
        )
    
    @classmethod
    def decode(cls, data: bytes) -> 'BinaryTradeMessage':
        """Decode จาก binary"""
        unpacked = struct.unpack('<BIQIQIB', data)
        
        return cls(
            message_type=MessageType(unpacked[0]),
            symbol=unpacked[1],
            price=unpacked[2],
            quantity=unpacked[3],
            timestamp=unpacked[4],
            trade_id=unpacked[5],
            is_buyer_maker=bool(unpacked[6])
        )
    
    @property
    def price_float(self) -> float:
        return self.price / 100000
    
    @property
    def quantity_float(self) -> float:
        return self.quantity / 100000


class SymbolRegistry:
    """จัดการ symbol mapping"""
    
    def __init__(self):
        self._symbol_to_id: Dict[str, int] = {}
        self._id_to_symbol: Dict[int, str] = {}
        self._next_id = 1
    
    def register(self, symbol: str) -> int:
        if symbol not in self._symbol_to_id:
            self._symbol_to_id[symbol] = self._next_id
            self._id_to_symbol[self._next_id] = symbol
            self._next_id += 1
        return self._symbol_to_id[symbol]
    
    def get_id(self, symbol: str) -> int:
        return self._symbol_to_id.get(symbol, 0)
    
    def get_symbol(self, symbol_id: int) -> str:
        return self._id_to_symbol.get(symbol_id, "")


class HighPerformanceParser:
    """
    Parser ที่ประมวลผล binary message ได้เร็ว
    เหมาะสำหรับ high-frequency trading
    """
    
    def __init__(self):
        self.symbols = SymbolRegistry()
        self._cache = {}
        self._stats = {
            'messages_parsed': 0,
            'cache_hits': 0,
            'parse_time_us': 0
        }
    
    def parse_binance_trade(self, data: dict) -> BinaryTradeMessage:
        """Parse trade จาก Binance แล้วแปลงเป็น binary"""
        symbol = data['s']
        symbol_id = self.symbols.register(symbol)
        
        # แปลง price/quantity เป็น fixed-point
        price = int(float(data['p']) * 100000)
        quantity = int(float(data['q']) * 100000)
        
        return BinaryTradeMessage(
            message_type=MessageType.TRADE,
            symbol=symbol_id,
            price=price,
            quantity=quantity,
            timestamp=int(data['T']),
            trade_id=int(data['t']),
            is_buyer_maker=data['m']
        )
    
    def parse_binary(self, data: bytes) -> BinaryTradeMessage:
        """Parse binary message"""
        return BinaryTradeMessage.decode(data)
    
    def batch_parse_binary(self, data_list: bytes) -> list:
        """Parseหลาย binary messages ในครั้งเดียว"""
        messages = []
        offset = 0
        msg_size = 40  # ขนาดของ BinaryTradeMessage
        
        while offset + msg_size <= len(data_list):
            msg_data = data_list[offset:offset + msg_size]
            messages.append(self.parse_binary(msg_data))
            offset += msg_size
        
        return messages
    
    def get_stats(self) -> dict:
        return {
            **self._stats,
            'cache_hit_rate': self._stats['cache_hits'] / max(1, self._stats['messages_parsed'])
        }


Benchmark

def benchmark(): """ทดสอบประสิทธิภาพของ binary vs JSON parsing""" import time # �