ในโลกของการพัฒนาระบบเทรดและการวิเคราะห์ตลาดการเงิน การทดสอบกลยุทธ์กับข้อมูลประวัติ (Backtesting) เป็นสิ่งจำเป็นอย่างยิ่ง แต่การพึ่งพา API ภายนอกอย่างเดียวมักมีข้อจำกัดเรื่องค่าใช้จ่ายและ Rate Limit บทความนี้จะสอนวิธีสร้าง Local Replay Server ที่ทำหน้าที่เหมือนเครื่อง Tardis ย้อนเวลาข้อมูลตลาดได้ตามต้องการ โดยใช้ Python สำหรับ Backend และ Node.js สำหรับ WebSocket Server

ทำไมต้องสร้าง Local Replay Server?

จากประสบการณ์การพัฒนาระบบ Algo Trading มาหลายปี ผมพบปัญหาหลัก 3 อย่างเมื่อใช้งาน Replay API จากผู้ให้บริการภายนอก:

Local Replay Server ช่วยแก้ปัญหาทั้งหมดนี้ด้วยการ Cache ข้อมูลทั้งหมดไว้ในเครื่อง และจำลองการ stream ข้อมูลตาม timestamp ที่กำหนด เหมาะสำหรับนักพัฒนาที่ต้องการ ทดสอบ Backtest อย่างไม่จำกัด และผสมผสานกับ AI สำหรับวิเคราะห์รูปแบบตลาด

สถาปัตยกรรมระบบ

ระบบประกอบด้วย 3 ส่วนหลัก:

  1. Data Fetcher (Python): ดึงข้อมูล Historical จาก Exchange API และบันทึกลง Local Database
  2. Replay Engine (Python): อ่านข้อมูลและจำลองการ Stream ตามเวลาที่กำหนด (Speed: 1x, 10x, 100x)
  3. WebSocket Server (Node.js): รับคำขอจาก Client และส่งข้อมูล Real-time ผ่าน WebSocket

การติดตั้งและ Setup

# สร้าง Directory สำหรับโปรเจกต์
mkdir tardis-replay-server
cd tardis-replay-server

สร้าง Virtual Environment สำหรับ Python

python3 -m venv venv source venv/bin/activate

ติดตั้ง Python Dependencies

pip install asyncpg aiohttp websockets pandas numpy redis asyncio

สร้าง Node.js project

npm init -y npm install express ws express-async-errors cors dotenv

ติดตั้ง Redis สำหรับ Cache (ใช้ Docker)

docker run -d --name redis-replay -p 6379:6379 redis:alpine

ส่วนที่ 1: Data Fetcher - ดึงข้อมูล Historical

สร้าง Python Script สำหรับดึงข้อมูล OHLCV และ Trade Ticks จาก Exchange โดยผมแนะนำให้ดึงข้อมูลหลาย Exchange เพื่อความครบถ้วน และใช้ HolySheep AI สำหรับวิเคราะห์รูปแบบตลาดอัตโนมัติ

