ในโลกของ High-Frequency Trading หรือการเทรดความถี่สูงนั้น ข้อมูล Level 2 (Order Book) คือหัวใจสำคัญที่ทำให้เทรดเดอร์สามารถวิเคราะห์แนวโน้มราคาและความลึกของตลาดได้อย่างแม่นยำ ผมเคยเจอปัญหา ConnectionError: timeout ที่ทำให้ระบบหยุดทำงานกลางคัน และ 401 Unauthorized ที่ไม่สามารถเชื่อมต่อ WebSocket ได้ จนกระทั่งศึกษาจนเข้าใจกลไกที่แท้จริง

บทความนี้จะพาคุณสร้าง Data Pipeline สำหรับเก็บข้อมูล Binance Level2 ผ่าน WebSocket อย่างมืออาชีพ พร้อมวิธีแก้ปัญหาที่พบบ่อยและ Best Practices จากประสบการณ์ตรง

Binance Level2 Data คืออะไร และทำไมต้องเก็บแบบ Real-time

Level 2 ของ Binance คือข้อมูล Order Book ที่แสดงคำสั่งซื้อ-ขายทั้งหมดในแต่ละระดับราคา โดยประกอบด้วย:

ในการพัฒนา Trading Bot หรือระบบวิเคราะห์ตลาด ข้อมูลเหล่านี้ต้องถูกเก็บแบบ Real-time เพื่อให้ได้ข้อมูลที่สมบูรณ์ที่สุด ไม่มีการ Miss ข้อมูล

การติดตั้งสถาปัตยกรรม WebSocket Data Pipeline

สำหรับการเก็บข้อมูล Level 2 แบบ Real-time เราจะใช้สถาปัตยกรรมดังนี้:

โค้ด Python สำหรับเชื่อมต่อ Binance Level2 WebSocket

นี่คือโค้ดพื้นฐานสำหรับการเชื่อมต่อ WebSocket กับ Binance โดยใช้ Library websockets ที่เป็นมาตรฐาน:

# ติดตั้ง dependencies ก่อนใช้งาน

pip install websockets asyncio aiofiles pandas

import asyncio import json import websockets from datetime import datetime from collections import defaultdict class BinanceLevel2Collector: """ Binance Level2 WebSocket Collector เก็บข้อมูล Order Book แบบ Real-time """ def __init__(self, symbol: str = "btcusdt", depth: int = 20): self.symbol = symbol.lower() self.depth = depth self.bids = {} # price -> quantity self.asks = {} # price -> quantity self.last_update_id = 0 self.stream_url = "wss://stream.binance.com:9443/ws" def get_stream_name(self) -> str: """สร้างชื่อ Stream สำหรับ Level2 Depth@100ms""" return f"{self.symbol}@depth@100ms" async def connect(self): """เชื่อมต่อ WebSocket กับ Binance""" stream_name = self.get_stream_name() full_url = f"{self.stream_url}/{stream_name}" print(f"🔌 กำลังเชื่อมต่อ: {full_url}") try: async with websockets.connect(full_url, ping_interval=20) as websocket: print(f"✅ เชื่อมต่อสำเร็จ! กำลังรับข้อมูล Level2 สำหรับ {self.symbol.upper()}") # รับข้อมูลแบบ Infinite Loop while True: try: message = await asyncio.wait_for( websocket.recv(), timeout=30.0 ) await self.process_message(message) except asyncio.TimeoutError: # ส่ง Ping เพื่อรักษาการเชื่อมต่อ await websocket.ping() print("📡 Ping ส่งไปแล้ว...") except websockets.exceptions.ConnectionClosed as e: print(f"❌ การเชื่อมต่อถูกปิด: {e}") print("🔄 กำลังพยายามเชื่อมต่อใหม่ในอีก 5 วินาที...") await asyncio.sleep(5) await self.connect() async def process_message(self, message: str): """ประมวลผลข้อมูลที่ได้รับจาก WebSocket""" data = json.loads(message) # ตรวจสอบว่าเป็นข้อมูล Depth หรือไม่ if "e" in data and data["e"] == "depthUpdate": self.last_update_id = data["u"] # อัพเดท Bids (คำสั่งซื้อ) for price, qty in data.get("b", []): price_float = float(price) qty_float = float(qty) if qty_float == 0: # ลบ Order ที่ถูก Cancel self.bids.pop(price_float, None) else: self.bids[price_float] = qty_float # อัพเดท Asks (คำสั่งขาย) for price, qty in data.get("a", []): price_float = float(price) qty_float = float(qty) if qty_float == 0: self.asks.pop(price_float, None) else: self.asks[price_float] = qty_float # แสดงผล Order Book ปัจจุบัน best_bid = max(self.bids.keys()) if self.bids else 0 best_ask = min(self.asks.keys()) if self.asks else 0 spread = best_ask - best_bid if best_bid and best_ask else 0 print(f"📊 [{data['E']}] " f"BID: {best_bid:.2f} | ASK: {best_ask:.2f} | " f"Spread: {spread:.2f} | " f"Update: {self.last_update_id}") async def main(): """Entry Point สำหรับรัน Collector""" collector = BinanceLevel2Collector(symbol="btcusdt", depth=20) await collector.connect() if __name__ == "__main__": print("=" * 60) print("🚀 Binance Level2 WebSocket Data Collector") print("=" * 60) asyncio.run(main())

