Tháng 6/2024, tôi đang xây dựng một hệ thống arbitrage bot cho thị trường USDT-M Futures. Khi xử lý data feed từ 5 sàn khác nhau cùng lúc, độ trễ trung bình lên đến 450ms — quá chậm để capture spread. Sau 3 tuần tối ưu hóa connection pooling và chuyển sang WebSocket stream thuần, độ trễ giảm xuống còn 67ms trung bình. Bài viết này sẽ chia sẻ toàn bộ kiến thức tôi đã đúc kết được, từ connection setup đến strategy implementation.
Tại sao Bybit API là lựa chọn tối ưu cho quant trader
Bybit cung cấp 3 loại endpoint chính: REST API cho historical data và order execution, WebSocket public channel cho real-time market data ( không cần authentication ), và WebSocket private channel cho account operations. Với tần suất update 100ms cho orderbook và 10ms cho trade stream, đây là một trong những sàn có data quality tốt nhất thị trường.
Ưu điểm nổi bật của Bybit API:
- Độ trễ thấp: WebSocket server deploy tại Singapore và Tokyo, ping trung bình 12ms từ Việt Nam
- Data completeness: Full orderbook 50 levels, trade tape, funding rate, liquidations
- Rate limit hào phóng: 6000 requests/phút cho public endpoints
- Documentation chi tiết: Swagger spec đầy đủ, code examples cho Python, Node.js, Go
Cài đặt môi trường và dependencies
# Python 3.10+ được khuyến nghị
python -m venv quant_env
source quant_env/bin/activate # Linux/Mac
quant_env\Scripts\activate # Windows
Core libraries cho WebSocket và data processing
pip install websockets==12.0
pip install aiohttp==3.9.1
pip install pandas==2.1.4
pip install numpy==1.26.2
pip install TA-Lib==0.4.28 # Technical analysis
Redis cho orderbook caching (optional nhưng recommended)
pip install redis==5.0.1
Monitoring và logging
pip install prometheus-client==0.19.0
pip install python-json-logger==2.0.7
Verify installation
python -c "import websockets; print(f'websockets {websockets.__version__}')"
Kết nối WebSocket Public Channel - Real-time Market Data
Đây là phần quan trọng nhất — data feed sẽ là foundation cho mọi strategy. Tôi sử dụng websockets library với async pattern để handle multiple streams hiệu quả.
# bybit_realtime.py
import asyncio
import json
import time
from websockets.asyncio.client import connect
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Trade:
symbol: str
side: str
price: float
size: float
timestamp: int
@dataclass
class OrderbookLevel:
price: float
size: float
@dataclass
class OrderbookSnapshot:
symbol: str
bids: List[OrderbookLevel] = field(default_factory=list)
asks: List[OrderbookLevel] = field(default_factory=list)
update_time: int = 0
class BybitWebSocketClient:
"""Client cho Bybit WebSocket public channels"""
BYBIT_WS_URL = "wss://stream.bybit.com/v5/public/linear"
def __init__(self):
self.trades: Dict[str, List[Trade]] = defaultdict(list)
self.orderbooks: Dict[str, OrderbookSnapshot] = {}
self.connection_latency: List[float] = []
self._running = False
async def subscribe(self, ws, channels: List[Dict]):
"""Subscribe vào các channels mong muốn"""
subscribe_msg = {
"op": "subscribe",
"args": channels
}
await ws.send(json.dumps(subscribe_msg))
logger.info(f"Đã subscribe: {channels}")
async def connect_and_subscribe(self, symbols: List[str]):
"""Kết nối WebSocket và subscribe market data"""
self._running = True
# Subscribe channels
channels = []
for symbol in symbols:
channels.append(f"publicTrade.{symbol}") # Trade stream
channels.append(f"orderbook.50.{symbol}") # Orderbook 50 levels
# Subscribe kline/candlestick (1m interval)
for symbol in symbols:
channels.append(f"kline.1.{symbol}")
try:
async with connect(self.BYBIT_WS_URL, ping_interval=20) as ws:
await self.subscribe(ws, channels)
while self._running:
try:
start = time.perf_counter()
message = await asyncio.wait_for(ws.recv(), timeout=30)
latency = (time.perf_counter() - start) * 1000
self.connection_latency.append(latency)
data = json.loads(message)
await self._handle_message(data)
except asyncio.TimeoutError:
# Ping để giữ connection alive
await ws.ping()
except Exception as e:
logger.error(f"WebSocket error: {e}")
self._running = False
async def _handle_message(self, data: Dict):
"""Parse và xử lý message từ Bybit"""
topic = data.get("topic", "")
if "publicTrade" in topic:
for trade_data in data.get("data", []):
trade = Trade