"""
Tardis Data Fetcher - ดึงข้อมูล Historical จาก Exchange
"""
import asyncio
import aiohttp
import asyncpg
import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class DataFetcher:
    def __init__(self, db_pool: asyncpg.Pool):
        self.db_pool = db_pool
        self.session: Optional[aiohttp.ClientSession] = None
        
    async def init_session(self):
        """Initialize HTTP Session พร้อม Retry Logic"""
        connector = aiohttp.TCPConnector(
            limit=100,
            limit_per_host=10,
            ttl_dns_cache=300
        )
        retry_timeout = aiohttp.ClientTimeout(total=30, connect=10)
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=retry_timeout
        )
    
    async def fetch_ohlcv(
        self, 
        symbol: str, 
        exchange: str,
        interval: str = "1m",
        start_time: datetime = None,
        end_time: datetime = None
    ) -> pd.DataFrame:
        """ดึงข้อมูล OHLCV จาก Exchange API
        
        Args:
            symbol: เช่น 'BTC/USDT'
            exchange: 'binance', 'bybit', 'okx'
            interval: '1m', '5m', '15m', '1h', '4h', '1d'
            start_time: เวลาเริ่มต้น
            end_time: เวลาสิ้นสุด
            
        Returns:
            DataFrame ที่มี columns: timestamp, open, high, low, close, volume
        """
        if not self.session:
            await self.init_session()
        
        # Map interval เป็น API format
        interval_map = {
            "1m": "1m", "5m": "5m", "15m": "15m",
            "1h": "1h", "4h": "4h", "1d": "1d"
        }
        api_interval = interval_map.get(interval, "1m")
        
        # Convert time to milliseconds
        start_ms = int(start_time.timestamp() * 1000) if start_time else None
        end_ms = int(end_time.timestamp() * 1000) if end_time else None
        
        # Endpoint ตาม Exchange
        endpoints = {
            "binance": f"https://api.binance.com/api/v3/klines",
            "bybit": f"https://api.bybit.com/v5/market/kline",
            "okx": f"https://www.okx.com/api/v5/market/history-candles"
        }
        
        params = {
            "symbol": symbol.replace("/", ""),
            "interval": api_interval,
            "limit": 1000
        }
        if start_ms:
            params["startTime"] = start_ms
        if end_ms:
            params["endTime"] = end_ms
            
        try:
            async with self.session.get(endpoints[exchange], params=params) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    return self._parse_ohlcv_response(data, exchange, symbol)
                else:
                    logger.error(f"API Error: {resp.status} - {await resp.text()}")
                    return pd.DataFrame()
        except Exception as e:
            logger.error(f"Fetch Error: {e}")
            return pd.DataFrame()
    
    def _parse_ohlcv_response(self, data: List, exchange: str, symbol: str) -> pd.DataFrame:
        """Parse response ตาม format ของแต่ละ Exchange"""
        records = []
        
        if exchange == "binance":
            for kline in data:
                records.append({
                    "timestamp": int(kline[0]),
                    "datetime": datetime.fromtimestamp(kline[0] / 1000),
                    "symbol": symbol,
                    "exchange": exchange,
                    "open": float(kline[1]),
                    "high": float(kline[2]),
                    "low": float(kline[3]),
                    "close": float(kline[4]),
                    "volume": float(kline[5]),
                    "quote_volume": float(kline[7]),
                    "trades": int(kline[8])
                })
        elif exchange == "bybit":
            for kline in data.get("result", {}).get("list", []):
                records.append({
                    "timestamp": int(kline[0]),
                    "datetime": datetime.fromtimestamp(int(kline[0]) / 1000),
                    "symbol": symbol,
                    "exchange": exchange,
                    "open": float(kline[1]),
                    "high": float(kline[2]),
                    "low": float(kline[3]),
                    "close": float(kline[4]),
                    "volume": float(kline[5]),
                    "quote_volume": 0,
                    "trades": 0
                })
                
        return pd.DataFrame(records)
    
    async def save_to_database(self, df: pd.DataFrame, table_name: str = "market_ohlcv"):
        """บันทึกข้อมูลลง PostgreSQL"""
        if df.empty:
            return
            
        async with self.db_pool.acquire() as conn:
            await conn.execute("""
                CREATE TABLE IF NOT EXISTS market_ohlcv (
                    id BIGSERIAL PRIMARY KEY,
                    timestamp BIGINT NOT NULL,
                    datetime TIMESTAMP NOT NULL,
                    symbol VARCHAR(20) NOT NULL,
                    exchange VARCHAR(20) NOT NULL,
                    open DECIMAL(20, 8) NOT NULL,
                    high DECIMAL(20, 8) NOT NULL,
                    low DECIMAL(20, 8) NOT NULL,
                    close DECIMAL(20, 8) NOT NULL,
                    volume DECIMAL(20, 8) NOT NULL,
                    quote_volume DECIMAL(20, 8),
                    trades INTEGER,
                    created_at TIMESTAMP DEFAULT NOW(),
                    UNIQUE(timestamp, symbol, exchange)
                )
            """)
            
            # Insert โดยใช้ UPSERT
            await conn.executemany("""
                INSERT INTO market_ohlcv 
                (timestamp, datetime, symbol, exchange, open, high, low, close, volume, quote_volume, trades)
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
                ON CONFLICT (timestamp, symbol, exchange) DO UPDATE SET
                    open = EXCLUDED.open,
                    high = EXCLUDED.high,
                    low = EXCLUDED.low,
                    close = EXCLUDED.close,
                    volume = EXCLUDED.volume
            """, [
                (row.timestamp, row.datetime, row.symbol, row.exchange,
                 row.open, row.high, row.low, row.close, row.volume,
                 row.get("quote_volume", 0), row.get("trades", 0))
                for row in df.itertuples()
            ])
            
        logger.info(f"Saved {len(df)} records to {table_name}")


