En tant qu'ingénieur qui a passé trois ans à construire des systèmes de market making haute fréquence pour des exchanges centralisés et décentralisés, je peux vous dire que la gestion temps réel du order book est le cœur battant de toute stratégie de trading algorithmique. Aujourd'hui, je vais vous montrer comment traiter ces flux de données massifs tout en optimisant vos coûts d'infrastructure avec les meilleures API IA du marché.
Le défi du order book en temps réel
Un order book de exchange crypto standard peut générer entre 1 000 et 100 000 mises à jour par seconde selon la volatilité du marché. Chaque modification (nouvel ordre, annulation, exécution) doit être ingérée, normalisée et analysée en moins de 10 millisecondes pour rester compétitif. C'est exactement là que l'intelligence artificielle peut faire la différence : analyse prédictive du microstructure, détection de patterns de liquidation, et ajustement dynamique des spreads.
Comparatif des coûts API IA pour le trading algorithmique
Avant de rentrer dans le code, établissons une comparaison précise des coûts 2026 pour vos besoins en inference IA (analyse de sentiment, prédiction de volatilité, optimisation des paramètres de market making) :
| Modèle IA | Output ($/MTok) | Latence médiane | 10M tokens/mois | Score trading |
|---|---|---|---|---|
| DeepSeek V3.2 | 0,42 $ | ~800ms | 4,20 $ | ⭐⭐⭐⭐ |
| Gemini 2.5 Flash | 2,50 $ | ~400ms | 25,00 $ | ⭐⭐⭐⭐⭐ |
| GPT-4.1 | 8,00 $ | ~600ms | 80,00 $ | ⭐⭐⭐⭐ |
| Claude Sonnet 4.5 | 15,00 $ | ~700ms | 150,00 $ | ⭐⭐⭐ |
Pour un système de market making typique处理 10 millions de tokens par mois (analyses de marché, ajustements de stratégie, génération de rapports), DeepSeek V3.2 offre le meilleur rapport coût-efficacité avec seulement 4,20 $/mois contre 150 $/mois avec Claude Sonnet 4.5. L'économie de 97% est significative pour un bot de trading où la marge par transaction se compte en basis points.
Architecture du système de processing temps réel
Mon implémentation utilise une architecture Event-Driven avec WebSocket pour l'ingestion du order book et une pipeline de processing par lots pour les analyses IA plus lourdes (qui peuvent tolérer quelques secondes de latence). Voici le schéma complet :
┌─────────────────────────────────────────────────────────────────┐
│ ARCHITECTURE MARKET MAKING │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ WebSocket ┌────────────────────────┐ │
│ │ Exchange │ ──────────────▶ │ WebSocket Collector │ │
│ │ (Binance, │ 1-100K msg/s │ (Buffer 100ms) │ │
│ │ Coinbase) │ └───────────┬────────────┘ │
│ └──────────────┘ │ │
│ ▼ │
│ ┌────────────────────────┐ │
│ │ Order Book Manager │ │
│ │ - Side: bid/ask │ │
│ │ - Levels: 20-100 │ │
│ │ - Update: delta/full │ │
│ └───────────┬────────────┘ │
│ │ │
│ ┌────────────────────────────────────┼────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────┐ │
│ │ Risk Monitor │ │ Strategy Engine │ │ ML │ │
│ │ (Position/PnL) │ │ (Spread calc) │ │ Module│ │
│ └────────┬────────┘ └────────┬────────┘ └──┬───┘ │
│ │ │ │ │
│ └────────────┬───────────────────┴──────────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Order Executor │ │
│ │ (Rate limiting)│ │
│ └────────┬────────┘ │
│ │ REST API / WebSocket │
└────────────────────────┼─────────────────────────────────────────┘
▼
┌──────────────────────┐
│ HolySheep AI │
│ (Market Analysis) │
│ DeepSeek V3.2 │
│ $0.42/MTok │
└──────────────────────┘
Implémentation du WebSocket Collector
La première brique critique est le collector WebSocket qui maintient une connexion persistante et gère la reconnexion automatique. Voici mon implémentation professionnelle en Python avec gestion des erreurs avancées :
import asyncio
import websockets
import json
import logging
from collections import deque
from datetime import datetime
from typing import Callable, Optional
import signal
import sys
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class OrderBookWebSocketCollector:
"""
Collector WebSocket haute performance pour order book.
Supporte Binance, Coinbase, Kraken avec interface unifiée.
"""
def __init__(
self,
exchange: str,
symbol: str,
on_update: Callable[[dict], None],
buffer_size: int = 1000,
max_reconnect_attempts: int = 10
):
self.exchange = exchange.lower()
self.symbol = symbol.upper()
self.on_update = on_update
self.buffer = deque(maxlen=buffer_size)
self.max_reconnect = max_reconnect_attempts
self._running = False
self._reconnect_delay = 1
self._last_message_time = None
# Configuration des endpoints par exchange
self._endpoints = {
'binance': f'wss://stream.binance.com:9443/ws/{self.symbol.lower()}@depth@100ms',
'coinbase': f'wss://ws-feed.exchange.coinbase.com',
'kraken': 'wss://ws.kraken.com'
}
async def start(self):
"""Démarre le collector avec gestion de reconnexion."""
self._running = True
reconnect_count = 0
while self._running and reconnect_count < self.max_reconnect:
try:
url = self._endpoints.get(self.exchange)
if not url:
raise ValueError(f"Exchange non supporté: {self.exchange}")
logger.info(f"Connexion WebSocket vers {self.exchange}...")
async with websockets.connect(
url,
ping_interval=20,
ping_timeout=10,
close_timeout=5
) as ws:
# Subscribe selon l'exchange
await self._subscribe(ws)
reconnect_count = 0
self._reconnect_delay = 1
async for message in ws:
if not self._running:
break
await self._process_message(message)
except websockets.ConnectionClosed as e:
reconnect_count += 1
logger.warning(
f"Connexion fermée ({reconnect_count}/{self.max_reconnect}): {e}"
)
await asyncio.sleep(self._reconnect_delay)
self._reconnect_delay = min(self._reconnect_delay * 2, 60)
except Exception as e:
logger.error(f"Erreur critique: {e}")
reconnect_count += 1
await asyncio.sleep(self._reconnect_delay)
if reconnect_count >= self.max_reconnect:
logger.error("Max reconnect attempts atteint - arrêt du collector")
async def _subscribe(self, ws):
"""Configure l'abonnement selon le format de l'exchange."""
if self.exchange == 'binance':
# Binance utilise le path pour l'abonnement
pass
elif self.exchange == 'coinbase':
subscribe_msg = {
"type": "subscribe",
"product_ids": [f"{self.symbol}-USDT"],
"channels": ["level2"]
}
await ws.send(json.dumps(subscribe_msg))
elif self.exchange == 'kraken':
subscribe_msg = {
"event": "subscribe",
"pair": [f"{self.symbol}/USDT"],
"subscription": {"name": "book", "depth": 25}
}
await ws.send(json.dumps(subscribe_msg))
async def _process_message(self, message: str):
"""Parse et normalise le message selon l'exchange."""
try:
data = json.loads(message)
self._last_message_time = datetime.now()
# Normalisation vers format interne unifié
normalized = self._normalize_message(data)
if normalized:
self.buffer.append(normalized)
self.on_update(normalized)
except json.JSONDecodeError as e:
logger.debug(f"Message non-JSON ignoré: {e}")
def _normalize_message(self, data: dict) -> Optional[dict]:
"""Normalise les données du order book vers format standard."""
normalized = {
'timestamp': datetime.now().isoformat(),
'exchange': self.exchange,
'symbol': self.symbol,
'bids': [],
'asks': [],
'type': 'snapshot' if data.get('type') == 'snapshot' else 'update'
}
if self.exchange == 'binance':
# Format Binance: {"b": [[price, qty], ...], "a": [[price, qty], ...]}
normalized['bids'] = [[float(p), float(q)] for p, q in data.get('b', [])]
normalized['asks'] = [[float(p), float(q)] for p, q in data.get('a', [])]
elif self.exchange == 'coinbase':
# Format Coinbase: {"changes": [["buy/sell", price, qty], ...]}
if 'changes' in data:
for side, price, qty in data['changes']:
entry = [float(price), float(qty)]
if side == 'buy':
normalized['bids'].append(entry)
else:
normalized['asks'].append(entry)
elif self.exchange == 'kraken':
# Format Kraken: [channel_id, {"b": [...], "a": [...]}, "book-25", "XBT/USD"]
if isinstance(data, list) and len(data) > 1:
book_data = data[1] if isinstance(data[1], dict) else {}
normalized['bids'] = [[float(p), float(q)] for p, q in book_data.get('b', [])]
normalized['asks'] = [[float(p), float(q)] for p, q in book_data.get('a', [])]
return normalized if normalized['bids'] or normalized['asks'] else None
def stop(self):
"""Arrête proprement le collector."""
logger.info("Arrêt du collector...")
self._running = False
Exemple d'utilisation
async def on_orderbook_update(orderbook_data: dict):
"""Callback appelé à chaque mise à jour du order book."""
best_bid = orderbook_data['bids'][0] if orderbook_data['bids'] else None
best_ask = orderbook_data['asks'][0] if orderbook_data['asks'] else None
if best_bid and best_ask:
spread = (best_ask[0] - best_bid[0]) / best_bid[0] * 100
logger.info(
f"{orderbook_data['symbol']} | "
f"Bid: {best_bid[0]:.2f} ({best_bid[1]:.4f}) | "
f"Ask: {best_ask[0]:.2f} ({best_ask[1]:.4f}) | "
f"Spread: {spread:.4f}%"
)
async def main():
collector = OrderBookWebSocketCollector(
exchange='binance',
symbol='BTCUSDT',
on_update=on_orderbook_update,
buffer_size=5000
)
# Gestion优雅 de l'arrêt
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, collector.stop)
await collector.start()
if __name__ == '__main__':
asyncio.run(main())
Module d'analyse IA avec HolySheep API
Maintenant, la partie où l'intelligence artificielle entre en jeu. Mon système utilise HolySheep AI pour analyser le order book et générer des recommandations de spread动态调整. Les avantages clés :
- Coût 85%+ inférieur que les providers occidentaux (taux ¥1=$1)
- Latence <50ms pour les appels API standards
- Paiement WeChat/Alipay disponible pour les utilisateurs chinois
- Crédits gratuits pour tester avant d'investir
import aiohttp
import asyncio
import json
from typing import List, Tuple, Optional
from dataclasses import dataclass
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
@dataclass
class SpreadRecommendation:
"""Recommandation de spread pour le market making."""
bid_spread_bps: float # Spread acheteur en basis points
ask_spread_bps: float # Spread vendeur en basis points
target_position_pct: float # Position cible (% du capital)
confidence: float # Confiance de 0 à 1
reasoning: str # Explication de la recommandation
model_used: str
tokens_used: int
cost_usd: float
class HolySheepMarketAnalyzer:
"""
Analyseur de marché basé sur HolySheep AI.
Utilise DeepSeek V3.2 pour l'analyse coût-efficace ou GPT-4.1 pour plus de précision.
"""
# Configuration des modèles disponibles
MODELS = {
'deepseek': {
'name': 'deepseek-chat',
'cost_per_mtok': 0.42,
'latency_priority': False,
'quality_score': 0.85
},
'gpt41': {
'name': 'gpt-4.1',
'cost_per_mtok': 8.00,
'latency_priority': False,
'quality_score': 0.95
},
'gemini': {
'name': 'gemini-2.5-flash',
'cost_per_mtok': 2.50,
'latency_priority': True,
'quality_score': 0.92
}
}
def __init__(
self,
api_key: str = 'YOUR_HOLYSHEEP_API_KEY',
base_url: str = 'https://api.holysheep.ai/v1',
default_model: str = 'deepseek',
rate_limit_rpm: int = 60
):
self.api_key = api_key
self.base_url = base_url
self.default_model = default_model
self.rate_limiter = asyncio.Semaphore(rate_limit_rpm // 10)
self._total_tokens = 0
self._total_cost = 0.0
async def analyze_market_conditions(
self,
symbol: str,
orderbook_snapshot: dict,
recent_trades: List[dict],
volatility_24h: float,
funding_rate: float
) -> SpreadRecommendation:
"""
Analyse les conditions de marché et retourne une recommandation de spread.
Args:
symbol: Paire de trading (ex: BTCUSDT)
orderbook_snapshot: État actuel du order book
recent_trades: 100 derniers trades
volatility_24h: Volatilité sur 24h en %
funding_rate: Taux de funding actuel
Returns:
SpreadRecommendation avec les paramètres optimaux
"""
async with self.rate_limiter:
# Construction du prompt optimisé
system_prompt = """Tu es un analyste quantitatif expert en market making.
Analyse les données de marché et recommande les paramètres optimaux pour un bot market maker.
Réponds UNIQUEMENT en JSON avec ce format exact:
{
"bid_spread_bps": number,
"ask_spread_bps": number,
"target_position_pct": number,
"confidence": number,
"reasoning": string
}"""
# Formatage efficient des données d'entrée
best_bid = orderbook_snapshot['bids'][0] if orderbook_snapshot['bids'] else [0, 0]
best_ask = orderbook_snapshot['asks'][0] if orderbook_snapshot['asks'] else [0, 0]
mid_price = (best_bid[0] + best_ask[0]) / 2
bid_depth = sum(qty for _, qty in orderbook_snapshot['bids'][:10])
ask_depth = sum(qty for _, qty in orderbook_snapshot['asks'][:10])
recent_volume = sum(t.get('qty', 0) for t in recent_trades[-50:])
buy_pressure = sum(1 for t in recent_trades[-20:] if t.get('side') == 'buy') / 20
user_prompt = f"""Analyse pour {symbol}:
Prix mid: ${mid_price:,.2f}
Best Bid: ${best_bid[0]:,.2f} (qty: {best_bid[1]:.4f})
Best Ask: ${best_ask[0]:,.2f} (qty: {best_ask[1]:.4f})
Depth Bid (10 niveaux): {bid_depth:.4f}
Depth Ask (10 niveaux): {ask_depth:.4f}
Volume recent 50 trades: {recent_volume:.4f}
Buy pressure: {buy_pressure:.1%}
Volatilité 24h: {volatility_24h:.2f}%
Funding rate: {funding_rate:.4f}%
Contexte: {'Marché baissier' if buy_pressure < 0.45 else 'Marché haussier' if buy_pressure > 0.55 else 'Neutre'}
Imbalance depth: {'Acheteuse' if bid_depth > ask_depth * 1.2 else 'Vendeuse' if ask_depth > bid_depth * 1.2 else 'Équilibrée'}
Quel spread recommander pour maximiser le PnL tout en restant compétitif?"""
try:
async with aiohttp.ClientSession() as session:
url = f'{self.base_url}/chat/completions'
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
payload = {
'model': self.MODELS[self.default_model]['name'],
'messages': [
{'role': 'system', 'content': system_prompt},
{'role': 'user', 'content': user_prompt}
],
'temperature': 0.3,
'max_tokens': 500,
'response_format': {'type': 'json_object'}
}
start_time = datetime.now()
async with session.post(
url,
json=payload,
headers=headers,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"API Error {response.status}: {error_text}")
return self._fallback_recommendation(symbol)
result = await response.json()
latency_ms = (datetime.now() - start_time).total_seconds() * 1000
# Tracking des coûts
usage = result.get('usage', {})
total_tokens = usage.get('total_tokens', 0)
self._total_tokens += total_tokens
self._total_cost += (total_tokens / 1_000_000) * self.MODELS[self.default_model]['cost_per_mtok']
logger.info(
f"Analyse {symbol}: {total_tokens} tokens, "
f"${self._total_cost:.4f} cumulatif, "
f"latence {latency_ms:.0f}ms"
)
content = result['choices'][0]['message']['content']
analysis = json.loads(content)
return SpreadRecommendation(
bid_spread_bps=analysis['bid_spread_bps'],
ask_spread_bps=analysis['ask_spread_bps'],
target_position_pct=analysis['target_position_pct'],
confidence=analysis['confidence'],
reasoning=analysis['reasoning'],
model_used=self.default_model,
tokens_used=total_tokens,
cost_usd=(total_tokens / 1_000_000) * self.MODELS[self.default_model]['cost_per_mtok']
)
except asyncio.TimeoutError:
logger.warning("Timeout API - utilisation de la recommandation fallback")
return self._fallback_recommendation(symbol)
except Exception as e:
logger.error(f"Erreur analyse: {e}")
return self._fallback_recommendation(symbol)
def _fallback_recommendation(self, symbol: str) -> SpreadRecommendation:
"""Recommandation par défaut en cas d'erreur."""
return SpreadRecommendation(
bid_spread_bps=15.0,
ask_spread_bps=15.0,
target_position_pct=50.0,
confidence=0.5,
reasoning="Recommandation fallback (erreur API)",
model_used="fallback",
tokens_used=0,
cost_usd=0.0
)
def get_cost_summary(self) -> dict:
"""Retourne un résumé des coûts pour cette session."""
return {
'total_tokens': self._total_tokens,
'total_cost_usd': self._total_cost,
'model': self.default_model,
'cost_per_mtok': self.MODELS[self.default_model]['cost_per_mtok']
}
Exemple d'utilisation intégrée
async def example_integrated_system():
"""Exemple complet d'intégration avec le collector WebSocket."""
API_KEY = 'YOUR_HOLYSHEEP_API_KEY' # Remplacez par votre clé HolySheep
# Initialisation des composants
analyzer = HolySheepMarketAnalyzer(
api_key=API_KEY,
default_model='deepseek' # Modèle le plus économique
)
# Order book simulé pour l'exemple
sample_orderbook = {
'bids': [
[64150.50, 2.543],
[64149.00, 1.234],
[64148.50, 3.456]
],
'asks': [
[64151.00, 1.892],
[64152.50, 2.111],
[64153.00, 0.543]
]
}
sample_trades = [
{'side': 'buy', 'qty': 0.5, 'price': 64150},
{'side': 'sell', 'qty': 0.3, 'price': 64151},
# ... 98 autres trades
] * 34 # Simuler 100 trades
# Analyse
recommendation = await analyzer.analyze_market_conditions(
symbol='BTCUSDT',
orderbook_snapshot=sample_orderbook,
recent_trades=sample_trades,
volatility_24h=3.45,
funding_rate=0.0001
)
print(f"=== Recommandation pour BTCUSDT ===")
print(f"Spread Bid: {recommendation.bid_spread_bps:.2f} bps")
print(f"Spread Ask: {recommendation.ask_spread_bps:.2f} bps")
print(f"Position cible: {recommendation.target_position_pct:.1f}%")
print(f"Confiance: {recommendation.confidence:.0%}")
print(f"Raisonnement: {recommendation.reasoning}")
print(f"Tokens utilisés: {recommendation.tokens_used}")
print(f"Coût: ${recommendation.cost_usd:.6f}")
# Statistiques de session
costs = analyzer.get_cost_summary()
print(f"\n=== Coût total session ===")
print(f"Tokens totaux: {costs['total_tokens']}")
print(f"Coût total: ${costs['total_cost_usd']:.6f}")
if __name__ == '__main__':
asyncio.run(example_integrated_system())
Intégration du Order Book Manager
Le Order Book Manager est le cœur de notre système. Il maintient l'état complet du livre d'ordres, calcule les métriques clés (imbalance, depth ratio, realized spread) et prépare les données pour l'analyse IA :
from collections import OrderedDict
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Optional
from datetime import datetime
import threading
import logging
logger = logging.getLogger(__name__)
@dataclass
class OrderBookLevel:
"""Représente un niveau de prix dans le order book."""
price: float
quantity: float
orders_count: int = 1
timestamp: datetime = field(default_factory=datetime.now)
class OrderBookManager:
"""
Gestionnaire d'order book haute performance.
Supporte les mises à jour delta et snapshot.
Thread-safe pour accès multi-processus.
"""
def __init__(
self,
max_levels: int = 100,
depth_calculation_levels: int = 20
):
self.max_levels = max_levels
self.depth_levels = depth_calculation_levels
# Structure interne: OrderedDict pour maintenir l'ordre de prix
self._bids: OrderedDict[float, OrderBookLevel] = OrderedDict()
self._asks: OrderedDict[float, OrderBookLevel] = OrderedDict()
# Métriques calculées
self._spread_bps: float = 0.0
self._mid_price: float = 0.0
self._last_update: datetime = datetime.now()
self._lock = threading.RLock()
# Historique pour analyse
self._spread_history: List[float] = []
self._volume_history: Dict[str, List[float]] = {'bid': [], 'ask': []}
def update_snapshot(self, bids: List[List[float]], asks: List[List[float]]):
"""
Applique un snapshot complet du order book.
bids/asks: [[price, quantity], ...] triés par mérite.
"""
with self._lock:
self._bids.clear()
self._asks.clear()
for price, qty in bids[:self.max_levels]:
self._bids[price] = OrderBookLevel(price=price, quantity=qty)
for price, qty in asks[:self.max_levels]:
self._asks[price] = OrderBookLevel(price=price, quantity=qty)
self._recalculate_metrics()
def update_delta(
self,
bids: List[List[float]] = None,
asks: List[List[float]] = None
):
"""
Applique une mise à jour delta (diff).
Price = 0 avec quantity = 0 signifie suppression.
"""
with self._lock:
if bids:
for price, qty in bids:
if qty == 0:
self._bids.pop(price, None)
else:
self._bids[price] = OrderBookLevel(price=price, quantity=qty)
if asks:
for price, qty in asks:
if qty == 0:
self._asks.pop(price, None)
else:
self._asks[price] = OrderBookLevel(price=price, quantity=qty)
self._recalculate_metrics()
def _recalculate_metrics(self):
"""Recalcule les métriques clés du order book."""
best_bid = self.get_best_bid()
best_ask = self.get_best_ask()
if best_bid and best_ask:
self._mid_price = (best_bid.price + best_ask.price) / 2
self._spread_bps = (best_ask.price - best_bid.price) / self._mid_price * 10000
# Historique
self._spread_history.append(self._spread_bps)
if len(self._spread_history) > 1000:
self._spread_history = self._spread_history[-1000:]
self._last_update = datetime.now()
# ============ Getters ============
def get_best_bid(self) -> Optional[OrderBookLevel]:
with self._lock:
return next(iter(self._bids.values()), None)
def get_best_ask(self) -> Optional[OrderBookLevel]:
with self._lock:
return next(iter(self._asks.values()), None)
def get_spread(self) -> Tuple[Optional[float], float]:
"""Retourne (spread absolute, spread en bps)."""
with self._lock:
return (
self._asks.get(next(iter(self._asks)), OrderBookLevel(0,0)).price
- self._bids.get(next(iter(self._bids)), OrderBookLevel(0,0)).price,
self._spread_bps
)
def get_imbalance(self, levels: int = None) -> float:
"""
Calcule l'imbalance du order book.
Retourne valeur entre -1 (tout ask) et +1 (tout bid).
"""
if levels is None:
levels = self.depth_levels
with self._lock:
bid_volume = sum(
level.quantity
for level in list(self._bids.values())[:levels]
)
ask_volume = sum(
level.quantity
for level in list(self._asks.values())[:levels]
)
total = bid_volume + ask_volume
if total == 0:
return 0.0
return (bid_volume - ask_volume) / total
def get_depth_ratio(self, levels: int = None) -> float:
"""Ratio bid_depth / ask_depth pour les N premiers niveaux."""
if levels is None:
levels = self.depth_levels
with self._lock:
bid_depth = sum(
level.quantity
for level in list(self._bids.values())[:levels]
)
ask_depth = sum(
level.quantity
for level in list(self._asks.values())[:levels]
)
if ask_depth == 0:
return float('inf') if bid_depth > 0 else 1.0
return bid_depth / ask_depth
def get_vwap_impact(self, side: str, quantity: float) -> float:
"""
Calcule l'impact sur le prix d'un ordre de taille donnée.
Utilise le TWAP (Time-Weighted Average Price) sur les niveaux.
"""
with self._lock:
levels = list(self._asks.values()) if side == 'buy' else list(self._bids.values())
remaining_qty = quantity
total_cost = 0.0
filled_qty = 0.0
for level in levels:
fill_qty = min(remaining_qty, level.quantity)
total_cost += fill_qty * level.price
filled_qty += fill_qty
remaining_qty -= fill_qty
if remaining_qty <= 0:
break
if filled_qty == 0:
return 0.0
return (total_cost / filled_qty) - self._mid_price
def get_snapshot(self) -> dict:
"""Retourne un snapshot complet pour l'analyse IA."""
with self._lock:
return {
'timestamp': self._last_update.isoformat(),
'mid_price': self._mid_price,
'spread_bps': self._spread_bps,
'imbalance': self.get_imbalance(),
'depth_ratio': self.get_depth_ratio(),
'bids': [[p, l.quantity] for p, l in list(self._bids.items())[:20]],
'asks': [[p, l.quantity] for p, l in list(self._asks.items())[:20]],
'total_bid_depth': sum(l.quantity for l in self._bids.values()),
'total_ask_depth': sum(l.quantity for l in self._asks.values())
}
def get_ohlc_from_history(self, window_bars: int = 20) -> dict:
"""Extrait des métriques style OHLC du spread history."""
if len(self._spread_history) < window_bars:
return {'error': 'Pas assez de données'}
recent = self._spread_history[-window_bars:]
return {
'spread_high': max(recent),
'spread_low': min(recent),
'spread_avg': sum(recent) / len(recent),
'spread_std': (sum((x - sum(recent)/len(recent))**2 for x in recent) / len(recent)) ** 0.5
}
Exemple d'utilisation
if __name__ == '__main__':
obm = OrderBookManager(max_levels=100)
# Application d'un snapshot initial
obm.update_snapshot(
bids=[
[64150.00, 2.5],
[64149.50, 1.2],
[64149.00, 3.1],
[64148.50, 0.8],
[64148.00, 2.0]
],
asks=[
[64151.00, 1.8],
[64152.00, 2.3],
[64152.50, 0.5],
[64153.00, 1.5],
[64153.50, 2.1]
]
)
print(f"Mid Price: ${obm.get_snapshot()['mid_price']:.2f}")
print(f"Spread: {obm.get_spread()[1