สวัสดีครับ นักพัฒนาและนักเทรดทุกคน วันนี้ผมจะมาแชร์ประสบการณ์ตรงในการใช้ Tardis Machine API เพื่อจำลองและสร้างใหม่ (reconstruct) ข้อมูล limit order book ณ เวลาใดก็ได้ในอดีตของตลาดคริปโต
สำหรับโปรเจกต์ที่ผมทำอยู่ ผมต้องการ backtest กลยุทธ์การเทรดแบบ market making โดยต้องการข้อมูล order book ที่ละเอียดถึงระดับ tick และทุกการเปลี่ยนแปลงของราคา ซึ่ง Tardis Machine เป็นบริการที่ให้ข้อมูล historical market data ที่ครบถ้วนมาก แต่ระหว่างทางผมเจอปัญหาหลายอย่างที่อยากจะเล่าให้ฟัง
ปัญหาจริงที่เจอ: เมื่อ Historical Data API ตอบสนองไม่ได้ตามที่คาดหวัง
ตอนแรกที่ผมเริ่มใช้งาน ผมได้รับข้อผิดพลาดนี้:
ConnectionError: HTTPSConnectionPool(host='tardis-devip.io', port=443):
Max retries exceeded with url: /v1/replays/btcusdt.binance/2024-01-15/bookchange_100.json
(Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x...>:
Failed to establish a new connection: [Errno 110] Connection timed out'))
ปัญหานี้เกิดจาก:
- การเชื่อมต่อ network จากเซิร์ฟเวอร์ของผมไปยัง Tardis Machine server มี latency สูงเกินไป
- การดึงข้อมูล historical tick-by-tick มีขนาดไฟล์ใหญ่ ทำให้เกิด timeout
- บางครั้ง rate limit ของ API ทำให้การทำ backtest ใช้เวลานานเกินไป
หลังจากทดลองและแก้ปัญหามาหลายวัน ผมได้เทคนิคที่ดีมาก รวมถึงการใช้ HolySheep AI เข้ามาช่วยในการ optimize กระบวนการทำ market analysis ซึ่งมีประสิทธิภาพสูงและราคาถูกกว่ามาก
Tardis Machine API คืออะไร?
Tardis Machine เป็นบริการที่ให้ข้อมูล market data ระดับ granular สำหรับตลาดคริปโต โดยมีข้อมูล:
- Order book snapshots และ deltas
- Trade ticks ทุกรายการ
- Funding rates
- Liquidation data
- Support หลาย exchange ทั้ง Binance, Bybit, OKX, Deribit เป็นต้น
การติดตั้งและ Setup เบื้องต้น
# ติดตั้ง dependencies ที่จำเป็น
pip install tardis-machine-client
pip install pandas
pip install aiohttp
pip install asyncio
สำหรับ visualization
pip install plotly
pip install kaleido
Code พื้นฐานสำหรับดึงข้อมูล Order Book Historical
import asyncio
import aiohttp
import json
from datetime import datetime, timedelta
import pandas as pd
class TardisReplayer:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.tardis-devip.io/v1"
self.session = None
async def get_replay_info(self, exchange: str, symbol: str, date: str):
"""ดึงข้อมูล metadata ของ replay ที่ต้องการ"""
url = f"{self.base_url}/replays/{exchange}.{symbol}/{date}"
headers = {"Authorization": f"Bearer {self.api_key}"}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status == 200:
return await response.json()
elif response.status == 401:
raise Exception("401 Unauthorized: ตรวจสอบ API key ของคุณ")
elif response.status == 404:
raise Exception(f"404 Not Found: ไม่พบข้อมูลสำหรับ {exchange}.{symbol} วันที่ {date}")
else:
raise Exception(f"API Error: {response.status}")
async def fetch_orderbook_snapshots(self, exchange: str, symbol: str,
date: str, limit: int = 100):
"""ดึงข้อมูล order book snapshots"""
url = f"{self.base_url}/replays/{exchange}.{symbol}/{date}/bookchange_{limit}.json"
headers = {"Authorization": f"Bearer {self.api_key}"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, timeout=60) as response:
content = await response.text()
data = json.loads(content)
return data
except asyncio.TimeoutError:
raise Exception("Connection timeout: เครือข่ายช้าเกินไป ลองใช้ chunked download")
except aiohttp.ClientConnectorError as e:
raise Exception(f"Connection error: {str(e)}")
ตัวอย่างการใช้งาน
async def main():
replayer = TardisReplayer(api_key="YOUR_TARDIS_API_KEY")
try:
# ดึงข้อมูล BTC/USDT จาก Binance วันที่ 15 มกราคม 2024
data = await replayer.fetch_orderbook_snapshots(
exchange="binance",
symbol="btcusdt",
date="2024-01-15",
limit=100
)
print(f"ได้รับ {len(data)} snapshots")
except Exception as e:
print(f"เกิดข้อผิดพลาด: {e}")
asyncio.run(main())
Advanced: สร้าง Order Book ณ เวลาที่ต้องการ
นี่คือส่วนสำคัญที่ผมอยากแชร์ วิธีการ reconstruct order book ณ เวลาเฉพาะเจาะจงจาก delta updates
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Optional
from decimal import Decimal
import heapq
@dataclass
class OrderBookLevel:
"""แทนระดับราคาใน order book"""
price: Decimal
quantity: Decimal
order_count: int = 0
@dataclass
class OrderBook:
"""โครงสร้าง Order Book ที่สมบูรณ์"""
symbol: str
exchange: str
timestamp: int
bids: Dict[Decimal, OrderBookLevel] = field(default_factory=dict)
asks: Dict[Decimal, OrderBookLevel] = field(default_factory=dict)
def get_best_bid(self) -> Optional[Tuple[Decimal, Decimal]]:
if not self.bids:
return None
best_price = max(self.bids.keys())
level = self.bids[best_price]
return (best_price, level.quantity)
def get_best_ask(self) -> Optional[Tuple[Decimal, Decimal]]:
if not self.asks:
return None
best_price = min(self.asks.keys())
level = self.asks[best_price]
return (best_price, level.quantity)
def get_mid_price(self) -> Optional[Decimal]:
bid = self.get_best_bid()
ask = self.get_best_ask()
if bid and ask:
return (bid[0] + ask[0]) / 2
return None
def get_spread(self) -> Optional[Decimal]:
bid = self.get_best_bid()
ask = self.get_best_ask()
if bid and ask:
return ask[0] - bid[0]
return None
class OrderBookReconstructor:
"""
คลาสสำหรับ reconstruct order book ณ เวลาที่ต้องการ
ใช้ algorithm จาก BookKeeper library ที่ปรับปรุงใหม่
"""
def __init__(self, symbol: str, exchange: str):
self.symbol = symbol
self.exchange = exchange
self.current_book: Optional[OrderBook] = None
self.snapshots: List[Tuple[int, OrderBook]] = []
self.delta_updates: Dict[int, List] = {}
def apply_snapshot(self, timestamp: int, snapshot_data: dict):
"""นำ snapshot data มาสร้าง order book"""
book = OrderBook(
symbol=self.symbol,
exchange=self.exchange,
timestamp=timestamp
)
# ประมวลผล bids
for price_str, bid_data in snapshot_data.get('bids', []):
price = Decimal(price_str)
quantity = Decimal(str(bid_data.get('quantity', 0)))
book.bids[price] = OrderBookLevel(price, quantity)
# ประมวลผล asks
for price_str, ask_data in snapshot_data.get('asks', []):
price = Decimal(price_str)
quantity = Decimal(str(ask_data.get('quantity', 0)))
book.asks[price] = OrderBookLevel(price, quantity)
self.current_book = book
self.snapshots.append((timestamp, book))
def apply_delta(self, timestamp: int, delta_data: dict):
"""นำ delta update มาปรับปรุง order book"""
if not self.current_book:
return
# ประมวลผล delta สำหรับ bids
for update in delta_data.get('b', []): # bids updates
price = Decimal(str(update[0]))
quantity = Decimal(str(update[1]))
if quantity == 0:
# ลบ order
if price in self.current_book.bids:
del self.current_book.bids[price]
else:
# เพิ่มหรือแก้ไข order
self.current_book.bids[price] = OrderBookLevel(price, quantity)
# ประมวลผล delta สำหรับ asks
for update in delta_data.get('a', []): # asks updates
price = Decimal(str(update[0]))
quantity = Decimal(str(update[1]))
if quantity == 0:
if price in self.current_book.asks:
del self.current_book.asks[price]
else:
self.current_book.asks[price] = OrderBookLevel(price, quantity)
self.current_book.timestamp = timestamp
def get_book_at_time(self, target_timestamp: int) -> Optional[OrderBook]:
"""ดึง order book ณ เวลาที่ต้องการ"""
# หา snapshot ล่าสุดก่อน target time
applicable_snapshot = None
for ts, book in self.snapshots:
if ts <= target_timestamp:
applicable_snapshot = (ts, book)
else:
break
if not applicable_snapshot:
return None
start_ts, book = applicable_snapshot
reconstructed_book = OrderBook(
symbol=book.symbol,
exchange=book.exchange,
timestamp=target_timestamp,
bids=book.bids.copy(),
asks=book.asks.copy()
)
# หา delta updates ที่เกิดขึ้นระหว่าง snapshot และ target time
deltas_to_apply = []
for ts, deltas in sorted(self.delta_updates.items()):
if start_ts < ts <= target_timestamp:
deltas_to_apply.extend(deltas)
# Apply deltas
for delta in deltas_to_apply:
for update in delta.get('b', []):
price = Decimal(str(update[0]))
quantity = Decimal(str(update[1]))
if quantity == 0:
if price in reconstructed_book.bids:
del reconstructed_book.bids[price]
else:
reconstructed_book.bids[price] = OrderBookLevel(price, quantity)
for update in delta.get('a', []):
price = Decimal(str(update[0]))
quantity = Decimal(str(update[1]))
if quantity == 0:
if price in reconstructed_book.asks:
del reconstructed_book.asks[price]
else:
reconstructed_book.asks[price] = OrderBookLevel(price, quantity)
reconstructed_book.timestamp = target_timestamp
return reconstructed_book
def analyze_spread(self, book: OrderBook) -> dict:
"""วิเคราะห์ spread และ liquidity"""
bid = book.get_best_bid()
ask = book.get_best_ask()
if not bid or not ask:
return {}
spread = ask[0] - bid[0]
spread_bps = (spread / bid[0]) * 10000 # basis points
# คำนวณ VWAP ของ top N levels
def calculate_vwap(levels: dict, top_n: int = 5) -> Optional[Decimal]:
sorted_prices = sorted(levels.keys(), reverse=True)[:top_n]
total_value = Decimal('0')
total_quantity = Decimal('0')
for price in sorted_prices:
level = levels[price]
total_value += price * level.quantity
total_quantity += level.quantity
if total_quantity > 0:
return total_value / total_quantity
return None
return {
'timestamp': book.timestamp,
'best_bid': bid,
'best_ask': ask,
'spread': spread,
'spread_bps': spread_bps,
'mid_price': book.get_mid_price(),
'vwap_bid_5': calculate_vwap(book.bids, 5),
'vwap_ask_5': calculate_vwap(book.asks, 5),
'bid_levels': len(book.bids),
'ask_levels': len(book.asks)
}
ตัวอย่างการใช้งาน
def example_usage():
reconstructor = OrderBookReconstructor("BTCUSDT", "binance")
# สมมติว่าได้ข้อมูลจาก API แล้ว
# 1. นำ snapshot ไปสร้าง base book
snapshot = {
'bids': [
['50000.00', {'quantity': '1.5', 'order_count': 3}],
['49999.00', {'quantity': '2.0', 'order_count': 5}],
],
'asks': [
['50001.00', {'quantity': '1.2', 'order_count': 2}],
['50002.00', {'quantity': '3.0', 'order_count': 4}],
]
}
reconstructor.apply_snapshot(1705334400000, snapshot)
# 2. ประมวลผล delta updates
delta_1 = {'b': [['49998.00', '1.0']], 'a': [['50003.00', '2.5']]}
reconstructor.delta_updates[1705334460000] = [delta_1]
# 3. ดึง order book ณ เวลาที่ต้องการ
target_time = 1705334500000
book_at_time = reconstructor.get_book_at_time(target_time)
if book_at_time:
analysis = reconstructor.analyze_spread(book_at_time)
print(f"ณ เวลา {target_time}:")
print(f" Mid Price: {analysis.get('mid_price')}")
print(f" Spread: {analysis.get('spread')} ({analysis.get('spread_bps')} bps)")
print(f" Bid Levels: {analysis.get('bid_levels')}")
print(f" Ask Levels: {analysis.get('ask_levels')}")
example_usage()
การ Optimize สำหรับ Large-scale Backtesting
สำหรับการทำ backtest ที่ต้องประมวลผลข้อมูลจำนวนมาก ผมแนะนำวิธีการต่อไปนี้:
- Streaming Processing: ใช้ async/await เพื่อดึงข้อมูลแบบ parallel
- Caching: เก็บ snapshot ที่ใช้บ่อยไว้ใน memory หรือ Redis
- Batch Processing: ประมวลผลข้อมูลเป็น chunk แทนที่จะทีละ record
- Local Replay: ดาวน์โหลดข้อมูลมาเก็บไว้ในเครื่องก่อนแล้วค่อยประมวลผล
import asyncio
from typing import List, Dict, Any
import aiofiles
import json
class BatchOrderBookProcessor:
"""Processor สำหรับประมวลผล order book data จำนวนมาก"""
def __init__(self, cache_dir: str = "./orderbook_cache"):
self.cache_dir = cache_dir
self.reconstructor = None
self.processed_count = 0
async def download_and_cache(self, exchange: str, symbol: str,
start_date: str, end_date: str):
"""ดาวน์โหลดข้อมูลและเก็บไว้ใน cache"""
import os
os.makedirs(self.cache_dir, exist_ok=True)
# สร้าง async tasks สำหรับดาวน์โหลดหลายวันพร้อมกัน
from datetime import datetime, timedelta
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
tasks = []
current = start
while current <= end:
date_str = current.strftime("%Y-%m-%d")
tasks.append(self._download_single_day(exchange, symbol, date_str))
current += timedelta(days=1)
# ประมวลผลพร้อมกัน (concurrent)
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = sum(1 for r in results if not isinstance(r, Exception))
print(f"ดาวน์โหลดสำเร็จ {successful}/{len(tasks)} วัน")
async def _download_single_day(self, exchange: str, symbol: str, date: str):
"""ดาวน์โหลดข้อมูลของวันเดียว"""
cache_file = f"{self.cache_dir}/{exchange}_{symbol}_{date}.json"
# ตรวจสอบ cache ก่อน
import os
if os.path.exists(cache_file):
print(f"ใช้ cache สำหรับ {date}")
return cache_file
# ดาวน์โหลดจาก API
url = f"https://api.tardis-devip.io/v1/replays/{exchange}.{symbol}/{date}/bookchange_100.json"
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=300)) as response:
if response.status == 200:
content = await response.read()
async with aiofiles.open(cache_file, 'wb') as f:
await f.write(content)
return cache_file
else:
raise Exception(f"HTTP {response.status}")
except Exception as e:
print(f"ดาวน์โหลด {date} ล้มเหลว: {e}")
raise
def process_cached_data(self, exchange: str, symbol: str, date: str,
target_times: List[int]) -> List[Dict]:
"""ประมวลผลข้อมูลจาก cache เพื่อหา order book ณ เวลาที่ต้องการ"""
cache_file = f"{self.cache_dir}/{exchange}_{symbol}_{date}.json"
if not os.path.exists(cache_file):
return []
reconstructor = OrderBookReconstructor(symbol, exchange)
results = []
with open(cache_file, 'r') as f:
data = json.load(f)
# หา snapshot แรกและ delta updates
for record in data:
if record.get('type') == 'snapshot':
reconstructor.apply_snapshot(record['timestamp'], record['data'])
elif record.get('type') == 'delta':
if record['timestamp'] not in reconstructor.delta_updates:
reconstructor.delta_updates[record['timestamp']] = []
reconstructor.delta_updates[record['timestamp']].append(record['data'])
# ดึง order book สำหรับแต่ละ target time
for target_time in target_times:
book = reconstructor.get_book_at_time(target_time)
if book:
analysis = reconstructor.analyze_spread(book)
results.append(analysis)
return results
การใช้งาน batch processor
async def run_batch_analysis():
processor = BatchOrderBookProcessor(cache_dir="./btc_cache")
# ดาวน์โหลดข้อมูล 7 วัน
await processor.download_and_cache(
exchange="binance",
symbol="btcusdt",
start_date="2024-01-01",
end_date="2024-01-07"
)
# วิเคราะห์ spread ณ เวลาต่างๆ
target_times = [
1704067200000, # 2024-01-01 00:00:00 UTC
1704153600000, # 2024-01-02 00:00:00 UTC
1704240000000, # 2024-01-03 00:00:00 UTC
]
results = processor.process_cached_data(
exchange="binance",
symbol="btcusdt",
date="2024-01-01",
target_times=target_times
)
# แปลงเป็น DataFrame เพื่อวิเคราะห์
df = pd.DataFrame(results)
print(df[['timestamp', 'mid_price', 'spread', 'spread_bps']].to_string())
asyncio.run(run_batch_analysis())
ข้อผิดพลาดที่พบบ่อยและวิธีแก้ไข
1. ConnectionError: Connection timeout
# ปัญหา: เครือข่ายช้าหรือ API server ตอบสนองช้า
สาเหตุ:
- Server ของ Tardis Machine อยู่ใน region ที่ไกลจากเรา
- ข้อมูลที่ดึงมีขนาดใหญ่เกินไป
- Rate limiting
วิธีแก้ไข:
วิธีที่ 1: ใช้ chunked download พร้อม retry logic
import asyncio
import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
async def download_with_retry(url: str, headers: dict):
timeout = aiohttp.ClientTimeout(total=120, connect=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, headers=headers) as response:
return await response.read()
วิธีที่ 2: ใช้ streaming แทนการดึงทั้งไฟล์
async def stream_download(url: str, headers: dict, chunk_size: int = 8192):
timeout = aiohttp.ClientTimeout(total=300, connect=60)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, headers=headers) as response:
async for chunk in response.content.iter_chunked(chunk_size):
yield chunk
วิธีที่ 3: ใช้ proxy ที่ใกล้กว่า
proxies = {
'http': 'http://proxy-server:8080',
'https': 'http://proxy-server:8080'
}
async with aiohttp.ClientSession() as session:
async with session.get(url, proxy='http://proxy-server:8080') as response:
data = await response.read()
2. 401 Unauthorized Error
# ปัญหา: API key ไม่ถูกต้องหรือหมดอายุ
สาเหตุ:
- API key ผิดพลาด
- API key หมดอายุ
- ไม่ได้ใส่ header ที่ถูกต้อง
วิธีแก้ไข:
import os
class TardisAPIClient:
def __init__(self, api_key: str = None):
# อ่านจาก environment variable
self.api_key = api_key or os.environ.get('TARDIS_API_KEY')
if not self.api_key:
raise ValueError("TARDIS_API_KEY not found. กรุณาตั้งค่า API key")
# ตรวจสอบ format ของ API key
if len(self.api_key) < 20:
raise ValueError("API key สั้นเกินไป อาจไม่ถูกต้อง")
def get_headers(self) -> dict:
"""สร้าง headers ที่ถูกต้อง"""
return {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json',
'User-Agent': 'TardisClient/1.0'
}
async def verify_connection(self) -> bool:
"""ตรวจสอบว่า API key ใช้งานได้หรือไม่"""
url = "https://api.tardis-devip.io/v1/status"
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=self.get_headers()) as response:
if response.status == 200:
return True
elif response.status == 401:
print("401 Unauthorized: ตรวจสอบ API key ของคุณ")
return False
else:
print(f"HTTP {response.status}")
return False
except Exception as e:
print(f"Connection error: {e}")
return False
การใช้งาน
client = TardisAPIClient()
is_valid = await client.verify_connection()
if not is_valid:
print("กรุณาตรวจสอบ API key ที่ https://app.tardis-machine.io")
3. Memory Error เมื่อประมวลผลข้อมูลจำนวนมาก
# ปัญหา: Memory ไม่พอสำหรับประมวลผลข้อมูลทั้งหมด
สาเหตุ:
- ข้อมูล historical มีขนาดใหญ่มาก (หลาย GB)
- เก็บ order books ทุก snapshot ไว้ใน memory
- ไม่ได้ clear old data
วิธีแก้ไข:
import gc
from typing import Generator
import pickle
class MemoryEfficientProcessor:
"""Processor ที่ประหยัด memory"""
def __init__(self, batch_size: int = 1000):
self.batch_size