async def main():
    """ตัวอย่างการใช้งาน Data Fetcher"""
    # เชื่อมต่อ Database
    db_pool = await asyncpg.create_pool(
        host="localhost",
        port=5432,
        user="postgres",
        password="your_password",
        database="tardis",
        min_size=5,
        max_size=20
    )
    
    fetcher = DataFetcher(db_pool)
    await fetcher.init_session()
    
    # ดึงข้อมูล BTC/USDT จาก Binance (30 วันล่าสุด)
    end_time = datetime.now()
    start_time = end_time - timedelta(days=30)
    
    df = await fetcher.fetch_ohlcv(
        symbol="BTC/USDT",
        exchange="binance",
        interval="1m",
        start_time=start_time,
        end_time=end_time
    )
    
    if not df.empty:
        await fetcher.save_to_database(df)
        print(f"✅ ดึงข้อมูลสำเร็จ: {len(df)} records")
        print(df.head())
    
    await db_pool.close()


if __name__ == "__main__":
    asyncio.run(main())

ส่วนที่ 2: Replay Engine - จำลอง Time Travel

Replay Engine เป็นหัวใจหลักของระบบ ทำหน้าที่อ่านข้อมูลจาก Database และ Stream ตาม Timeline ที่กำหนด รองรับการปรับความเร็วได้หลายระดับ

"""
Tardis Replay Engine - จำลองการ Stream ข้อมูล Historical
"""
import asyncio
import asyncpg
import json
import logging
from datetime import datetime, timedelta
from typing import AsyncGenerator, Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import redis.asyncio as redis

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ReplaySpeed(Enum):
    """ความเร็วในการ Replay"""
    REALTIME = 1
    FAST_10X = 10
    FAST_100X = 100
    FAST_1000X = 1000

@dataclass
class ReplayConfig:
    """Configuration สำหรับ Replay Session"""
    session_id: str
    symbol: str
    exchange: str
    interval: str
    start_time: int  # timestamp in ms
    end_time: int
    speed: ReplaySpeed = ReplaySpeed.FAST_100X
    on_tick_callback: Optional[callable] = None

@dataclass
class MarketTick:
    """Market Data Tick"""
    timestamp: int
    datetime: str
    symbol: str
    exchange: str
    open: float
    high: float
    low: float
    close: float
    volume: float
    replay_time: int  # เวลาจริงที่ส่งออกไป (ms)

