金融アルゴリズムのバックテストにおいて、「HistoricalDataNotFoundError: timestamp 2024-03-15T09:30:00Z not available」というエラーに遭遇したことがあるだろうか。この記事を書いている私も、実際のヘッジファンドでのクオンツ開発時代に、この問題で深夜まで対応した経験がある。本番環境のリアルタイム данные とテスト環境の историческая база の不一致导致的再現困難なバグは、致命的だ。
本稿では、「Tardis Machine」——過去に戻って行情データを自在に再生するローカルリプレイサーバ——をPythonとNode.jsで構築する方法を解説する。
なぜローカルリプレイサーバが必要か
リアルタイム市場データへの依存を排除し、確定的なバックテスト環境を構築することは、アルファ運用において至关重要である。ローカルリプレイサーバ主要有以下の利点がある:
- ネットワーク遅延やAPI制限の影響を受けない
- 複数回同じ条件でバックテストを再実行可能
- 特定の日時の異常値を繰り返し検証できる
- 機密データを外部に送信せずに分析可能
アーキテクチャ概要
リプレイサーバは以下3つのコンポーネントで構成される:
- データストア層:SQLite/Parquet形式での行情データ永続化
- リプレイエンジン:指定時間に基づいてデータを逐次配信
- APIゲートウェイ:WebSocket/RESTでクライアントにデータ供給
Python実装:コアリプレイエンジン
import sqlite3
import asyncio
import json
from datetime import datetime, timedelta
from typing import Optional, Callable, Dict, List
from dataclasses import dataclass
import heapq
@dataclass
class TickData:
timestamp: datetime
symbol: str
price: float
volume: int
bid: float
ask: float
class TardisReplayServer:
"""過去行情データのリプレイサーバ
使用例:
server = TardisReplayServer("./market_data.db")
await server.start(start_time="2024-03-15T09:30:00", speed=1.0)
"""
def __init__(self, db_path: str, port: int = 8765):
self.db_path = db_path
self.port = port
self.conn: Optional[sqlite3.Connection] = None
self.subscribers: List[Callable] = []
self.is_running = False
self.current_time: Optional[datetime] = None
self.speed: float = 1.0
self._tick_buffer: List[TickData] = []
self._last_flush = datetime.now()
def connect(self) -> None:
"""データベース接続を確立"""
try:
self.conn = sqlite3.connect(
self.db_path,
check_same_thread=False,
timeout=30.0
)
# パフォーマンス最適化
self.conn.execute("PRAGMA journal_mode=WAL")
self.conn.execute("PRAGMA synchronous=NORMAL")
self.conn.execute("PRAGMA cache_size=-64000") # 64MBキャッシュ
print(f"✅ Connected to {self.db_path}")
except sqlite3.OperationalError as e:
raise ConnectionError(f"Database connection failed: {e}")
def load_ticks(
self,
symbol: str,
start_time: datetime,
end_time: datetime
) -> int:
"""指定期間のティックデータをメモリにロード"""
query = """
SELECT timestamp, symbol, price, volume, bid, ask
FROM ticks
WHERE symbol = ?
AND timestamp >= ?
AND timestamp <= ?
ORDER BY timestamp ASC
"""
cursor = self.conn.execute(
query,
(symbol, start_time.isoformat(), end_time.isoformat())
)
self._tick_buffer = [
TickData(
timestamp=datetime.fromisoformat(row[0]),
symbol=row[1],
price=row[2],
volume=row[3],
bid=row[4],
ask=row[5]
)
for row in cursor.fetchall()
]
# ヒープ化して時間順アクセスを最適化
heapq.heapify(self._tick_buffer)
return len(self._tick_buffer)
async def replay_loop(self) -> None:
"""リプレイメインループ"""
self.is_running = True
last_tick_time = None
while self.is_running and self._tick_buffer:
# ヒープから最小時間のティックを取得
if not self._tick_buffer:
break
next_tick = heapq.heappop(self._tick_buffer)
# 時間待機(リプレイ速度調整)
if last_tick_time:
elapsed = (next_tick.timestamp - last_tick_time).total_seconds()
wait_time = elapsed / self.speed
if wait_time > 0:
await asyncio.sleep(min(wait_time, 60.0)) # 最大60秒
self.current_time = next_tick.timestamp
last_tick_time = next_tick.timestamp
# 全購読者へ配信
await self._broadcast(next_tick)
async def _broadcast(self, tick: TickData) -> None:
"""全購読者へティックデータを配信"""
message = json.dumps({
"type": "tick",
"data": {
"symbol": tick.symbol,
"price": tick.price,
"volume": tick.volume,
"bid": tick.bid,
"ask": tick.ask,
"timestamp": tick.timestamp.isoformat()
}
})
for callback in self.subscribers:
try:
if asyncio.iscoroutinefunction(callback):
await callback(message)
else:
callback(message)
except Exception as e:
print(f"Subscriber error: {e}")
def subscribe(self, callback: Callable) -> None:
"""行情データ購読者登録"""
self.subscribers.append(callback)
def stop(self) -> None:
"""リプレイ停止"""
self.is_running = False
print("🛑 Replay server stopped")
def get_status(self) -> Dict:
"""現在の状態を返す"""
return {
"is_running": self.is_running,
"current_time": self.current_time.isoformat() if self.current_time else None,
"buffer_remaining": len(self._tick_buffer),
"subscribers": len(self.subscribers),
"speed": self.speed
}
使用例
async def main():
server = TardisReplayServer("./data/eq_us_2024.db")
server.connect()
# 2024年3月15日のIBM株データをロード
count = server.load_ticks(
symbol="IBM",
start_time=datetime(2024, 3, 15, 9, 30),
end_time=datetime(2024, 3, 15, 16, 0)
)
print(f"Loaded {count} ticks")
# 2倍速で再生
server.speed = 2.0
# データ購読者登録
async def on_tick(message):
data = json.loads(message)
print(f"[{data['data']['timestamp']}] {data['data']['symbol']}: ${data['data']['price']}")
server.subscribe(on_tick)
# リプレイ開始
try:
await server.replay_loop()
except KeyboardInterrupt:
server.stop()
if __name__ == "__main__":
asyncio.run(main())
Node.js実装:WebSocket APIゲートウェイ
const WebSocket = require('ws');
const sqlite3 = require