ในโลกของ High-Frequency Trading หรือการเทรดความถี่สูงนั้น ข้อมูล Level 2 (Order Book) คือหัวใจสำคัญที่ทำให้เทรดเดอร์สามารถวิเคราะห์แนวโน้มราคาและความลึกของตลาดได้อย่างแม่นยำ ผมเคยเจอปัญหา ConnectionError: timeout ที่ทำให้ระบบหยุดทำงานกลางคัน และ 401 Unauthorized ที่ไม่สามารถเชื่อมต่อ WebSocket ได้ จนกระทั่งศึกษาจนเข้าใจกลไกที่แท้จริง
บทความนี้จะพาคุณสร้าง Data Pipeline สำหรับเก็บข้อมูล Binance Level2 ผ่าน WebSocket อย่างมืออาชีพ พร้อมวิธีแก้ปัญหาที่พบบ่อยและ Best Practices จากประสบการณ์ตรง
Binance Level2 Data คืออะไร และทำไมต้องเก็บแบบ Real-time
Level 2 ของ Binance คือข้อมูล Order Book ที่แสดงคำสั่งซื้อ-ขายทั้งหมดในแต่ละระดับราคา โดยประกอบด้วย:
- Bids — คำสั่งซื้อที่รอการจับคู่ จากราคาสูงไปต่ำ
- Asks — คำสั่งขายที่รอการจับคู่ จากราคาต่ำไปสูง
- Depth — ความลึกของตลาดในแต่ละช่วงราคา
- Update ID — ลำดับการอัพเดทสำหรับตรวจสอบความต่อเนื่อง
ในการพัฒนา Trading Bot หรือระบบวิเคราะห์ตลาด ข้อมูลเหล่านี้ต้องถูกเก็บแบบ Real-time เพื่อให้ได้ข้อมูลที่สมบูรณ์ที่สุด ไม่มีการ Miss ข้อมูล
การติดตั้งสถาปัตยกรรม WebSocket Data Pipeline
สำหรับการเก็บข้อมูล Level 2 แบบ Real-time เราจะใช้สถาปัตยกรรมดังนี้:
- WebSocket Client — เชื่อมต่อกับ Binance Stream
- Message Queue — Buffer ข้อมูลระหว่างรับและประมวลผล
- Data Processor — ประมวลผลและ Normalize ข้อมูล
- Storage — จัดเก็บลง Database หรือ File System
โค้ด Python สำหรับเชื่อมต่อ Binance Level2 WebSocket
นี่คือโค้ดพื้นฐานสำหรับการเชื่อมต่อ WebSocket กับ Binance โดยใช้ Library websockets ที่เป็นมาตรฐาน:
# ติดตั้ง dependencies ก่อนใช้งาน
pip install websockets asyncio aiofiles pandas
import asyncio
import json
import websockets
from datetime import datetime
from collections import defaultdict
class BinanceLevel2Collector:
"""
Binance Level2 WebSocket Collector
เก็บข้อมูล Order Book แบบ Real-time
"""
def __init__(self, symbol: str = "btcusdt", depth: int = 20):
self.symbol = symbol.lower()
self.depth = depth
self.bids = {} # price -> quantity
self.asks = {} # price -> quantity
self.last_update_id = 0
self.stream_url = "wss://stream.binance.com:9443/ws"
def get_stream_name(self) -> str:
"""สร้างชื่อ Stream สำหรับ Level2 Depth@100ms"""
return f"{self.symbol}@depth@100ms"
async def connect(self):
"""เชื่อมต่อ WebSocket กับ Binance"""
stream_name = self.get_stream_name()
full_url = f"{self.stream_url}/{stream_name}"
print(f"🔌 กำลังเชื่อมต่อ: {full_url}")
try:
async with websockets.connect(full_url, ping_interval=20) as websocket:
print(f"✅ เชื่อมต่อสำเร็จ! กำลังรับข้อมูล Level2 สำหรับ {self.symbol.upper()}")
# รับข้อมูลแบบ Infinite Loop
while True:
try:
message = await asyncio.wait_for(
websocket.recv(),
timeout=30.0
)
await self.process_message(message)
except asyncio.TimeoutError:
# ส่ง Ping เพื่อรักษาการเชื่อมต่อ
await websocket.ping()
print("📡 Ping ส่งไปแล้ว...")
except websockets.exceptions.ConnectionClosed as e:
print(f"❌ การเชื่อมต่อถูกปิด: {e}")
print("🔄 กำลังพยายามเชื่อมต่อใหม่ในอีก 5 วินาที...")
await asyncio.sleep(5)
await self.connect()
async def process_message(self, message: str):
"""ประมวลผลข้อมูลที่ได้รับจาก WebSocket"""
data = json.loads(message)
# ตรวจสอบว่าเป็นข้อมูล Depth หรือไม่
if "e" in data and data["e"] == "depthUpdate":
self.last_update_id = data["u"]
# อัพเดท Bids (คำสั่งซื้อ)
for price, qty in data.get("b", []):
price_float = float(price)
qty_float = float(qty)
if qty_float == 0:
# ลบ Order ที่ถูก Cancel
self.bids.pop(price_float, None)
else:
self.bids[price_float] = qty_float
# อัพเดท Asks (คำสั่งขาย)
for price, qty in data.get("a", []):
price_float = float(price)
qty_float = float(qty)
if qty_float == 0:
self.asks.pop(price_float, None)
else:
self.asks[price_float] = qty_float
# แสดงผล Order Book ปัจจุบัน
best_bid = max(self.bids.keys()) if self.bids else 0
best_ask = min(self.asks.keys()) if self.asks else 0
spread = best_ask - best_bid if best_bid and best_ask else 0
print(f"📊 [{data['E']}] "
f"BID: {best_bid:.2f} | ASK: {best_ask:.2f} | "
f"Spread: {spread:.2f} | "
f"Update: {self.last_update_id}")
async def main():
"""Entry Point สำหรับรัน Collector"""
collector = BinanceLevel2Collector(symbol="btcusdt", depth=20)
await collector.connect()
if __name__ == "__main__":
print("=" * 60)
print("🚀 Binance Level2 WebSocket Data Collector")
print("=" * 60)
asyncio.run(main())
การสร้าง Order Book Aggregator พร้อมระบบ Reconnection
ในการใช้งานจริง เราต้องมีระบบ Reconnection ที่แข็งแกร่ง เพราะ WebSocket อาจหลุดการเชื่อมต่อได้ ผมเคยเสียข้อมูลไปหลายชั่วโมงเพราะไม่มีระบบ Reconnection ที่ดี นี่คือโค้ดที่พัฒนาต่อมาจากประสบการณ์จริง:
# advanced_level2_collector.py
รุ่นพัฒนาต่อยอด พร้อมระบบ Reconnection และ Data Persistence
import asyncio
import json
import time
import aiofiles
import logging
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Dict, Optional, List
from collections import OrderedDict
import websockets
from websockets.exceptions import ConnectionClosed, InvalidStatusCode
ตั้งค่า Logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s | %(levelname)-8s | %(message)s',
handlers=[
logging.FileHandler('level2_collector.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@dataclass
class OrderBookEntry:
"""โครงสร้างข้อมูล Order Book"""
price: float
quantity: float
timestamp: int
is_bid: bool
@dataclass
class Level2Snapshot:
"""Snapshot ของ Order Book ในแต่ละช่วงเวลา"""
symbol: str
timestamp: int
update_id: int
bids: List[OrderBookEntry]
asks: List[OrderBookEntry]
best_bid: float
best_ask: float
spread: float
total_bid_volume: float
total_ask_volume: float
class AdvancedLevel2Collector:
"""
Advanced Binance Level2 Collector
- รองรับการ Reconnection อัตโนมัติ
- บันทึกข้อมูลแบบ Batch
- คำนวณ Volume Weighted Average Price (VWAP)
"""
# ค่าคงที่
MAX_RECONNECT_ATTEMPTS = 10
RECONNECT_DELAY = 5 # วินาที
BATCH_SIZE = 100 # บันทึกทุก 100 updates
SNAPSHOT_INTERVAL = 60 # Snapshot ทุก 60 วินาที
def __init__(self, symbol: str, data_dir: str = "./data"):
self.symbol = symbol.lower()
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
# Order Book State
self.bids: OrderedDict[float, float] = OrderedDict()
self.asks: OrderedDict[float, float] = OrderedDict()
self.last_update_id: int = 0
self.last_snapshot_time: int = 0
# Statistics
self.messages_received: int = 0
self.reconnect_count: int = 0
self.start_time: float = time.time()
# Buffer สำหรับ Batch Writing
self.update_buffer: List[Dict] = []
# WebSocket State
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.running: bool = True
@property
def base_url(self) -> str:
"""URL สำหรับ Combined Stream"""
return "wss://stream.binance.com:9443/stream"
@property
def stream_params(self) -> Dict:
"""พารามิเตอร์สำหรับ Subscribe"""
return {
"method": "SUBSCRIBE",
"params": [
f"{self.symbol}@depth@100ms",
f"{self.symbol}@depth@1s@100ms"
],
"id": 1
}
async def start(self):
"""เริ่มต้น Collector"""
logger.info(f"🚀 เริ่มต้น AdvancedLevel2Collector สำหรับ {self.symbol.upper()}")
while self.running and self.reconnect_count < self.MAX_RECONNECT_ATTEMPTS:
try:
await self._connect_and_subscribe()
except ConnectionClosed as e:
self.reconnect_count += 1
logger.warning(
f"⚠️ Connection closed: {e.code} - "
f"พยายามเชื่อมต่อใหม่ครั้งที่ {self.reconnect_count}"
)
await asyncio.sleep(self.RECONNECT_DELAY)
except Exception as e:
logger.error(f"❌ เกิดข้อผิดพลาด: {e}")
await asyncio.sleep(self.RECONNECT_DELAY)
if self.reconnect_count >= self.MAX_RECONNECT_ATTEMPTS:
logger.error(f"❌ เชื่อมต่อไม่ได้หลังจาก {self.MAX_RECONNECT_ATTEMPTS} ครั้ง")
async def _connect_and_subscribe(self):
"""เชื่อมต่อและ Subscribe ไปยัง Stream"""
logger.info(f"🔌 กำลังเชื่อมต่อไปยัง Binance WebSocket...")
async with websockets.connect(
self.base_url,
ping_interval=20,
ping_timeout=10,
close_timeout=10
) as websocket:
self.ws = websocket
# Subscribe ไปยัง Streams ที่ต้องการ
await websocket.send(json.dumps(self.stream_params))
logger.info("📡 Subscribe ไปยัง Level2 streams แล้ว")
# รับข้อมูลใน Loop
async for message in websocket:
if not self.running:
break
await self._process_message(message)
# Batch write ทุก BATCH_SIZE messages
if len(self.update_buffer) >= self.BATCH_SIZE:
await self._flush_buffer()
async def _process_message(self, message: str):
"""ประมวลผลข้อความจาก WebSocket"""
self.messages_received += 1
try:
data = json.loads(message)
# ข้าม Response ของ Subscribe
if "result" in data:
return
# ตรวจสอบว่าเป็นข้อมูล Depth
stream_data = data.get("data", {})
if stream_data.get("e") != "depthUpdate":
return
# ประมวลผล Update
await self._process_depth_update(stream_data)
except json.JSONDecodeError as e:
logger.error(f"❌ JSON Decode Error: {e}")
except Exception as e:
logger.error(f"❌ ประมวลผลข้อความผิดพลาด: {e}")
async def _process_depth_update(self, data: Dict):
"""ประมวลผล Depth Update"""
update_id = data["u"]
event_time = data["E"]
# อัพเดท Bids
for price_str, qty_str in data.get("b", []):
price = float(price_str)
qty = float(qty_str)
if qty == 0:
self.bids.pop(price, None)
else:
# รักษา Order โดยใช้ OrderedDict
self.bids[price] = qty
# จำกัดจำนวน Order ไม่เกิน 1000 รายการ
while len(self.bids) > 1000:
self.bids.popitem(last=False)
# อัพเดท Asks
for price_str, qty_str in data.get("a", []):
price = float(price_str)
qty = float(qty_str)
if qty == 0:
self.asks.pop(price, None)
else:
self.asks[price] = qty
while len(self.asks) > 1000:
self.asks.popitem(last=False)
self.last_update_id = update_id
# คำนวณ Statistics
best_bid = max(self.bids.keys()) if self.bids else 0
best_ask = min(self.asks.keys()) if self.asks else float('inf')
spread = (best_ask - best_bid) if best_bid and best_ask != float('inf') else 0
total_bid_vol = sum(self.bids.values())
total_ask_vol = sum(self.asks.values())
# เพิ่มลงใน Buffer
snapshot = {
"timestamp": event_time,
"update_id": update_id,
"best_bid": best_bid,
"best_ask": best_ask,
"spread": spread,
"total_bid_volume": total_bid_vol,
"total_ask_volume": total_ask_vol,
"bid_count": len(self.bids),
"ask_count": len(self.asks)
}
self.update_buffer.append(snapshot)
# แสดงผลทุก 10 Updates
if self.messages_received % 10 == 0:
elapsed = time.time() - self.start_time
rate = self.messages_received / elapsed
logger.info(
f"📊 Update #{self.messages_received} | "
f"BID: {best_bid:.2f} | ASK: {best_ask:.2f} | "
f"Spread: {spread:.4f} | Rate: {rate:.1f} msg/s"
)
async def _flush_buffer(self):
"""บันทึก Buffer ลงไฟล์"""
if not self.update_buffer:
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = self.data_dir / f"{self.symbol}_{timestamp}.jsonl"
async with aiofiles.open(filename, mode='a') as f:
for record in self.update_buffer:
await f.write(json.dumps(record) + "\n")
logger.info(f"💾 บันทึก {len(self.update_buffer)} records ไปยัง {filename}")
self.update_buffer.clear()
async def stop(self):
"""หยุด Collector"""
logger.info("🛑 กำลังหยุด Collector...")
self.running = False
# Flush Buffer สุดท้าย
await self._flush_buffer()
elapsed = time.time() - self.start_time
logger.info(
f"📈 สรุป: รับ {self.messages_received} ข้อความใน {elapsed:.1f} วินาที "
f"({self.messages_received/elapsed:.1f} msg/s)"
)
async def main():
"""Entry Point"""
collector = AdvancedLevel2Collector(
symbol="btcusdt",
data_dir="./level2_data"
)
try:
await collector.start()
except KeyboardInterrupt:
await collector.stop()
if __name__ == "__main__":
asyncio.run(main())
การใช้งานร่วมกับ AI API สำหรับวิเคราะห์ข้อมูล
หลังจากเก็บข้อมูล Level2 ได้แล้ว อีกหนึ่งการใช้งานที่ทรงพลังคือการนำข้อมูลไปวิเคราะห์ด้วย AI ผมใช้ HolySheep AI ซึ่งมีความเร็วในการตอบสนองน้อยกว่า 50ms และราคาประหยัดกว่า 85% เมื่อเทียบกับ OpenAI โดยมีราคาเพียง $0.42 ต่อล้าน tokens สำหรับ DeepSeek V3.2
นี่คือตัวอย่างการใช้งาน AI API สำหรับวิเคราะห์ Order Book Pattern:
# ai_orderbook_analyzer.py
ใช้ AI วิเคราะห์ Order Book Pattern
import httpx
import json
import asyncio
from typing import List, Dict, Optional
class OrderBookAnalyzer:
"""
วิเคราะห์ Order Book ด้วย AI
ใช้ HolySheep AI API สำหรับการวิเคราะห์
"""
BASE_URL = "https://api.holysheep.ai/v1"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async def analyze_order_book(
self,
symbol: str,
bids: Dict[float, float],
asks: Dict[float, float],
model: str = "gpt-4.1"
) -> str:
"""
วิเคราะห์ Order Book Pattern ด้วย AI
Args:
symbol: สัญลักษณ์คู่เทรด เช่น BTCUSDT
bids: Dict ของราคา -> ปริมาณ (คำสั่งซื้อ)
asks: Dict ของราคา -> ปริมาณ (คำสั่งขาย)
model: โมเดลที่ใช้ (gpt-4.1, claude-sonnet-4.5, deepseek-v3.2)
Returns:
ผลการวิเคราะห์จาก AI
"""
# คำนวณข้อมูลสถิติ
top_bids = sorted(bids.items(), reverse=True)[:10]
top_asks = sorted(asks.items())[:10]
best_bid = top_bids[0] if top_bids else (0, 0)
best_ask = top_asks[0] if top_asks else (0, 0)
total_bid_vol = sum(bids.values())
total_ask_vol = sum(asks.values())
imbalance = (total_bid_vol - total_ask_vol) / (total_bid_vol + total_ask_vol + 1e-8)
# สร้าง Prompt สำหรับ AI
prompt = f"""คุณคือผู้เชี่ยวชาญด้านการวิเคราะห์ Order Book สำหรับ {symbol}
ข้อมูล Order Book ปัจจุบัน:
- Best Bid: {best_bid[0]:.2f} (ปริมาณ: {best_bid[1]:.4f})
- Best Ask: {best_ask[0]:.2f} (ปริมาณ: {best_ask[1]:.4f})
- Spread: {(best_ask[0] - best_bid[0]):.2f} ({(best_ask[0] - best_bid[0])/best_bid[0]*100:.4f}%)
10 อันดับคำสั่งซื้อ (Bids):
{chr(10).join([f" ราคา {p:.2f}: ปริมาณ {q:.4f}" for p, q in top_bids])}
10 อันดับคำสั่งขาย (Asks):
{chr(10).join([f" ราคา {p:.2f}: ปริมาณ {q:.4f}" for p, q in top_asks])}
สถิติรวม:
- ปริมาณ Bid ทั้งหมด: {total_bid_vol:.4f}
- ปริมาณ Ask ทั้งหมด: {total_ask_vol:.4f}
- Order Imbalance: {imbalance:.4f} ({'Bullish' if imbalance > 0 else 'Bearish'})
กรุณาวิเคราะห์:
1. แนวโน้มของตลาด (Bullish/Bearish/Neutral)
2. ระดับแนวรับและแนวต้านท