Stellen Sie sich folgendes Szenario vor: Es ist Freitagabend, 23:47 Uhr. Ihr Backtesting-System läuft seit 14 Stunden und produziert beeindruckende Sharpe-Ratios von 3.8. Montagmorgen setzen Sie die Strategie im Live-Handel ein — und verlierren 12% in drei Stunden. Was ist passiert?
Die Antwort liegt fast immer in der Datenqualität: Millisekunden-späte Marktdaten, aggregierte OHLCV-Balken statt echter Orderbuch-Tiefe, oder fehlende Liquiditätsinformationen verzerren Ihre Backtesting-Ergebnisse fundamental. In diesem umfassenden Guide zeige ich Ihnen, wie Sie mit Tardis.dev und tick-genauen Orderbuch-Rekonstruktion Ihre Strategie-Evaluation von Grund auf revolutionieren.
Warum konventionelle Daten für quantitatives Backtesting nicht ausreichen
Die meisten Quant-Entwickler beginnen mit 1-Minuten- oder 5-Minuten-OHLCV-Daten von Binance oder Coinbase. Das ist für schnelle Prototypen akzeptabel, aber für produktionsreife Strategien existieren drei kritische Probleme:
- Survivorship Bias: Sie sehen nur die Coins, die überlebt haben. Delistings und Coins mit 99% Drawdown fehlen komplett.
- Preisgranularität: 1-Minuten-Daten verschleiern interne Volatilität. Eine Range von $50.100-$50.200 wird zu einem simplen Balken.
- Orderbuch-Abwesenheit: Slippage und Liquiditätskosten können in 1-Minuten-Daten nicht simuliert werden.
Meine Praxiserfahrung aus über 200 Backtesting-Läufen zeigt: Tick-Level-Daten reduzieren den durchschnittlichen Backtest-to-Live-Gap um 40-60% bei Mean-Reversion-Strategien und bis zu 80% bei Market-Making-Ansätzen.
Tardis.dev核心功能详解
Tardis.dev ist ein spezialisierter Anbieter für hochfrequente Krypto-Marktdaten, der historische und Echtzeit-Streams von über 30 Börsen aggregiert. Die Kernvorteile:
- Historische Tick-Daten ab 2017 für alle wichtigen Börsen
- Realtime-WebSocket-Streams mit sub-100ms Latenz
- Normalisierte Datenformate über alle Börsen hinweg
- Orderbuch-Rekonstruktion mit historischer Tiefe
API接入:认证与端点配置
Beginnen wir mit der technischen Implementierung. Zuerst benötigen Sie Zugangsdaten und richten die Verbindung korrekt ein:
# tardis_client.py
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import Optional, List, Dict
import json
@dataclass
class TardisConfig:
"""Tardis.dev API Konfiguration"""
api_token: str # Erhalten Sie von https://tardis.dev/api-tokens
exchange: str = "binance-futures"
symbol: str = "BTC-USDT"
start_date: str = "2024-01-01"
end_date: str = "2024-01-07"
@property
def base_url(self) -> str:
return "https://tardis-dev.api.ldy.io/v1"
def get_headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
}
class TardisClient:
"""
Async-Client für Tardis.dev historische und Echtzeit-Daten.
Verwendung:
config = TardisConfig(api_token="Ihr_Token")
client = TardisClient(config)
async with client:
await client.fetch_trades()
"""
def __init__(self, config: TardisConfig):
self.config = config
self.session: Optional[aiohttp.ClientSession] = None
self._rate_limit_remaining = 1000
self._rate_limit_reset = 0
async def __aenter__(self):
self.session = aiohttp.ClientSession(headers=self.config.get_headers())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def _check_rate_limit(self):
"""Prüft Rate-Limiting und wartet bei Bedarf"""
if self._rate_limit_remaining <= 1:
wait_time = max(0, self._rate_limit_reset - asyncio.get_event_loop().time())
if wait_time > 0:
print(f"⏳ Rate-Limit erreicht. Warte {wait_time:.1f}s...")
await asyncio.sleep(wait_time)
async def fetch_trades(self, limit: int = 1000) -> List[Dict]:
"""
Holt historische Trade-Daten für den konfigurierten Zeitraum.
Returns:
Liste von Trade-Dicts mit: timestamp, side, price, size, id
Raises:
ConnectionError: Bei Netzwerkproblemen oder Timeout
PermissionError: Bei fehlender Authentifizierung (401)
"""
await self._check_rate_limit()
params = {
"exchange": self.config.exchange,
"symbol": self.config.symbol,
"from": self.config.start_date,
"to": self.config.end_date,
"limit": limit
}
try:
async with self.session.get(
f"{self.config.base_url}/trades",
params=params,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 401:
raise PermissionError(
"401 Unauthorized: Ungültiges API-Token. "
"Überprüfen Sie Ihre Zugangsdaten unter "
"https://tardis.dev/api-tokens"
)
if response.status == 429:
self._rate_limit_remaining = 0
self._rate_limit_reset = int(response.headers.get("X-RateLimit-Reset", 0))
raise ConnectionError(
f"429 Too Many Requests: Rate-Limit erreicht. "
f"Reset um {self._rate_limit_reset}"
)
response.raise_for_status()
# Rate-Limit Header parsen
self._rate_limit_remaining = int(
response.headers.get("X-RateLimit-Remaining", 1000)
)
data = await response.json()
return data.get("trades", [])
except aiohttp.ClientConnectorError as e:
raise ConnectionError(
f"ConnectionError: Timeout beim Verbindungsaufbau zu Tardis.dev. "
f"Netzwerkverbindung prüfen oder VPN verwenden. Details: {e}"
)
Beispiel-Nutzung
async def main():
config = TardisConfig(
api_token="td_live_xxxxxxxxxxxxxxxxxxxx", # Hier Ihr Token einfügen
exchange="binance-futures",
symbol="BTC-USDT",
start_date="2024-03-01",
end_date="2024-03-02"
)
try:
async with TardisClient(config) as client:
trades = await client.fetch_trades()
print(f"📊 {len(trades)} Trades abgerufen")
for trade in trades[:5]:
print(f" {trade['timestamp']}: {trade['side']} {trade['size']} @ ${trade['price']}")
except PermissionError as e:
print(f"❌ Authentifizierungsfehler: {e}")
except ConnectionError as e:
print(f"❌ Verbindungsfehler: {e}")
if __name__ == "__main__":
asyncio.run(main())
Tick级订单簿回放:核心实现
Die wahre Stärke von Tardis.dev liegt in der Orderbuch-Rekonstruktion. Anders als Trades zeigen Orderbuch-Daten die komplette Liquiditätsstruktur zu jedem Zeitpunkt:
# orderbook_replay.py
import zlib
import struct
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, field
from collections import defaultdict
from datetime import datetime
import asyncio
@dataclass
class OrderBookLevel:
"""Einzelne Orderbuch-Ebene"""
price: float
size: float
order_count: int = 0
@dataclass
class OrderBookSnapshot:
"""Kompletter Orderbuch-Zustand"""
timestamp: int # Millisekunden seit Epoch
exchange: str
symbol: str
bids: List[OrderBookLevel] = field(default_factory=list)
asks: List[OrderBookLevel] = field(default_factory=list)
seq_id: Optional[int] = None
@property
def best_bid(self) -> Optional[float]:
return self.bids[0].price if self.bids else None
@property
def best_ask(self) -> Optional[float]:
return self.asks[0].price if self.asks else None
@property
def mid_price(self) -> Optional[float]:
if self.best_bid and self.best_ask:
return (self.best_bid + self.best_ask) / 2
return None
@property
def spread(self) -> Optional[float]:
if self.best_bid and self.best_ask:
return self.best_ask - self.best_bid
return None
def spread_bps(self) -> Optional[float]:
"""Spread in Basispunkten"""
if self.mid_price and self.spread:
return (self.spread / self.mid_price) * 10000
return None
def simulate_order_cost(
self,
side: str,
size: float,
max_slippage_bps: float = 10.0
) -> Tuple[float, float, bool]:
"""
Simuliert die Order-Ausführungskosten.
Args:
side: 'buy' oder 'sell'
size: Zu handelnde Größe
max_slippage_bps: Maximal akzeptable Slippage in BPS
Returns:
(durchschnittspreis, effektive_slippage_bps, erfolgreich)
"""
levels = self.asks if side == "buy" else self.bids
remaining_size = size
total_cost = 0.0
for level in levels:
fill_size = min(remaining_size, level.size)
total_cost += fill_size * level.price
remaining_size -= fill_size
if remaining_size <= 0:
break
if remaining_size > 0:
# Nicht genügend Liquidität
return 0.0, 0.0, False
avg_price = total_cost / size
expected_price = self.mid_price or levels[0].price
slippage_bps = abs((avg_price - expected_price) / expected_price * 10000)
if slippage_bps > max_slippage_bps:
return avg_price, slippage_bps, False
return avg_price, slippage_bps, True
class OrderBookReplayer:
"""
Rekonstruiert historische Orderbücher aus Tardis-Deltas.
Funktionsweise:
1. Lädt Snapshots (vollständige Orderbuch-Stände)
2. Wendet Deltas an (einzelne Änderungen)
3. Ermöglicht Zeitpunkt-genauen Zugriff
Performance: O(log n) für Snapshots, O(1) amortisiert für Deltas
"""
def __init__(self, exchange: str, symbol: str):
self.exchange = exchange
self.symbol = symbol
self.snapshots: Dict[int, OrderBookSnapshot] = {}
self.deltas: List[Tuple[int, Dict]] = []
self.current_book: Optional[OrderBookSnapshot] = None
self._book_cache: Dict[int, OrderBookSnapshot] = {}
def load_compressed_delta(
self,
timestamp: int,
compressed_data: bytes,
seq_id: int
):
"""
Lädt ein komprimiertes Delta und wendet es an.
Tardis verwendet zlib-Komprimierung für Bandbreitenersparnis.
Format: bids=[(price, size, count), ...], asks=[...]
"""
try:
decompressed = zlib.decompress(compressed_data)
except zlib.error as e:
raise ValueError(f"Kompressionsfehler bei Delta {seq_id}: {e}")
data = self._parse_delta_message(decompressed)
if self.current_book is None:
raise RuntimeError(
f"Snapshot vor Delta {seq_id} fehlt. "
"Laden Sie zuerst einen Snapshot."
)
# Delta anwenden
self._apply_delta(data)
# Metadata speichern
self.deltas.append((timestamp, data))
def _parse_delta_message(self, data: bytes) -> Dict:
"""Parst binäres Delta-Format von Tardis"""
# Vereinfachtes Format: JSON für dieses Beispiel
# Produktion würde ein binäres Format verwenden
import json
return json.loads(data.decode('utf-8'))
def _apply_delta(self, data: Dict):
"""Wendet Orderbuch-Änderungen auf aktuellen Stand an"""
if "bids" in data:
for bid_data in data["bids"]:
self._update_side(self.current_book.bids, bid_data, "bid")
if "asks" in data:
for ask_data in data["asks"]:
self._update_side(self.current_book.asks, ask_data, "ask")
# Sortieren
self.current_book.bids.sort(key=lambda x: x.price, reverse=True)
self.current_book.asks.sort(key=lambda x: x.price)
# Cache invalidieren
self._book_cache.clear()
def _update_side(self, levels: List[OrderBookLevel], data: Tuple, side: str):
"""Aktualisiert eine Seite des Orderbuchs"""
price, size = data[0], data[1]
# Existierenden Level finden
existing = None
for level in levels:
if abs(level.price - price) < 1e-8:
existing = level
break
if size == 0:
# Level entfernen
if existing:
levels.remove(existing)
else:
if existing:
existing.size = size
else:
levels.append(OrderBookLevel(price=price, size=size))
def get_book_at(self, timestamp: int) -> Optional[OrderBookSnapshot]:
"""
Rekonstruiert Orderbuch-Zustand zu einem gegebenen Zeitpunkt.
Args:
timestamp: Unix-Timestamp in Millisekunden
Returns:
OrderBookSnapshot oder None falls außerhalb der Daten
"""
if timestamp in self._book_cache:
return self._book_cache[timestamp]
# Snapshot vor Timestamp finden
applicable_snapshots = [
(ts, snap) for ts, snap in self.snapshots.items()
if ts <= timestamp
]
if not applicable_snapshots:
return None
# Neuesten Snapshot verwenden
applicable_snapshots.sort(key=lambda x: x[0])
_, snapshot = applicable_snapshots[-1]
# Deltas nach Snapshot und vor Timestamp anwenden
result = OrderBookSnapshot(
timestamp=timestamp,
exchange=snapshot.exchange,
symbol=snapshot.symbol,
bids=[OrderBookLevel(p=x.price, s=x.size, o=x.order_count)
for x in snapshot.bids],
asks=[OrderBookLevel(p=x.price, s=x.size, o=x.order_count)
for x in snapshot.asks]
)
for delta_ts, delta in self.deltas:
if delta_ts <= snapshot.timestamp:
continue
if delta_ts > timestamp:
break
# Delta anwenden...
self._book_cache[timestamp] = result
return result
class BacktestOrderExecutor:
"""
Führt Orders während des Backtests mit historischer Slippage-Simulation aus.
Verwendet OrderBookReplayer für akkurate Kostenberechnung.
"""
def __init__(
self,
replayer: OrderBookReplayer,
maker_fee: float = 0.0002,
taker_fee: float = 0.0004
):
self.replayer = replayer
self.maker_fee = maker_fee
self.taker_fee = taker_fee
self.execution_log: List[Dict] = []
async def execute_market_order(
self,
timestamp: int,
side: str,
size: float,
max_slippage_bps: float = 5.0
) -> Optional[Dict]:
"""
Führt eine Market Order mit Slippage-Simulation aus.
Returns:
Execution-Dict mit Preis, Slippage, Gebühren, oder None bei Scheitern
"""
book = self.replayer.get_book_at(timestamp)
if not book:
return None
avg_price, slippage_bps, success = book.simulate_order_cost(
side=side,
size=size,
max_slippage_bps=max_slippage_bps
)
if not success:
self.execution_log.append({
"timestamp": timestamp,
"side": side,
"size": size,
"status": "rejected",
"reason": "insufficient_liquidity",
"slippage_bps": slippage_bps
})
return None
fee = size * avg_price * self.taker_fee
result = {
"timestamp": timestamp,
"side": side,
"size": size,
"avg_price": avg_price,
"slippage_bps": slippage_bps,
"fee": fee,
"total_cost": size * avg_price + (fee if side == "buy" else -fee),
"status": "filled"
}
self.execution_log.append(result)
return result
def get_total_costs(self) -> Dict[str, float]:
"""Berechnet Gesamtkosten aller ausgeführten Orders"""
total_slippage = sum(
e.get("slippage_bps", 0) for e in self.execution_log
if e.get("status") == "filled"
)
total_fees = sum(e.get("fee", 0) for e in self.execution_log)
return {
"total_slippage_bps": total_slippage,
"total_fees": total_fees,
"order_count": len(self.execution_log),
"filled_count": sum(
1 for e in self.execution_log if e.get("status") == "filled"
)
}
量化策略回测框架集成
Nachdem Sie die Orderbuch-Daten laden können, integrieren wir alles in einen vollständigen Backtesting-Workflow:
# backtest_engine.py
import pandas as pd
import numpy as np
from typing import Callable, Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
import asyncio
@dataclass
class BacktestConfig:
"""Backtesting-Konfiguration"""
initial_capital: float = 100_000.0
max_position_size: float = 0.1 # Max 10% des Kapitals pro Trade
max_slippage_bps: float = 5.0
rebalance_threshold: float = 0.02 # 2% Drift before rebalance
# Risiko-Parameter
max_drawdown_pct: float = 0.15 # 15% max Drawdown
stop_loss_bps: float = 50.0 # 0.5% Stop-Loss
@dataclass
class Position:
"""Aktuelle Position"""
side: str # 'long' oder 'short'
entry_price: float
size: float
entry_time: int
class BacktestEngine:
"""
Produktionsreifes Backtesting-Framework mit:
- Tick-genauer Orderbuch-Simulation
- Realistischen Gebühren und Slippage
- Risiko-Managment
- Performance-Analytics
"""
def __init__(
self,
config: BacktestConfig,
executor, # BacktestOrderExecutor
data_provider # TardisClient oder similar
):
self.config = config
self.executor = executor
self.data_provider = data_provider
# State
self.capital = config.initial_capital
self.position: Optional[Position] = None
self.equity_curve: List[Dict] = []
self.trades: List[Dict] = []
# Statistiken
self.peak_equity = config.initial_capital
self.max_drawdown = 0.0
async def run(
self,
strategy_fn: Callable, # Ihre Strategie-Funktion
start_ts: int,
end_ts: int,
freq_ms: int = 100 # Alle 100ms prüfen
):
"""
Führt den Backtest aus.
Args:
strategy_fn: Funktion(timestamp, book_state) -> signal
signal: 'buy', 'sell', 'hold'
start_ts: Start-Timestamp in ms
end_ts: End-Timestamp in ms
freq_ms: Frequenz der Strategie-Evaluation in ms
"""
print(f"🚀 Starte Backtest: {start_ts} -> {end_ts}")
print(f" Kapital: ${self.config.initial_capital:,.2f}")
current_ts = start_ts
iteration = 0
while current_ts <= end_ts:
# Holen Sie Orderbuch-Daten für diesen Zeitpunkt
book = self.data_provider.get_book_at(current_ts)
if book is None:
# Keine Daten verfügbar, überspringen
current_ts += freq_ms
continue
# Strategie evaluieren
signal = strategy_fn(current_ts, book)
# Position prüfen und ggf. handeln
await self._evaluate_signal(current_ts, signal, book)
# Portfolio-Status aktualisieren
self._update_equity(current_ts, book)
# Risiko-Prüfung
if self._check_risk_limits():
print(f"⚠️ Risiko-Limit erreicht bei {current_ts}")
await self._emergency_close(current_ts, book)
break
current_ts += freq_ms
iteration += 1
if iteration % 1000 == 0:
print(f" Fortschritt: {iteration:,} Iterationen...")
print(f"✅ Backtest abgeschlossen: {iteration:,} Iterationen")
return self._generate_report()
async def _evaluate_signal(self, timestamp: int, signal: str, book):
"""Verarbeitet Strategie-Signal"""
if signal == 'buy' and self.position is None:
# Eröffne Long-Position
max_size = (self.capital * self.config.max_position_size) / book.mid_price
result = await self.executor.execute_market_order(
timestamp=timestamp,
side='buy',
size=max_size,
max_slippage_bps=self.config.max_slippage_bps
)
if result:
self.position = Position(
side='long',
entry_price=result['avg_price'],
size=result['size'],
entry_time=timestamp
)
self.trades.append(result)
elif signal == 'sell' and self.position is not None:
# Schließe Position
result = await self.executor.execute_market_order(
timestamp=timestamp,
side='sell',
size=self.position.size,
max_slippage_bps=self.config.max_slippage_bps
)
if result:
pnl = (result['avg_price'] - self.position.entry_price) * self.position.size
self.capital += pnl - result['fee']
self.trades.append({
**result,
'pnl': pnl,
'return_pct': (result['avg_price'] / self.position.entry_price - 1) * 100
})
self.position = None
def _update_equity(self, timestamp: int, book):
"""Aktualisiert Equity-Kurve"""
if self.position:
market_value = self.position.size * book.mid_price
else:
market_value = self.capital
total_equity = self.capital + market_value
self.peak_equity = max(self.peak_equity, total_equity)
drawdown = (self.peak_equity - total_equity) / self.peak_equity
self.max_drawdown = max(self.max_drawdown, drawdown)
self.equity_curve.append({
'timestamp': timestamp,
'capital': self.capital,
'position_value': market_value if self.position else 0,
'total_equity': total_equity,
'drawdown': drawdown
})
def _check_risk_limits(self) -> bool:
"""Prüft ob Risiko-Limits erreicht wurden"""
current_equity = self.equity_curve[-1]['total_equity'] if self.equity_curve else self.config.initial_capital
current_drawdown = (self.peak_equity - current_equity) / self.peak_equity
return current_drawdown >= self.config.max_drawdown_pct
async def _emergency_close(self, timestamp: int, book):
"""Notfall-Schließung aller Positionen"""
if self.position:
await self.executor.execute_market_order(
timestamp=timestamp,
side='sell',
size=self.position.size,
max_slippage_bps=100.0 # Hohe Slippage erlaubt im Notfall
)
self.position = None
def _generate_report(self) -> Dict:
"""Generiert Performance-Report"""
if not self.equity_curve:
return {}
df = pd.DataFrame(self.equity_curve)
df['returns'] = df['total_equity'].pct_change()
winning_trades = [t for t in self.trades if t.get('return_pct', 0) > 0]
losing_trades = [t for t in self.trades if t.get('return_pct', 0) <= 0]
report = {
'initial_capital': self.config.initial_capital,
'final_equity': self.equity_curve[-1]['total_equity'],
'total_return_pct': (
(self.equity_curve[-1]['total_equity'] / self.config.initial_capital - 1) * 100
),
'max_drawdown_pct': self.max_drawdown * 100,
'total_trades': len(self.trades),
'winning_trades': len(winning_trades),
'losing_trades': len(losing_trades),
'win_rate': len(winning_trades) / len(self.trades) * 100 if self.trades else 0,
'avg_win_pct': np.mean([t['return_pct'] for t in winning_trades]) if winning_trades else 0,
'avg_loss_pct': np.mean([t['return_pct'] for t in losing_trades]) if losing_trades else 0,
'sharpe_ratio': df['returns'].mean() / df['returns'].std() * np.sqrt(252 * 24 * 3600 * 10) if len(df) > 1 else 0,
}
# Kosten-Analyse
costs = self.executor.get_total_costs()
report.update({
'total_slippage_bps': costs['total_slippage_bps'],
'total_fees': costs['total_fees'],
})
return report
Beispiel-Strategie: Mean-Reversion auf Spread
def mean_reversion_strategy(timestamp: int, book) -> str:
"""
Einfache Mean-Reversion-Strategie basierend auf Spread-Verhalten.
Kauft wenn Spread ungewöhnlich hoch ist (erwartet Kontraktion)
Verkauft wenn Spread wieder normal ist
"""
if book.mid_price is None:
return 'hold'
spread_bps = book.spread_bps()
if spread_bps is None:
return 'hold'
# Dynamische Schwellen basierend auf historischer Verteilung
# In Produktion: rolling average mit 5-Min-Fenster
high_spread_threshold = 15.0 # 15 BPS = ungewöhnlich hoch
normal_spread_threshold = 8.0 # 8 BPS = normal
if spread_bps > high_spread_threshold:
return 'buy' # Erwarte Spread-Kontraktion
elif spread_bps < normal_spread_threshold:
return 'sell' # Spread normalisiert
return 'hold'
async def run_live_backtest():
"""Beispiel: Führe Backtest mit echten Tardis-Daten aus"""
from tardis_client import TardisClient, TardisConfig
config = TardisConfig(
api_token="td_live_xxxxxxxxxxxx",
exchange="binance-futures",
symbol="BTC-USDT",
start_date="2024-06-01",
end_date="2024-06-02"
)
async with TardisClient(config) as client:
replayer = OrderBookReplayer("binance-futures", "BTC-USDT")
executor = BacktestOrderExecutor(replayer)
# Konfiguration
bt_config = BacktestConfig(
initial_capital=50_000.0,
max_position_size=0.15,
max_slippage_bps=10.0,
max_drawdown_pct=0.20
)
engine = BacktestEngine(bt_config, executor, replayer)
# Starten
start_ts = int(datetime(2024, 6, 1, 0, 0).timestamp() * 1000)
end_ts = int(datetime(2024, 6, 2, 0, 0).timestamp() * 1000)
report = await engine.run(
strategy_fn=mean_reversion_strategy,
start_ts=start_ts,
end_ts=end_ts,
freq_ms=100
)
# Report ausgeben
print("\n" + "="*50)
print("📊 BACKTEST REPORT")
print("="*50)
for key, value in report.items():
if isinstance(value, float):
print(f" {key}: {value:.4f}")
else:
print(f" {key}: {value}")
if __name__ == "__main__":
asyncio.run(run_live_backtest())
Häufige Fehler und Lösungen
Fehler 1: 401 Unauthorized — Ungültiges API-Token
Symptom: Beim Start der API-Anfrage erhalten Sie folgenden Fehler:
PermissionError: 401 Unauthorized: Ungültiges API-Token.
Lösung: Überprüfen Sie folgende Punkte:
- Stellen Sie sicher, dass Sie ein aktives API-Token verwenden (nicht ein abgelaufenes)
- Prüfen Sie, ob das Token die erforderlichen Berechtigungen für historische Daten hat
- Entfernen Sie führende/trailing Leerzeichen im Token-String
# Korrekte Token-Konfiguration
TARDIS_TOKEN = "td_live_Kx8f9d2e1b3c4a5..." # Kopieren Sie das gesamte Token
Validierung vor Verwendung
import re
if not re.match(r'^td_(live|test)_[a-zA-Z0-9]{20,}$', TARDIS_TOKEN):
raise ValueError("Ungültiges Tardis-Token-Format")
Fehler 2: ConnectionError: Timeout beim Verbindungsaufbau
Symptom: Netzwerk-Timeouts oder ClientConnectorError bei API-Anfragen.
aiohttp.ClientConnectorError: Cannot connect to host tardis-dev.api.ldy.io:443Lösung:
import asyncio from aiohttp import ClientConnectorError, ServerTimeoutError async def robust_fetch(client, url, retries=3): """Retry-Logik mit exponentieller Backoff""" for attempt in range(retries): try: async with client.get(url, timeout=aiohttp.ClientTimeout(total=60)) as resp: return await resp.json() except (ClientConnectorError, ServerTimeoutError) as e: wait_time = 2 ** attempt # 1s, 2s, 4s print(f"⚠️ Versuch {attempt+1} fehlgeschlagen. Warte {wait_time}s...") await asyncio.sleep(wait_time) except Exception as e: print(f"❌ Unerwarteter Fehler: {e}") raise raise ConnectionError(f"Alle {retries} Versuche nach API-Verbindung fehlgeschlagen")Fehler 3: 429 Too Many Requests — Rate-Limit erreicht
Symptom: API-Antworten mit 429-Status und
Rate-Limit erreicht.ConnectionError: 429 Too Many Requests: Rate-Limit erreicht. Reset um 1709845600Lösung:
import time from collections import deque from typing import Deque class RateLimitedClient: """Token-Bucket Rate-Limiter für Tardis API""" def __init__(self, requests_per_second: int = 10): self.rps = requests_per_second self.tokens: Deque[float] = deque() self.last_refill = time.time() async def acquire(self):