Cet article explore la construction d'un serveur de relecture pour données financières historiques. Bien que le "Tardis Machine" soit un concept populaire dans le domaine du trading algorithmique pour simuler le temps, concentrons-nous sur l'architecture technique réelle permettant de rejouer des données OHLCV (Open, High, Low, Close, Volume) pour le backtesting et la simulation.

Cas d'Usage : Équipe de Trading Algorithmique à Paris

Une équipe de quantitative research工作在巴黎的hedge fund需要一个本地回放服务器。他们之前的设置使用商业数据源,但成本高昂且延迟问题严重。

Architecture du Système

Le système se compose de trois couches principales :

Implémentation en Python

La couche d'ingestion gère le téléchargement et la normalisation des données historiques :

import sqlite3
import pandas as pd
from datetime import datetime, timedelta
import asyncio
import aiohttp

class HistoricalDataIngestion:
    def __init__(self, db_path: str = "market_data.db"):
        self.db_path = db_path
        self.init_database()
    
    def init_database(self):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS ohlcv (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                symbol TEXT NOT NULL,
                timestamp INTEGER NOT NULL,
                open REAL NOT NULL,
                high REAL NOT NULL,
                low REAL NOT NULL,
                close REAL NOT NULL,
                volume REAL NOT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                UNIQUE(symbol, timestamp)
            )
        ''')
        cursor.execute('''
            CREATE INDEX IF NOT EXISTS idx_symbol_timestamp 
            ON ohlcv(symbol, timestamp)
        ''')
        conn.commit()
        conn.close()
    
    async def fetch_candles(self, symbol: str, start_date: datetime, 
                            end_date: datetime, interval: str = "1h"):
        """Télécharge les bougies depuis une source de données"""
        # Mapping des intervalles en millisecondes
        interval_ms = {
            "1m": 60000, "5m": 300000, "15m": 900000,
            "1h": 3600000, "4h": 14400000, "1d": 86400000
        }
        
        start_ts = int(start_date.timestamp() * 1000)
        end_ts = int(end_date.timestamp() * 1000)
        
        candles = []
        current_ts = start_ts
        
        while current_ts < end_ts:
            # Exemple avec une source de données fictive
            url = f"https://api.example.com/v1/candles"
            params = {
                "symbol": symbol,
                "interval": interval,
                "startTime": current_ts,
                "endTime": min(current_ts + interval_ms[interval] * 1000, end_ts)
            }
            
            async with aiohttp.ClientSession() as session:
                async with session.get(url, params=params) as response:
                    if response.status == 200:
                        data = await response.json()
                        candles.extend(data.get("data", []))
            
            current_ts += interval_ms[interval] * 1000
            await asyncio.sleep(0.5)  # Rate limiting
        
        return candles
    
    def store_candles(self, symbol: str, candles: list):
        """Stocke les bougies dans SQLite"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        data = [
            (symbol, c["timestamp"], c["open"], c["high"], 
             c["low"], c["close"], c["volume"])
            for c in candles
        ]
        
        cursor.executemany('''
            INSERT OR REPLACE INTO ohlcv 
            (symbol, timestamp, open, high, low, close, volume)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        ''', data)
        
        conn.commit()
        rows_affected = cursor.rowcount
        conn.close()
        return rows_affected

Utilisation

ingestion = HistoricalDataIngestion() print(f"Base de données initialisée: {ingestion.db_path}")

Implémentation du Moteur de Replay en Node.js

Le serveur Node.js gère le replayer temporel avec support du mode simulation :

const express = require('express');
const sqlite3 = require('better-sqlite3');
const WebSocket = require('ws');

class TimeMachine {
    constructor(dbPath = './market_data.db') {
        this.db = new sqlite3(dbPath);
        this.subscribers = new Map();
        this.currentTimestamp = null;
        this.isPlaying = false;
        this.playbackSpeed = 1.0;
    }
    
    getCandles(symbol, startTime, endTime, interval = '1h') {
        const query = `
            SELECT 
                (timestamp / ?) * ? as bucket,
                MIN(low) as low,
                MAX(high) as high,
                AVG(open) as open,
                AVG(close) as close,
                SUM(volume) as volume,
                MIN(rowid) as first_id
            FROM ohlcv
            WHERE symbol = ?
              AND timestamp >= ?
              AND timestamp < ?
            GROUP BY bucket
            ORDER BY bucket
        `;
        
        const intervalMs = this.parseInterval(interval);
        const rows = this.db.prepare(query).all(
            intervalMs, intervalMs, symbol, startTime, endTime
        );
        
        return rows.map(row => ({
            timestamp: row.bucket,
            open: row.open,
            high: row.high,
            low: row.low,
            close: row.close,
            volume: row.volume
        }));
    }
    
    parseInterval(interval) {
        const units = {
            'm': 60000,
            'h': 3600000,
            'd': 86400000
        };
        const match = interval.match(/^(\d+)([mhd])$/);
        if (!match) return 3600000;
        return parseInt(match[1]) * units[match[2]];
    }
    
    startPlayback(symbol, startTime, endTime, speed = 1.0) {
        this.isPlaying = true;
        this.playbackSpeed = speed;
        this.currentTimestamp = startTime;
        
        const candles = this.getCandles(symbol, startTime, endTime);
        let index = 0;
        
        const playNext = () => {
            if (index < candles.length && this.isPlaying) {
                const candle = candles[index];
                this.broadcast(symbol, candle);
                this.currentTimestamp = candle.timestamp;
                index++;
                
                const delay = 1000 / this.playbackSpeed;
                setTimeout(playNext, Math.max(delay, 10));
            }
        };
        
        playNext();
    }
    
    stopPlayback() {
        this.isPlaying = false;
    }
    
    broadcast(symbol, candle) {
        const message = JSON.stringify({
            type: 'candle',
            symbol,
            data: candle
        });
        
        this.subscribers.forEach(ws => {
            if (ws.readyState === WebSocket.OPEN) {
                ws.send(message);
            }
        });
    }
    
    subscribe(ws) {
        const id = Date.now();
        this.subscribers.set(id, ws);
        return id;
    }
    
    unsubscribe(id) {
        this.subscribers.delete(id);
    }
}

// Configuration du serveur
const app = express();
const wss = new WebSocket.Server({ port: 8080 });
const timeMachine = new TimeMachine();

app.use(express.json());

app.get('/api/candles', (req, res) => {
    const { symbol, start, end, interval } = req.query;
    
    if (!symbol || !start || !end) {
        return res.status(400).json({ 
            error: 'Paramètres requis: symbol, start, end' 
        });
    }
    
    try {
        const candles = timeMachine.getCandles(
            symbol,
            parseInt(start),
            parseInt(end),
            interval || '1h'
        );
        res.json({ candles });
    } catch (error) {
        res.status(500).json({ error: error.message });
    }
});

app.post('/api/playback/start', (req, res) => {
    const { symbol, start, end, speed } = req.body;
    timeMachine.startPlayback(symbol, start, end, speed || 1.0);
    res.json({ status: 'started' });
});

app.post('/api/playback/stop', (req, res) => {
    timeMachine.stopPlayback();
    res.json({ status: 'stopped' });
});

wss.on('connection', (ws) => {
    const subscriberId = timeMachine.subscribe(ws);
    
    ws.send(JSON.stringify({
        type: 'subscribed',
        subscriberId
    }));
    
    ws.on('close', () => {
        timeMachine.unsubscribe(subscriberId);
    });
});

console.log('Serveur TimeMachine écoute sur http://localhost:3000');
console.log('WebSocket disponible sur ws://localhost:8080');
app.listen(3000);

Client de Backtesting

Un client simple pour consommer les données rejouées :

const WebSocket = require('ws');

class BacktestClient {
    constructor(wsUrl = 'ws://localhost:8080') {
        this.wsUrl = wsUrl;
        this.ws = null;
        this.strategies = [];
        this.trades = [];
    }
    
    connect() {
        return new Promise((resolve, reject) => {
            this.ws = new WebSocket(this.wsUrl);
            
            this.ws.on('open', () => {
                console.log('Connecté au TimeMachine');
                resolve();
            });
            
            this.ws.on('message', (data) => {
                const message = JSON.parse(data);
                this.handleMessage(message);
            });
            
            this.ws.on('error', reject);
        });
    }
    
    handleMessage(message) {
        if (message.type === 'candle') {
            this.onCandle(message.symbol, message.data);
        } else if (message.type === 'subscribed') {
            console.log(Abonné: ${message.subscriberId});
        }
    }
    
    onCandle(symbol, candle) {
        for (const strategy of this.strategies) {
            strategy.evaluate(symbol, candle);
        }
    }
    
    registerStrategy(strategy) {
        this.strategies.push(strategy);
    }
    
    disconnect() {
        if (this.ws) {
            this.ws.close();
        }
    }
}

// Exemple de stratégie simple
class MovingAverageCrossover {
    constructor(shortPeriod = 10, longPeriod = 20) {
        this.shortPeriod = shortPeriod;
        this.longPeriod = longPeriod;
        this.prices = {};
    }
    
    evaluate(symbol, candle) {
        if (!this.prices[symbol]) {
            this.prices[symbol] = [];
        }
        
        this.prices[symbol].push(candle.close);
        
        if (this.prices[symbol].length > this.longPeriod) {
            const shortMA = this.calculateMA(
                this.prices[symbol].slice(-this.shortPeriod)
            );
            const longMA = this.calculateMA(
                this.prices[symbol].slice(-this.longPeriod)
            );
            
            console.log(${symbol}: MA${this.shortPeriod}=${shortMA.toFixed(2)}, 
                + MA${this.longPeriod}=${longMA.toFixed(2)});
        }
    }
    
    calculateMA(prices) {
        return prices.reduce((a, b) => a + b, 0) / prices.length;
    }
}

// Utilisation
const client = new BacktestClient();

async function run() {
    await client.connect();
    client.registerStrategy(new MovingAverageCrossover(10, 20));
    
    // Démarrer le playback via API REST
    const fetch = await import('node:http');
    
    const start = Math.floor(Date.now() / 1000) - 86400 * 30; // 30 jours
    const end = Math.floor(Date.now() / 1000);
    
    const req = fetch.request({
        hostname: 'localhost',
        port: 3000,
        path: '/api/playback/start',
        method: 'POST',
        headers: { 'Content-Type': 'application/json' }
    }, (res) => {
        console.log(Playback started: ${res.statusCode});
    });
    
    req.write(JSON.stringify({
        symbol: 'BTCUSD',
        start: start * 1000,
        end: end * 1000,
        speed: 60
    }));
    req.end();
}

run().catch(console.error);

Considérations de Performance

Pour optimiser les performances du serveur local :

Erreurs Courantes et Solutions

ErreurCauseSolution
SQLite lockedAccès concurrent en écritureActiver WAL mode: PRAGMA journal_mode=WAL
MemoryErrorDonnées trop volumineusesImplémenter pagination et streaming
Timestamp mismatchFormat date incohérentNormaliser en millisecondes UTC

Conclusion

Ce serveur de replay permet de backtester des stratégies de trading avec des données historiques locales. L'architecture Python/Node.js offre flexibilité pour l'ingestion et performance pour la distribution en temps réel via WebSocket.