บทนำ: ทำไม WebSocket ถึงสำคัญสำหรับ Trading System
ในโลกของการเทรดคริปโตเคอร์เรนซี ความเร็วคือทุกอย่าง ความหน่วง (latency) เพียง 100 มิลลิวินาทีอาจหมายถึงความแตกต่างระหว่างกำไรกับขาดทุน บทความนี้จะพาคุณสร้างระบบรับข้อมูลราคาแบบเรียลไทม์ด้วย WebSocket ที่มีความหน่วงต่ำกว่า 50 มิลลิวินาที พร้อมสำหรับการใช้งานจริงในระดับ Production
สถาปัตยกรรมระบบ WebSocket สำหรับ Crypto Exchange
หลักการทำงานของ WebSocket Connection
WebSocket เป็น protocol การสื่อสารแบบ full-duplex ผ่าน TCP connection เดียว ต่างจาก HTTP ที่ต้องสร้าง connection ใหม่ทุกครั้ง WebSocket รักษา connection ไว้ตลอดเวลา ทำให้เหมาะสำหรับการรับข้อมูลแบบ real-time
// สถาปัตยกรรมการเชื่อมต่อ WebSocket
┌─────────────────────────────────────────────────────────────────┐
│ Trading Client │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ WebSocket │───▶│ Message │───▶│ Trading │ │
│ │ Manager │ │ Parser │ │ Engine │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Reconnection │ │ JSON/Binary │ │ Order │ │
│ │ Handler │ │ Decoder │ │ Executor │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Exchange WebSocket API │
│ wss://stream.binance.com:9443/ws/btcusdt@trade │
│ wss://ws-feed.pro.coinbase.com │
│ wss://fstream.bittrex.com/ws │
└─────────────────────────────────────────────────────────────────┘
Protocol ที่ใช้งานจริง: Binance WebSocket Stream
# การเชื่อมต่อ WebSocket กับ Binance Exchange
WebSocket URL: wss://stream.binance.com:9443/ws/<stream_name>
import websocket
import json
import time
from typing import Dict, List, Callable
from dataclasses import dataclass
from threading import Thread, Lock
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class TradeMessage:
"""โครงสร้างข้อมูล Trade จาก Exchange"""
symbol: str # เช่น "BTCUSDT"
price: float # ราคาที่เทรด
quantity: float # จำนวนที่เทรด
timestamp: int # Unix timestamp (มิลลิวินาที)
is_buyer_maker: bool # True = ผู้ขายเป็น maker
@property
def trade_time_ms(self) -> int:
return self.timestamp
class CryptoWebSocketClient:
"""
WebSocket Client สำหรับรับข้อมูลราคาคริปโตแบบเรียลไทม์
ออกแบบมาสำหรับ Production โดยเฉพาะ
"""
def __init__(
self,
on_trade: Callable[[TradeMessage], None] = None,
reconnect_delay: float = 1.0,
max_reconnect_attempts: int = 10
):
self.ws = None
self.on_trade = on_trade
self.reconnect_delay = reconnect_delay
self.max_reconnect_attempts = max_reconnect_attempts
self.is_running = False
self.reconnect_count = 0
self.message_count = 0
self.last_latency_check = time.time()
self.latencies: List[float] = []
self._lock = Lock()
# สถิติประสิทธิภาพ
self.stats = {
'total_messages': 0,
'messages_per_second': 0.0,
'avg_latency_ms': 0.0,
'reconnections': 0
}
def connect(self, streams: List[str]) -> None:
"""
เชื่อมต่อกับ Exchange WebSocket
streams: รายชื่อ streams เช่น ["btcusdt@trade", "ethusdt@trade"]
"""
# Binance Combined Stream Format
param_str = '/'.join(streams)
url = f"wss://stream.binance.com:9443/stream?streams={param_str}"
logger.info(f"กำลังเชื่อมต่อกับ {len(streams)} streams...")
self.ws = websocket.WebSocketApp(
url,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_open=self._on_open
)
self.is_running = True
# รันใน thread แยกเพื่อไม่บล็อก main thread
self.ws_thread = Thread(target=self.ws.run_forever, daemon=True)
self.ws_thread.start()
# รัน thread สำหรับสถิติ
self.stats_thread = Thread(target=self._stats_loop, daemon=True)
self.stats_thread.start()
def _on_open(self, ws) -> None:
logger.info("✅ WebSocket เชื่อมต่อสำเร็จ")
self.reconnect_count = 0
def _on_message(self, ws, message: str) -> None:
"""ประมวลผลข้อความที่ได้รับ"""
start_time = time.perf_counter()
try:
data = json.loads(message)
# Binance Combined Stream Format
if 'stream' in data and 'data' in data:
stream_data = data['data']
if stream_data.get('e') == 'trade':
trade = TradeMessage(
symbol=stream_data['s'],
price=float(stream_data['p']),
quantity=float(stream_data['q']),
timestamp=stream_data['T'],
is_buyer_maker=stream_data['m']
)
# คำนวณ latency
latency_ms = (time.time() * 1000) - trade.timestamp
with self._lock:
self.latencies.append(latency_ms)
if len(self.latencies) > 1000:
self.latencies = self.latencies[-500:]
self.message_count += 1
self.stats['total_messages'] += 1
# เรียก callback function
if self.on_trade:
self.on_trade(trade)
except json.JSONDecodeError as e:
logger.error(f"JSON Parse Error: {e}")
except Exception as e:
logger.error(f"Message Processing Error: {e}")
processing_time = (time.perf_counter() - start_time) * 1000
if processing_time > 10:
logger.warning(f"Slow message processing: {processing_time:.2f}ms")
def _on_error(self, ws, error) -> None:
logger.error(f"WebSocket Error: {error}")
def _on_close(self, ws, close_status_code, close_msg) -> None:
logger.warning(f"WebSocket ปิดการเชื่อมต่อ: {close_status_code}")
self.is_running = False
# พยายามเชื่อมต่อใหม่
if self.reconnect_count < self.max_reconnect_attempts:
self._reconnect()
def _reconnect(self) -> None:
"""จัดการการเชื่อมต่อใหม่อัตโนมัติ"""
self.reconnect_count += 1
delay = min(self.reconnect_delay * (2 ** self.reconnect_count), 30)
logger.info(f"พยายามเชื่อมต่อใหม่ใน {delay:.1f} วินาที (ครั้งที่ {self.reconnect_count})")
time.sleep(delay)
self.is_running = True
self.ws_thread = Thread(target=self.ws.run_forever, daemon=True)
self.ws_thread.start()
def _stats_loop(self) -> None:
"""อัปเดตสถิติประจำวินาที"""
last_count = 0
while self.is_running:
time.sleep(1)
with self._lock:
messages_in_last_second = self.message_count - last_count
last_count = self.message_count
self.stats['messages_per_second'] = messages_in_last_second
if self.latencies:
self.stats['avg_latency_ms'] = sum(self.latencies) / len(self.latencies)
self.stats['p99_latency_ms'] = sorted(self.latencies)[int(len(self.latencies) * 0.99)] if len(self.latencies) > 10 else 0
logger.info(
f"Stats: {self.stats['messages_per_second']} msg/s | "
f"Avg Latency: {self.stats['avg_latency_ms']:.2f}ms | "
f"Total: {self.stats['total_messages']:,}"
)
def close(self) -> None:
"""ปิดการเชื่อมต่อ"""
self.is_running = False
if self.ws:
self.ws.close()
logger.info("WebSocket ปิดการเชื่อมต่อแล้ว")
ตัวอย่างการใช้งาน
if __name__ == "__main__":
last_prices: Dict[str, float] = {}
def on_trade(trade: TradeMessage):
"""Callback function สำหรับจัดการ trade data"""
last_prices[trade.symbol] = trade.price
# แสดงข้อมูล trade ล่าสุด
if trade.symbol == "BTCUSDT":
print(f"[{trade.trade_time_ms}] {trade.symbol}: ${trade.price:,.2f} | Qty: {trade.quantity}")
# สร้าง client และเชื่อมต่อ
client = CryptoWebSocketClient(
on_trade=on_trade,
reconnect_delay=1.0
)
# เชื่อมต่อกับหลาย streams
streams = [
"btcusdt@trade",
"ethusdt@trade",
"bnbusdt@trade",
"solusdt@trade"
]
client.connect(streams)
# รันเป็นเวลา 60 วินาที
try:
time.sleep(60)
except KeyboardInterrupt:
print("\nหยุดการทำงาน...")
finally:
client.close()
# แสดงสรุปสถิติ
print(f"\n📊 สรุปสถิติ:")
print(f" ข้อความทั้งหมด: {client.stats['total_messages']:,}")
print(f" การเชื่อมต่อใหม่: {client.stats['reconnections']}")
print(f" Latency เฉลี่ย: {client.stats.get('avg_latency_ms', 0):.2f}ms")
การเพิ่มประสิทธิภาพและลด Latency
1. Connection Pooling และ Reuse
การสร้าง WebSocket connection ใหม่ทุกครั้งมี cost สูง ควร reuse connection และใช้ connection pool เพื่อลด overhead
"""
High-Performance WebSocket Connection Pool
ออกแบบสำหรับ low-latency trading system
"""
import asyncio
import aiohttp
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, field
from contextlib import asynccontextmanager
import time
import logging
import json
logger = logging.getLogger(__name__)
@dataclass
class ConnectionStats:
"""สถิติของ connection"""
messages_received: int = 0
messages_per_second: float = 0.0
last_message_time: float = 0.0
avg_latency_ms: float = 0.0
errors: int = 0
reconnect_count: int = 0
@dataclass
class WebSocketPoolConfig:
"""Configuration สำหรับ Connection Pool"""
max_connections: int = 10
connect_timeout: float = 10.0
read_timeout: float = 30.0
heartbeat_interval: float = 20.0
max_message_size: int = 1024 * 1024 # 1MB
auto_reconnect: bool = True
max_reconnect_attempts: int = 100
backoff_base: float = 1.0
backoff_max: float = 60.0
class WebSocketConnection:
"""
Single WebSocket Connection พร้อม auto-reconnect
"""
def __init__(
self,
url: str,
session: aiohttp.ClientSession,
config: WebSocketPoolConfig
):
self.url = url
self.session = session
self.config = config
self.ws: Optional[aiohttp.ClientWebSocketResponse] = None
self.stats = ConnectionStats()
self._is_connected = False
self._should_reconnect = True
self._subscriptions: Set[str] = set()
@property
def is_connected(self) -> bool:
return self._is_connected and self.ws is not None
async def connect(self) -> bool:
"""เชื่อมต่อกับ WebSocket server"""
try:
self.ws = await self.session.ws_connect(
self.url,
timeout=aiohttp.ClientWSTimeout(
connect=self.config.connect_timeout,
total=self.config.read_timeout
),
heartbeat=self.config.heartbeat_interval,
max_msg_size=self.config.max_message_size
)
self._is_connected = True
# Re-subscribe to previous streams
for stream in self._subscriptions:
await self.send(json.dumps({"method": "subscribe", "params": [stream]}))
logger.info(f"✅ เชื่อมต่อสำเร็จ: {self.url}")
return True
except Exception as e:
logger.error(f"❌ เชื่อมต่อล้มเหลว: {e}")
self.stats.errors += 1
return False
async def subscribe(self, streams: List[str]) -> None:
"""Subscribe ไปยัง streams"""
self._subscriptions.update(streams)
if self.is_connected:
# Binance format
subscribe_msg = {
"method": "SUBSCRIBE",
"params": streams,
"id": int(time.time() * 1000)
}
await self.send(json.dumps(subscribe_msg))
logger.info(f"✅ Subscribe สำเร็จ: {streams}")
async def send(self, data: str) -> None:
"""ส่งข้อความ"""
if self.is_connected:
await self.ws.send_str(data)
async def receive(self) -> Optional[dict]:
"""รับข้อความ"""
if not self.is_connected:
return None
try:
msg = await self.ws.receive()
if msg.type == aiohttp.WSMsgType.TEXT:
self.stats.messages_received += 1
self.stats.last_message_time = time.time()
return json.loads(msg.data)
elif msg.type == aiohttp.WSMsgType.CLOSED:
logger.warning("WebSocket closed by server")
self._is_connected = False
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.error(f"WebSocket error: {msg.data}")
self.stats.errors += 1
self._is_connected = False
except asyncio.TimeoutError:
pass
except Exception as e:
logger.error(f"Receive error: {e}")
self.stats.errors += 1
return None
async def close(self) -> None:
"""ปิด connection"""
self._should_reconnect = False
if self.ws:
await self.ws.close()
self._is_connected = False
class WebSocketConnectionPool:
"""
Connection Pool สำหรับจัดการหลาย WebSocket connections
รองรับ load balancing และ automatic failover
"""
def __init__(self, config: WebSocketPoolConfig = None):
self.config = config or WebSocketPoolConfig()
self.connections: Dict[str, WebSocketConnection] = {}
self.session: Optional[aiohttp.ClientSession] = None
self._lock = asyncio.Lock()
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.stop()
async def start(self) -> None:
"""เริ่มต้น connection pool"""
connector = aiohttp.TCPConnector(
limit=self.config.max_connections,
force_close=True, # ปิด connection ทันทีเมื่อใช้เสร็จ
enable_cleanup_closed=True
)
self.session = aiohttp.ClientSession(connector=connector)
logger.info("WebSocket Connection Pool เริ่มต้นแล้ว")
async def stop(self) -> None:
"""หยุด connection pool"""
for conn in self.connections.values():
await conn.close()
if self.session:
await self.session.close()
logger.info("WebSocket Connection Pool หยุดแล้ว")
async def get_connection(
self,
url: str,
streams: List[str] = None
) -> WebSocketConnection:
"""
รับ connection จาก pool
สร้างใหม่ถ้ายังไม่มี
"""
async with self._lock:
if url not in self.connections:
conn = WebSocketConnection(url, self.session, self.config)
self.connections[url] = conn
# เชื่อมต่อและ subscribe
await conn.connect()
if streams:
await conn.subscribe(streams)
return self.connections[url]
@asynccontextmanager
async def connection_context(
self,
url: str,
streams: List[str] = None
):
"""Context manager สำหรับใช้ connection"""
conn = await self.get_connection(url, streams)
try:
yield conn
except Exception as e:
logger.error(f"Connection error: {e}")
# ลอง reconnect
if self.config.auto_reconnect:
await conn.connect()
if streams:
await conn.subscribe(streams)
raise
async def get_all_stats(self) -> Dict[str, ConnectionStats]:
"""รวบรวมสถิติทั้งหมด"""
return {url: conn.stats for url, conn in self.connections.items()}
ตัวอย่างการใช้งาน
async def main():
config = WebSocketPoolConfig(
max_connections=5,
auto_reconnect=True,
backoff_base=1.0
)
async with WebSocketConnectionPool(config) as pool:
# เชื่อมต่อกับ Binance
binance_url = "wss://stream.binance.com:9443/ws"
async with pool.connection_context(
binance_url,
streams=["btcusdt@trade", "ethusdt@trade"]
) as conn:
print("กำลังรับข้อมูล...")
# รับข้อความเป็นเวลา 10 วินาที
start = time.time()
while time.time() - start < 10:
msg = await conn.receive()
if msg:
print(f"📩 ได้รับ: {msg}")
# วัด latency
if 'data' in msg and 'T' in msg['data']:
server_time = msg['data']['T']
local_time = int(time.time() * 1000)
latency = local_time - server_time
print(f"⏱️ Latency: {latency}ms")
await asyncio.sleep(0.001) # หยุดเล็กน้อยเพื่อไม่ให้ CPU 100%
# แสดงสถิติ
stats = await pool.get_all_stats()
print("\n📊 สถิติการเชื่อมต่อ:")
for url, stat in stats.items():
print(f" {url}:")
print(f" ข้อความทั้งหมด: {stat.messages_received:,}")
print(f" Errors: {stat.errors}")
print(f" Reconnections: {stat.reconnect_count}")
if __name__ == "__main__":
asyncio.run(main())
2. Binary Protocol เพื่อลด Bandwidth และ Latency
การใช้ binary protocol แทน JSON สามารถลดขนาดข้อมูลได้ถึง 70% และเพิ่มความเร็วในการ parse
"""
Binary Message Encoder/Decoder สำหรับ Crypto Data
ใช้ MessagePack ซึ่งเร็วกว่า JSON ถึง 10 เท่า
"""
import msgpack
import struct
import time
from typing import Dict, Any, Optional
from dataclasses import dataclass
from enum import IntEnum
class MessageType(IntEnum):
"""ประเภทของ binary message"""
TRADE = 1
DEPTH_UPDATE = 2
KLINE = 3
TICKER = 4
AGG_TRADE = 5
@dataclass
class BinaryTradeMessage:
"""Trade message ในรูปแบบ binary"""
message_type: MessageType = MessageType.TRADE
symbol: int = 0 # Symbol ID (mapped from string)
price: int = 0 # Price * 100000 (8 decimal places)
quantity: int = 0 # Quantity * 100000
timestamp: int = 0 # Unix timestamp in ms
trade_id: int = 0
is_buyer_maker: bool = False
def encode(self) -> bytes:
"""Encode เป็น binary"""
# struct format: type(1) + symbol(4) + price(8) + qty(8) + timestamp(8) + id(8) + flags(1)
flags = 1 if self.is_buyer_maker else 0
return struct.pack(
'<BIQIQIB',
self.message_type,
self.symbol,
self.price,
self.quantity,
self.timestamp,
self.trade_id,
flags
)
@classmethod
def decode(cls, data: bytes) -> 'BinaryTradeMessage':
"""Decode จาก binary"""
unpacked = struct.unpack('<BIQIQIB', data)
return cls(
message_type=MessageType(unpacked[0]),
symbol=unpacked[1],
price=unpacked[2],
quantity=unpacked[3],
timestamp=unpacked[4],
trade_id=unpacked[5],
is_buyer_maker=bool(unpacked[6])
)
@property
def price_float(self) -> float:
return self.price / 100000
@property
def quantity_float(self) -> float:
return self.quantity / 100000
class SymbolRegistry:
"""จัดการ symbol mapping"""
def __init__(self):
self._symbol_to_id: Dict[str, int] = {}
self._id_to_symbol: Dict[int, str] = {}
self._next_id = 1
def register(self, symbol: str) -> int:
if symbol not in self._symbol_to_id:
self._symbol_to_id[symbol] = self._next_id
self._id_to_symbol[self._next_id] = symbol
self._next_id += 1
return self._symbol_to_id[symbol]
def get_id(self, symbol: str) -> int:
return self._symbol_to_id.get(symbol, 0)
def get_symbol(self, symbol_id: int) -> str:
return self._id_to_symbol.get(symbol_id, "")
class HighPerformanceParser:
"""
Parser ที่ประมวลผล binary message ได้เร็ว
เหมาะสำหรับ high-frequency trading
"""
def __init__(self):
self.symbols = SymbolRegistry()
self._cache = {}
self._stats = {
'messages_parsed': 0,
'cache_hits': 0,
'parse_time_us': 0
}
def parse_binance_trade(self, data: dict) -> BinaryTradeMessage:
"""Parse trade จาก Binance แล้วแปลงเป็น binary"""
symbol = data['s']
symbol_id = self.symbols.register(symbol)
# แปลง price/quantity เป็น fixed-point
price = int(float(data['p']) * 100000)
quantity = int(float(data['q']) * 100000)
return BinaryTradeMessage(
message_type=MessageType.TRADE,
symbol=symbol_id,
price=price,
quantity=quantity,
timestamp=int(data['T']),
trade_id=int(data['t']),
is_buyer_maker=data['m']
)
def parse_binary(self, data: bytes) -> BinaryTradeMessage:
"""Parse binary message"""
return BinaryTradeMessage.decode(data)
def batch_parse_binary(self, data_list: bytes) -> list:
"""Parseหลาย binary messages ในครั้งเดียว"""
messages = []
offset = 0
msg_size = 40 # ขนาดของ BinaryTradeMessage
while offset + msg_size <= len(data_list):
msg_data = data_list[offset:offset + msg_size]
messages.append(self.parse_binary(msg_data))
offset += msg_size
return messages
def get_stats(self) -> dict:
return {
**self._stats,
'cache_hit_rate': self._stats['cache_hits'] / max(1, self._stats['messages_parsed'])
}
Benchmark
def benchmark():
"""ทดสอบประสิทธิภาพของ binary vs JSON parsing"""
import time
# �