class ReplayEngine:
    def __init__(self, db_pool: asyncpg.Pool, redis_client: redis.Redis):
        self.db_pool = db_pool
        self.redis = redis_client
        self.active_sessions: Dict[str, ReplayConfig] = {}
        
    async def create_session(self, config: ReplayConfig) -> str:
        """สร้าง Replay Session ใหม่"""
        # ตรวจสอบว่ามีข้อมูลในช่วงเวลาที่ต้องการหรือไม่
        data_exists = await self._check_data_exists(
            config.symbol, 
            config.exchange,
            config.start_time,
            config.end_time
        )
        
        if not data_exists:
            raise ValueError(
                f"ไม่พบข้อมูลสำหรับ {config.symbol} ({config.exchange}) "
                f"ในช่วงเวลาที่กำหนด"
            )
        
        self.active_sessions[config.session_id] = config
        
        # Cache session info ใน Redis
        await self.redis.setex(
            f"replay:session:{config.session_id}",
            3600,  # 1 hour TTL
            json.dumps(asdict(config))
        )
        
        logger.info(f"Created replay session: {config.session_id}")
        return config.session_id
    
    async def _check_data_exists(
        self, 
        symbol: str, 
        exchange: str,
        start_time: int,
        end_time: int
    ) -> bool:
        """ตรวจสอบว่ามีข้อมูลหรือไม่"""
        async with self.db_pool.acquire() as conn:
            result = await conn.fetchval("""
                SELECT EXISTS(
                    SELECT 1 FROM market_ohlcv 
                    WHERE symbol = $1 
                    AND exchange = $2 
                    AND timestamp >= $3 
                    AND timestamp <= $4
                    LIMIT 1
                )
            """, symbol, exchange, start_time, end_time)
        return result
    
    async def stream(
        self, 
        session_id: str,
        batch_size: int = 100
    ) -> AsyncGenerator[MarketTick, None]:
        """Stream ข้อมูลตาม Timeline
        
        Args:
            session_id: Replay Session ID
            batch_size: จำนวน records ต่อ batch
            
        Yields:
            MarketTick objects
        """
        if session_id not in self.active_sessions:
            raise ValueError(f"Session not found: {session_id}")
            
        config = self.active_sessions[session_id]
        start_wall_time = asyncio.get_event_loop().time() * 1000
        
        # ดึงข้อมูลจาก Database
        async with self.db_pool.acquire() as conn:
            records = await conn.fetch("""
                SELECT timestamp, datetime, symbol, exchange,
                       open, high, low, close, volume
                FROM market_ohlcv
                WHERE symbol = $1 
                AND exchange = $2 
                AND timestamp >= $3 
                AND timestamp <= $4
                ORDER BY timestamp ASC
            """, config.symbol, config.exchange, config.start_time, config.end_time)
        
        if not records:
            logger.warning(f"No data found for session {session_id}")
            return
            
        total_records = len(records)
        logger.info(f"Starting replay: {total_records} records at {config.speed.name}")
        
        for i, record in enumerate(records):
            # คำนวณเวลาที่ต้องส่งออกไป (wall time simulation)
            if i == 0:
                next_wall_time = start_wall_time
            else:
                # คำนวณเวลาตามความเร็วที่กำหนด
                time_diff = records[i].timestamp - records[i-1].timestamp
                simulated_delay = time_diff / config.speed.value
                next_wall_time += simulated_delay
            
            # รอจนถึงเวลาที่ต้องส่ง
            current_time = asyncio.get_event_loop().time() * 1000
            if next_wall_time > current_time:
                await asyncio.sleep((next_wall_time - current_time) / 1000)
            
            tick = MarketTick(
                timestamp=record["timestamp"],
                datetime=record["datetime"].isoformat(),
                symbol=record["symbol"],
                exchange=record["exchange"],
                open=float(record["open"]),
                high=float(record["high"]),
                low=float(record["low"]),
                close=float(record["close"]),
                volume=float(record["volume"]),
                replay_time=int(asyncio.get_event_loop().time() * 1000)
            )
            
            # Publish ไปยัง Redis สำหรับ subscriber อื่น
            await self.redis.publish(
                f"replay:{session_id}",
                json.dumps(asdict(tick))
            )
            
            yield tick
            
            # Log progress ทุก 1000 records
            if (i + 1) % 1000 == 0:
                progress = (i + 1) / total_records * 100
                logger.info(f"Session {session_id}: {progress:.1f}% complete")
    
    async def pause_session(self, session_id: str):
        """หยุดชั่วคราว"""
        if session_id in self.active_sessions:
            await self.redis.sadd("replay:paused", session_id)
            logger.info(f"Paused session: {session_id}")
    
    async def resume_session(self, session_id: str):
        """resume session"""
        if session_id in self.active_sessions:
            await self.redis.srem("replay:paused", session_id)
            logger.info(f"Resumed session: {session_id}")
    
    async def stop_session(self, session_id: str):
        """หยุด Session และลบ"""
        if session_id in self.active_sessions:
            del self.active_sessions[session_id]
            await self.redis.delete(f"replay:session:{session_id}")
            logger.info(f"Stopped session: {session_id}")
    
    async def get_session_status(self, session_id: str) -> Dict:
        """ดูสถานะ Session"""
        is_active = session_id in self.active_sessions
        is_paused = await self.redis.sismember("replay:paused", session_id)
        
        return {
            "session_id": session_id,
            "active": is_active,
            "paused": is_paused,
            "config": asdict(self.active_sessions.get(session_id)) if is_active else None
        }


