Dans cet article, je vais partager mon retour d'expérience complet sur la construction d'un data warehouse dédié aux données historiques de cryptomonnaies. Après avoir testé plusieurs architectures pendant 6 mois sur des projets de trading algorithmique, j'ai trouvé que la combinaison ClickHouse + APIs d'échanges constitue la solution la plus robuste et performante. Je détaillerai chaque étape, des choix d'infrastructure aux optimisations de requêtes, en passant par la gestion des erreurs courantes que j'ai rencontrées en production.

为什么选择 ClickHouse 存储加密货币数据

Le choix de ClickHouse pour stocker des données de cryptomonnaies n'est pas anodin. Avec des volumes de ticks pouvant atteindre plusieurs millions de lignes par seconde sur les principales plateformes, la capacité de compression native ( jusqu'à 10:1 sur des données OHLCV) et les performances de requête agrégée (moins de 100ms sur 1 milliard de lignes) font la différence. J'ai comparé cette solution avec TimescaleDB et InfluxDB : ClickHouse surpasse clairement sur les workloads analytiques massifs.

Architecture du système

L'architecture que j'ai déployée comprend trois composants principaux. Premièrement, les collecteurs Python qui interrogent les APIs REST et WebSocket des exchanges. Deuxièmement, une instance ClickHouse Auto-hebergée (ou ClickHouse Cloud) avec replication pour la haute disponibilité. Troisièmement, un service de transformation des données en tables MergeTree optimisées pour les analyses temporelles.

Configuration initiale de ClickHouse

La création du schéma de base de données est critique pour les performances. Je recommande fortement d'utiliser le moteur MergeTree avec un partitionnement par jour et un tri par (symbol, timestamp). Pour les données tick-by-tick, le moteur ReplacingMergeTree peut être utilisé pour dédupliquer automatiquement les entrées.

CREATE DATABASE IF NOT EXISTS crypto_data;

CREATE TABLE crypto_data.ohlcv_1m
(
    symbol String,
    timestamp DateTime64(3),
    open Decimal(18, 8),
    high Decimal(18, 8),
    low Decimal(18, 8),
    close Decimal(18, 8),
    volume Decimal(18, 8),
    quote_volume Decimal(18, 8),
    trades UInt32,
    inserted_at DateTime DEFAULT now()
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (symbol, timestamp)
TTL timestamp + INTERVAL 2 YEAR
SETTINGS index_granularity = 8192;

-- Table pour les trades individuels (tick data)
CREATE TABLE crypto_data.trades
(
    trade_id String,
    symbol String,
    timestamp DateTime64(3),
    price Decimal(18, 8),
    quantity Decimal(18, 8),
    side Enum8('buy' = 1, 'sell' = 2),
    is_maker UInt8
)
ENGINE = ReplacingMergeTree(trade_id)
ORDER BY (symbol, timestamp)
SETTINGS index_granularity = 8192;

-- Table pour les orderbooks snapshot
CREATE TABLE crypto_data.orderbook_snapshots
(
    symbol String,
    timestamp DateTime64(3),
    bids String,
    asks String,
    level_depth UInt8
)
ENGINE = MergeTree()
ORDER BY (symbol, timestamp)
SAMPLE BY timestamp;

Collecteur Python pour Binance API

Le code suivant implémente un collecteur robuste pour l'API Binance. J'ai ajouté une gestion des Rate Limits, une reconnectique automatique sur failure, et un batching des insertions pour optimiser les performances ClickHouse. Ce script fonctionne 24/7 en production sur mon infrastructure.

# crypto_collector.py
import asyncio
import aiohttp
import clickhouse_driver
from datetime import datetime
from typing import List, Dict
import logging
import signal
import sys

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class BinanceCollector:
    def __init__(self, symbols: List[str], clickhouse_host: str = "localhost"):
        self.symbols = [s.upper() for s in symbols]
        self.base_url = "https://api.binance.com/api/v3"
        self.session = None
        self.client = clickhouse_driver.Client(host=clickhouse_host)
        self.running = True
        self.batch_size = 500
        self.batch = []
        
    async def fetch_ohlcv(self, symbol: str, interval: str = "1m", limit: int = 1000):
        """Récupère les données OHLCV historiques"""
        url = f"{self.base_url}/klines"
        params = {"symbol": symbol, "interval": interval, "limit": limit}
        
        async with self.session.get(url, params=params) as resp:
            if resp.status == 429:
                logger.warning(f"Rate limit atteint pour {symbol}, attente 60s")
                await asyncio.sleep(60)
                return await self.fetch_ohlcv(symbol, interval, limit)
            
            if resp.status != 200:
                logger.error(f"Erreur API {resp.status} pour {symbol}")
                return []
                
            data = await resp.json()
            return self._parse_klines(data, symbol)
    
    def _parse_klines(self, data: List, symbol: str) -> List[Dict]:
        """Parse les données klines Binance"""
        parsed = []
        for k in data:
            parsed.append({
                'symbol': symbol,
                'timestamp': datetime.fromtimestamp(k[0] / 1000),
                'open': float(k[1]),
                'high': float(k[2]),
                'low': float(k[3]),
                'close': float(k[4]),
                'volume': float(k[5]),
                'quote_volume': float(k[7]),
                'trades': int(k[8])
            })
        return parsed
    
    def insert_batch(self, data: List[Dict]):
        """Insère un lot de données dans ClickHouse"""
        if not data:
            return
            
        query = """INSERT INTO crypto_data.ohlcv_1m 
                   (symbol, timestamp, open, high, low, close, volume, quote_volume, trades) 
                   VALUES"""