金融アルゴリズムのバックテストにおいて、「HistoricalDataNotFoundError: timestamp 2024-03-15T09:30:00Z not available」というエラーに遭遇したことがあるだろうか。この記事を書いている私も、実際のヘッジファンドでのクオンツ開発時代に、この問題で深夜まで対応した経験がある。本番環境のリアルタイム данные とテスト環境の историческая база の不一致导致的再現困難なバグは、致命的だ。

本稿では、「Tardis Machine」——過去に戻って行情データを自在に再生するローカルリプレイサーバ——をPythonとNode.jsで構築する方法を解説する。

なぜローカルリプレイサーバが必要か

リアルタイム市場データへの依存を排除し、確定的なバックテスト環境を構築することは、アルファ運用において至关重要である。ローカルリプレイサーバ主要有以下の利点がある:

アーキテクチャ概要

リプレイサーバは以下3つのコンポーネントで構成される:

Python実装:コアリプレイエンジン

import sqlite3
import asyncio
import json
from datetime import datetime, timedelta
from typing import Optional, Callable, Dict, List
from dataclasses import dataclass
import heapq

@dataclass
class TickData:
    timestamp: datetime
    symbol: str
    price: float
    volume: int
    bid: float
    ask: float

class TardisReplayServer:
    """過去行情データのリプレイサーバ
    
    使用例:
        server = TardisReplayServer("./market_data.db")
        await server.start(start_time="2024-03-15T09:30:00", speed=1.0)
    """
    
    def __init__(self, db_path: str, port: int = 8765):
        self.db_path = db_path
        self.port = port
        self.conn: Optional[sqlite3.Connection] = None
        self.subscribers: List[Callable] = []
        self.is_running = False
        self.current_time: Optional[datetime] = None
        self.speed: float = 1.0
        self._tick_buffer: List[TickData] = []
        self._last_flush = datetime.now()
        
    def connect(self) -> None:
        """データベース接続を確立"""
        try:
            self.conn = sqlite3.connect(
                self.db_path,
                check_same_thread=False,
                timeout=30.0
            )
            # パフォーマンス最適化
            self.conn.execute("PRAGMA journal_mode=WAL")
            self.conn.execute("PRAGMA synchronous=NORMAL")
            self.conn.execute("PRAGMA cache_size=-64000")  # 64MBキャッシュ
            print(f"✅ Connected to {self.db_path}")
        except sqlite3.OperationalError as e:
            raise ConnectionError(f"Database connection failed: {e}")
    
    def load_ticks(
        self, 
        symbol: str, 
        start_time: datetime, 
        end_time: datetime
    ) -> int:
        """指定期間のティックデータをメモリにロード"""
        query = """
            SELECT timestamp, symbol, price, volume, bid, ask
            FROM ticks
            WHERE symbol = ? 
            AND timestamp >= ? 
            AND timestamp <= ?
            ORDER BY timestamp ASC
        """
        cursor = self.conn.execute(
            query, 
            (symbol, start_time.isoformat(), end_time.isoformat())
        )
        
        self._tick_buffer = [
            TickData(
                timestamp=datetime.fromisoformat(row[0]),
                symbol=row[1],
                price=row[2],
                volume=row[3],
                bid=row[4],
                ask=row[5]
            )
            for row in cursor.fetchall()
        ]
        
        # ヒープ化して時間順アクセスを最適化
        heapq.heapify(self._tick_buffer)
        return len(self._tick_buffer)
    
    async def replay_loop(self) -> None:
        """リプレイメインループ"""
        self.is_running = True
        last_tick_time = None
        
        while self.is_running and self._tick_buffer:
            # ヒープから最小時間のティックを取得
            if not self._tick_buffer:
                break
                
            next_tick = heapq.heappop(self._tick_buffer)
            
            # 時間待機(リプレイ速度調整)
            if last_tick_time:
                elapsed = (next_tick.timestamp - last_tick_time).total_seconds()
                wait_time = elapsed / self.speed
                if wait_time > 0:
                    await asyncio.sleep(min(wait_time, 60.0))  # 最大60秒
            
            self.current_time = next_tick.timestamp
            last_tick_time = next_tick.timestamp
            
            # 全購読者へ配信
            await self._broadcast(next_tick)
    
    async def _broadcast(self, tick: TickData) -> None:
        """全購読者へティックデータを配信"""
        message = json.dumps({
            "type": "tick",
            "data": {
                "symbol": tick.symbol,
                "price": tick.price,
                "volume": tick.volume,
                "bid": tick.bid,
                "ask": tick.ask,
                "timestamp": tick.timestamp.isoformat()
            }
        })
        
        for callback in self.subscribers:
            try:
                if asyncio.iscoroutinefunction(callback):
                    await callback(message)
                else:
                    callback(message)
            except Exception as e:
                print(f"Subscriber error: {e}")
    
    def subscribe(self, callback: Callable) -> None:
        """行情データ購読者登録"""
        self.subscribers.append(callback)
    
    def stop(self) -> None:
        """リプレイ停止"""
        self.is_running = False
        print("🛑 Replay server stopped")
    
    def get_status(self) -> Dict:
        """現在の状態を返す"""
        return {
            "is_running": self.is_running,
            "current_time": self.current_time.isoformat() if self.current_time else None,
            "buffer_remaining": len(self._tick_buffer),
            "subscribers": len(self.subscribers),
            "speed": self.speed
        }

使用例

async def main(): server = TardisReplayServer("./data/eq_us_2024.db") server.connect() # 2024年3月15日のIBM株データをロード count = server.load_ticks( symbol="IBM", start_time=datetime(2024, 3, 15, 9, 30), end_time=datetime(2024, 3, 15, 16, 0) ) print(f"Loaded {count} ticks") # 2倍速で再生 server.speed = 2.0 # データ購読者登録 async def on_tick(message): data = json.loads(message) print(f"[{data['data']['timestamp']}] {data['data']['symbol']}: ${data['data']['price']}") server.subscribe(on_tick) # リプレイ開始 try: await server.replay_loop() except KeyboardInterrupt: server.stop() if __name__ == "__main__": asyncio.run(main())

Node.js実装:WebSocket APIゲートウェイ

const WebSocket = require('ws');
const sqlite3 = require