ตัวอย่างการใช้งาน

async def demo(): db_pool = await asyncpg.create_pool( host="localhost", port=5432, user="postgres", password="your_password", database="tardis" ) redis_client = redis.from_url("redis://localhost:6379") engine = ReplayEngine(db_pool, redis_client) # สร้าง session สำหรับ replay BTC/USDT config = ReplayConfig( session_id="demo-session-001", symbol="BTCUSDT", exchange="binance", interval="1m", start_time=int((datetime.now() - timedelta(days=7)).timestamp() * 1000), end_time=int(datetime.now().timestamp() * 1000), speed=ReplaySpeed.FAST_100X ) await engine.create_session(config) # Stream ข้อมูล async for tick in engine.stream(config.session_id): print(f"[{tick.datetime}] {tick.symbol}: {tick.close}") await db_pool.close() await redis_client.close() if __name__ == "__main__": asyncio.run(demo())

ส่วนที่ 3: WebSocket Server ด้วย Node.js

Node.js WebSocket Server ทำหน้าที่รับคำขอจาก Frontend และส่งข้อมูล Replay แบบ Real-time รองรับการเชื่อมต่อหลาย Client พร้อมกัน

/**
 * Tardis WebSocket Server - Node.js
 * รับคำขอ Replay และส่งข้อมูลแบบ Real-time
 */
const WebSocket = require('ws');
const http = require('http');
const express = require('express');
const Redis = require('ioredis');
const axios = require('axios');

// Configuration
const PORT = process.env.PORT || 8080;
const PYTHON_API_URL = process.env.PYTHON_API_URL || 'http://localhost:5000';
const HOLYSHEEP_API_KEY = process.env.HOLYSHEEP_API_KEY || 'YOUR_HOLYSHEEP_API_KEY';
const HOLYSHEEP_BASE_URL = 'https://api.holysheep.ai/v1';

// Initialize Express & HTTP Server
const app = express();
app.use(express.json());
app.use(require('cors')());

const server = http.createServer(app);
const wss = new WebSocket.Server({ server });

// Redis clients (Publisher/Subscriber pattern)
const redisPub = new Redis({ host: 'localhost', port: 6379 });
const redisSub = new Redis({ host: 'localhost', port: 6379 });
const redisClient = new Redis({ host: 'localhost', port: 6379 });

// ติดตาม active connections และ sessions
const clients = new Map(); // clientId -> { ws, subscriptions }
const sessionClients = new Map(); // sessionId -> Set of clientIds

// WebSocket Connection Handler
wss.on('connection', (ws, req) => {
    const clientId = generateClientId();
    clients.set(clientId, { ws, subscriptions: new Set() });
    
    console.log([WS] Client connected: ${clientId} from ${req.socket.remoteAddress});
    
    ws.on('message', async (message) => {
        try {
            const data = JSON.parse(message);
            await handleMessage(clientId, data);
        } catch (error) {
            console.error([WS] Message error: ${error.message});
            sendError(ws, 'Invalid message format');
        }
    });
    
    ws.on('close', () => {
        handleDisconnect(clientId);
    });
    
    ws.on('error', (error) => {
        console.error([WS] Client error: ${error.message});
        handleDisconnect(clientId);
    });
    
    // ส่ง welcome message
    send(ws, {
        type: 'connected',
        clientId,
        timestamp: Date.now()
    });
});

