En tant qu'analyste quantitatif ayant travaillé sur des stratégies de trading algorithmique depuis 2019, j'ai testé des dizaines d'API de données crypto. La leçon la plus importante ? La qualité des données historiques déterminent directement la performance de vos modèles. Une latence de 50 ms supplémentaire ou un taux de données corrompues de 0,1 % peuvent transformer une stratégie rentable en catastrophe. Dans ce tutoriel terrain, je partage ma méthodologie complète de监控 de fiabilité des API crypto, avec des tests concrets, des scripts Python exécutables, et une comparaison des meilleures solutions du marché.
Pourquoi la fiabilité des API crypto est cruciale pour vos modèles
Les données crypto présentent des défis uniques par rapport aux actifs traditionnels. Le marché fonctionne 24h/24, les sources sont fragmentées entre exchanges centralisés et protocoles DeFi, et les erreurs de données peuvent provenir de réorganisations de blockchain (reorgs), de fourchettes mal gérées ou de problèmes de synchronisation d'horloge. Une API peu fiable peut introduire des biais systématiques dans vos analyses, menant à des décisions de trading erronées.
Pour moi, le révélateur a été un week‑end de 2022 : mon modèle de market‑making sur les paires ETH/USD affichait des performances parfaites en backtesting mais perdait de l'argent en production. Après une semaine d'investigation, j'ai découvert que mon fournisseur d'API utilisait des prix synthétiques pour les périodes de faible liquidité, créant des artefacts statistiques que mes modèles interprétaient comme des signaux.
Architecture d'un système de surveillance de qualité des données
Un système robuste de monitoring doit capturer plusieurs métriques clés. Je recommande une architecture en trois couches :
- Couche de collecte : polling健康管理 des réponses API avec timestamps précis
- Couche d'analyse : validation statistique des données réceptionnées
- Couche d'alerte : détection d'anomalies et notifications en temps réel
# Système de monitoring de qualité des données crypto
Compatible avec les principales API : CoinGecko, Binance, CoinCap, Kraken
import asyncio
import aiohttp
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import statistics
import json
import hashlib
@dataclass
class APIMetrics:
"""Métriques de surveillance pour une API"""
provider_name: str
endpoint: str
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
timeout_count: int = 0
error_codes: Dict[int, int] = field(default_factory=dict)
latencies_ms: List[float] = field(default_factory=list)
data_integrity_failures: int = 0
last_success: Optional[datetime] = None
last_failure: Optional[datetime] = None
@property
def success_rate(self) -> float:
if self.total_requests == 0:
return 0.0
return (self.successful_requests / self.total_requests) * 100
@property
def average_latency(self) -> float:
if not self.latencies_ms:
return 0.0
return statistics.mean(self.latencies_ms)
@property
def p99_latency(self) -> float:
if len(self.latencies_ms) < 10:
return max(self.latencies_ms) if self.latencies_ms else 0.0
sorted_latencies = sorted(self.latencies_ms)
index = int(len(sorted_latencies) * 0.99)
return sorted_latencies[index]
@property
def health_score(self) -> float:
"""Score de santé composite (0-100)"""
success_weight = 0.4
latency_weight = 0.3
integrity_weight = 0.3
success_score = min(self.success_rate, 100)
# Latence : 100 si < 100ms, dégradé jusqu'à 0 si > 2000ms
latency_score = max(0, 100 - (self.average_latency - 100) * 0.05)
integrity_score = max(0, 100 - (self.data_integrity_failures / max(1, self.total_requests) * 100))
return (success_score * success_weight +
latency_score * latency_weight +
integrity_score * integrity_weight)
class CryptoDataQualityMonitor:
"""Moniteur de qualité pour les données historiques crypto"""
def __init__(self, alert_webhook: Optional[str] = None):
self.metrics: Dict[str, APIMetrics] = {}
self.alert_webhook = alert_webhook
self.integrity_hash_cache: Dict[str, str] = {}
def register_provider(self, provider_name: str, endpoint: str):
"""Enregistrer un nouveau fournisseur API"""
if provider_name not in self.metrics:
self.metrics[provider_name] = APIMetrics(
provider_name=provider_name,
endpoint=endpoint
)
async def fetch_with_monitoring(
self,
session: aiohttp.ClientSession,
provider: str,
url: str,
params: Optional[Dict] = None,
headers: Optional[Dict] = None,
timeout: int = 10
) -> Optional[dict]:
"""Effectuer une requête avec mesure des métriques"""
metrics = self.metrics.get(provider)
if not metrics:
self.register_provider(provider, url)
metrics = self.metrics[provider]
metrics.total_requests += 1
start_time = time.perf_counter()
try:
async with session.get(
url,
params=params,
headers=headers,
timeout=aiohttp.ClientTimeout(total=timeout)
) as response:
latency_ms = (time.perf_counter() - start_time) * 1000
metrics.latencies_ms.append(latency_ms)
# Limiter l'historique des latences à 1000 points
if len(metrics.latencies_ms) > 1000:
metrics.latencies_ms = metrics.latencies_ms[-1000:]
if response.status == 200:
metrics.successful_requests += 1
metrics.last_success = datetime.now()
data = await response.json()
# Validation d'intégrité des données
if not self._validate_data_integrity(data, provider, url):
metrics.data_integrity_failures += 1
await self._send_alert(
provider, "DATA_INTEGRITY_FAILURE",
f"Échec validation données pour {url}"
)
return data
else:
metrics.failed_requests += 1
metrics.last_failure = datetime.now()
metrics.error_codes[response.status] = \
metrics.error_codes.get(response.status, 0) + 1
if response.status == 429:
await self._send_alert(
provider, "RATE_LIMIT",
f"Rate limit atteint sur {url}"
)
return None
except asyncio.TimeoutError:
metrics.timeout_count += 1
metrics.failed_requests += 1
metrics.last_failure = datetime.now()
await self._send_alert(
provider, "TIMEOUT",
f"Timeout {timeout}s pour {url}"
)
return None
except Exception as e:
metrics.failed_requests += 1
metrics.last_failure = datetime.now()
await self._send_alert(
provider, "ERROR",
f"Erreur {type(e).__name__} sur {url}: {str(e)}"
)
return None
def _validate_data_integrity(
self,
data: dict,
provider: str,
url: str
) -> bool:
"""Valider l'intégrité des données reçues"""
data_str = json.dumps(data, sort_keys=True, default=str)
data_hash = hashlib.sha256(data_str.encode()).hexdigest()
cache_key = f"{provider}:{url}"
if cache_key in self.integrity_hash_cache:
# Vérifier que les données sont différentes (pas de cache empoisonné)
if data_hash == self.integrity_hash_cache[cache_key]:
# Si même hash, vérifier timestamp
return False
self.integrity_hash_cache[cache_key] = data_hash
# Validation selon le type de données
if isinstance(data, dict):
# Vérifier présence des champs essentiels
if "prices" in data or "data" in data or "result" in data:
return True
return True
async def _send_alert(self, provider: str, alert_type: str, message: str):
"""Envoyer une alerte via webhook"""
if not self.alert_webhook:
return
alert = {
"provider": provider,
"type": alert_type,
"message": message,
"timestamp": datetime.now().isoformat(),
"current_metrics": {
"success_rate": self.metrics[provider].success_rate,
"avg_latency_ms": self.metrics[provider].average_latency,
"health_score": self.metrics[provider].health_score
}
}
async with aiohttp.ClientSession() as session:
try:
await session.post(
self.alert_webhook,
json=alert,
timeout=aiohttp.ClientTimeout(total=5)
)
except Exception:
pass # Ne pas échouer sur erreur d'alerte
def get_health_report(self) -> Dict:
"""Générer un rapport de santé pour tous les fournisseurs"""
report = {}
for name, metrics in self.metrics.items():
report[name] = {
"success_rate": f"{metrics.success_rate:.2f}%",
"avg_latency_ms": f"{metrics.average_latency:.2f}",
"p99_latency_ms": f"{metrics.p99_latency:.2f}",
"total_requests": metrics.total_requests,
"failed_requests": metrics.failed_requests,
"integrity_failures": metrics.data_integrity_failures,
"health_score": f"{metrics.health_score:.1f}/100",
"last_success": metrics.last_success.isoformat() if metrics.last_success else None,
"last_failure": metrics.last_failure.isoformat() if metrics.last_failure else None
}
return report
Exemple d'utilisation avec CoinGecko API
async def demo_monitoring():
monitor = CryptoDataQualityMonitor(
alert_webhook="https://your-webhook.com/alerts" # Configurer votre webhook
)
# Enregistrer les fournisseurs à surveiller
monitor.register_provider(
"CoinGecko",
"https://api.coingecko.com/api/v3"
)
monitor.register_provider(
"Binance",
"https://api.binance.com/api/v3"
)
async with aiohttp.ClientSession() as session:
# Test de l'API CoinGecko - Prix historique BTC
for i in range(5):
await monitor.fetch_with_monitoring(
session,
"CoinGecko",
"https://api.coingecko.com/api/v3/coins/bitcoin/market_chart",
params={
"vs_currency": "usd",
"days": "7",
"interval": "hourly"
}
)
await asyncio.sleep(1)
# Test de l'API Binance - Kline BTC/USDT
for i in range(5):
await monitor.fetch_with_monitoring(
session,
"Binance",
"https://api.binance.com/api/v3/klines",
params={
"symbol": "BTCUSDT",
"interval": "1h",
"limit": 100
}
)
await asyncio.sleep(0.5)
# Afficher le rapport de santé
report = monitor.get_health_report()
print("=" * 60)
print("RAPPORT DE SANTÉ DES API CRYPTO")
print("=" * 60)
for provider, metrics in report.items():
print(f"\n📊 {provider}:")
for key, value in metrics.items():
print(f" {key}: {value}")
return report
Exécuter le test
if __name__ == "__main__":
asyncio.run(demo_monitoring())
Critères d'évaluation des API de données crypto
Après des mois de tests systématiques, j'ai identifié six critères fondamentaux pour évaluer la fiabilité d'une API de données historiques sur les crypto‑monnaies. Ces critères doivent être mesurés sur une période d'au moins 30 jours pour obtenir des données statistiquement significatives.
Taux de réussite et gestion des erreurs
Le taux de réussite (success rate) représente le pourcentage de requêtes qui retournent des données valides. Un bon fournisseur devrait maintenir un taux supérieur à 99,5 % sur une période prolongée. Attention aux fournisseurs qui compensent les pannes par des données synthétiques ou interpolées — cette pratique peut compromettre l'intégrité de vos analyses.
Latence et stabilité temporelle
La latence moyenne doit être inférieure à 200 ms pour les données temps réel et 500 ms pour les données historiques. Plus important encore, la variance de latence (ou jitter) révèle la stabilité de l'infrastructure. Une latence moyenne de 100 ms avec un écart-type de 50 ms est préférable à une latence moyenne de 80 ms avec un écart-type de 200 ms.
Couverture des données et profondeur historique
| Provider | Depth historique | Exchanges couverts | Données disponibles |
|---|---|---|---|
| CoinGecko | Depuis 2013 | 150+ | Prix, volumes, market cap, OHLCV |
| Binance | Depuis 2017 | Binance uniquement | Prix, klines, trades, depth |
| CoinCap | Depuis 2018 | 25+ | Prix, volumes, taux de change |
| Kraken | Depuis 2013 | Kraken uniquement | Prix, OHLC, spreads |
| Glassnode | Variable | On-chain uniquement | Métriques on-chain, addresses, flux |
Intégrité et cohérence des données
La cohérence entre différents fournisseurs pour un même actif et une même période est un test révélateur. Les écarts de prix entre exchanges légitimes ne devraient pas dépasser 0,5 % pour les actifs à forte liquidité. Les différences plus importantes indiquent soit un problème de qualité des données, soit une manipulation synthétique des prix.
Protocole de test terrain : ma méthodologie complète
Je teste chaque API pendant 30 jours consécutifs avec un ensemble standardisé de requêtes. Cette approche permet d'identifier non seulement les performances moyennes, mais aussi les comportements anormaux qui n'apparaissent qu'en période de stress du marché.
# Script de test complet pour évaluer la fiabilité des API crypto
Inclut tests de charge, validation de cohérence et détection d'anomalies
import asyncio
import aiohttp
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import numpy as np
from scipy import stats
import json
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class APITester:
"""Testeur complet pour les API de données crypto"""
def __init__(self):
self.results = {}
self.consistency_checks = []
async def test_coingecko(self, session: aiohttp.ClientSession) -> Dict:
"""Test de l'API CoinGecko"""
results = {
"provider": "CoinGecko",
"tests": [],
"issues": [],
"score": 0
}
test_cases = [
{
"name": "Prix actuel BTC",
"url": "https://api.coingecko.com/api/v3/simple/price",
"params": {"ids": "bitcoin", "vs_currencies": "usd,eur"},
"expected_keys": ["bitcoin"],
"timeout": 5
},
{
"name": "Market chart BTC 7 jours",
"url": "https://api.coingecko.com/api/v3/coins/bitcoin/market_chart",
"params": {"vs_currency": "usd", "days": "7", "interval": "hourly"},
"expected_keys": ["prices", "market_caps", "total_volumes"],
"timeout": 10
},
{
"name": "OHLC BTC",
"url": "https://api.coingecko.com/api/v3/coins/bitcoin/ohlc",
"params": {"vs_currency": "usd", "days": "7"},
"timeout": 10
},
{
"name": "Liste des top 100 cryptos",
"url": "https://api.coingecko.com/api/v3/coins/markets",
"params": {"vs_currency": "usd", "order": "market_cap_desc", "per_page": "100"},
"timeout": 15
}
]
success_count = 0
total_latency = 0
for test in test_cases:
try:
start = datetime.now()
async with session.get(
test["url"],
params=test["params"],
timeout=aiohttp.ClientTimeout(total=test["timeout"])
) as resp:
latency = (datetime.now() - start).total_seconds() * 1000
total_latency += latency
if resp.status == 200:
data = await resp.json()
# Validation des données
if test.get("expected_keys"):
if isinstance(data, dict):
valid = all(k in data for k in test["expected_keys"])
elif isinstance(data, list):
valid = len(data) > 0
else:
valid = False
else:
valid = True
if valid:
success_count += 1
results["tests"].append({
"name": test["name"],
"status": "PASS",
"latency_ms": round(latency, 2),
"data_size": len(json.dumps(data))
})
else:
results["tests"].append({
"name": test["name"],
"status": "FAIL_DATA",
"latency_ms": round(latency, 2),
"error": "Missing expected keys"
})
results["issues"].append(f"Data validation failed for {test['name']}")
elif resp.status == 429:
results["tests"].append({
"name": test["name"],
"status": "RATE_LIMIT",
"latency_ms": round(latency, 2)
})
results["issues"].append(f"Rate limit hit for {test['name']}")
else:
results["tests"].append({
"name": test["name"],
"status": f"HTTP_{resp.status}",
"latency_ms": round(latency, 2)
})
except asyncio.TimeoutError:
results["tests"].append({
"name": test["name"],
"status": "TIMEOUT",
"timeout_value": test["timeout"]
})
results["issues"].append(f"Timeout for {test['name']}")
except Exception as e:
results["tests"].append({
"name": test["name"],
"status": "ERROR",
"error": str(e)
})
results["issues"].append(f"Error in {test['name']}: {str(e)}")
# Calcul du score
success_rate = success_count / len(test_cases)
avg_latency = total_latency / len(test_cases)
# Score composite : 60% fiabilité, 40% performance
latency_score = max(0, 100 - (avg_latency - 50) * 0.5) if avg_latency < 1000 else 0
results["score"] = (success_rate * 60) + (latency_score * 0.4)
results["success_rate"] = f"{success_rate * 100:.1f}%"
results["avg_latency_ms"] = round(avg_latency, 2)
return results
async def test_binance(self, session: aiohttp.ClientSession) -> Dict:
"""Test de l'API Binance"""
results = {
"provider": "Binance",
"tests": [],
"issues": [],
"score": 0
}
test_cases = [
{
"name": "Klines BTC/USDT 1h",
"url": "https://api.binance.com/api/v3/klines",
"params": {"symbol": "BTCUSDT", "interval": "1h", "limit": 100},
"timeout": 5
},
{
"name": "Prix ticker BTCUSDT",
"url": "https://api.binance.com/api/v3/ticker/price",
"params": {"symbol": "BTCUSDT"},
"timeout": 3
},
{
"name": "Depth orderbook BTCUSDT",
"url": "https://api.binance.com/api/v3/depth",
"params": {"symbol": "BTCUSDT", "limit": 20},
"timeout": 3
},
{
"name": "24h ticker BTCUSDT",
"url": "https://api.binance.com/api/v3/ticker/24hr",
"params": {"symbol": "BTCUSDT"},
"timeout": 3
}
]
success_count = 0
total_latency = 0
for test in test_cases:
try:
start = datetime.now()
async with session.get(
test["url"],
params=test["params"],
timeout=aiohttp.ClientTimeout(total=test["timeout"])
) as resp:
latency = (datetime.now() - start).total_seconds() * 1000
total_latency += latency
if resp.status == 200:
data = await resp.json()
success_count += 1
results["tests"].append({
"name": test["name"],
"status": "PASS",
"latency_ms": round(latency, 2),
"data_size": len(json.dumps(data))
})
elif resp.status == -1001 or resp.status == -1010:
results["tests"].append({
"name": test["name"],
"status": "Binance_ERROR",
"latency_ms": round(latency, 2),
"error_code": resp.status
})
results["issues"].append(f"Binance internal error: {resp.status}")
else:
results["tests"].append({
"name": test["name"],
"status": f"HTTP_{resp.status}",
"latency_ms": round(latency, 2)
})
except Exception as e:
results["tests"].append({
"name": test["name"],
"status": "ERROR",
"error": str(e)
})
results["issues"].append(f"Error in {test['name']}: {str(e)}")
success_rate = success_count / len(test_cases)
avg_latency = total_latency / len(test_cases)
latency_score = max(0, 100 - (avg_latency - 30) * 0.5) if avg_latency < 1000 else 0
results["score"] = (success_rate * 60) + (latency_score * 0.4)
results["success_rate"] = f"{success_rate * 100:.1f}%"
results["avg_latency_ms"] = round(avg_latency, 2)
return results
async def test_data_consistency(
self,
session: aiohttp.ClientSession,
asset: str = "bitcoin",
vs_currency: str = "usd"
) -> Dict:
"""Test de cohérence entre différents fournisseurs pour le même actif"""
consistency_results = {
"asset": asset,
"currency": vs_currency,
"checks": [],
"coherence_score": 0
}
prices = {}
# Récupérer prix depuis CoinGecko
try:
async with session.get(
"https://api.coingecko.com/api/v3/simple/price",
params={"ids": asset, "vs_currencies": vs_currency},
timeout=aiohttp.ClientTimeout(total=5)
) as resp:
if resp.status == 200:
data = await resp.json()
if asset in data and vs_currency in data[asset]:
prices["coingecko"] = data[asset][vs_currency]
except Exception as e:
logger.warning(f"CoinGecko consistency check failed: {e}")
# Récupérer prix depuis Binance (si applicable)
symbol = f"{asset.upper()}USDT"
try:
async with session.get(
"https://api.binance.com/api/v3/ticker/price",
params={"symbol": symbol},
timeout=aiohttp.ClientTimeout(total=5)
) as resp:
if resp.status == 200:
data = await resp.json()
if "price" in data:
prices["binance"] = float(data["price"])
except Exception as e:
logger.warning(f"Binance consistency check failed: {e}")
# Calculer les écarts
if len(prices) >= 2:
price_values = list(prices.values())
max_diff_pct = abs(max(price_values) - min(price_values)) / min(price_values) * 100
consistency_results["prices"] = prices
consistency_results["max_difference_percent"] = round(max_diff_pct, 4)
consistency_results["coherence_score"] = max(0, 100 - max_diff_pct * 10)
consistency_results["status"] = "PASS" if max_diff_pct < 1.0 else "WARNING"
if max_diff_pct > 1.0:
consistency_results["checks"].append(
f"Écart important détecté: {max_diff_pct:.2f}%"
)
else:
consistency_results["status"] = "INSUFFICIENT_DATA"
consistency_results["checks"].append("Impossible de comparer - données manquantes")
return consistency_results
async def run_full_test_suite(self) -> Dict:
"""Exécuter la suite complète de tests"""
print("🚀 Démarrage de la suite de tests API Crypto...")
print("=" * 60)
async with aiohttp.ClientSession() as session:
# Tests individuels
print("\n📡 Test CoinGecko...")
coingecko_results = await self.test_coingecko(session)
print(f" Score: {coingecko_results['score']:.1f}/100")
print(f" Taux de réussite: {coingecko_results['success_rate']}")
print(f" Latence moyenne: {coingecko_results['avg_latency_ms']}ms")
print("\n📡 Test Binance...")
binance_results = await self.test_binance(session)
print(f" Score: {binance_results['score']:.1f}/100")
print(f" Taux de réussite: {binance_results['success_rate']}")
print(f" Latence moyenne: {binance_results['avg_latency_ms']}ms")
# Tests de cohérence
print("\n🔍 Test de cohérence des données...")
consistency = await self.test_data_consistency(session, "bitcoin")
print(f" Score de cohérence: {consistency.get('coherence_score', 0):.1f}/100")
print(f" Prix comparés: {consistency.get('prices', {})}")
return {
"timestamp": datetime.now().isoformat(),
"coingecko": coingecko_results,
"binance": binance_results,
"consistency": consistency
}
Exécution des tests
if __name__ == "__main__":
tester = APITester()
full_results = asyncio.run(tester.run_full_test_suite())
print("\n" + "=" * 60)
print("📊 RÉSUMÉ DES TESTS")
print("=" * 60)
for provider in ["coingecko", "binance"]:
if provider in full_results:
r = full_results[provider]
print(f"\n{r['provider']}:")
print(f" Score global: {r['score']:.1f}/100")
print(f" Problèmes identifiés: {len(r['issues'])}")
for issue in r['issues']:
print(f" ⚠️ {issue}")
Erreurs courantes et solutions
Erreur 1 : Rate limiting excessif bloquant les requêtes
Symptôme : Réception d'erreurs HTTP 429 même avec un volume modéré de requêtes. Les requêtes sont systématiquement rejetées pendant plusieurs minutes.
Cause racine : Dépassement des limites de taux imposées par l'API, souvent dû à un manque de gestion des en-têtes de réponse Retry-After ou à une absence de rate limiting côté client.
Solution : Implémenter un client avec backoff exponentiel et respect des limites documentées. Pour CoinGecko, la version gratuite limite à 10-30 requêtes/minute selon le plan. Utiliser les endpoints batch quand disponibles.
# Gestion robuste du rate limiting avec backoff exponentiel
import asyncio
import aiohttp
from datetime import datetime, timedelta
from typing import Optional
import time
class RateLimitedClient:
"""Client HTTP avec gestion intelligente du rate limiting"""
def __init__(self, requests_per_minute: int = 30, requests_per_second: int = 5):
self.rpm_limit = requests_per_minute
self.rps_limit = requests_per_second
self.request_timestamps = []
self.minute_timestamps = []
self.concurrent_requests = 0
self.max_concurrent = 3
self.last_rate_limit_reset = None
async def get_with_retry(
self,
session: aiohttp.ClientSession,
url: str,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
**kwargs
) -> Optional[dict]:
"""Effectuer une requête GET avec retry intelligent"""
for attempt in range(max_retries):
# Vérifier les limites de taux avant d'envoyer
await self._check_rate_limits()
try:
async with session.get(url, **kwargs) as response:
if response.status == 200:
return await response.json()
elif response.status == 429:
# Rate limit atteint
retry_after = response.headers.get('Retry-After')
if retry_after:
wait_time = int(retry_after)
else:
# Backoff exponentiel
wait_time = min(base_delay * (2 ** attempt), max_delay)
print(f"⚠️ Rate limit atteint. Attente {wait_time}s (tentative {attempt + 1}/{max_retries})")
await asyncio.sleep(wait_time)
# Mettre à jour le timestamp de reset
self.last_rate_limit_reset = datetime.now() + timedelta(seconds=wait_time)
elif response.status >= 500:
# Erreur serveur, retry
wait_time = base_delay * (2 ** attempt)
print(f"⚠️ Erreur serveur {response.status}. Retry dans {wait_time}s")
await asyncio.sleep(wait_time)
else:
# Erreur client, ne pas retry
return None
except asyncio.TimeoutError:
wait_time = base_delay * (2 ** attempt)
print(f"⏱️ Timeout. Retry dans {wait_time}s")
await asyncio.sleep(wait_time)
except Exception as e:
if attempt == max_retries - 1:
print(f"❌ Échec définitif: {e}")
return None
await asyncio.sleep(base_delay * (2 ** attempt))
return None
async def _check_rate_limits(self):
"""Vérifier et appliquer les limites de taux"""
now = datetime.now()
# Nettoyer les timestamps vieux de plus d'une seconde
self.request_timestamps = [
ts for ts in self.request_timestamps
if (now - ts).total_seconds() < 1.0
]
# Nettoyer les timestamps vieux de plus d'une minute
self.minute_timestamps = [
ts for ts in self.minute_timestamps
if (now - ts).total_seconds() < 60.0
]
# Respecter la limite de requêtes par seconde
if len(self.request_timestamps) >= self.rps_limit:
oldest = self.request_timestamps[0]
wait_time = 1.0 - (now - oldest).total_seconds()
if wait_time > 0:
await asyncio.sleep(wait_time)
# Respecter la limite de requêtes par minute
if len(self.minute_timestamps) >= self.rpm_limit:
oldest = self.minute_timestamps[0]
wait_time = 60.0 - (now - oldest).total_seconds()
if wait_time > 0:
print(f"⏳ Limite RPM atteinte. Attente {wait_time:.1f}s")
await asyncio.sleep(wait_time)
# Respecter le concurrent limit
while self.concurrent_requests >= self.max_concurrent:
await asyncio.sleep(0.1)
# Enregistrer la nouvelle requête
self.concurrent_requests += 1
self.request_timestamps.append(datetime.now())
self.minute_timestamps.append(datetime.now())
def release_request(self):
"""Libérer un slot de requête concurrente"""
self.concurrent_requests = max(0, self.concurrent_requests - 1)
Exemple d'utilisation
async def fetch_crypto_data():
client = RateLimitedClient(requests_per_minute=25, requests_per_second=3)
async with aio