การสร้าง Order Book Aggregator พร้อมระบบ Reconnection

ในการใช้งานจริง เราต้องมีระบบ Reconnection ที่แข็งแกร่ง เพราะ WebSocket อาจหลุดการเชื่อมต่อได้ ผมเคยเสียข้อมูลไปหลายชั่วโมงเพราะไม่มีระบบ Reconnection ที่ดี นี่คือโค้ดที่พัฒนาต่อมาจากประสบการณ์จริง:

# advanced_level2_collector.py

รุ่นพัฒนาต่อยอด พร้อมระบบ Reconnection และ Data Persistence

import asyncio import json import time import aiofiles import logging from pathlib import Path from dataclasses import dataclass, asdict from typing import Dict, Optional, List from collections import OrderedDict import websockets from websockets.exceptions import ConnectionClosed, InvalidStatusCode

ตั้งค่า Logging

logging.basicConfig( level=logging.INFO, format='%(asctime)s | %(levelname)-8s | %(message)s', handlers=[ logging.FileHandler('level2_collector.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) @dataclass class OrderBookEntry: """โครงสร้างข้อมูล Order Book""" price: float quantity: float timestamp: int is_bid: bool @dataclass class Level2Snapshot: """Snapshot ของ Order Book ในแต่ละช่วงเวลา""" symbol: str timestamp: int update_id: int bids: List[OrderBookEntry] asks: List[OrderBookEntry] best_bid: float best_ask: float spread: float total_bid_volume: float total_ask_volume: float class AdvancedLevel2Collector: """ Advanced Binance Level2 Collector - รองรับการ Reconnection อัตโนมัติ - บันทึกข้อมูลแบบ Batch - คำนวณ Volume Weighted Average Price (VWAP) """ # ค่าคงที่ MAX_RECONNECT_ATTEMPTS = 10 RECONNECT_DELAY = 5 # วินาที BATCH_SIZE = 100 # บันทึกทุก 100 updates SNAPSHOT_INTERVAL = 60 # Snapshot ทุก 60 วินาที def __init__(self, symbol: str, data_dir: str = "./data"): self.symbol = symbol.lower() self.data_dir = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) # Order Book State self.bids: OrderedDict[float, float] = OrderedDict() self.asks: OrderedDict[float, float] = OrderedDict() self.last_update_id: int = 0 self.last_snapshot_time: int = 0 # Statistics self.messages_received: int = 0 self.reconnect_count: int = 0 self.start_time: float = time.time() # Buffer สำหรับ Batch Writing self.update_buffer: List[Dict] = [] # WebSocket State self.ws: Optional[websockets.WebSocketClientProtocol] = None self.running: bool = True @property def base_url(self) -> str: """URL สำหรับ Combined Stream""" return "wss://stream.binance.com:9443/stream" @property def stream_params(self) -> Dict: """พารามิเตอร์สำหรับ Subscribe""" return { "method": "SUBSCRIBE", "params": [ f"{self.symbol}@depth@100ms", f"{self.symbol}@depth@1s@100ms" ], "id": 1 } async def start(self): """เริ่มต้น Collector""" logger.info(f"🚀 เริ่มต้น AdvancedLevel2Collector สำหรับ {self.symbol.upper()}") while self.running and self.reconnect_count < self.MAX_RECONNECT_ATTEMPTS: try: await self._connect_and_subscribe() except ConnectionClosed as e: self.reconnect_count += 1 logger.warning( f"⚠️ Connection closed: {e.code} - " f"พยายามเชื่อมต่อใหม่ครั้งที่ {self.reconnect_count}" ) await asyncio.sleep(self.RECONNECT_DELAY) except Exception as e: logger.error(f"❌ เกิดข้อผิดพลาด: {e}") await asyncio.sleep(self.RECONNECT_DELAY) if self.reconnect_count >= self.MAX_RECONNECT_ATTEMPTS: logger.error(f"❌ เชื่อมต่อไม่ได้หลังจาก {self.MAX_RECONNECT_ATTEMPTS} ครั้ง") async def _connect_and_subscribe(self): """เชื่อมต่อและ Subscribe ไปยัง Stream""" logger.info(f"🔌 กำลังเชื่อมต่อไปยัง Binance WebSocket...") async with websockets.connect( self.base_url, ping_interval=20, ping_timeout=10, close_timeout=10 ) as websocket: self.ws = websocket # Subscribe ไปยัง Streams ที่ต้องการ await websocket.send(json.dumps(self.stream_params)) logger.info("📡 Subscribe ไปยัง Level2 streams แล้ว") # รับข้อมูลใน Loop async for message in websocket: if not self.running: break await self._process_message(message) # Batch write ทุก BATCH_SIZE messages if len(self.update_buffer) >= self.BATCH_SIZE: await self._flush_buffer() async def _process_message(self, message: str): """ประมวลผลข้อความจาก WebSocket""" self.messages_received += 1 try: data = json.loads(message) # ข้าม Response ของ Subscribe if "result" in data: return # ตรวจสอบว่าเป็นข้อมูล Depth stream_data = data.get("data", {}) if stream_data.get("e") != "depthUpdate": return # ประมวลผล Update await self._process_depth_update(stream_data) except json.JSONDecodeError as e: logger.error(f"❌ JSON Decode Error: {e}") except Exception as e: logger.error(f"❌ ประมวลผลข้อความผิดพลาด: {e}") async def _process_depth_update(self, data: Dict): """ประมวลผล Depth Update""" update_id = data["u"] event_time = data["E"] # อัพเดท Bids for price_str, qty_str in data.get("b", []): price = float(price_str) qty = float(qty_str) if qty == 0: self.bids.pop(price, None) else: # รักษา Order โดยใช้ OrderedDict self.bids[price] = qty # จำกัดจำนวน Order ไม่เกิน 1000 รายการ while len(self.bids) > 1000: self.bids.popitem(last=False) # อัพเดท Asks for price_str, qty_str in data.get("a", []): price = float(price_str) qty = float(qty_str) if qty == 0: self.asks.pop(price, None) else: self.asks[price] = qty while len(self.asks) > 1000: self.asks.popitem(last=False) self.last_update_id = update_id # คำนวณ Statistics best_bid = max(self.bids.keys()) if self.bids else 0 best_ask = min(self.asks.keys()) if self.asks else float('inf') spread = (best_ask - best_bid) if best_bid and best_ask != float('inf') else 0 total_bid_vol = sum(self.bids.values()) total_ask_vol = sum(self.asks.values()) # เพิ่มลงใน Buffer snapshot = { "timestamp": event_time, "update_id": update_id, "best_bid": best_bid, "best_ask": best_ask, "spread": spread, "total_bid_volume": total_bid_vol, "total_ask_volume": total_ask_vol, "bid_count": len(self.bids), "ask_count": len(self.asks) } self.update_buffer.append(snapshot) # แสดงผลทุก 10 Updates if self.messages_received % 10 == 0: elapsed = time.time() - self.start_time rate = self.messages_received / elapsed logger.info( f"📊 Update #{self.messages_received} | " f"BID: {best_bid:.2f} | ASK: {best_ask:.2f} | " f"Spread: {spread:.4f} | Rate: {rate:.1f} msg/s" ) async def _flush_buffer(self): """บันทึก Buffer ลงไฟล์""" if not self.update_buffer: return timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = self.data_dir / f"{self.symbol}_{timestamp}.jsonl" async with aiofiles.open(filename, mode='a') as f: for record in self.update_buffer: await f.write(json.dumps(record) + "\n") logger.info(f"💾 บันทึก {len(self.update_buffer)} records ไปยัง {filename}") self.update_buffer.clear() async def stop(self): """หยุด Collector""" logger.info("🛑 กำลังหยุด Collector...") self.running = False # Flush Buffer สุดท้าย await self._flush_buffer() elapsed = time.time() - self.start_time logger.info( f"📈 สรุป: รับ {self.messages_received} ข้อความใน {elapsed:.1f} วินาที " f"({self.messages_received/elapsed:.1f} msg/s)" ) async def main(): """Entry Point""" collector = AdvancedLevel2Collector( symbol="btcusdt", data_dir="./level2_data" ) try: await collector.start() except KeyboardInterrupt: await collector.stop() if __name__ == "__main__": asyncio.run(main())

การใช้งานร่วมกับ AI API สำหรับวิเคราะห์ข้อมูล

หลังจากเก็บข้อมูล Level2 ได้แล้ว อีกหนึ่งการใช้งานที่ทรงพลังคือการนำข้อมูลไปวิเคราะห์ด้วย AI ผมใช้ HolySheep AI ซึ่งมีความเร็วในการตอบสนองน้อยกว่า 50ms และราคาประหยัดกว่า 85% เมื่อเทียบกับ OpenAI โดยมีราคาเพียง $0.42 ต่อล้าน tokens สำหรับ DeepSeek V3.2

นี่คือตัวอย่างการใช้งาน AI API สำหรับวิเคราะห์ Order Book Pattern:

# ai_orderbook_analyzer.py

ใช้ AI วิเคราะห์ Order Book Pattern

import httpx import json import asyncio from typing import List, Dict, Optional class OrderBookAnalyzer: """ วิเคราะห์ Order Book ด้วย AI ใช้ HolySheep AI API สำหรับการวิเคราะห์ """ BASE_URL = "https://api.holysheep.ai/v1" def __init__(self, api_key: str): self.api_key = api_key self.headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } async def analyze_order_book( self, symbol: str, bids: Dict[float, float], asks: Dict[float, float], model: str = "gpt-4.1" ) -> str: """ วิเคราะห์ Order Book Pattern ด้วย AI Args: symbol: สัญลักษณ์คู่เทรด เช่น BTCUSDT bids: Dict ของราคา -> ปริมาณ (คำสั่งซื้อ) asks: Dict ของราคา -> ปริมาณ (คำสั่งขาย) model: โมเดลที่ใช้ (gpt-4.1, claude-sonnet-4.5, deepseek-v3.2) Returns: ผลการวิเคราะห์จาก AI """ # คำนวณข้อมูลสถิติ top_bids = sorted(bids.items(), reverse=True)[:10] top_asks = sorted(asks.items())[:10] best_bid = top_bids[0] if top_bids else (0, 0) best_ask = top_asks[0] if top_asks else (0, 0) total_bid_vol = sum(bids.values()) total_ask_vol = sum(asks.values()) imbalance = (total_bid_vol - total_ask_vol) / (total_bid_vol + total_ask_vol + 1e-8) # สร้าง Prompt สำหรับ AI prompt = f"""คุณคือผู้เชี่ยวชาญด้านการวิเคราะห์ Order Book สำหรับ {symbol} ข้อมูล Order Book ปัจจุบัน: - Best Bid: {best_bid[0]:.2f} (ปริมาณ: {best_bid[1]:.4f}) - Best Ask: {best_ask[0]:.2f} (ปริมาณ: {best_ask[1]:.4f}) - Spread: {(best_ask[0] - best_bid[0]):.2f} ({(best_ask[0] - best_bid[0])/best_bid[0]*100:.4f}%) 10 อันดับคำสั่งซื้อ (Bids): {chr(10).join([f" ราคา {p:.2f}: ปริมาณ {q:.4f}" for p, q in top_bids])} 10 อันดับคำสั่งขาย (Asks): {chr(10).join([f" ราคา {p:.2f}: ปริมาณ {q:.4f}" for p, q in top_asks])} สถิติรวม: - ปริมาณ Bid ทั้งหมด: {total_bid_vol:.4f} - ปริมาณ Ask ทั้งหมด: {total_ask_vol:.4f} - Order Imbalance: {imbalance:.4f} ({'Bullish' if imbalance > 0 else 'Bearish'}) กรุณาวิเคราะห์: 1. แนวโน้มของตลาด (Bullish/Bearish/Neutral) 2. ระดับแนวรับและแนวต้านท