Tháng 6/2024, tôi đang xây dựng một hệ thống arbitrage bot cho thị trường USDT-M Futures. Khi xử lý data feed từ 5 sàn khác nhau cùng lúc, độ trễ trung bình lên đến 450ms — quá chậm để capture spread. Sau 3 tuần tối ưu hóa connection pooling và chuyển sang WebSocket stream thuần, độ trễ giảm xuống còn 67ms trung bình. Bài viết này sẽ chia sẻ toàn bộ kiến thức tôi đã đúc kết được, từ connection setup đến strategy implementation.

Tại sao Bybit API là lựa chọn tối ưu cho quant trader

Bybit cung cấp 3 loại endpoint chính: REST API cho historical data và order execution, WebSocket public channel cho real-time market data ( không cần authentication ), và WebSocket private channel cho account operations. Với tần suất update 100ms cho orderbook và 10ms cho trade stream, đây là một trong những sàn có data quality tốt nhất thị trường.

Ưu điểm nổi bật của Bybit API:

Cài đặt môi trường và dependencies

# Python 3.10+ được khuyến nghị
python -m venv quant_env
source quant_env/bin/activate  # Linux/Mac

quant_env\Scripts\activate # Windows

Core libraries cho WebSocket và data processing

pip install websockets==12.0 pip install aiohttp==3.9.1 pip install pandas==2.1.4 pip install numpy==1.26.2 pip install TA-Lib==0.4.28 # Technical analysis

Redis cho orderbook caching (optional nhưng recommended)

pip install redis==5.0.1

Monitoring và logging

pip install prometheus-client==0.19.0 pip install python-json-logger==2.0.7

Verify installation

python -c "import websockets; print(f'websockets {websockets.__version__}')"

Kết nối WebSocket Public Channel - Real-time Market Data

Đây là phần quan trọng nhất — data feed sẽ là foundation cho mọi strategy. Tôi sử dụng websockets library với async pattern để handle multiple streams hiệu quả.

# bybit_realtime.py
import asyncio
import json
import time
from websockets.asyncio.client import connect
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class Trade:
    symbol: str
    side: str
    price: float
    size: float
    timestamp: int
    
@dataclass 
class OrderbookLevel:
    price: float
    size: float

@dataclass
class OrderbookSnapshot:
    symbol: str
    bids: List[OrderbookLevel] = field(default_factory=list)
    asks: List[OrderbookLevel] = field(default_factory=list)
    update_time: int = 0
    
class BybitWebSocketClient:
    """Client cho Bybit WebSocket public channels"""
    
    BYBIT_WS_URL = "wss://stream.bybit.com/v5/public/linear"
    
    def __init__(self):
        self.trades: Dict[str, List[Trade]] = defaultdict(list)
        self.orderbooks: Dict[str, OrderbookSnapshot] = {}
        self.connection_latency: List[float] = []
        self._running = False
        
    async def subscribe(self, ws, channels: List[Dict]):
        """Subscribe vào các channels mong muốn"""
        subscribe_msg = {
            "op": "subscribe",
            "args": channels
        }
        await ws.send(json.dumps(subscribe_msg))
        logger.info(f"Đã subscribe: {channels}")
        
    async def connect_and_subscribe(self, symbols: List[str]):
        """Kết nối WebSocket và subscribe market data"""
        self._running = True
        
        # Subscribe channels
        channels = []
        for symbol in symbols:
            channels.append(f"publicTrade.{symbol}")      # Trade stream
            channels.append(f"orderbook.50.{symbol}")     # Orderbook 50 levels
            
        # Subscribe kline/candlestick (1m interval)
        for symbol in symbols:
            channels.append(f"kline.1.{symbol}")
            
        try:
            async with connect(self.BYBIT_WS_URL, ping_interval=20) as ws:
                await self.subscribe(ws, channels)
                
                while self._running:
                    try:
                        start = time.perf_counter()
                        message = await asyncio.wait_for(ws.recv(), timeout=30)
                        latency = (time.perf_counter() - start) * 1000
                        self.connection_latency.append(latency)
                        
                        data = json.loads(message)
                        await self._handle_message(data)
                        
                    except asyncio.TimeoutError:
                        # Ping để giữ connection alive
                        await ws.ping()
                        
        except Exception as e:
            logger.error(f"WebSocket error: {e}")
            self._running = False
            
    async def _handle_message(self, data: Dict):
        """Parse và xử lý message từ Bybit"""
        topic = data.get("topic", "")
        
        if "publicTrade" in topic:
            for trade_data in data.get("data", []):
                trade = Trade