Il est 3h47 du matin quand mon téléphone vibre. Slack explode de messages du service trading : « Les dashboards sont complètement bloqués ». Je me connecte en catastrophe, les yeux encore ensommeillés, et je tombe sur une erreur qui me glace le sang : ConnectionResetError: [Errno 104] Connection reset by peer. Des mois de données tick-by-tick, des années d'historique BTC/USD, tout s'effondre à cause d'une simple mauvaise gestion des reconnexions WebSocket.
Ce tutoriel est le fruit de cette nuit blanche. Aujourd'hui, je vais vous montrer comment construire un data warehouse crypto industriel avec ClickHouse, capable d'ingérer des millions de rows par seconde, de gérer les pertes de connexion gracieusement, et de vous éviter de recevoir ce genre d'alertes à 3h du matin.
Architecture du Système
Notre architecture s'articule autour de trois piliers fondamentaux : ClickHouse comme colonne vertébrale analytique, les WebSocket streams des exchanges pour le temps réel, et une couche de recovery intelligente pour la tolérance aux pannes.
+-------------------+ +-------------------+ +-------------------+
| Binance WS | | Coinbase Pro | | Kraken WS |
| wss://stream | | wss://ws-feed | | wss://ws |
+--------+----------+ +--------+----------+ +--------+----------+
| | |
+---------------------------+---------------------------+
|
+-------v-------+
| Kafka/Redis |
| Buffering |
+-------+-------+
|
+-------v-------+
| ClickHouse |
| Warehouse |
+-------+-------+
|
+-------v-------+
| Dashboards |
| Grafana/Met |
+---------------+
Installation et Configuration de ClickHouse
ClickHouse n'est pas une base de données comme les autres. Elle excelle dans le traitement analytique de volumes massifs avec des latences de l'ordre de la milliseconde. Pour un data warehouse crypto, c'est le choix industriel par excellence.
# Installation ClickHouse sur Ubuntu 22.04
sudo apt-get update && sudo apt-get install -y apt-transport-https ca-certificates dirmngr
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 8919F6BD2B48D754
echo "deb https://packages.clickhouse.com/deb stable main" | \
sudo tee /etc/apt/sources.list.d/clickhouse.list
sudo apt-get update && sudo apt-get install -y clickhouse-server clickhouse-client
Configuration mémoire et performance
sudo cat > /etc/clickhouse-server/config.d/custom_config.xml << 'EOF'
<?xml version="1.0"?>
<clickhouse>
<max_memory_usage>64382566400</max_memory_usage> <!-- 60GB RAM -->
<max_threads>32</max_threads>
<use_uncompressed_cache>0</use_uncompressed_cache>
<mark_cache_size>5368709120</mark_cache_size>
<_QUERY>
<max_execution_time>300</max_execution_time>
<max_rows_to_read>10000000000</max_rows_to_read>
</QUERY>
</clickhouse>
EOF
sudo service clickhouse-server start
clickhouse-client -h 127.0.0.1
La configuration ci-dessus alloue 60GB de RAM et 32 threads pour des performances optimales sur un serveur dédié. Si vous êtes sur une instance plus modeste (8 vCPU, 32GB RAM), réduisez proportionnellement.
-- Création du database et des tables optimisées pour le trading crypto
CREATE DATABASE IF NOT EXISTS crypto_warehouse;
-- Table principale pour les trades agrégés (OHLCV)
CREATE TABLE IF NOT EXISTS crypto_warehouse.ohlcv_1m (
symbol String,
timeframe String,
timestamp DateTime('UTC'),
open Decimal(18, 8),
high Decimal(18, 8),
low Decimal(18, 8),
close Decimal(18, 8),
volume Decimal(18, 8),
quote_volume Decimal(18, 8),
trades UInt32,
buy_volume Decimal(18, 8),
created_at DateTime DEFAULT now()
) ENGINE = MergeTree()
PARTITION BY (toYYYYMM(timestamp), symbol)
ORDER BY (symbol, timeframe, timestamp)
TTL timestamp + INTERVAL 365 DAY
SETTINGS index_granularity = 8192;
-- Table pour les trades individuels (high-frequency)
CREATE TABLE IF NOT EXISTS crypto_warehouse.trades_raw (
id UInt64,
symbol String,
price Decimal(18, 8),
quantity Decimal(18, 8),
quote_quantity Decimal(18, 8),
timestamp DateTime64(3, 'UTC'),
is_buyer_maker UInt8,
is_best_match UInt8
) ENGINE = ReplacingMergeTree(timestamp)
ORDER BY (symbol, timestamp, id)
SETTINGS index_granularity = 8192;
-- Table materialized pour orderbook snapshots
CREATE TABLE IF NOT EXISTS crypto_warehouse.orderbook_snapshots (
symbol String,
timestamp DateTime('UTC'),
bids Array(Tuple(Decimal(18, 8), Decimal(18, 8))),
asks Array(Tuple(Decimal(18, 8), Decimal(18, 8))),
last_update_id UInt64
) ENGINE = MergeTree()
ORDER BY (symbol, timestamp)
SETTINGS index_granularity = 8192;
-- Vue materialisée pour calculs en temps réel
CREATE MATERIALIZED VIEW crypto_warehouse.mv_1h_agg
ENGINE = SummingMergeTree()
PARTITION BY (toYYYYMM(timestamp), symbol)
ORDER BY (symbol, timestamp)
AS SELECT
symbol,
toStartOfHour(timestamp) as timestamp,
argMax(open, timestamp) as open,
max(high) as high,
minIf(low, timestamp > 0) as low,
argMax(close, timestamp) as close,
sum(volume) as volume,
sum(quote_volume) as quote_volume,
sum(trades) as trades
FROM crypto_warehouse.ohlcv_1m
GROUP BY symbol, toStartOfHour(timestamp);
Connexion aux APIs d'Échanges
Chaque exchange a ses particularités. Binance offre les meilleures performances avec son endpoint wss://stream.binance.com:9443, mais Coinbase est plus fiable pour les données réglementées américaines. Kraken reste le champion de la liquidité EUR.
# Installation des dépendances Python
pip install websockets pandas numpy clickhouse-driver aiohttp asyncio-redis
crypto_data_collector.py
import asyncio
import aiohttp
import websockets
import json
import hmac
import hashlib
import time
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from clickhouse_driver import Client
from collections import deque
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Trade:
id: int
symbol: str
price: float
quantity: float
quote_quantity: float
timestamp: datetime
is_buyer_maker: bool
@dataclass
class OHLCV:
symbol: str
timeframe: str
timestamp: datetime
open: float
high: float
low: float
close: float
volume: float
quote_volume: float
trades: int
class ClickHouseWriter:
def __init__(self, host: str = 'localhost', database: str = 'crypto_warehouse'):
self.client = Client(host=host, database=database, compression='lz4')
self.batch_size = 5000
self.trade_buffer = deque(maxlen=100000)
self.ohlcv_buffer = deque(maxlen=50000)
self.last_flush = time.time()
def insert_trades(self, trades: List[Trade]):
if not trades:
return
query = 'INSERT INTO crypto_warehouse.trades_raw VALUES'
data = [
(t.id, t.symbol, t.price, t.quantity, t.quote_quantity,
t.timestamp, int(t.is_buyer_maker), 0)
for t in trades
]
try:
self.client.execute(query, data)
logger.info(f"Inserted {len(trades)} trades")
except Exception as e:
logger.error(f"Trade insert error: {e}")
# Retry with smaller batch
for i in range(0, len(data), 1000):
self.client.execute(query, data[i:i+1000])
def insert_ohlcv(self, ohlcvs: List[OHLCV]):
if not ohlcvs:
return
query = '''INSERT INTO crypto_warehouse.ohlcv_1m
(symbol, timeframe, timestamp, open, high, low, close,
volume, quote_volume, trades) VALUES'''
data = [
(o.symbol, o.timeframe, o.timestamp, o.open, o.high, o.low,
o.close, o.volume, o.quote_volume, o.trades)
for o in ohlcvs
]
try:
self.client.execute(query, data)
logger.info(f"Inserted {len(ohlcv)} OHLCV candles")
except Exception as e:
logger.error(f"OHLCV insert error: {e}")
class BinanceWebSocket:
def __init__(self, symbols: List[str], writer: ClickHouseWriter):
self.base_url = "wss://stream.binance.com:9443/ws"
self.symbols = [s.lower() for s in symbols]
self.writer = writer
self.reconnect_delay = 1
self.max_reconnect_delay = 60
self._running = True
self.trade_buffers: Dict[str, deque] = {
s: deque(maxlen=10000) for s in symbols
}
async def get_combined_stream_url(self) -> str:
streams = []
for symbol in self.symbols:
streams.append(f"{symbol}@aggTrade")
streams.append(f"{symbol}@depth20@100ms")
return f"{self.base_url}/{'/'.join(streams)}"
async def process_agg_trade(self, msg: dict):
"""Traite un trade agrégé de Binance"""
symbol = msg['s']
trade = Trade(
id=int(msg['a']),
symbol=symbol,
price=float(msg['p']),
quantity=float(msg['q']),
quote_quantity=float(msg['p']) * float(msg['q']),
timestamp=datetime.utcfromtimestamp(msg['T'] / 1000),
is_buyer_maker=msg['m']
)
self.trade_buffers[symbol].append(trade)
# Flush toutes les 5 secondes ou 5000 trades
now = time.time()
if now - self.writer.last_flush > 5:
all_trades = []
for buf in self.trade_buffers.values():
all_trades.extend(buf)
buf.clear()
if all_trades:
self.writer.insert_trades(all_trades)
self.writer.last_flush = now
async def run(self):
while self._running:
try:
url = await self.get_combined_stream_url()
logger.info(f"Connecting to Binance WebSocket: {url}")
async with websockets.connect(url, ping_interval=20) as ws:
self.reconnect_delay = 1 # Reset on successful connection
logger.info("Binance WebSocket connected successfully")
while self._running:
try:
msg = await asyncio.wait_for(ws.recv(), timeout=30)
data = json.loads(msg)
if data.get('e') == 'aggTrade':
await self.process_agg_trade(data)
elif data.get('lastUpdateId'):
# Orderbook snapshot
await self.process_orderbook(data)
except asyncio.TimeoutError:
# Ping Keepalive
await ws.ping()
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"Binance connection closed: {e.code} {e.reason}")
await self._reconnect()
except Exception as e:
logger.error(f"Binance WebSocket error: {e}")
await self._reconnect()
async def _reconnect(self):
logger.info(f"Reconnecting in {self.reconnect_delay}s...")
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
class ExchangeAPIClient:
"""Client REST pour requêtage historique avec rate limiting intelligent"""
def __init__(self, base_url: str = "https://api.binance.com"):
self.base_url = base_url
self.rate_limit = 1200 # requests per minute
self.request_timestamps = deque(maxlen=self.rate_limit)
self.session: Optional[aiohttp.ClientSession] = None
async def _rate_limit(self):
now = time.time()
# Nettoyer les timestamps vieux de 1 minute
while self.request_timestamps and now - self.request_timestamps[0] > 60:
self.request_timestamps.popleft()
if len(self.request_timestamps) >= self.rate_limit:
sleep_time = 60 - (now - self.request_timestamps[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.request_timestamps.append(time.time())
async def fetch_klines(self, symbol: str, interval: str,
start_time: int = None, limit: int = 1000) -> List[List]:
"""Récupère les données OHLCV historiques"""
await self._rate_limit()
if not self.session:
self.session = aiohttp.ClientSession()
params = {
'symbol': symbol.upper(),
'interval': interval,
'limit': limit
}
if start_time:
params['startTime'] = start_time
url = f"{self.base_url}/api/v3/klines"
async with self.session.get(url, params=params) as resp:
if resp.status == 429:
logger.warning("Rate limited, backing off...")
await asyncio.sleep(60)
return await self.fetch_klines(symbol, interval, start_time, limit)
resp.raise_for_status()
return await resp.json()
async def fetch_historical_backfill(self, symbol: str, start_date: datetime):
"""Backfill historique avec gestion intelligente des limites"""
start_time = int(start_date.timestamp() * 1000)
all_klines = []
oldest_timestamp = start_time
logger.info(f"Starting historical backfill for {symbol} from {start_date}")
while True:
klines = await self.fetch_klines(
symbol=symbol,
interval='1m',
start_time=oldest_timestamp,
limit=1000
)
if not klines:
break
all_klines.extend(klines)
oldest_timestamp = int(klines[-1][0]) + 1
logger.info(f"Fetched {len(klines)} klines, total: {len(all_klines)}")
# Respecter les limites Binance (1200 req/min, 1 req chaque 50ms min)
await asyncio.sleep(0.1)
return all_klines
Intégration avec les APIs HolySheep pour l'Analyse IA
Une fois vos données stockées dans ClickHouse, vous pouvez utiliser l'API HolySheep pour effectuer des analyses avancées via IA. La latence inférieure à 50ms de HolySheep rend les requêtes analytiques quasi-instantanées.
# holy_analysis.py - Analyse crypto alimentée par IA
import aiohttp
import asyncio
import json
from datetime import datetime, timedelta
from typing import List, Dict
from clickhouse_driver import Client
class CryptoAnalysisEngine:
"""
Moteur d'analyse utilisant l'API HolySheep pour le NLP financier.
Taux de change: ¥1 = $1 (économie 85%+ vs alternatives)
"""
BASE_URL = "https://api.holysheep.ai/v1" # ÉDITÉ: URL HolySheep
def __init__(self, api_key: str, clickhouse_client: Client):
self.api_key = api_key
self.ch = clickhouse_client
self.session: aiohttp.ClientSession = None
async def _make_request(self, endpoint: str, payload: dict) -> dict:
"""Requête vers l'API HolySheep avec gestion d'erreurs"""
if not self.session:
self.session = aiohttp.ClientSession()
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
url = f"{self.BASE_URL}{endpoint}"
try:
async with self.session.post(url, json=payload, headers=headers) as resp:
if resp.status == 401:
raise Exception("Clé API HolySheep invalide — vérifiez https://www.holysheep.ai/register")
if resp.status == 429:
await asyncio.sleep(5)
return await self._make_request(endpoint, payload)
resp.raise_for_status()
return await resp.json()
except aiohttp.ClientError as e:
logger.error(f"HTTP error: {e}")
raise
def get_price_data_for_analysis(self, symbol: str, days: int = 30) -> str:
"""Récupère les données de prix des 30 derniers jours"""
query = f"""
SELECT
toDate(timestamp) as date,
round(avg(open), 2) as avg_open,
round(min(low), 2) as min_low,
round(max(high), 2) as max_high,
round(sum(volume), 2) as total_volume,
round(sum(trades) / count() / 1440, 0) as avg_trades_per_min
FROM crypto_warehouse.ohlcv_1m
WHERE symbol = '{symbol}'
AND timestamp >= now() - INTERVAL {days} DAY
GROUP BY toDate(timestamp)
ORDER BY date
FORMAT JSON
"""
result = self.ch.execute(query)
return json.dumps(result[0] if result else [], indent=2)
async def analyze_market_sentiment(self, symbol: str) -> Dict:
"""
Analyse le sentiment du marché en utilisant GPT-4.1 via HolySheep.
Coût: ~$8/1M tokens (vs $15 pour Claude Sonnet 4.5)
"""
price_data = self.get_price_data_for_analysis(symbol, days=30)
prompt = f"""Analyse le sentiment du marché pour {symbol} basé sur ces données:
{price_data}
Fournis:
1. Résumé exécutif du comportement des prix
2. Indicateurs techniques clés (support/résistance)
3. Recommandation trading (ACHAT/VENTE/NEUTRE) avec justification
4. Niveau de confiance (0-100%)
5. Horizon de temps recommandé
Réponds en JSON structuré."""
response = await self._make_request('/chat/completions', {
'model': 'gpt-4.1',
'messages': [
{'role': 'system', 'content': 'Tu es un analyste financier expert en crypto.'},
{'role': 'user', 'content': prompt}
],
'temperature': 0.3,
'max_tokens': 2000
})
return {
'analysis': response['choices'][0]['message']['content'],
'usage': response.get('usage', {}),
'symbol': symbol,
'timestamp': datetime.now().isoformat()
}
async def detect_anomalies(self, symbol: str, std_dev_threshold: float = 3.0) -> List[Dict]:
"""
Détecte les anomalies de prix en utilisant les fonctions statistiques de ClickHouse
et valide avec l'IA de HolySheep.
"""
query = f"""
WITH stats AS (
SELECT
avg(close) as mean_price,
stddevPop(close) as std_price,
quantile(0.99)(close) as p99,
quantile(0.01)(close) as p01
FROM crypto_warehouse.ohlcv_1m
WHERE symbol = '{symbol}'
AND timestamp >= now() - INTERVAL 7 DAY
)
SELECT
timestamp,
close,
(close - mean_price) / std_price as zscore,
volume
FROM crypto_warehouse.ohlcv_1m
CROSS JOIN stats
WHERE symbol = '{symbol}'
AND abs((close - mean_price) / std_price) > {std_dev_threshold}
ORDER BY timestamp DESC
LIMIT 100
"""
result = self.ch.execute(query)
anomalies = [
{
'timestamp': row[0].isoformat() if isinstance(row[0], datetime) else row[0],
'close': float(row[1]),
'zscore': float(row[2]),
'volume': float(row[3])
}
for row in result[0] if result and result[0]
]
if anomalies:
# Valider avec l'IA
validation = await self._make_request('/chat/completions', {
'model': 'gpt-4.1',
'messages': [
{'role': 'user', 'content': f"Anomalies détectées pour {symbol}: {json.dumps(anomalies[:5])}. Explique ces anomalies en termes de facteurs de marché possibles."}
]
})
return {'anomalies': anomalies, 'ai_analysis': validation}
return {'anomalies': [], 'ai_analysis': None}
async def generate_trading_report(self, symbols: List[str]) -> str:
"""Génère un rapport de trading complet avec DeepSeek V3.2 (économique: $0.42/1M tokens)"""
reports = []
for symbol in symbols:
data = self.get_price_data_for_analysis(symbol, days=7)
reports.append(f"## {symbol}\n``json\n{data}\n``")
combined_report = "\n---\n".join(reports)
# Utiliser DeepSeek pour le rapport (plus économique)
response = await self._make_request('/chat/completions', {
'model': 'deepseek-v3.2',
'messages': [
{'role': 'system', 'content': 'Tu es un analyste quantitatif expert.'},
{'role': 'user', 'content': f"Génère un rapport de trading consolidé:\n{combined_report}"}
],
'temperature': 0.2
})
return response['choices'][0]['message']['content']
Utilisation
async def main():
ch_client = Client(host='localhost', database='crypto_warehouse')
analyzer = CryptoAnalysisEngine(
api_key='YOUR_HOLYSHEEP_API_KEY', # Remplacer par votre clé
clickhouse_client=ch_client
)
# Analyse de sentiment
btc_analysis = await analyzer.analyze_market_sentiment('BTCUSDT')
print(f"Bitcoin Analysis:\n{btc_analysis['analysis']}")
print(f"Coût: ${btc_analysis['usage']['total_tokens'] / 1000000 * 8:.4f}")
if __name__ == '__main__':
asyncio.run(main())
Backfill et Récupération des Données Historiques
La reconstruction de l'historique est critique. Binance limite les requêtes historiques à des blocs de 1000 candles, mais avec notre système de rate limiting intelligent, vous pouvez récupérer des années de données en quelques heures sans jamais déclencher le rate limit.
# backfill_manager.py - Gestionnaire de recovery et backfill
import asyncio
from datetime import datetime, timedelta
from typing import List, Tuple
import logging
logger = logging.getLogger(__name__)
class BackfillManager:
"""Gestionnaire de backfill avec recovery intelligent"""
def __init__(self, api_client: ExchangeAPIClient, writer: ClickHouseWriter):
self.api_client = api_client
self.writer = writer
self.checkpoint_file = 'backfill_checkpoint.json'
def get_last_timestamp(self, symbol: str, table: str = 'ohlcv_1m') -> datetime:
"""Récupère le dernier timestamp présent dans ClickHouse"""
query = f"""
SELECT max(timestamp)
FROM crypto_warehouse.{table}
WHERE symbol = '{symbol}'
"""
result = self.writer.client.execute(query)
return result[0][0] if result and result[0][0] else None
def calculate_progress(self, symbol: str, start_date: datetime) -> float:
"""Calcule le pourcentage de progression du backfill"""
last_ts = self.get_last_timestamp(symbol)
if not last_ts:
return 0.0
total_seconds = (datetime.utcnow() - start_date).total_seconds()
elapsed_seconds = (last_ts - start_date).total_seconds()
return min(100.0, (elapsed_seconds / total_seconds) * 100)
async def backfill_symbol(self, symbol: str,
start_date: datetime,
force_reload: bool = False) -> int:
"""
Backfill complet d'un symbole depuis start_date.
Gère automatiquement les checkpoints et la reprise sur erreur.
"""
total_rows = 0
last_success_ts = start_date
# Vérifier si on peut reprendre depuis un checkpoint
if not force_reload:
checkpoint_ts = self.get_last_timestamp(symbol)
if checkpoint_ts:
start_date = checkpoint_ts - timedelta(minutes=1)
logger.info(f"Resuming from checkpoint: {start_date}")
# Backfill par blocs de 1000 (limite Binance)
current_start = start_date
batch_count = 0
while True:
try:
klines = await self.api_client.fetch_klines(
symbol=symbol,
interval='1m',
start_time=int(current_start.timestamp() * 1000),
limit=1000
)
if not klines:
logger.info(f"No more data for {symbol}, backfill complete")
break
# Transformer les données Binance vers notre format
ohlcv_data = [
(symbol, '1m',
datetime.utcfromtimestamp(k[0] / 1000),
float(k[1]), float(k[2]), float(k[3]), float(k[4]),
float(k[5]), float(k[7]), int(k[8]))
for k in klines
]
# Insertion par batches de 10000
self.writer.client.execute(
'''INSERT INTO crypto_warehouse.ohlcv_1m
(symbol, timeframe, timestamp, open, high, low, close,
volume, quote_volume, trades) VALUES''',
ohlcv_data
)
total_rows += len(ohlcv_data)
last_success_ts = datetime.utcfromtimestamp(klines[-1][0] / 1000)
current_start = last_success_ts + timedelta(minutes=1)
batch_count += 1
logger.info(f"{symbol}: {total_rows} rows, last: {last_success_ts}")
# Progress bar
progress = self.calculate_progress(symbol, start_date)
logger.info(f"Progress: {progress:.2f}%")
# Pause pour éviter le rate limit (Binance: 1200 req/min)
await asyncio.sleep(0.06) # ~1000 req/min avec marge
# Safety check: limiter à 1 mois par session
if (datetime.utcnow() - start_date).days > 30:
logger.warning("Stopping backfill after 1 month to prevent timeout")
break
except Exception as e:
logger.error(f"Backfill error for {symbol}: {e}")
# Exponential backoff
await asyncio.sleep(min(300, 2 ** batch_count))
continue
return total_rows
async def backfill_multiple_symbols(self, symbols: List[str],
start_date: datetime,
max_concurrent: int = 3) -> Dict[str, int]:
"""Backfill parallèle de plusieurs symboles avec sémaphore"""
semaphore = asyncio.Semaphore(max_concurrent)
async def backfill_with_semaphore(symbol: str):
async with semaphore:
rows = await self.backfill_symbol(symbol, start_date)
return symbol, rows
tasks = [
backfill_with_semaphore(symbol)
for symbol in symbols
]
results = await asyncio.gather(*tasks, return_exceptions=True)
summary = {}
for result in results:
if isinstance(result, Exception):
logger.error(f"Task failed: {result}")
else:
symbol, rows = result
summary[symbol] = rows
return summary
Exécution du backfill
async def run_backfill():
from crypto_data_collector import ExchangeAPIClient, ClickHouseWriter
writer = ClickHouseWriter()
api = ExchangeAPIClient()
manager = BackfillManager(api, writer)
# Backfill des majeurs sur 2 ans
symbols = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'SOLUSDT', 'ADAUSDT']
start_date = datetime(2022, 1, 1)
results = await manager.backfill_multiple_symbols(symbols, start_date)
print("\n=== BACKFILL SUMMARY ===")
for symbol, count in results.items():
print(f"{symbol}: {count:,} rows")
if __name__ == '__main__':
asyncio.run(run_backfill())
Optimisation des Performances ClickHouse
ClickHouse peut ingérer des millions de lignes par seconde, mais sans tuning, vos performances resteront médiocres. Voici les optimizations qui font la différence entre une requête en 5ms et une en 30 secondes.
- index_granularity à 8192 : Réduit l'overhead d'indexation pour les tablesanalytics
- Compression LZ4 : Divise par 3 l'utilisation disque avec impact CPU minimal
- PARTITION BY mois + symbole : Permet le pruning efficace des partitions
- ORDER BY optimisé : Cluster les données par symbole puis timestamp
- TTL sur 365 jours : Auto-cleanup des données anciennes
-- Requêtes de monitoring des performances
SELECT
database,
table,
formatReadableSize(sum(bytes_on_disk)) as disk_size,
formatReadableSize(sum(data_compressed_bytes)) as compressed_size,
round(sum(data_compressed_bytes) / sum(data_uncompressed_bytes) * 100, 2) as compression_ratio,
sum(rows) as total_rows,
count() as parts
FROM system.parts
WHERE database = 'crypto_warehouse'
AND active = 1
GROUP BY database, table
ORDER BY sum(bytes_on_disk) DESC;
-- Monitoring des requêtes actives
SELECT
query_id,
user,
query,
elapsed,
rows_read,
formatReadableSize(memory_usage) as memory,
read_rows,
formatReadableSize(read_bytes) as read_size
FROM system.processes
WHERE user = 'default'
ORDER BY elapsed DESC
LIMIT 10;
-- Statistiques de Merge
SELECT
table,
sum(rows) as total_rows_merged,
sum(bytes_uncompressed) as uncompressed_bytes,
count() as merge_operations,
avg(duration_ms) as avg_duration_ms
FROM system.merges
WHERE database = 'crypto_warehouse'
AND event_time >= now() - INTERVAL 1 HOUR
GROUP BY table;
Gestion du Streaming Temps Réel
# real_time_aggregator.py - Agrégation temps réel avec Kafka-like guarantees
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List
from collections import defaultdict
import threading
import time
class TimeBucketAggregator:
"""
Agrégateur de candles temps réel avec flushs périodiques.
Garantit exactly-once semantics via buffering intelligent.
"""
def __init__(self, writer: ClickHouseWriter, flush_interval: int = 60):
self.writer = writer
self.flush_interval = flush_interval
self.candles: Dict[tuple, dict] = {}
self.lock = threading.Lock()
self.running = True
def process_trade(self, symbol: str, price: float, quantity: float,
quote_qty: float, timestamp: datetime, is_buyer_maker: bool):
"""Incrémentation atomique d'une candle"""
bucket_ts = timestamp.replace(second=0, microsecond=0)
key = (symbol, bucket_ts)
with self.lock:
if key not in self.candles:
self.candles[key] = {
'symbol': symbol,
'timeframe': '1m',
'timestamp': bucket_ts,
'open': price,
'high': price,
'low': price,
'close': price,
'volume': quantity,
'quote_volume': quote_qty,
'trades': 1,
'buy_volume': quantity if not is_buyer_maker else 0,
'first_trade': timestamp,
'last_trade': timestamp
}
else:
c = self.candles[key]
c['high'] = max(c['high'], price)
c['low'] = min(c['low'], price)
c['close'] = price
c['volume'] += quantity
c['quote_volume'] += quote_qty
c['trades'] += 1
if not is_buyer_maker:
c['buy_volume'] += quantity
c['last_trade'] = timestamp
def flush_candles(self):
"""Flush toutes les candles complètes vers ClickHouse"""
with self.lock:
now = datetime.utcnow()
to_flush = []
to_keep = {}
for key, candle in self.candles.items():
# Flush si candle est fermée (plus de 2 minutes dans le passé)
age = (now - candle['timestamp']).total_seconds()
if age > 120:
to_flush.append(candle)
else:
to_keep[key] = candle
self.candles = to_keep
if to_flush:
ohlcvs = [
OHLCV(
symbol=c['symbol'],
timeframe=c['timeframe'],
timestamp=c['timestamp'],
open=c['open'],
high=c['high'],
low=c['low'],
close=c['close'],
volume=c['volume'],
quote_volume=c['quote_volume'],
trades=c['trades']
)
for c in to_flush
]
self.writer.insert_