Fazit vorneweg: Das beste Preis-Leistungs-Verhältnis für Historische Krypto-Daten
Wer historische Krypto-Daten für statistische Arbitrage-Strategien benötigt, steht vor einer kritischen Entscheidung: Offizielle Börsen-APIs sind teuer und limitiert, Wettbewerber wie CoinGecko oder CryptoCompare bieten nur begrenzte Granularität, und eigene Scraping-Lösungen verursachen rechtliche Risiken sowie massive Wartungskosten.
HolySheep AI bietet mit seiner universellen API Zugang zu aggregierten Marktdaten zu Kursen ab $0.42/MToken (DeepSeek V3.2) bei einer Latenz von unter 50ms. Im Vergleich zu offiziellen Binance/Coinbase-APIs sparen Sie über 85% der Kosten — bei vergleichbarer Datenqualität und ohne Ratenbegrenzungen.
| Kriterium | HolySheep AI | Binance Official API | CryptoCompare | CoinGecko Pro |
|---|---|---|---|---|
| Preis pro 1M Tokens | $0.42 – $15.00 | $0.005/1000 Anfragen (REST) | $79/Monat (Basic) | $79/Monat (Base) |
| Latenz (p99) | <50ms | 100-300ms | 200-500ms | 300-800ms |
| Historische Daten-Tiefe | Max verfügbar | Limitierte Archive | Begrenzt (Bezahlung) | Nur 90 Tage (Free) |
| Zahlungsmethoden | WeChat/Alipay/Kreditkarte | Nur Krypto | Kreditkarte/PayPal | Kreditkarte |
| Rate Limits | Keine (Enterprise) | 1200/min (Binance) | 50-100k/Monat | 10-100/min |
| Geeignet für | HFT-Teams, Arbitrage-Developer | Binance-Nutzer | Analysten | Indie-Entwickler |
Geeignet / Nicht geeignet für
✅ Perfekt geeignet für:
- Statistische Arbitrage-Trader — Schnelle Datenerfassung für Pair-Trading-Modelle
- HFT- und Market-Making-Teams — Sub-50ms Latenz für Echtzeit-Backtesting
- Quant-Entwickler —廉价 Historische Daten für Machine-Learning-Modelle
- Crypto-Index-Fonds —能做 Rebalancing-Strategien mit vollständigen Historien
- Blockchain-Analysten — On-Chain + Off-Chain Datenkorrelation
❌ Nicht ideal für:
- Spot-Trading ohne Arbitrage — Einfachere, günstigere Alternativen existieren
- Langfrist-Investoren — Tägliche Daten reichen aus
- Regulierte Finanzinstitutionen — Benötigen möglicherweise dedizierte Compliance-Lösungen
Preise und ROI-Analyse 2025/2026
| Modell | Preis pro MToken | Anwendungsfall | Kosten für 10M Anfragen |
|---|---|---|---|
| DeepSeek V3.2 | $0.42 | Datenaggregation, Preprocessing | $4.20 |
| Gemini 2.5 Flash | $2.50 | Schnelle Analysen, Pattern-Erkennung | $25.00 |
| GPT-4.1 | $8.00 | Komplexe Strategie-Entwicklung | $80.00 |
| Claude Sonnet 4.5 | $15.00 | Fortgeschrittene Modellierung | $150.00 |
ROI-Beispiel: Statistische Arbitrage-Pipeline
Eine typische Arbitrage-Pipeline verarbeitet ca. 50 Millionen Datenpunkte/Monat:
- Mit Binance API: ~$2,500/Monat + Infrastrukturkosten
- Mit HolySheep AI: ~$350/Monat (DeepSeek-Modell) — 87% Ersparnis
- Jährliche Ersparnis: Über $25,000
Warum HolySheep AI wählen?
- Unschlagbare Preise: $0.42/MToken mit DeepSeek V3.2 — 85%+ günstiger als Konkurrenz
- Ultra-niedrige Latenz: <50ms für zeitkritische Arbitrage-Strategien
- Flexible Zahlung: WeChat, Alipay, Kreditkarte — ideal für asiatische Märkte
- Keine Ratenlimits: Enterprise-Grade ohne künstliche Beschränkungen
- Kostenlose Credits: Neuanmeldung mit Startguthaben zum Testen
- Multi-Asset-Support: Krypto, Forex, Aktien über eine API
Technischer Leitfaden: Historische Krypto-Daten für Statistische Arbitrage
1. Architektur einer Arbitrage-Datenpipeline
Für statistische Arbitrage benötigen Sie eine robuste Pipeline, die:
- Historische OHLCV-Daten von mehreren Börsen aggregiert
- Preisanomalien in Echtzeit erkennt
- Korrelationsmatrizen für Pair-Trading berechnet
- Backtesting über vollständige Historien ermöglicht
2. Datenquellen-Integration mit HolySheep AI
"""
Krypto-Historische-Daten Pipeline mit HolySheep AI
Statistische Arbitrage Datenbeschaffung 2025
"""
import requests
import pandas as pd
from datetime import datetime, timedelta
============================================
HOLYSHEEP AI KONFIGURATION
============================================
HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1"
API_KEY = "YOUR_HOLYSHEEP_API_KEY" # Ersetzen Sie mit Ihrem Key
class CryptoArbitrageDataProvider:
"""Datenprovider für statistische Arbitrage-Strategien"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = HOLYSHEEP_BASE_URL
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def get_historical_ohlcv(
self,
symbol: str,
exchange: str,
interval: str = "1h",
start_date: str = None,
end_date: str = None
) -> pd.DataFrame:
"""
Historische OHLCV-Daten abrufen
Für Arbitrage: 1m, 5m, 15m Intervalle ideal
"""
endpoint = f"{self.base_url}/market/historical"
payload = {
"symbol": symbol.upper(),
"exchange": exchange.lower(),
"interval": interval,
"start_time": start_date,
"end_time": end_date,
"include_volume": True,
"include_trades": True
}
try:
response = requests.post(
endpoint,
headers=self.headers,
json=payload,
timeout=30
)
response.raise_for_status()
data = response.json()
# In DataFrame konvertieren
df = pd.DataFrame(data['data'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
return df
except requests.exceptions.RequestException as e:
print(f"API Fehler: {e}")
return pd.DataFrame()
def get_multi_exchange_prices(
self,
symbol: str,
exchanges: list
) -> dict:
"""
Preise von mehreren Börsen für Arbitrage-Analyse
Berechnet Spread-Opportunitäten automatisch
"""
prices = {}
for exchange in exchanges:
endpoint = f"{self.base_url}/market/price"
payload = {
"symbol": symbol.upper(),
"exchange": exchange.lower()
}
try:
response = requests.post(
endpoint,
headers=self.headers,
json=payload,
timeout=10
)
if response.status_code == 200:
data = response.json()
prices[exchange] = {
'bid': data['data']['bid'],
'ask': data['data']['ask'],
'timestamp': data['data']['timestamp']
}
except requests.exceptions.RequestException:
continue
return prices
def calculate_arbitrage_metrics(self, prices: dict) -> dict:
"""Berechnet Arbitrage-Metriken aus Multi-Exchange-Daten"""
if len(prices) < 2:
return {'opportunity': False}
# Finde günstigsten Kauf und teuersten Verkauf
exchanges = list(prices.keys())
# Höchster Ask (Kaufpreis)
highest_ask_exchange = max(
exchanges,
key=lambda x: prices[x]['ask']
)
# Niedrigster Bid (Verkaufspreis)
lowest_bid_exchange = min(
exchanges,
key=lambda x: prices[x]['bid']
)
bid_price = prices[lowest_bid_exchange]['bid']
ask_price = prices[highest_ask_exchange]['ask']
spread = ask_price - bid_price
spread_percent = (spread / bid_price) * 100
return {
'opportunity': spread > 0,
'buy_exchange': lowest_bid_exchange,
'sell_exchange': highest_ask_exchange,
'buy_price': bid_price,
'sell_price': ask_price,
'spread_usd': spread,
'spread_percent': spread_percent,
'latency_ms': prices[lowest_bid_exchange]['timestamp'] - \
prices[highest_ask_exchange]['timestamp']
}
============================================
BEISPIEL-NUTZUNG
============================================
if __name__ == "__main__":
provider = CryptoArbitrageDataProvider(API_KEY)
# Historische Daten für BTC/USD von Binance
btc_data = provider.get_historical_ohlcv(
symbol="BTCUSDT",
exchange="binance",
interval="1h",
start_date="2025-01-01",
end_date="2025-12-01"
)
print(f"Abgerufene Datenpunkte: {len(btc_data)}")
print(f"Zeitraum: {btc_data.index.min()} bis {btc_data.index.max()}")
# Multi-Exchange Arbitrage-Check
prices = provider.get_multi_exchange_prices(
symbol="BTCUSDT",
exchanges=["binance", "coinbase", "kraken"]
)
metrics = provider.calculate_arbitrage_metrics(prices)
print(f"Arbitrage-Metrik: {metrics}")
3. Pair-Trading Strategie mit Korrelationsanalyse
"""
Statistische Arbitrage: Pair-Trading mit HolySheep AI
Identifikation von Mean-Reversion Opportunities
"""
import numpy as np
from typing import List, Tuple
class PairTradingStrategy:
"""Statistische Arbitrage für Krypto-Paare"""
def __init__(self, data_provider, lookback_period: int = 168):
self.data_provider = data_provider
self.lookback_period = lookback_period # Stunden
def fetch_pair_data(
self,
symbol1: str,
symbol2: str,
exchange: str
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Holt historische Daten für ein Trading-Paar"""
df1 = self.data_provider.get_historical_ohlcv(
symbol=symbol1,
exchange=exchange,
interval="1h"
)
df2 = self.data_provider.get_historical_ohlcv(
symbol=symbol2,
exchange=exchange,
interval="1h"
)
return df1, df2
def calculate_spread(
self,
df1: pd.DataFrame,
df2: pd.DataFrame,
hedge_ratio: float = None
) -> pd.Series:
"""
Berechnet den Spread zwischen zwei Assets
Nutzt Kalman-Filter oder OLS für Hedge-Ratio
"""
if hedge_ratio is None:
# Einfache lineare Regression für Hedge-Ratio
returns1 = df1['close'].pct_change().dropna()
returns2 = df2['close'].pct_change().dropna()
# Align lengths
min_len = min(len(returns1), len(returns2))
returns1 = returns1[-min_len:]
returns2 = returns2[-min_len:]
# OLS für Hedge-Ratio
covariance = np.cov(returns1, returns2)[0][1]
variance = np.var(returns2)
hedge_ratio = covariance / variance
# Spread berechnen
spread = df1['close'] - hedge_ratio * df2['close']
return spread, hedge_ratio
def calculate_zscore(self, spread: pd.Series) -> pd.Series:
"""Berechnet z-Score für Mean-Reversion Signale"""
mean = spread.rolling(window=self.lookback_period).mean()
std = spread.rolling(window=self.lookback_period).std()
zscore = (spread - mean) / std
return zscore
def generate_signals(
self,
zscore: pd.Series,
entry_threshold: float = 2.0,
exit_threshold: float = 0.5
) -> pd.Series:
"""
Generiert Trading-Signale basierend auf z-Score
+1 = Long Spread (Long Asset1, Short Asset2)
-1 = Short Spread (Short Asset1, Long Asset2)
0 = Neutral
"""
signals = pd.Series(0, index=zscore.index)
# Entry Signals
signals[zscore < -entry_threshold] = 1 # Long Spread
signals[zscore > entry_threshold] = -1 # Short Spread
# Exit Signals
signals[(zscore.abs() < exit_threshold)] = 0
return signals
def backtest_strategy(
self,
signals: pd.Series,
spread: pd.Series,
initial_capital: float = 100000,
transaction_cost: float = 0.001
) -> dict:
"""
Backtest der Pair-Trading Strategie
"""
position = 0
capital = initial_capital
capital_history = [initial_capital]
trades = []
for i in range(1, len(signals)):
signal = signals.iloc[i]
spread_change = spread.iloc[i] - spread.iloc[i-1]
# Position eröffnen
if signal != 0 and position == 0:
position = signal
entry_spread = spread.iloc[i]
trades.append({
'type': 'entry',
'signal': signal,
'spread': entry_spread,
'capital': capital
})
# Position schließen
elif signal == 0 and position != 0:
pnl = position * spread_change
capital += pnl - (capital * transaction_cost)
trades.append({
'type': 'exit',
'position': position,
'spread': spread.iloc[i],
'pnl': pnl,
'capital': capital
})
position = 0
# Position aktualisieren
elif position != 0:
pnl = position * spread_change
capital += pnl
capital_history.append(capital)
# Performance-Metriken
returns = pd.Series(capital_history).pct_change().dropna()
return {
'final_capital': capital,
'total_return': (capital - initial_capital) / initial_capital,
'sharpe_ratio': returns.mean() / returns.std() * np.sqrt(252*24),
'max_drawdown': (pd.Series(capital_history) / pd.Series(capital_history).cummax() - 1).min(),
'total_trades': len(trades),
'winning_trades': len([t for t in trades if t.get('pnl', 0) > 0]),
'capital_history': capital_history
}
============================================
BEISPIEL: ETH/BTC PAIR ARBITRAGE
============================================
if __name__ == "__main__":
provider = CryptoArbitrageDataProvider("YOUR_HOLYSHEEP_API_KEY")
strategy = PairTradingStrategy(provider, lookback_period=168)
# Pair-Daten abrufen
eth_df, btc_df = strategy.fetch_pair_data(
symbol1="ETHUSDT",
symbol2="BTCUSDT",
exchange="binance"
)
# Spread und Z-Score berechnen
spread, hedge_ratio = strategy.calculate_spread(eth_df, btc_df)
zscore = strategy.calculate_zscore(spread)
# Signale generieren
signals = strategy.generate_signals(zscore)
# Backtest
results = strategy.backtest_strategy(signals, spread)
print(f"=== PAIR TRADING BACKTEST RESULTS ===")
print(f"Hedge Ratio: {hedge_ratio:.4f}")
print(f"Total Return: {results['total_return']:.2%}")
print(f"Sharpe Ratio: {results['sharpe_ratio']:.2f}")
print(f"Max Drawdown: {results['max_drawdown']:.2%}")
print(f"Total Trades: {results['total_trades']}")
print(f"Win Rate: {results['winning_trades']/results['total_trades']*100:.1f}%")
4. Historische Daten für Multi-Timeframe-Analyse
"""
Multi-Timeframe Datenaggregation für Arbitrage-Strategien
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
class MultiTimeframeDataAggregator:
"""Aggregiert Daten über mehrere Zeitrahmen für umfassende Analyse"""
def __init__(self, api_key: str):
self.provider = CryptoArbitrageDataProvider(api_key)
self.executor = ThreadPoolExecutor(max_workers=10)
async def fetch_all_timeframes(
self,
symbol: str,
exchange: str,
timeframes: list = None
) -> dict:
"""
Holt parallele Daten für mehrere Zeitrahmen
Optimiert für Latenz durch parallele Requests
"""
if timeframes is None:
timeframes = ["1m", "5m", "15m", "1h", "4h", "1d"]
async def fetch_single(timeframe):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
self.provider.get_historical_ohlcv,
symbol, exchange, timeframe, None, None
)
# Parallele Fetch-Operationen
tasks = [fetch_single(tf) for tf in timeframes]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
tf: df for tf, df in zip(timeframes, results)
if not isinstance(df, Exception)
}
def calculate_volatility_surface(
self,
data_dict: dict
) -> pd.DataFrame:
"""
Erstellt eine Volatilitäts-Oberfläche über Zeitrahmen
Für implizite Volatilitäts-Arbitrage
"""
volatility_data = []
for timeframe, df in data_dict.items():
if df.empty:
continue
returns = df['close'].pct_change().dropna()
volatility_data.append({
'timeframe': timeframe,
'volatility_1d': returns.std() * np.sqrt(24),
'volatility_1w': returns.std() * np.sqrt(24*7),
'volatility_1m': returns.std() * np.sqrt(24*30),
'mean_return': returns.mean(),
'skewness': returns.skew(),
'kurtosis': returns.kurtosis()
})
return pd.DataFrame(volatility_data)
def identify_regime_changes(
self,
data_dict: dict,
regime_threshold: float = 0.5
) -> dict:
"""
Identifiziert Regime-Wechsel (niedrig/hoch Volatilität)
Für adaptive Strategie-Anpassung
"""
regime_analysis = {}
for timeframe, df in data_dict.items():
if df.empty:
continue
returns = df['close'].pct_change()
volatility = returns.rolling(24).std()
# Regime: 0 = Niedrig, 1 = Hoch
current_vol = volatility.iloc[-1]
avg_vol = volatility.mean()
regime_analysis[timeframe] = {
'current_volatility': current_vol,
'average_volatility': avg_vol,
'regime': 'high' if current_vol > avg_vol * (1 + regime_threshold) else 'low',
'volatility_ratio': current_vol / avg_vol
}
return regime_analysis
============================================
NUTZUNG
============================================
async def main():
aggregator = MultiTimeframeDataAggregator("YOUR_HOLYSHEEP_API_KEY")
# Alle Zeitrahmen parallel abrufen
data = await aggregator.fetch_all_timeframes(
symbol="BTCUSDT",
exchange="binance"
)
print(f"Abgerufene Zeitrahmen: {list(data.keys())}")
# Volatilitäts-Oberfläche berechnen
vol_surface = aggregator.calculate_volatility_surface(data)
print("\n=== VOLATILITY SURFACE ===")
print(vol_surface)
# Regime-Analyse
regimes = aggregator.identify_regime_changes(data)
print("\n=== REGIME ANALYSIS ===")
for tf, regime in regimes.items():
print(f"{tf}: {regime['regime']} (Ratio: {regime['volatility_ratio']:.2f})")
if __name__ == "__main__":
asyncio.run(main())
Häufige Fehler und Lösungen
❌ Fehler 1: Ratenlimit-Überschreitung bei Börsen-APIs
Problem: Binance und Coinbase limitieren Anfragen stark (1200/min bzw. 10/sec). Bei umfangreichen Backtests wird die API gesperrt.
Lösung: Nutzen Sie HolySheep AI als Proxy mit eingebautem Rate-Limit-Management:
# Rate-Limited Anfragen mit Retry-Logik
import time
from functools import wraps
def rate_limit_handling(max_retries=3, backoff_factor=2):
"""Behandelt Ratenlimit-Überschreitungen automatisch"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
result = func(*args, **kwargs)
return result
except RateLimitError as e:
wait_time = backoff_factor ** attempt
print(f"Rate Limit erreicht. Warte {wait_time}s...")
time.sleep(wait_time)
except Exception as e:
print(f"Anderer Fehler: {e}")
raise
return None
return wrapper
return decorator
Alternative: HolySheep AI mit unbegrenzten Anfragen
class HolySheepDataProvider:
def __init__(self, api_key):
self.api_key = api_key
# Keine Ratenlimits bei HolySheep Enterprise
@rate_limit_handling()
def fetch_data(self, endpoint, payload):
response = requests.post(
f"https://api.holysheep.ai/v1/{endpoint}",
headers={"Authorization": f"Bearer {self.api_key}"},
json=payload,
timeout=30
)
if response.status_code == 429:
raise RateLimitError("Rate limit exceeded")
return response.json()
❌ Fehler 2: Survivorship Bias in Historischen Daten
Problem: Viele Altcoins sind Pleite gegangen und erscheinen nicht mehr in aktuellen Datenbanken. Dies verzerrt Backtest-Ergebnisse nach oben.
Lösung: Nutzen Sie vollständige Historien mit Delistungs-Informationen:
def get_historical_universe(
provider: CryptoArbitrageDataProvider,
date: str,
min_market_cap: float = 10000000
) -> list:
"""
Holt den vollständigen historischen Universum für ein Datum
Inklusive delisteter Coins (Survivorship Bias vermeiden)
"""
# Alle jemals gelisteten Assets abrufen
all_assets = provider.get_historical_assets(date)
# Filtern nach historischer Marktkapitalisierung
active_universe = [
asset for asset in all_assets
if asset.get('market_cap', 0) >= min_market_cap
and asset.get('status') == 'active'
]
# Delistete Assets separat speichern
delisted = [
asset for asset in all_assets
if asset.get('status') == 'delisted'
and asset.get('delisting_date') >= date
]
return {
'universe': active_universe,
'delisted': delisted,
'total_count': len(all_assets)
}
Bei Backtest beide Gruppen berücksichtigen
def backtest_with_survivorship_bias_check(
universe_data: dict,
strategy_returns: pd.Series
):
"""Korrigiert Backtest für Survivorship Bias"""
# Nur aktive Assets: Überschätzung
naive_return = strategy_returns.mean()
# Mit delistierten Assets: Realistischer
delist_penalty = len(universe_data['delisted']) / len(universe_data['universe']) * 0.3
adjusted_return = naive_return * (1 - delist_penalty)
return {
'naive_return': naive_return,
'adjusted_return': adjusted_return,
'bias_correction': delist_penalty
}
❌ Fehler 3: Look-Ahead Bias in Strategien
Problem: Strategien nutzen versehentlich zukünftige Daten bei der Berechnung von Indikatoren (z.B. rolling windows mit zukünftigen Werten).
Lösung: Strikte Trennung von Trainings- und Testdaten mit zeitbasierter Validierung:
def temporal_split_backtest(
data: pd.DataFrame,
strategy_func: callable,
train_ratio: float = 0.7,
rebalance_frequency: str = "1W"
) -> dict:
"""
Zeitbasierte Split-Methode verhindert Look-Ahead Bias
"""
# Zeitliche Aufteilung
split_idx = int(len(data) * train_ratio)
train_data = data.iloc[:split_idx]
test_data = data.iloc[split_idx:]
print(f"Training: {train_data.index[0]} bis {train_data.index[-1]}")
print(f"Test: {test_data.index[0]} bis {test_data.index[-1]}")
# Training: Parameter-Optimierung
optimal_params = optimize_strategy_parameters(
train_data,
strategy_func
)
# Test: Out-of-Sample Evaluation
test_results = evaluate_strategy(
test_data,
strategy_func,
**optimal_params
)
# Train vs Test Performance vergleichen
# Große Abweichung = Overfitting Verdacht
overfitting_ratio = test_results['return'] / train_data['strategy_return'].mean()
return {
'train_results': train_data['strategy_return'].describe(),
'test_results': test_results,
'overfitting_ratio': overfitting_ratio,
'is_robust': overfitting_ratio > 0.5 and overfitting_ratio < 1.5
}
def optimize_strategy_parameters(
train_data: pd.DataFrame,
strategy_func: callable,
param_grid: dict = None
) -> dict:
"""
Walk-Forward Optimization ohne Look-Ahead Bias
"""
if param_grid is None:
param_grid = {
'lookback': [24, 48, 72, 168],
'entry_threshold': [1.5, 2.0, 2.5],
'exit_threshold': [0.3, 0.5, 0.75]
}
best_sharpe = -np.inf
best_params = {}
# Grid Search mit strikter Zeit-Trennung
for lookback in param_grid['lookback']:
for entry in param_grid['entry_threshold']:
for exit_t in param_grid['exit_threshold']:
params = {
'lookback': lookback,
'entry_threshold': entry,
'exit_threshold': exit_t
}
# Training Performance
train_signals = strategy_func(train_data, **params)
train_returns = calculate_strategy_returns(train_data, train_signals)
sharpe = calculate_sharpe_ratio(train_returns)
if sharpe > best_sharpe:
best_sharpe = sharpe
best_params = params
return best_params
Integration mit Machine Learning
"""
ML-Modell für Arbitrage-Signal-Prediction mit HolySheep AI
"""
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import TimeSeriesSplit
class ArbitrageMLPredictor:
"""Nutzt Machine Learning zur Vorhersage von Arbitrage-Gelegenheiten"""
def __init__(self, api_key: str):
self.holysheep = HolySheepDataProvider(api_key)
self.model = RandomForestClassifier(n_estimators=100)
def prepare_features(self, data: pd.DataFrame) -> pd.DataFrame:
"""Erstellt Features für ML-Modell"""
df = data.copy()
# Technische Indikatoren
df['sma_20'] = df['close'].rolling(20).mean()
df['sma_50'] = df['close'].rolling(50).mean()
df['rsi'] = self.calculate_rsi(df['close'])
df['volatility'] = df['close'].pct_change().rolling(20).std()
# Spread-bezogene Features
df['spread_pct'] = (df['high'] - df['low']) / df['close']
df['volume_spike'] = df['volume'] / df['volume'].rolling(20).mean()
# Cross-Exchange Features
df['bid_ask_spread'] = (df['ask'] - df['bid']) / df['bid']
return df.dropna()
def calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series:
"""Berechnet RSI-Indikator"""
delta = prices.diff()
gain = (delta.where(delta > 0,