/**
 * จัดการ WebSocket Messages
 */
async function handleMessage(clientId, data) {
    const client = clients.get(clientId);
    if (!client || !client.ws) return;
    
    switch (data.type) {
        case 'create_session':
            await handleCreateSession(clientId, data);
            break;
            
        case 'subscribe':
            await handleSubscribe(clientId, data);
            break;
            
        case 'unsubscribe':
            handleUnsubscribe(clientId, data);
            break;
            
        case 'pause':
            await handlePause(clientId, data);
            break;
            
        case 'resume':
            await handleResume(clientId, data);
            break;
            
        case 'speed_change':
            await handleSpeedChange(clientId, data);
            break;
            
        case 'analyze_with_ai':
            await handleAnalyzeWithAI(clientId, data);
            break;
            
        default:
            sendError(client.ws, Unknown message type: ${data.type});
    }
}

/**
 * สร้าง Replay Session ใหม่
 */
async function handleCreateSession(clientId, data) {
    const { session_id, symbol, exchange, interval, start_time, end_time, speed } = data;
    
    try {
        // เรียก Python API เพื่อสร้าง session
        const response = await axios.post(${PYTHON_API_URL}/api/replay/create, {
            session_id,
            symbol,
            exchange,
            interval: interval || '1m',
            start_time,
            end_time,
            speed: speed || 100
        });
        
        // Subscribe ไปยัง Redis channel
        await redisSub.subscribe(replay:${session_id});
        
        // ติดตาม client subscription
        client.subscriptions.add(session_id);
        if (!sessionClients.has(session_id)) {
            sessionClients.set(session_id, new Set());
        }
        sessionClients.get(session_id).add(clientId);
        
        send(client.ws, {
            type: 'session_created',
            session_id: response.data.session_id,
            status: 'active'
        });
        
        console.log([Session] Created: ${session_id} for client ${clientId});
        
    } catch (error) {
        console.error([Session] Create error: ${error.message});
        sendError(client.ws, Failed to create session: ${error.response?.data?.message || error.message});
    }
}

/**
 * Subscribe ไปยัง Session ที่มีอยู่
 */
async function handleSubscribe(clientId, data) {
    const { session_id } = data;
    
    await redisSub.subscribe(replay:${session_id});
    client.subscriptions.add(session_id);
    
    if (!sessionClients.has(session_id)) {
        sessionClients.set(session_id, new Set());
    }
    sessionClients.get(session_id).add(clientId);
    
    send(client.ws, {
        type: 'subscribed',
        session_id
    });
}

/**
 * วิเคราะห์ข้อมูลด้วย HolySheep AI
 */
async function handleAnalyzeWithAI(clientId, data) {
    const { session_id, tick_data, analysis_type } = data;
    
    try {
        // เรียก HolySheep AI API สำหรับวิเคราะห์ตลาด
        const response = await axios.post(
            ${HOLYSHEEP_BASE_URL}/chat/completions,
            {
                model: 'gpt-4.1',
                messages: [
                    {
                        role: 'system',
                        content: You are a market analysis AI assistant. Analyze the provided market data and identify patterns, trends, and potential trading signals.
                    },
                    {
                        role: 'user',
                        content: Analyze this market tick data for ${tick_data.symbol}:\n${JSON.stringify(tick_data, null, 2)}
                    }
                ],
                temperature: 0.3,
                max_tokens: 500
            },
            {
                headers: {
                    'Authorization': Bearer ${HOLYSHEEP_API_KEY},
                    'Content-Type': 'application/json'
                }
            }
        );
        
        send(client.ws, {