ในโลกของการพัฒนาระบบเทรดและการวิเคราะห์ตลาดการเงิน การทดสอบกลยุทธ์กับข้อมูลประวัติ (Backtesting) เป็นสิ่งจำเป็นอย่างยิ่ง แต่การพึ่งพา API ภายนอกอย่างเดียวมักมีข้อจำกัดเรื่องค่าใช้จ่ายและ Rate Limit บทความนี้จะสอนวิธีสร้าง Local Replay Server ที่ทำหน้าที่เหมือนเครื่อง Tardis ย้อนเวลาข้อมูลตลาดได้ตามต้องการ โดยใช้ Python สำหรับ Backend และ Node.js สำหรับ WebSocket Server
ทำไมต้องสร้าง Local Replay Server?
จากประสบการณ์การพัฒนาระบบ Algo Trading มาหลายปี ผมพบปัญหาหลัก 3 อย่างเมื่อใช้งาน Replay API จากผู้ให้บริการภายนอก:
- ค่าใช้จ่ายสูงเกินไป: การเรียก API ทุก Tick ของข้อมูลตลาดทำให้ค่าใช้จ่ายพุ่งสูงอย่างรวดเร็ว
- Rate Limit ต้นทุน: การทดสอบกลยุทธ์หลายร้อยครั้งต่อวันถูกจำกัดด้วย Rate Limit
- Latency ที่ไม่คงที่: ข้อมูลมาถึงไม่พร้อมกัน ทำให้ผลการทดสอบไม่แม่นยำ
Local Replay Server ช่วยแก้ปัญหาทั้งหมดนี้ด้วยการ Cache ข้อมูลทั้งหมดไว้ในเครื่อง และจำลองการ stream ข้อมูลตาม timestamp ที่กำหนด เหมาะสำหรับนักพัฒนาที่ต้องการ ทดสอบ Backtest อย่างไม่จำกัด และผสมผสานกับ AI สำหรับวิเคราะห์รูปแบบตลาด
สถาปัตยกรรมระบบ
ระบบประกอบด้วย 3 ส่วนหลัก:
- Data Fetcher (Python): ดึงข้อมูล Historical จาก Exchange API และบันทึกลง Local Database
- Replay Engine (Python): อ่านข้อมูลและจำลองการ Stream ตามเวลาที่กำหนด (Speed: 1x, 10x, 100x)
- WebSocket Server (Node.js): รับคำขอจาก Client และส่งข้อมูล Real-time ผ่าน WebSocket
การติดตั้งและ Setup
# สร้าง Directory สำหรับโปรเจกต์
mkdir tardis-replay-server
cd tardis-replay-server
สร้าง Virtual Environment สำหรับ Python
python3 -m venv venv
source venv/bin/activate
ติดตั้ง Python Dependencies
pip install asyncpg aiohttp websockets pandas numpy redis asyncio
สร้าง Node.js project
npm init -y
npm install express ws express-async-errors cors dotenv
ติดตั้ง Redis สำหรับ Cache (ใช้ Docker)
docker run -d --name redis-replay -p 6379:6379 redis:alpine
ส่วนที่ 1: Data Fetcher - ดึงข้อมูล Historical
สร้าง Python Script สำหรับดึงข้อมูล OHLCV และ Trade Ticks จาก Exchange โดยผมแนะนำให้ดึงข้อมูลหลาย Exchange เพื่อความครบถ้วน และใช้ HolySheep AI สำหรับวิเคราะห์รูปแบบตลาดอัตโนมัติ
"""
Tardis Data Fetcher - ดึงข้อมูล Historical จาก Exchange
"""
import asyncio
import aiohttp
import asyncpg
import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataFetcher:
def __init__(self, db_pool: asyncpg.Pool):
self.db_pool = db_pool
self.session: Optional[aiohttp.ClientSession] = None
async def init_session(self):
"""Initialize HTTP Session พร้อม Retry Logic"""
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=10,
ttl_dns_cache=300
)
retry_timeout = aiohttp.ClientTimeout(total=30, connect=10)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=retry_timeout
)
async def fetch_ohlcv(
self,
symbol: str,
exchange: str,
interval: str = "1m",
start_time: datetime = None,
end_time: datetime = None
) -> pd.DataFrame:
"""ดึงข้อมูล OHLCV จาก Exchange API
Args:
symbol: เช่น 'BTC/USDT'
exchange: 'binance', 'bybit', 'okx'
interval: '1m', '5m', '15m', '1h', '4h', '1d'
start_time: เวลาเริ่มต้น
end_time: เวลาสิ้นสุด
Returns:
DataFrame ที่มี columns: timestamp, open, high, low, close, volume
"""
if not self.session:
await self.init_session()
# Map interval เป็น API format
interval_map = {
"1m": "1m", "5m": "5m", "15m": "15m",
"1h": "1h", "4h": "4h", "1d": "1d"
}
api_interval = interval_map.get(interval, "1m")
# Convert time to milliseconds
start_ms = int(start_time.timestamp() * 1000) if start_time else None
end_ms = int(end_time.timestamp() * 1000) if end_time else None
# Endpoint ตาม Exchange
endpoints = {
"binance": f"https://api.binance.com/api/v3/klines",
"bybit": f"https://api.bybit.com/v5/market/kline",
"okx": f"https://www.okx.com/api/v5/market/history-candles"
}
params = {
"symbol": symbol.replace("/", ""),
"interval": api_interval,
"limit": 1000
}
if start_ms:
params["startTime"] = start_ms
if end_ms:
params["endTime"] = end_ms
try:
async with self.session.get(endpoints[exchange], params=params) as resp:
if resp.status == 200:
data = await resp.json()
return self._parse_ohlcv_response(data, exchange, symbol)
else:
logger.error(f"API Error: {resp.status} - {await resp.text()}")
return pd.DataFrame()
except Exception as e:
logger.error(f"Fetch Error: {e}")
return pd.DataFrame()
def _parse_ohlcv_response(self, data: List, exchange: str, symbol: str) -> pd.DataFrame:
"""Parse response ตาม format ของแต่ละ Exchange"""
records = []
if exchange == "binance":
for kline in data:
records.append({
"timestamp": int(kline[0]),
"datetime": datetime.fromtimestamp(kline[0] / 1000),
"symbol": symbol,
"exchange": exchange,
"open": float(kline[1]),
"high": float(kline[2]),
"low": float(kline[3]),
"close": float(kline[4]),
"volume": float(kline[5]),
"quote_volume": float(kline[7]),
"trades": int(kline[8])
})
elif exchange == "bybit":
for kline in data.get("result", {}).get("list", []):
records.append({
"timestamp": int(kline[0]),
"datetime": datetime.fromtimestamp(int(kline[0]) / 1000),
"symbol": symbol,
"exchange": exchange,
"open": float(kline[1]),
"high": float(kline[2]),
"low": float(kline[3]),
"close": float(kline[4]),
"volume": float(kline[5]),
"quote_volume": 0,
"trades": 0
})
return pd.DataFrame(records)
async def save_to_database(self, df: pd.DataFrame, table_name: str = "market_ohlcv"):
"""บันทึกข้อมูลลง PostgreSQL"""
if df.empty:
return
async with self.db_pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS market_ohlcv (
id BIGSERIAL PRIMARY KEY,
timestamp BIGINT NOT NULL,
datetime TIMESTAMP NOT NULL,
symbol VARCHAR(20) NOT NULL,
exchange VARCHAR(20) NOT NULL,
open DECIMAL(20, 8) NOT NULL,
high DECIMAL(20, 8) NOT NULL,
low DECIMAL(20, 8) NOT NULL,
close DECIMAL(20, 8) NOT NULL,
volume DECIMAL(20, 8) NOT NULL,
quote_volume DECIMAL(20, 8),
trades INTEGER,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(timestamp, symbol, exchange)
)
""")
# Insert โดยใช้ UPSERT
await conn.executemany("""
INSERT INTO market_ohlcv
(timestamp, datetime, symbol, exchange, open, high, low, close, volume, quote_volume, trades)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (timestamp, symbol, exchange) DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
volume = EXCLUDED.volume
""", [
(row.timestamp, row.datetime, row.symbol, row.exchange,
row.open, row.high, row.low, row.close, row.volume,
row.get("quote_volume", 0), row.get("trades", 0))
for row in df.itertuples()
])
logger.info(f"Saved {len(df)} records to {table_name}")
async def main():
"""ตัวอย่างการใช้งาน Data Fetcher"""
# เชื่อมต่อ Database
db_pool = await asyncpg.create_pool(
host="localhost",
port=5432,
user="postgres",
password="your_password",
database="tardis",
min_size=5,
max_size=20
)
fetcher = DataFetcher(db_pool)
await fetcher.init_session()
# ดึงข้อมูล BTC/USDT จาก Binance (30 วันล่าสุด)
end_time = datetime.now()
start_time = end_time - timedelta(days=30)
df = await fetcher.fetch_ohlcv(
symbol="BTC/USDT",
exchange="binance",
interval="1m",
start_time=start_time,
end_time=end_time
)
if not df.empty:
await fetcher.save_to_database(df)
print(f"✅ ดึงข้อมูลสำเร็จ: {len(df)} records")
print(df.head())
await db_pool.close()
if __name__ == "__main__":
asyncio.run(main())
ส่วนที่ 2: Replay Engine - จำลอง Time Travel
Replay Engine เป็นหัวใจหลักของระบบ ทำหน้าที่อ่านข้อมูลจาก Database และ Stream ตาม Timeline ที่กำหนด รองรับการปรับความเร็วได้หลายระดับ
"""
Tardis Replay Engine - จำลองการ Stream ข้อมูล Historical
"""
import asyncio
import asyncpg
import json
import logging
from datetime import datetime, timedelta
from typing import AsyncGenerator, Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import redis.asyncio as redis
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ReplaySpeed(Enum):
"""ความเร็วในการ Replay"""
REALTIME = 1
FAST_10X = 10
FAST_100X = 100
FAST_1000X = 1000
@dataclass
class ReplayConfig:
"""Configuration สำหรับ Replay Session"""
session_id: str
symbol: str
exchange: str
interval: str
start_time: int # timestamp in ms
end_time: int
speed: ReplaySpeed = ReplaySpeed.FAST_100X
on_tick_callback: Optional[callable] = None
@dataclass
class MarketTick:
"""Market Data Tick"""
timestamp: int
datetime: str
symbol: str
exchange: str
open: float
high: float
low: float
close: float
volume: float
replay_time: int # เวลาจริงที่ส่งออกไป (ms)
class ReplayEngine:
def __init__(self, db_pool: asyncpg.Pool, redis_client: redis.Redis):
self.db_pool = db_pool
self.redis = redis_client
self.active_sessions: Dict[str, ReplayConfig] = {}
async def create_session(self, config: ReplayConfig) -> str:
"""สร้าง Replay Session ใหม่"""
# ตรวจสอบว่ามีข้อมูลในช่วงเวลาที่ต้องการหรือไม่
data_exists = await self._check_data_exists(
config.symbol,
config.exchange,
config.start_time,
config.end_time
)
if not data_exists:
raise ValueError(
f"ไม่พบข้อมูลสำหรับ {config.symbol} ({config.exchange}) "
f"ในช่วงเวลาที่กำหนด"
)
self.active_sessions[config.session_id] = config
# Cache session info ใน Redis
await self.redis.setex(
f"replay:session:{config.session_id}",
3600, # 1 hour TTL
json.dumps(asdict(config))
)
logger.info(f"Created replay session: {config.session_id}")
return config.session_id
async def _check_data_exists(
self,
symbol: str,
exchange: str,
start_time: int,
end_time: int
) -> bool:
"""ตรวจสอบว่ามีข้อมูลหรือไม่"""
async with self.db_pool.acquire() as conn:
result = await conn.fetchval("""
SELECT EXISTS(
SELECT 1 FROM market_ohlcv
WHERE symbol = $1
AND exchange = $2
AND timestamp >= $3
AND timestamp <= $4
LIMIT 1
)
""", symbol, exchange, start_time, end_time)
return result
async def stream(
self,
session_id: str,
batch_size: int = 100
) -> AsyncGenerator[MarketTick, None]:
"""Stream ข้อมูลตาม Timeline
Args:
session_id: Replay Session ID
batch_size: จำนวน records ต่อ batch
Yields:
MarketTick objects
"""
if session_id not in self.active_sessions:
raise ValueError(f"Session not found: {session_id}")
config = self.active_sessions[session_id]
start_wall_time = asyncio.get_event_loop().time() * 1000
# ดึงข้อมูลจาก Database
async with self.db_pool.acquire() as conn:
records = await conn.fetch("""
SELECT timestamp, datetime, symbol, exchange,
open, high, low, close, volume
FROM market_ohlcv
WHERE symbol = $1
AND exchange = $2
AND timestamp >= $3
AND timestamp <= $4
ORDER BY timestamp ASC
""", config.symbol, config.exchange, config.start_time, config.end_time)
if not records:
logger.warning(f"No data found for session {session_id}")
return
total_records = len(records)
logger.info(f"Starting replay: {total_records} records at {config.speed.name}")
for i, record in enumerate(records):
# คำนวณเวลาที่ต้องส่งออกไป (wall time simulation)
if i == 0:
next_wall_time = start_wall_time
else:
# คำนวณเวลาตามความเร็วที่กำหนด
time_diff = records[i].timestamp - records[i-1].timestamp
simulated_delay = time_diff / config.speed.value
next_wall_time += simulated_delay
# รอจนถึงเวลาที่ต้องส่ง
current_time = asyncio.get_event_loop().time() * 1000
if next_wall_time > current_time:
await asyncio.sleep((next_wall_time - current_time) / 1000)
tick = MarketTick(
timestamp=record["timestamp"],
datetime=record["datetime"].isoformat(),
symbol=record["symbol"],
exchange=record["exchange"],
open=float(record["open"]),
high=float(record["high"]),
low=float(record["low"]),
close=float(record["close"]),
volume=float(record["volume"]),
replay_time=int(asyncio.get_event_loop().time() * 1000)
)
# Publish ไปยัง Redis สำหรับ subscriber อื่น
await self.redis.publish(
f"replay:{session_id}",
json.dumps(asdict(tick))
)
yield tick
# Log progress ทุก 1000 records
if (i + 1) % 1000 == 0:
progress = (i + 1) / total_records * 100
logger.info(f"Session {session_id}: {progress:.1f}% complete")
async def pause_session(self, session_id: str):
"""หยุดชั่วคราว"""
if session_id in self.active_sessions:
await self.redis.sadd("replay:paused", session_id)
logger.info(f"Paused session: {session_id}")
async def resume_session(self, session_id: str):
"""resume session"""
if session_id in self.active_sessions:
await self.redis.srem("replay:paused", session_id)
logger.info(f"Resumed session: {session_id}")
async def stop_session(self, session_id: str):
"""หยุด Session และลบ"""
if session_id in self.active_sessions:
del self.active_sessions[session_id]
await self.redis.delete(f"replay:session:{session_id}")
logger.info(f"Stopped session: {session_id}")
async def get_session_status(self, session_id: str) -> Dict:
"""ดูสถานะ Session"""
is_active = session_id in self.active_sessions
is_paused = await self.redis.sismember("replay:paused", session_id)
return {
"session_id": session_id,
"active": is_active,
"paused": is_paused,
"config": asdict(self.active_sessions.get(session_id)) if is_active else None
}
ตัวอย่างการใช้งาน
async def demo():
db_pool = await asyncpg.create_pool(
host="localhost",
port=5432,
user="postgres",
password="your_password",
database="tardis"
)
redis_client = redis.from_url("redis://localhost:6379")
engine = ReplayEngine(db_pool, redis_client)
# สร้าง session สำหรับ replay BTC/USDT
config = ReplayConfig(
session_id="demo-session-001",
symbol="BTCUSDT",
exchange="binance",
interval="1m",
start_time=int((datetime.now() - timedelta(days=7)).timestamp() * 1000),
end_time=int(datetime.now().timestamp() * 1000),
speed=ReplaySpeed.FAST_100X
)
await engine.create_session(config)
# Stream ข้อมูล
async for tick in engine.stream(config.session_id):
print(f"[{tick.datetime}] {tick.symbol}: {tick.close}")
await db_pool.close()
await redis_client.close()
if __name__ == "__main__":
asyncio.run(demo())
ส่วนที่ 3: WebSocket Server ด้วย Node.js
Node.js WebSocket Server ทำหน้าที่รับคำขอจาก Frontend และส่งข้อมูล Replay แบบ Real-time รองรับการเชื่อมต่อหลาย Client พร้อมกัน
/**
* Tardis WebSocket Server - Node.js
* รับคำขอ Replay และส่งข้อมูลแบบ Real-time
*/
const WebSocket = require('ws');
const http = require('http');
const express = require('express');
const Redis = require('ioredis');
const axios = require('axios');
// Configuration
const PORT = process.env.PORT || 8080;
const PYTHON_API_URL = process.env.PYTHON_API_URL || 'http://localhost:5000';
const HOLYSHEEP_API_KEY = process.env.HOLYSHEEP_API_KEY || 'YOUR_HOLYSHEEP_API_KEY';
const HOLYSHEEP_BASE_URL = 'https://api.holysheep.ai/v1';
// Initialize Express & HTTP Server
const app = express();
app.use(express.json());
app.use(require('cors')());
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });
// Redis clients (Publisher/Subscriber pattern)
const redisPub = new Redis({ host: 'localhost', port: 6379 });
const redisSub = new Redis({ host: 'localhost', port: 6379 });
const redisClient = new Redis({ host: 'localhost', port: 6379 });
// ติดตาม active connections และ sessions
const clients = new Map(); // clientId -> { ws, subscriptions }
const sessionClients = new Map(); // sessionId -> Set of clientIds
// WebSocket Connection Handler
wss.on('connection', (ws, req) => {
const clientId = generateClientId();
clients.set(clientId, { ws, subscriptions: new Set() });
console.log([WS] Client connected: ${clientId} from ${req.socket.remoteAddress});
ws.on('message', async (message) => {
try {
const data = JSON.parse(message);
await handleMessage(clientId, data);
} catch (error) {
console.error([WS] Message error: ${error.message});
sendError(ws, 'Invalid message format');
}
});
ws.on('close', () => {
handleDisconnect(clientId);
});
ws.on('error', (error) => {
console.error([WS] Client error: ${error.message});
handleDisconnect(clientId);
});
// ส่ง welcome message
send(ws, {
type: 'connected',
clientId,
timestamp: Date.now()
});
});
/**
* จัดการ WebSocket Messages
*/
async function handleMessage(clientId, data) {
const client = clients.get(clientId);
if (!client || !client.ws) return;
switch (data.type) {
case 'create_session':
await handleCreateSession(clientId, data);
break;
case 'subscribe':
await handleSubscribe(clientId, data);
break;
case 'unsubscribe':
handleUnsubscribe(clientId, data);
break;
case 'pause':
await handlePause(clientId, data);
break;
case 'resume':
await handleResume(clientId, data);
break;
case 'speed_change':
await handleSpeedChange(clientId, data);
break;
case 'analyze_with_ai':
await handleAnalyzeWithAI(clientId, data);
break;
default:
sendError(client.ws, Unknown message type: ${data.type});
}
}
/**
* สร้าง Replay Session ใหม่
*/
async function handleCreateSession(clientId, data) {
const { session_id, symbol, exchange, interval, start_time, end_time, speed } = data;
try {
// เรียก Python API เพื่อสร้าง session
const response = await axios.post(${PYTHON_API_URL}/api/replay/create, {
session_id,
symbol,
exchange,
interval: interval || '1m',
start_time,
end_time,
speed: speed || 100
});
// Subscribe ไปยัง Redis channel
await redisSub.subscribe(replay:${session_id});
// ติดตาม client subscription
client.subscriptions.add(session_id);
if (!sessionClients.has(session_id)) {
sessionClients.set(session_id, new Set());
}
sessionClients.get(session_id).add(clientId);
send(client.ws, {
type: 'session_created',
session_id: response.data.session_id,
status: 'active'
});
console.log([Session] Created: ${session_id} for client ${clientId});
} catch (error) {
console.error([Session] Create error: ${error.message});
sendError(client.ws, Failed to create session: ${error.response?.data?.message || error.message});
}
}
/**
* Subscribe ไปยัง Session ที่มีอยู่
*/
async function handleSubscribe(clientId, data) {
const { session_id } = data;
await redisSub.subscribe(replay:${session_id});
client.subscriptions.add(session_id);
if (!sessionClients.has(session_id)) {
sessionClients.set(session_id, new Set());
}
sessionClients.get(session_id).add(clientId);
send(client.ws, {
type: 'subscribed',
session_id
});
}
/**
* วิเคราะห์ข้อมูลด้วย HolySheep AI
*/
async function handleAnalyzeWithAI(clientId, data) {
const { session_id, tick_data, analysis_type } = data;
try {
// เรียก HolySheep AI API สำหรับวิเคราะห์ตลาด
const response = await axios.post(
${HOLYSHEEP_BASE_URL}/chat/completions,
{
model: 'gpt-4.1',
messages: [
{
role: 'system',
content: You are a market analysis AI assistant. Analyze the provided market data and identify patterns, trends, and potential trading signals.
},
{
role: 'user',
content: Analyze this market tick data for ${tick_data.symbol}:\n${JSON.stringify(tick_data, null, 2)}
}
],
temperature: 0.3,
max_tokens: 500
},
{
headers: {
'Authorization': Bearer ${HOLYSHEEP_API_KEY},
'Content-Type': 'application/json'
}
}
);
send(client.ws, {
แหล่งข้อมูลที่เกี่ยวข้อง
บทความที่เกี่ยวข้อง