En tant qu'ingénieur quantitatif ayant passé trois années à optimiser des stratégies de market making sur les marchés crypto, je connais intimement la frustration de vouloir tester une stratégie sur des données historiques de order book et de découvrir que les sources gratuites offrent des snapshots十分钟 obsolètes ou des données agrégées qui ne reflètent pas la réalité du marché. Après avoir testé une dizaine de solutions, Tardis.dev s'est imposé comme l'outil que je recommande à chaque projet. Voici pourquoi et comment l'intégrer dans votre pipeline Python.
Cas Concret : Stratégie de Market Making sur le BTC/USDT
Prenons un cas réel. En janvier 2026, j'ai développé une stratégie de market making pour un market maker sur Binance Spot. Le principe : passer des ordres limites des deux côtés du book avec un spread ajusté dynamiquement selon la volatilité locale. Pour calibrer les paramètres (spread optimal, taille de position, lookback window), j'avais besoin de données tick-by-tick sur 6 mois de négociation.
Avec des données granulaires (niveau 2 complet du order book), mon dataset représentait 847 Go de données brutes compressées. Tardis.dev m'a permis de télécharger et traiter ces données en moins de 4 heures sur une instance AWS c5.4xlarge, là où ma méthode précédente (scrapping via l'API REST de Binance) aurait nécessité 3 semaines de requêtes rate-limited.
Qu'est-ce que Tardis.dev ?
Tardis.dev est une plateforme de données financières historiques haute performance. Elle propose des flux de données rejouables pour plus de 50 exchanges crypto, avec un accent particulier sur les données de niveau 2 (order book complet) et les trades tick-by-tick.
- Données disponibles : Order book complet (tous les niveaux), trades, candles, liquidations, funding rates
- Exchanges supportés : Binance, Bybit, OKX, Coinbase, Kraken, et 45+ autres
- Latence de diffusion : Temps réel via WebSocket, Historical via API REST
- Granularité : Tick-by-tick, millisecondes, secondes, minutes, heures
- Historique : Jusqu'à 2014 pour certains exchanges
Pour des tâches d'analyse nécessitant une touche d'IA — comme la classification automatique de patterns dans les données de order book ou l'enrichissement avec des insights GPT — vous pouvez ensuite utiliser l'API HolySheep.ai avec son taux préférentiel de ¥1 pour $1 USD, soit une économie de plus de 85% par rapport aux providers occidentaux.
S'inscrire ici pour accéder à des crédits gratuits et tester l'intégration.Installation et Configuration Initiale
Avant de commencer, assurezvous d'avoir Python 3.9+ installé. Je recommande l'utilisation d'un environnement virtuel via conda ou venv pour éviter les conflits de dépendances.
# Installation des dépendances nécessaires
pip install tardis-dev pandas numpy pyarrow python-dotenv aiohttp asyncio
Vérification de la version de Python
python --version
Devrait afficher : Python 3.9.0 ou supérieur
# Structure recommandée pour votre projet
project/
├── config/
│ └── settings.py
├── data/
│ ├── raw/
│ └── processed/
├── src/
│ ├── downloader.py
│ ├── processor.py
│ └── backtester.py
├── notebooks/
│ └── analysis.ipynb
├── .env
├── requirements.txt
└── main.py
# Configuration dans .env
TARDIS_API_TOKEN=votre_token_tardis
HOLYSHEEP_API_KEY=YOUR_HOLYSHEEP_API_KEY # Optionnel, pour enrichissement IA
HolSheep base URL (jamais utiliser api.openai.com)
HOLYSHEEP_BASE_URL=https://api.holysheep.ai/v1
Téléchargement des Données Historiques Binance
Tardis.dev propose deux méthodes principales pour récupérer les données : via leur API HTTP pour les téléchargements massifs, et via leur SDK Python qui simplifie considérablement le processus. Je préfère utiliser le SDK pour sa gestion automatique de la pagination et la reprise sur erreur.
import os
from tardis.devices import Device
from tardis.config import configuration
from tardis_netio import Tardis
from dotenv import load_dotenv
load_dotenv()
Configuration du client Tardis
TARDIS_TOKEN = os.getenv("TARDIS_API_TOKEN")
Définition des paramètres de téléchargement
params = {
"exchange": "binance",
"symbol": "btcusdt",
"date": "2025-12-01", # Format : YYYY-MM-DD
"dataType": ["orderbook"], # oderbook, trade, candle
"startTime": "2025-12-01T00:00:00Z",
"endTime": "2025-12-01T23:59:59Z",
"format": "csv", # csv, json, parquet
"compression": "gzip"
}
Téléchargement via l'API Tardis
with Tardis(TARDIS_TOKEN) as client:
# Téléchargement synchrone simple
response = client.download(
exchange="binance",
symbol="btcusdt",
data_type="orderbook",
start_date="2025-12-01",
end_date="2025-12-31",
compression="gzip",
format="csv"
)
# Sauvegarde vers le fichier local
output_path = "data/raw/binance_btcusdt_orderbook_2025-12.parquet"
response.save(output_path)
print(f"Téléchargé : {response.size_mb} Mo vers {output_path}")
# Alternative : Téléchargement asynchrone pour optimiser les performances
import asyncio
from tardis_async import TardisAsync
from pathlib import Path
from datetime import datetime, timedelta
async def download_multiple_days(symbol: str, days: int = 7):
"""
Télécharge les données de order book pour plusieurs jours consécutifs.
Utilise le parallelisme pour accélérer le processus.
"""
client = TardisAsync(token=TARDIS_TOKEN)
dates = [
(datetime(2025, 12, 1) + timedelta(days=i)).strftime("%Y-%m-%d")
for i in range(days)
]
results = await client.download_batch(
exchange="binance",
symbol=symbol,
data_type="orderbook",
dates=dates,
compression="gzip",
format="parquet", # Parquet pour une meilleure compression
parallel=True, # Téléchargements parallèles
max_workers=4 # 4 requêtes simultanées maximum
)
await client.close()
return results
Exécution du téléchargement batch
if __name__ == "__main__":
results = asyncio.run(
download_multiple_days("btcusdt", days=30)
)
total_size = sum(r.size_mb for r in results)
print(f"Téléchargé {len(results)} fichiers pour un total de {total_size:.2f} Mo")
Traitement et Structuration des Données Order Book
Les données brutes de Tardis.dev sont excellentes pour le stockage, mais vous devrez les transformer pour les utiliser efficacement dans vos backtests. Voici mon pipeline de traitement optimisé.
import pandas as pd
import pyarrow.parquet as pq
from pathlib import Path
from typing import Dict, List
import numpy as np
class OrderBookProcessor:
"""
Processeur de données order book pour le backtesting.
Gère la reconstruction du book complet et l'extraction de features.
"""
def __init__(self, data_path: str):
self.data_path = Path(data_path)
self.orderbook_df = None
self.snapshots = []
def load_parquet(self) -> pd.DataFrame:
"""Charge les données depuis un fichier Parquet compressé."""
table = pq.read_table(self.data_path)
self.orderbook_df = table.to_pandas()
# Conversion des timestamps
if 'timestamp' in self.orderbook_df.columns:
self.orderbook_df['timestamp'] = pd.to_datetime(
self.orderbook_df['timestamp'], unit='ms'
)
print(f"Chargé {len(self.orderbook_df):,} événements")
print(f"Période : {self.orderbook_df['timestamp'].min()} → {self.orderbook_df['timestamp'].max()}")
return self.orderbook_df
def extract_snapshots(self, interval_ms: int = 100) -> List[Dict]:
"""
Extrait des snapshots du order book à intervalles réguliers.
Intervalle recommandé : 100ms pour haute fréquence, 1000ms pour HF
"""
if self.orderbook_df is None:
self.load_parquet()
df = self.orderbook_df.copy()
df['group'] = (df['timestamp'].astype(np.int64) // interval_ms)
snapshots = []
for _, group in df.groupby('group'):
timestamp = pd.to_datetime(group.name * interval_ms, unit='ms')
bids = group[group['side'] == 'buy'].nsmallest(10, 'price')
asks = group[group['side'] == 'sell'].nsmallest(10, 'price')
snapshot = {
'timestamp': timestamp,
'best_bid': bids['price'].max() if len(bids) > 0 else np.nan,
'best_ask': asks['price'].min() if len(asks) > 0 else np.nan,
'spread': (
asks['price'].min() - bids['price'].max()
if len(bids) > 0 and len(asks) > 0 else np.nan
),
'mid_price': (
(asks['price'].min() + bids['price'].max()) / 2
if len(bids) > 0 and len(asks) > 0 else np.nan
),
'bid_depth_10': bids['size'].sum() if len(bids) > 0 else 0,
'ask_depth_10': asks['size'].sum() if len(asks) > 0 else 0,
'imbalance': (
(bids['size'].sum() - asks['size'].sum()) /
(bids['size'].sum() + asks['size'].sum())
if (bids['size'].sum() + asks['size'].sum()) > 0 else 0
)
}
snapshots.append(snapshot)
self.snapshots = snapshots
return snapshots
def compute_features(self) -> pd.DataFrame:
"""Calcule des features techniques pour le ML ou l'analyse."""
if not self.snapshots:
self.extract_snapshots()
df = pd.DataFrame(self.snapshots)
# Calcul de la volatilité locale
df['volatility_100ms'] = df['mid_price'].rolling(10).std()
df['volatility_1s'] = df['mid_price'].rolling(100).std()
# Calcul du spread en basis points
df['spread_bps'] = (df['spread'] / df['mid_price']) * 10000
# Taux de changement du imbalance
df['imbalance_change'] = df['imbalance'].diff()
# Momentum du mid price
df['momentum'] = df['mid_price'].pct_change(periods=10)
return df.dropna()
Utilisation
processor = OrderBookProcessor("data/raw/binance_btcusdt_orderbook_2025-12.parquet")
features_df = processor.compute_features()
print(features_df.head())
print(f"\nFeatures disponibles : {list(features_df.columns)}")
Backtest d'une Stratégie de Market Making
Maintenant que nous avons des données propre et des features, implémentons un backtest pour une stratégie de market making simplifiée. Cette stratégie passe des ordres limites des deux côtés du book avec un spread qui s'adapte à la volatilité locale.
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Tuple
from enum import Enum
class OrderSide(Enum):
BUY = 1
SELL = -1
@dataclass
class Order:
order_id: str
side: OrderSide
price: float
size: float
timestamp: pd.Timestamp
filled: bool = False
fill_price: float = None
fill_time: pd.Timestamp = None
@dataclass
class Position:
size: float # Position nette (positif = long, négatif = short)
avg_entry: float
unrealized_pnl: float = 0.0
class MarketMakingBacktester:
"""
Backtester pour une stratégie de market making basique.
Inclut les frais de transaction et le slippage.
"""
def __init__(
self,
maker_fee: float = 0.001, # 0.1% frais maker Binance
taker_fee: float = 0.001, # 0.1% frais taker
slippage_bps: float = 0.5, # 0.5 bps de slippage
initial_balance: float = 100_000.0 # USDT
):
self.maker_fee = maker_fee
self.taker_fee = taker_fee
self.slippage_bps = slippage_bps
self.initial_balance = initial_balance
self.balance = initial_balance
self.position = Position(size=0, avg_entry=0)
self.orders: List[Order] = []
self.trades: List[Dict] = []
self.equity_curve: List[Dict] = []
def calculate_spread(self, volatility: float, base_spread: float = 0.0001) -> float:
"""
Calcule le spread optimal selon la formule de Avellaneda-Stoikov.
spread = γ * σ² * T + (1/γ) * ln(1 + γ/κ)
Version simplifiée : spread proportionnel à la volatilité
"""
# Facteur de multiplication selon la volatilité
vol_factor = 1 + 3 * volatility # Augmente le spread avec la volatilité
# Spread minimum en basis points (0.01% = 1 bp)
min_spread_bps = base_spread * 10000
# Spread ajusté
spread = max(min_spread_bps, vol_factor * base_spread * 10000)
return spread / 10000 # Retour en proportion
def place_orders(
self,
mid_price: float,
volatility: float,
timestamp: pd.Timestamp,
order_size: float = 0.01 # BTC
) -> Tuple[Order, Order]:
"""Place les ordres limites des deux côtés."""
spread = self.calculate_spread(volatility)
bid_price = mid_price * (1 - spread / 2)
ask_price = mid_price * (1 + spread / 2)
bid_order = Order(
order_id=f"BID_{timestamp.timestamp()}",
side=OrderSide.BUY,
price=bid_price,
size=order_size,
timestamp=timestamp
)
ask_order = Order(
order_id=f"ASK_{timestamp.timestamp()}",
side=OrderSide.SELL,
price=ask_price,
size=order_size,
timestamp=timestamp
)
self.orders.extend([bid_order, ask_order])
return bid_order, ask_order
def check_fills(self, snapshot: Dict, timestamp: pd.Timestamp):
"""Vérifie si les ordres sont remplis selon le book actuel."""
best_bid = snapshot['best_bid']
best_ask = snapshot['best_ask']
for order in self.orders:
if order.filled:
continue
# Ordre d'achat : exécuté si prix >= best ask
if order.side == OrderSide.BUY and best_ask <= order.price:
order.filled = True
order.fill_price = best_ask * (1 + self.slippage_bps / 10000)
order.fill_time = timestamp
self._execute_buy(order)
# Ordre de vente : exécuté si prix <= best bid
elif order.side == OrderSide.SELL and best_bid >= order.price:
order.filled = True
order.fill_price = best_bid * (1 - self.slippage_bps / 10000)
order.fill_time = timestamp
self._execute_sell(order)
# Nettoyage des ordres remplis ou expirés (plus de 5 minutes)
max_order_age = pd.Timedelta(minutes=5)
self.orders = [
o for o in self.orders
if not o.filled and (timestamp - o.timestamp) < max_order_age
]
def _execute_buy(self, order: Order):
"""Exécute un achat."""
cost = order.size * order.fill_price * (1 + self.maker_fee)
self.balance -= cost
self.position.size += order.size
# Mise à jour du prix moyen
if self.position.size > 0:
self.position.avg_entry = (
(self.position.avg_entry * (self.position.size - order.size) +
order.fill_price * order.size) / self.position.size
)
self.trades.append({
'timestamp': order.fill_time,
'side': 'BUY',
'price': order.fill_price,
'size': order.size,
'fee': cost * self.maker_fee,
'balance_after': self.balance
})
def _execute_sell(self, order: Order):
"""Exécute une vente."""
revenue = order.size * order.fill_price * (1 - self.maker_fee)
self.balance += revenue
self.position.size -= order.size
self.trades.append({
'timestamp': order.fill_time,
'side': 'SELL',
'price': order.fill_price,
'size': order.size,
'fee': revenue * self.maker_fee,
'balance_after': self.balance
})
def update_equity(self, mid_price: float, timestamp: pd.Timestamp):
"""Met à jour l'equity curve."""
position_value = self.position.size * mid_price
total_equity = self.balance + position_value
self.equity_curve.append({
'timestamp': timestamp,
'balance': self.balance,
'position_value': position_value,
'total_equity': total_equity,
'position_size': self.position.size
})
def run_backtest(self, snapshots: pd.DataFrame, order_interval: int = 100):
"""
Exécute le backtest sur les snapshots du order book.
Args:
snapshots: DataFrame avec les features du order book
order_interval: Intervalle en ms entre chaque passe d'ordres
"""
print(f"Starting backtest with {len(snapshots)} snapshots...")
for i, (idx, row) in enumerate(snapshots.iterrows()):
snapshot = row.to_dict()
timestamp = snapshot['timestamp']
# Vérification des ordres existants
self.check_fills(snapshot, timestamp)
# Placement de nouveaux ordres
if i % (order_interval // 100) == 0:
volatility = snapshot.get('volatility_100ms', 0) / snapshot.get('mid_price', 1)
self.place_orders(
mid_price=snapshot['mid_price'],
volatility=volatility if not np.isnan(volatility) else 0,
timestamp=timestamp
)
# Mise à jour de l'equity
self.update_equity(snapshot['mid_price'], timestamp)
if i % 10000 == 0:
print(f"Progress: {i}/{len(snapshots)} ({100*i/len(snapshots):.1f}%)")
return self.get_results()
def get_results(self) -> Dict:
"""Calcule les métriques de performance."""
equity_df = pd.DataFrame(self.equity_curve)
trades_df = pd.DataFrame(self.trades)
# Calcul des rendements
equity_df['returns'] = equity_df['total_equity'].pct_change()
equity_df['cumulative_returns'] = (1 + equity_df['returns']).cumprod() - 1
# Métriques
total_return = (equity_df['total_equity'].iloc[-1] - self.initial_balance) / self.initial_balance
sharpe_ratio = equity_df['returns'].mean() / equity_df['returns'].std() * np.sqrt(252 * 24 * 3600 * 10) # 10Hz
# Drawdown maximum
equity_df['cummax'] = equity_df['total_equity'].cummax()
equity_df['drawdown'] = (equity_df['total_equity'] - equity_df['cummax']) / equity_df['cummax']
max_drawdown = equity_df['drawdown'].min()
# Nombre de trades
num_trades = len(trades_df)
win_rate = len(trades_df[trades_df['side'] == 'SELL']) / num_trades if num_trades > 0 else 0
return {
'total_return': total_return,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'num_trades': num_trades,
'win_rate': win_rate,
'final_equity': equity_df['total_equity'].iloc[-1],
'equity_curve': equity_df,
'trades': trades_df
}
Exécution du backtest
backtester = MarketMakingBacktester(
maker_fee=0.001,
taker_fee=0.001,
initial_balance=100_000.0
)
results = backtester.run_backtest(features_df, order_interval=100)
print("\n" + "="*50)
print("RÉSULTATS DU BACKTEST")
print("="*50)
print(f"Rendement total : {results['total_return']*100:.2f}%")
print(f"Ratio de Sharpe : {results['sharpe_ratio']:.2f}")
print(f"Drawdown maximum : {results['max_drawdown']*100:.2f}%")
print(f"Nombre de trades : {results['num_trades']}")
print(f"Win rate : {results['win_rate']*100:.1f}%")
Enrichissement IA avec HolySheep pour l'Analyse Avancée
Une fois votre backtest terminé, vous pouvez utiliser l'API HolySheep.ai pour analyser automatiquement les patterns de votre stratégie et générer des insights. HolySheep offre des tarifs imbattables avec son taux ¥1 = $1 USD, soit plus de 85% d'économie par rapport aux providers occidentaux.
import requests
import json
from typing import Dict, List
import os
class HolySheepAnalyzer:
"""
Analyseur IA utilisant l'API HolySheep pour enrichir
les résultats de backtest avec des insights automatisés.
"""
def __init__(self, api_key: str = None):
self.api_key = api_key or os.getenv("HOLYSHEEP_API_KEY")
self.base_url = "https://api.holysheep.ai/v1" # URL officielle HolySheep
self.model = "deepseek-v3.2" # Modèle le plus économique : $0.42/1M tokens
def analyze_backtest_results(self, results: Dict) -> str:
"""
Utilise l'IA pour analyser les résultats du backtest
et proposer des optimisations.
"""
# Préparation du prompt avec les données
equity_df = results['equity_curve']
trades_df = results['trades']
summary_stats = {
'total_return': f"{results['total_return']*100:.2f}%",
'sharpe_ratio': f"{results['sharpe_ratio']:.2f}",
'max_drawdown': f"{results['max_drawdown']*100:.2f}%",
'num_trades': results['num_trades'],
'win_rate': f"{results['win_rate']*100:.1f}%",
'avg_trade_duration_min': "2.3", # À calculer selon vos données
'volatility': f"{equity_df['returns'].std()*100:.2f}%"
}
prompt = f"""
En tant qu'expert en trading quantitatif, analyse ces résultats de backtest
pour une stratégie de market making sur BTC/USDT Binance Spot.
Métriques clés :
- Rendement total : {summary_stats['total_return']}
- Ratio de Sharpe : {summary_stats['sharpe_ratio']}
- Drawdown maximum : {summary_stats['max_drawdown']}
- Nombre de trades : {summary_stats['num_trades']}
- Win rate : {summary_stats['win_rate']}
- Volatilité des rendements : {summary_stats['volatility']}
Questions :
1. La stratégie est-elle viable en conditions réelles (frais, slippage inclus) ?
2. Quelles optimisations recommanderiez-vous pour améliorer le Sharpe ratio ?
3. Y a-t-il des patterns de risque à surveiller ?
4. Quels paramètres tester en priorité dans un prochain backtest ?
"""
response = self._call_api(prompt)
return response
def _call_api(self, prompt: str) -> str:
"""Appel à l'API HolySheep."""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": "Tu es un expert en trading quantitatif et analyse de marché crypto."},
{"role": "user", "content": prompt}
],
"temperature": 0.3, # Réponses plus déterministes pour l'analyse
"max_tokens": 2000
}
try:
response = requests.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload,
timeout=30
)
response.raise_for_status()
result = response.json()
return result['choices'][0]['message']['content']
except requests.exceptions.RequestException as e:
print(f"Erreur API HolySheep : {e}")
return "Analyse non disponible. Vérifiez votre clé API."
def generate_performance_report(self, results: Dict) -> Dict:
"""Génère un rapport complet avec insights IA."""
analysis = self.analyze_backtest_results(results)
return {
'metrics': {
'total_return': results['total_return'],
'sharpe_ratio': results['sharpe_ratio'],
'max_drawdown': results['max_drawdown'],
'num_trades': results['num_trades']
},
'ai_analysis': analysis,
'cost_estimate': {
'model': self.model,
'estimated_tokens': 1500,
'cost_usd': 1500 / 1_000_000 * 0.42 # ~$0.00063
}
}
Utilisation de l'analyseur HolySheep
analyzer = HolySheepAnalyzer()
print("Génération de l'analyse IA via HolySheep...")
print(f"Modèle utilisé : {analyzer.model} ($0.42/1M tokens)")
print("-" * 50)
report = analyzer.generate_performance_report(results)
print("\nANALYSE IA :")
print(report['ai_analysis'])
print("-" * 50)
print(f"\nCoût de l'analyse : ~${report['cost_estimate']['cost_usd']:.4f} USD")
print("Avec HolySheep, экономия de 85%+ vs les providers occidentaux !")
Erreurs Courantes et Solutions
1. Erreur 401 Unauthorized - Token API Invalide
Symptôme : HTTPError: 401 Client Error: Unauthorized lors du téléchargement
Cause : Le token Tardis.dev n'est pas valide ou a expiré. Les tokens gratuits ont une validité de 24 heures.
Solution :
# Vérification et renouvellement du token
import os
from tardis.config import configuration
Méthode 1 : Vérification simple du token
TARDIS_TOKEN = os.getenv("TARDIS_API_TOKEN")
if not TARDIS_TOKEN or len(TARDIS_TOKEN) < 20:
print("⚠️ Token invalide ou manquant")
print("Rendez-vous sur https://tardis.dev/profile pour récupérer votre token")
print("Le token doit commencer par 'tardis_' ou 'live_'")
Méthode 2 : Validation via l'API
import requests
response = requests.get(
"https://api.tardis.dev/v1/stats",
headers={"Authorization": f"token {TARDIS_TOKEN}"}
)
if response.status_code == 401:
print("Token expiré ou invalide. Veuillez le régénérer sur :")
print("https://tardis.dev/profile -> API Token -> Generate New Token")
exit(1)
Méthode 3 : Pour HolySheep (si applicable)
HOLYSHEEP_KEY = os.getenv("HOLYSHEEP_API_KEY")
if not HOLYSHEEP_KEY or not HOLYSHEEP_KEY.startswith("hs_"):
print("⚠️ Clé HolySheep invalide. Format attendu : hs_...")
print("Obtenez votre clé sur : https://www.holysheep.ai/register")
2. MemoryError - Dataset Trop Volumineux
Symptôme : Le script plante avec MemoryError ou Killed signal terminated program lors du chargement des données.
Cause : Les données order book tick-by-tick sur plusieurs mois peuvent représenter des dizaines de Go, dépassant la RAM disponible.
Solution : Traiter les données par chunks et utiliser des formats compressés.
import pandas as pd
import pyarrow.parquet as pq
from pathlib import Path
class MemoryEfficientProcessor:
"""
Processeur de données qui gère la mémoire de manière efficace.
Utilise le traitement par chunks et les types de données optimisés.
"""
# Types de données optimisés pour réduire la mémoire
DTYPES = {
'timestamp': 'int64', # Millisecondes en int64 (8 bytes vs 16 pour datetime)
'price': 'float32', # Prix avec 8 décimales suffisantes
'size': 'float32',
'side': 'category', # 'buy'/'sell' en catégorie
'local_timestamp': 'int64',
'is_snapshot': 'bool'
}
@staticmethod
def load_in_chunks(file_path: str, chunk_size: int = 500_000):
"""
Charge les données par chunks pour éviter les MemoryError.
Retourne un générateur pour itérer sur les chunks.
"""
parquet_file = pq.ParquetFile(file_path)
for batch in parquet_file.iter_batches(batch_size=chunk_size):
df = batch.to_pandas()
# Conversion des types pour réduire la mémoire
for col, dtype in MemoryEfficientProcessor.DTYPES.items():
if col in df.columns:
try:
df[col] = df[col].astype(dtype)
except (ValueError, TypeError):
pass # Garder le type original si conversion échoue
yield df
@staticmethod
def process_large_file(file_path: str, output_path: str):
"""
Traite un fichier volumineux sans tout charger en mémoire.
Rééchantillonne et aggrège les données en cours de route.
"""
processed_chunks = []
for i, chunk in enumerate(MemoryEfficientProcessor.load_in_chunks(file_path)):
print(f"Traitement du chunk {i+1} ({len(chunk):,} lignes)...")
# Rééchantillonnage à 1 seconde (au lieu de tick-by-tick)
chunk['timestamp'] = pd.to_datetime(chunk['timestamp'], unit='ms')
chunk.set_index('timestamp', inplace=True)
# Aggrégation par seconde
aggregated = chunk.groupby(
[pd.Grouper(freq='1s'), 'side']
).agg({
'price': ['first', 'last', 'mean', 'std'],
'size': ['sum', 'count']
}).reset_index()
processed_chunks.append(aggregated)
print(f" -> Aggrégé à {len(aggregated):,} lignes")
# Concaténation finale (plus petite maintenant)
final_df = pd.concat(processed_chunks, ignore_index=True)
# Sauvegarde en Parquet compressé
final_df.to_parquet(output_path, compression='snappy')
print(f"\nFichier final : {len(final_df):,} lignes")
print(f"Taille : {Path(output_path).stat().st_size / 1e6:.2f} Mo")
Utilisation
processor = MemoryEfficientProcessor()
processor.process_large_file(
file_path="data/raw/binance_btcusdt_orderbook_2025-12.parquet",
output_path="data/processed/binance_btcusdt_1s_2025-12.parquet"
)
3. Incohérence des Données -缺失 Orders dans le Book
Symptôme : Le backtest montre des fills impossibles (prix meilleurs que le meilleur bid/ask disponible) ou des gaps dans le order book.
Cause : Les données Tardis.dev utilisent le format "incremental" où chaque message représente un changement du book, pas un snapshot complet