Trong thị trường crypto, tốc độ là tất cả. Nếu bạn đang xây dựng trading bot hoặc ứng dụng phân tích thị trường, việc lấy order book data từ OKX API là kỹ năng không thể thiếu. Bài viết này sẽ hướng dẫn chi tiết cách kết nối, xử lý và tối ưu real-time depth data, đồng thời so sánh các phương án API hiện có.
Bảng so sánh: HolySheep vs OKX API chính thức vs Dịch vụ Relay
| Tiêu chí | HolySheep AI | OKX API chính thức | Dịch vụ Relay khác |
|---|---|---|---|
| Chi phí | ¥1 = $1 (tiết kiệm 85%+) | Miễn phí (rate limit) | $5-50/tháng |
| Thanh toán | WeChat/Alipay, Visa | Chỉ card quốc tế | Card quốc tế |
| Độ trễ | <50ms | 20-100ms | 50-200ms |
| Tín dụng miễn phí | Có khi đăng ký | Không | Ít khi |
| Hỗ trợ WebSocket | Có | Có | Tùy nhà cung cấp |
| Độ ổn định | 99.9% uptime | Tốt | Biến đổi |
OKX Order Book API là gì?
OKX cung cấp REST API và WebSocket API để lấy order book data. Order book là danh sách các lệnh mua/bán chưa khớp, thể hiện cung-cầu của thị trường. Depth data cho biết giá và khối lượng tại mỗi mức giá.
Cách kết nối OKX REST API
Với OKX REST API, bạn cần lấy order book depth data qua endpoint:
import requests
import time
class OKXOrderBook:
def __init__(self, api_key, api_secret, passphrase):
self.api_key = api_key
self.api_secret = api_secret
self.passphrase = passphrase
self.base_url = "https://www.okx.com"
def get_order_book(self, inst_id="BTC-USDT", sz="400"):
"""Lấy order book depth data từ OKX REST API"""
endpoint = "/api/v5/market/books"
params = {
"instId": inst_id,
"sz": sz # Số lượng level (tối đa 400)
}
url = f"{self.base_url}{endpoint}"
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
if data.get("code") == "0":
return data["data"][0]
else:
print(f"Lỗi API: {data.get('msg')}")
return None
else:
print(f"Lỗi HTTP: {response.status_code}")
return None
def parse_depth_data(self, order_book):
"""Parse order book thành bid/ask lists"""
if not order_book:
return None, None
bids = [] # Lệnh mua
asks = [] # Lệnh bán
for item in order_book.get("bids", []):
bids.append({
"price": float(item[0]),
"size": float(item[1]),
"orders": int(item[2]) if len(item) > 2 else 1
})
for item in order_book.get("asks", []):
asks.append({
"price": float(item[0]),
"size": float(item[1]),
"orders": int(item[2]) if len(item) > 2 else 1
})
return bids, asks
Sử dụng
okx = OKXOrderBook("your_api_key", "your_secret", "your_passphrase")
start_time = time.time()
order_book = okx.get_order_book("BTC-USDT", "100")
latency = (time.time() - start_time) * 1000
if order_book:
bids, asks = okx.parse_depth_data(order_book)
print(f"Độ trễ: {latency:.2f}ms")
print(f"Số lượng bid levels: {len(bids)}")
print(f"Số lượng ask levels: {len(asks)}")
print(f"Best bid: {bids[0]['price']}, Best ask: {asks[0]['price']}")
spread = asks[0]['price'] - bids[0]['price']
print(f"Spread: {spread:.2f} USDT")
Kết nối OKX WebSocket cho Real-time Data
Để nhận dữ liệu real-time, WebSocket là lựa chọn tốt hơn REST vì độ trễ thấp hơn đáng kể:
import websocket
import json
import time
import threading
from collections import deque
class OKXWebSocket:
def __init__(self):
self.ws = None
self.order_book = {"bids": {}, "asks": {}}
self.update_history = deque(maxlen=1000)
self.is_running = False
self.last_update_time = 0
self.latencies = []
def on_message(self, ws, message):
"""Xử lý message từ WebSocket"""
recv_time = time.time()
data = json.loads(message)
if data.get("arg", {}).get("channel") == "books":
for item in data.get("data", []):
self.process_order_book_update(item, recv_time)
def process_order_book_update(self, update, recv_time):
"""Xử lý order book update với độ trễ chính xác"""
# Timestamp từ OKX (milliseconds)
if "ts" in update:
okx_timestamp = int(update["ts"])
self.latencies.append((recv_time * 1000) - okx_timestamp)
# Full snapshot (ngày mới bắt đầu)
if update.get("action") == "snapshot" or "bids" in update:
self.order_book["bids"].clear()
self.order_book["asks"].clear()
for bid in update.get("bids", []):
price = float(bid[0])
size = float(bid[1])
self.order_book["bids"][price] = size
for ask in update.get("asks", []):
price = float(ask[0])
size = float(ask[1])
self.order_book["asks"][price] = size
# Incremental update
elif update.get("action") == "update":
for bid in update.get("bids", []):
price = float(bid[0])
size = float(bid[1])
if size == 0:
self.order_book["bids"].pop(price, None)
else:
self.order_book["bids"][price] = size
for ask in update.get("asks", []):
price = float(ask[0])
size = float(ask[1])
if size == 0:
self.order_book["asks"].pop(price, None)
else:
self.order_book["asks"][price] = size
self.last_update_time = time.time()
# Tính spread trung bình
if self.order_book["bids"] and self.order_book["asks"]:
best_bid = max(self.order_book["bids"].keys())
best_ask = min(self.order_book["asks"].keys())
spread = best_ask - best_bid
def on_error(self, ws, error):
print(f"WebSocket Error: {error}")
def on_close(self, ws, close_status_code, close_msg):
print(f"WebSocket đóng: {close_status_code} - {close_msg}")
self.is_running = False
def on_open(self, ws):
"""Subscribe vào order book channel"""
subscribe_msg = {
"op": "subscribe",
"args": [{
"channel": "books",
"instId": "BTC-USDT"
}]
}
ws.send(json.dumps(subscribe_msg))
print("Đã subscribe BTC-USDT order book")
def start(self):
"""Khởi động WebSocket connection"""
self.ws = websocket.WebSocketApp(
"wss://ws.okx.com:8443/ws/v5/public",
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
self.is_running = True
# Chạy trong thread riêng
ws_thread = threading.Thread(target=self.ws.run_forever)
ws_thread.daemon = True
ws_thread.start()
return self
def get_best_bid_ask(self):
"""Lấy best bid/ask hiện tại"""
if not self.order_book["bids"] or not self.order_book["asks"]:
return None, None
best_bid = max(self.order_book["bids"].keys())
best_ask = min(self.order_book["asks"].keys())
return best_bid, best_ask
def get_depth_summary(self, levels=10):
"""Lấy tóm tắt depth data"""
bids = sorted(self.order_book["bids"].items(), reverse=True)[:levels]
asks = sorted(self.order_book["asks"].items())[:levels]
total_bid_size = sum(size for _, size in bids)
total_ask_size = sum(size for _, size in asks)
return {
"bids": bids,
"asks": asks,
"total_bid_size": total_bid_size,
"total_ask_size": total_ask_size,
"bid_ask_ratio": total_bid_size / total_ask_size if total_ask_size > 0 else 0
}
def get_average_latency(self):
"""Tính độ trễ trung bình (ms)"""
if not self.latencies:
return None
return sum(self.latencies) / len(self.latencies)
def stop(self):
"""Dừng WebSocket"""
if self.ws:
self.ws.close()
self.is_running = False
Sử dụng
okx_ws = OKXWebSocket()
okx_ws.start()
Đợi một chút để nhận data
time.sleep(2)
bid, ask = okx_ws.get_best_bid_ask()
print(f"Best Bid: {bid}, Best Ask: {ask}")
summary = okx_ws.get_depth_summary(5)
print(f"Bid/Ask Ratio: {summary['bid_ask_ratio']:.4f}")
print(f"Độ trễ trung bình: {okx_ws.get_average_latency():.2f}ms")
okx_ws.stop()
Xử lý Order Book Data cho Trading Strategy
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple
class OrderBookAnalyzer:
"""Phân tích order book để đưa ra trading signals"""
def __init__(self, order_book_data: Dict):
self.bids = order_book_data.get("bids", {})
self.asks = order_book_data.get("asks", {})
def calculate_spread(self) -> float:
"""Tính spread giữa best bid và best ask"""
if not self.bids or not self.asks:
return 0
best_bid = max(self.bids.keys())
best_ask = min(self.asks.keys())
return best_ask - best_bid
def calculate_spread_percentage(self) -> float:
"""Tính spread dưới dạng phần trăm"""
best_bid = max(self.bids.keys())
spread = self.calculate_spread()
return (spread / best_bid) * 100
def get_mid_price(self) -> float:
"""Tính giá trung vị"""
if not self.bids or not self.asks:
return 0
best_bid = max(self.bids.keys())
best_ask = min(self.asks.keys())
return (best_bid + best_ask) / 2
def calculate_vwap(self, levels: int = 20) -> float:
"""Volume Weighted Average Price"""
bid_prices = sorted(self.bids.keys(), reverse=True)[:levels]
ask_prices = sorted(self.asks.keys())[:levels]
bid_volumes = [self.bids[p] for p in bid_prices]
ask_volumes = [self.asks[p] for p in ask_prices]
total_volume = sum(bid_volumes) + sum(ask_volumes)
if total_volume == 0:
return 0
vwap = (
sum(p * v for p, v in zip(bid_prices, bid_volumes)) +
sum(p * v for p, v in zip(ask_prices, ask_volumes))
) / total_volume
return vwap
def calculate_imbalance(self, levels: int = 10) -> float:
"""Order book imbalance: >0 = buy pressure, <0 = sell pressure"""
bid_prices = sorted(self.bids.keys(), reverse=True)[:levels]
ask_prices = sorted(self.asks.keys())[:levels]
bid_volumes = sum(self.bids.get(p, 0) for p in bid_prices)
ask_volumes = sum(self.asks.get(p, 0) for p in ask_prices)
total = bid_volumes + ask_volumes
if total == 0:
return 0
return (bid_volumes - ask_volumes) / total
def get_depth_levels(self, price_range_pct: float = 1.0) -> Tuple[List, List]:
"""Lấy các mức giá trong khoảng % từ mid price"""
mid_price = self.get_mid_price()
if mid_price == 0:
return [], []
lower_bound = mid_price * (1 - price_range_pct / 100)
upper_bound = mid_price * (1 + price_range_pct / 100)
bid_levels = [(p, self.bids[p]) for p in self.bids
if lower_bound <= p <= mid_price]
ask_levels = [(p, self.asks[p]) for p in self.asks
if mid_price <= p <= upper_bound]
return sorted(bid_levels, reverse=True), sorted(ask_levels)
def detect_wall(self, threshold: float = 10.0) -> Dict:
"""Phát hiện các 'wall' - khối lượng lớn bất thường"""
avg_bid_size = np.mean(list(self.bids.values())) if self.bids else 0
avg_ask_size = np.mean(list(self.asks.values())) if self.asks else 0
bid_walls = {p: v for p, v in self.bids.items()
if v > avg_bid_size * threshold}
ask_walls = {p: v for p, v in self.asks.items()
if v > avg_ask_size * threshold}
return {
"bid_walls": bid_walls,
"ask_walls": ask_walls,
"bid_avg": avg_bid_size,
"ask_avg": avg_ask_size
}
def generate_signals(self) -> Dict:
"""Tạo các trading signals từ order book"""
spread_pct = self.calculate_spread_percentage()
imbalance = self.calculate_imbalance()
vwap = self.calculate_vwap()
mid_price = self.get_mid_price()
signals = {
"spread_pct": spread_pct,
"imbalance": imbalance,
"vwap": vwap,
"mid_price": mid_price,
"signals": []
}
# Signal: Strong buy pressure
if imbalance > 0.3:
signals["signals"].append({
"type": "BUY_PRESSURE",
"strength": imbalance,
"reason": "Order book imbalance nghiêng về phía bid"
})
# Signal: Strong sell pressure
if imbalance < -0.3:
signals["signals"].append({
"type": "SELL_PRESSURE",
"strength": abs(imbalance),
"reason": "Order book imbalance nghiêng về phía ask"
})
# Signal: VWAP deviation
if vwap > 0 and abs(vwap - mid_price) / mid_price > 0.001:
if vwap > mid_price:
signals["signals"].append({
"type": "VWAP_ABOVE_MID",
"reason": "Giá hợp lý cao hơn mid price"
})
else:
signals["signals"].append({
"type": "VWAP_BELOW_MID",
"reason": "Giá hợp lý thấp hơn mid price"
})
return signals
Ví dụ sử dụng
sample_order_book = {
"bids": {
64500.0: 2.5,
64499.0: 1.8,
64498.0: 3.2,
64497.0: 0.9,
64496.0: 5.1,
64495.0: 2.3
},
"asks": {
64501.0: 1.5,
64502.0: 2.9,
64503.0: 1.2,
64504.0: 4.0,
64505.0: 0.8
}
}
analyzer = OrderBookAnalyzer(sample_order_book)
print(f"Spread: ${analyzer.calculate_spread():.2f}")
print(f"Spread %: {analyzer.calculate_spread_percentage():.4f}%")
print(f"Mid Price: ${analyzer.get_mid_price():.2f}")
print(f"VWAP: ${analyzer.calculate_vwap():.2f}")
print(f"Imbalance: {analyzer.calculate_imbalance():.4f}")
signals = analyzer.generate_signals()
for signal in signals["signals"]:
print(f"Signal: {signal['type']} - {signal['reason']}")
Sử dụng AI để phân tích Order Book với HolySheep
Nếu bạn cần phân tích phức tạp hơn với AI, đăng ký tại đây để sử dụng HolySheep AI API — nơi cung cấp các model AI hàng đầu với chi phí chỉ ¥1=$1 (tiết kiệm 85%+).
import requests
import json
class HolySheepAIAnalyzer:
"""Sử dụng AI để phân tích order book data"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.model = "gpt-4.1" # $8/M tokens
def analyze_order_book_with_ai(self, order_book_data: dict,
market_context: str = "") -> dict:
"""
Gửi order book data lên AI để phân tích và đưa ra khuyến nghị
"""
# Chuẩn bị prompt
system_prompt = """Bạn là chuyên gia phân tích thị trường crypto.
Dựa trên order book data được cung cấp, hãy phân tích và đưa ra:
1. Đánh giá tổng quan thị trường (bullish/bearish/neutral)
2. Các mức hỗ trợ và kháng cự tiềm năng
3. Đánh giá thanh khoản
4. Khuyến nghị hành động (nếu có)
Trả lời ngắn gọn, dễ hiểu, có con số cụ thể."""
user_prompt = f"""
Market Context: {market_context}
Order Book Data:
- Best Bid: {max(order_book_data.get('bids', {}).keys()) if order_book_data.get('bids') else 'N/A'}
- Best Ask: {min(order_book_data.get('asks', {}).keys()) if order_book_data.get('asks') else 'N/A'}
- Top 5 Bids: {sorted(order_book_data.get('bids', {}).items(), reverse=True)[:5]}
- Top 5 Asks: {sorted(order_book_data.get('asks', {}).items())[:5]}
"""
# Gọi HolySheep AI API
response = requests.post(
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": self.model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
"temperature": 0.7,
"max_tokens": 500
}
)
if response.status_code == 200:
result = response.json()
return {
"analysis": result["choices"][0]["message"]["content"],
"usage": result.get("usage", {}),
"model": self.model
}
else:
raise Exception(f"API Error: {response.status_code} - {response.text}")
def detect_anomalies(self, order_book_data: dict) -> dict:
"""Sử dụng AI để phát hiện bất thường trong order book"""
system_prompt = """Bạn là chuyên gia phát hiện gian lận trong thị trường crypto.
Phân tích order book để tìm các dấu hiệu bất thường:
- Wash trading indicators
- Spoofing patterns
- Large wall placements
- Unusual spread changes
Trả lời dạng JSON với các trường: anomaly_detected (boolean),
risk_level (low/medium/high), details (string)"""
# Convert order book to string for AI analysis
bids_str = str(sorted(order_book_data.get('bids', {}).items(),
reverse=True)[:10])
asks_str = str(sorted(order_book_data.get('asks', {}).items())[:10])
response = requests.post(
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": self.model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Bids: {bids_str}\nAsks: {asks_str}"}
],
"temperature": 0.3,
"max_tokens": 300
}
)
if response.status_code == 200:
result = response.json()
try:
return json.loads(result["choices"][0]["message"]["content"])
except:
return {"error": "Failed to parse AI response"}
return {"error": f"API Error: {response.status_code}"}
Ví dụ sử dụng
analyzer = HolySheepAIAnalyzer("YOUR_HOLYSHEEP_API_KEY")
order_book = {
"bids": {
64500.0: 15.5, # Large wall
64499.0: 2.5,
64498.0: 3.2,
64497.0: 0.9,
64496.0: 5.1
},
"asks": {
64501.0: 1.5,
64502.0: 2.9,
64503.0: 50.0, # Suspicious large wall
64504.0: 4.0,
64505.0: 0.8
}
}
Phân tích với AI
analysis = analyzer.analyze_order_book_with_ai(
order_book,
market_context="BTC đang trong xu hướng tăng, ETF approval expected"
)
print(f"AI Analysis:\n{analysis['analysis']}")
print(f"Tokens used: {analysis['usage'].get('total_tokens', 'N/A')}")
Lỗi thường gặp và cách khắc phục
1. Lỗi WebSocket Connection Failed (1006)
Nguyên nhân: Server đóng kết nối đột ngột, thường do rate limit hoặc network issue.
# Khắc phục: Implement reconnection logic
import websocket
import time
import threading
class WebSocketWithReconnect:
def __init__(self, url, on_message, max_retries=5, retry_delay=2):
self.url = url
self.on_message = on_message
self.max_retries = max_retries
self.retry_delay = retry_delay
self.ws = None
self.should_reconnect = True
def connect(self):
retries = 0
while retries < self.max_retries and self.should_reconnect:
try:
self.ws = websocket.WebSocketApp(
self.url,
on_message=self.on_message,
on_error=self.handle_error,
on_close=self.handle_close
)
# Chạy với ping interval để giữ connection alive
self.ws.run_forever(
ping_interval=20,
ping_timeout=10
)
except Exception as e:
print(f"Connection error: {e}")
retries += 1
if retries < self.max_retries:
wait_time = self.retry_delay * (2 ** retries) # Exponential backoff
print(f"Retrying in {wait_time}s... (attempt {retries}/{self.max_retries})")
time.sleep(wait_time)
if retries >= self.max_retries:
print("Max retries exceeded. Please check network connection.")
def handle_error(self, ws, error):
print(f"WebSocket Error: {error}")
def handle_close(self, ws, close_status_code, close_msg):
print(f"Connection closed: {close_status_code} - {close_msg}")
def close(self):
self.should_reconnect = False
if self.ws:
self.ws.close()
2. Lỗi Rate Limit (429) trên REST API
Nguyên nhân: Gọi API quá nhiều lần trong thời gian ngắn.
import time
import requests
from functools import wraps
from collections import defaultdict
class RateLimitedClient:
"""Client với rate limit handling thông minh"""
def __init__(self, base_url, requests_per_second=10):
self.base_url = base_url
self.requests_per_second = requests_per_second
self.request_times = defaultdict(list)
self.min_interval = 1.0 / requests_per_second
def wait_if_needed(self, endpoint):
"""Đợi nếu cần để tránh rate limit"""
current_time = time.time()
# Xóa các request cũ (quá 1 giây)
self.request_times[endpoint] = [
t for t in self.request_times[endpoint]
if current_time - t < 1.0
]
# Nếu đã đạt limit, đợi
if len(self.request_times[endpoint]) >= self.requests_per_second:
oldest = self.request_times[endpoint][0]
wait_time = 1.0 - (current_time - oldest)
if wait_time > 0:
time.sleep(wait_time)
self.request_times[endpoint].append(time.time())
def get_with_retry(self, endpoint, params=None, max_retries=3):
"""GET request với retry logic"""
for attempt in range(max_retries):
try:
self.wait_if_needed(endpoint)
response = requests.get(
f"{self.base_url}{endpoint}",
params=params,
timeout=10
)
if response.status_code == 429:
# Rate limited - đợi và thử lại
retry_after = int(response.headers.get("Retry-After", 1))
print(f"Rate limited. Waiting {retry_after}s...")
time.sleep(retry_after)
continue
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise
print(f"Request failed (attempt {attempt + 1}): {e}")
time.sleep(2 ** attempt) # Exponential backoff
return None
Sử dụng
client = RateLimitedClient("https://www.okx.com", requests_per_second=5)
data = client.get_with_retry("/api/v5/market/books", params={"instId": "BTC-USDT"})
3. Order Book Data Lag hoặc Stale
Nguyên nhân: Không update đúng cách, snapshot out-of-date.
import time
from threading import Lock
class OrderBookManager:
"""Quản lý order book với stale detection"""
def __init__(self, stale_threshold_ms=5000):
self.order_book = {"bids": {}, "asks": {}}
self.last_update_time = 0
self.stale_threshold = stale_threshold_ms / 1000
self.lock = Lock()
self.update_count = 0
def update_snapshot(self, bids, asks, timestamp_ms):
"""Cập nhật full snapshot"""
with self.lock:
self.order_book["bids"] = {float(p): float(s) for p, s in bids}
self.order_book["asks"] = {float(p): float(s) for p, s in asks}
self.last_update_time = timestamp_ms / 1000
self.update_count += 1
def apply_update(self, updates, timestamp_ms):
"""Áp dụng incremental update"""
with self.lock:
for side, price, size in updates:
price = float(price)
size = float(size)
if side == "bid":
if size == 0:
self.order_book["bids"].pop(price, None)
else:
self.order_book["bids"][price] = size
elif side == "ask":
if size == 0:
self.order_book["asks"].pop(price, None)
else:
self.order_book["asks"][price] = size
self.last_update_time = timestamp_ms / 1000
self.update_count += 1
def is_stale(self):
"""Kiểm tra data có stale không"""
if self.last_update_time == 0:
return True
current_time = time.time()
time_since_update = current_time - self.last_update_time
return time_since_update > self.stale_threshold
def get