Die Aggregation von Kryptowährungs-Historiendaten über mehrere Börsen hinweg stellt Entwickler vor erhebliche Herausforderungen. Die Integration von Daten aus Binance, Coinbase, Kraken und anderen Plattformen erfordert unterschiedliche API-Formate, Authentifizierungsmethoden und Datenstrukturen. Dieser Leitfaden zeigt Ihnen, wie Sie mit HolySheep AI eine einheitliche Krypto-Daten-API implementieren, die historische Preisdaten, Orderbuch-Informationen und Handelsvolumina aus mehreren Quellen konsolidiert.
Warum Multi-Exchange Datenaggregation?
Bei der Entwicklung von Trading-Bots, Portfolio-Trackern und Marktanalysen stoßen Entwickler schnell auf ein fundamentales Problem: Jede Kryptobörse bietet ihre eigene API mit unterschiedlichen Endpunkten, Parametern und Rate-Limits. Eine einheitliche Schnittstelle eliminiert nicht nur den Wartungsaufwand, sondern ermöglicht auch präzisere Marktabschätzungen durch Datenkorrelation.
Praxiserfahrung: In unseren eigenen Projekten bei HolySheep haben wir festgestellt, dass die manuelle Verwaltung von 5+ Börsen-APIs zu über 60% des gesamten Entwicklungsaufwands für Dateninfrastruktur führt. Durch den Einsatz einer aggregierten Lösung konnten wir die Time-to-Market um 75% reduzieren und gleichzeitig die Datenkonsistenz verbessern.
2026 KI-Modell Kostenvergleich für Datenverarbeitung
Bevor wir in die technische Implementierung eintauchen, ein wichtiger Überblick über die aktuellen Kosten für KI-gestützte Datenanalyse und -verarbeitung. Bei HolySheep profitieren Sie von unserem günstigen Wechselkurs: ¥1 = $1 USD, was Ihnen über 85% Ersparnis gegenüber anderen Anbietern sichert.
| KI-Modell | Preis pro Million Token | Kosten für 10M Token/Monat | Latenz |
|---|---|---|---|
| GPT-4.1 | $8,00 | $80,00 | <2s |
| Claude Sonnet 4.5 | $15,00 | $150,00 | <3s |
| Gemini 2.5 Flash | $2,50 | $25,00 | <500ms |
| DeepSeek V3.2 | $0,42 | $4,20 | <200ms |
| HolySheep DeepSeek V3.2 | ¥0,42 (~$0,42) | ¥4,20 (~$4,20) | <50ms |
Stand: Januar 2026 | Wechselkurs: ¥1 = $1 USD
Architektur der Multi-Exchange Datenaggregation
Systemübersicht
Eine robuste Krypto-Datenaggregationslösung besteht aus mehreren Kernkomponenten: dem API-Gateway für Request-Routing, dem Daten-Normalisierer für einheitliche Formate, einem Cache-Layer für Performance-Optimierung und dem Analyse-Modul für KI-gestützte Insights.
┌─────────────────────────────────────────────────────────────────┐
│ Multi-Exchange Data Aggregator │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Binance │ │ Coinbase │ │ Kraken │ ... │
│ │ API │ │ API │ │ API │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Data Normalization Layer │ │
│ │ - Standardisierte Candlestick-Formate │ │
│ │ - Einheitliche Zeitstempel (Unix/ISO8601) │ │
│ │ - Konsistente Währungspaar-Kodierung │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Cache Layer (Redis) │ │
│ │ - Historisches OHLCV mit TTL │ │
│ │ - Orderbook-Snapshots │ │
│ │ - Aggregierte Marktindikatoren │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ HolySheep AI Integration │ │
│ │ - Preistrend-Analyse mit DeepSeek V3.2 │ │
│ │ - Sentiment-Analyse für Nachrichten │ │
│ │ - Risikobewertung mit Gemini 2.5 Flash │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Implementierung: Vollständiger Python-Client
Der folgende Code zeigt eine produktionsreife Implementierung eines Multi-Exchange Datenaggregators, der HolySheep AI für erweiterte Analysen nutzt:
import requests
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import json
class CryptoDataAggregator:
"""
Multi-Exchange Kryptowährungs-Datenaggregator mit HolySheep AI Integration.
Konsolidiert historische Daten von Binance, Coinbase und anderen Börsen.
"""
def __init__(self, api_key: str, holysheep_key: str):
self.api_key = api_key
self.holysheep_key = holysheep_key
self.base_url = "https://api.holysheep.ai/v1"
self.cache = {}
self.exchanges = {
'binance': 'https://api.binance.com',
'coinbase': 'https://api.coinbase.com',
'kraken': 'https://api.kraken.com'
}
def get_historical_ohlcv(
self,
symbol: str,
timeframe: str = '1h',
start_time: Optional[int] = None,
end_time: Optional[int] = None,
exchanges: List[str] = None
) -> Dict[str, List[Dict]]:
"""
Ruft historische OHLCV-Daten von einer oder mehreren Börsen ab.
Args:
symbol: Trading-Paar (z.B. 'BTC-USD', 'ETH-USDT')
timeframe: Zeiteinheit ('1m', '5m', '1h', '4h', '1d')
start_time: Unix-Timestamp für Startzeitpunkt
end_time: Unix-Timestamp für Endzeitpunkt
exchanges: Liste der Börsen (default: alle verfügbaren)
"""
if exchanges is None:
exchanges = list(self.exchanges.keys())
if end_time is None:
end_time = int(time.time() * 1000)
if start_time is None:
start_time = int((time.time() - 86400 * 30) * 1000) # 30 Tage default
results = {}
for exchange in exchanges:
try:
data = self._fetch_from_exchange(exchange, symbol, timeframe, start_time, end_time)
normalized = self._normalize_ohlcv(data, exchange, symbol)
results[exchange] = normalized
print(f"✓ {exchange}: {len(normalized)} Datensätze für {symbol}")
except Exception as e:
print(f"✗ {exchange}: Fehler - {str(e)}")
results[exchange] = []
return results
def _fetch_from_exchange(
self,
exchange: str,
symbol: str,
timeframe: str,
start: int,
end: int
) -> Dict:
"""Ruft Rohdaten von der spezifizierten Börse ab."""
endpoints = {
'binance': '/api/v3/klines',
'coinbase': '/v2/products/{symbol}/candles',
'kraken': '/0/public/OHLC'
}
params_map = {
'binance': {
'symbol': symbol.replace('-', ''),
'interval': timeframe,
'startTime': start,
'endTime': end,
'limit': 1000
},
'coinbase': {
'start': datetime.fromtimestamp(start/1000).isoformat(),
'end': datetime.fromtimestamp(end/1000).isoformat(),
'granularity': self._timeframe_to_seconds(timeframe)
},
'kraken': {
'pair': symbol.replace('-', '').replace('USD', 'USD'),
'interval': self._kraken_interval(timeframe)
}
}
response = requests.get(
f"{self.exchanges[exchange]}{endpoints[exchange]}",
params=params_map[exchange],
timeout=30
)
response.raise_for_status()
return response.json()
def _normalize_ohlcv(
self,
raw_data: Dict,
exchange: str,
symbol: str
) -> List[Dict]:
"""Normalisiert Rohdaten in ein einheitliches Format."""
normalized = []
candle_map = {
'binance': [0, 1, 2, 3, 4, 5], # open_time, open, high, low, close, volume
'coinbase': [0, 1, 2, 3, 4, 5], # time, low, high, open, close, volume
'kraken': [1, 2, 3, 4, 5, 6] # time, open, high, low, close, volume
}
indices = candle_map[exchange]
for candle in raw_data:
normalized.append({
'timestamp': candle[indices[0]],
'open': float(candle[indices[1]]),
'high': float(candle[indices[2]]),
'low': float(candle[indices[3]]),
'close': float(candle[indices[4]]),
'volume': float(candle[indices[5]]),
'exchange': exchange,
'symbol': symbol,
'normalized_at': datetime.utcnow().isoformat()
})
return normalized
def _timeframe_to_seconds(self, timeframe: str) -> int:
"""Konvertiert Zeitrahmen-String in Sekunden."""
mapping = {
'1m': 60, '5m': 300, '15m': 900,
'1h': 3600, '4h': 14400, '1d': 86400
}
return mapping.get(timeframe, 3600)
def _kraken_interval(self, timeframe: str) -> int:
"""Konvertiert Zeitrahmen für Kraken-API."""
mapping = {
'1m': 1, '5m': 5, '15m': 15,
'1h': 60, '4h': 240, '1d': 1440
}
return mapping.get(timeframe, 60)
def analyze_with_holysheep(
self,
aggregated_data: Dict[str, List[Dict]],
analysis_type: str = 'trend'
) -> Dict:
"""
Analysiert aggregierte Daten mit HolySheep AI.
Nutzt DeepSeek V3.2 für präzise Trendanalyse mit <50ms Latenz.
"""
# Daten für KI-Analyse aufbereiten
combined_data = []
for exchange, candles in aggregated_data.items():
combined_data.extend(candles[:100]) # Letzte 100 Candles pro Börse
# Prompt für HolySheep erstellen
prompt = self._create_analysis_prompt(combined_data, analysis_type)
payload = {
"model": "deepseek-v3.2",
"messages": [
{"role": "system", "content": "Du bist ein Krypto-Marktanalyst mit Fokus auf technische Analyse."},
{"role": "user", "content": prompt}
],
"temperature": 0.3,
"max_tokens": 2000
}
headers = {
"Authorization": f"Bearer {self.holysheep_key}",
"Content-Type": "application/json"
}
response = requests.post(
f"{self.base_url}/chat/completions",
json=payload,
headers=headers,
timeout=10
)
if response.status_code == 200:
result = response.json()
return {
'analysis': result['choices'][0]['message']['content'],
'model_used': 'deepseek-v3.2',
'latency_ms': response.elapsed.total_seconds() * 1000,
'cost_estimate': self._estimate_cost(prompt, result)
}
else:
raise Exception(f"HolySheep API Fehler: {response.status_code} - {response.text}")
def _create_analysis_prompt(self, data: List[Dict], analysis_type: str) -> str:
"""Erstellt einen optimierten Prompt für die KI-Analyse."""
# Letzte Candles für Analyse extrahieren
recent = data[-20:] if len(data) > 20 else data
formatted = []
for candle in recent:
formatted.append(
f"{datetime.fromtimestamp(candle['timestamp']/1000).strftime('%Y-%m-%d %H:%M')}: "
f"O={candle['open']:.2f} H={candle['high']:.2f} "
f"L={candle['low']:.2f} C={candle['close']:.2f} V={candle['volume']:.2f}"
)
return f"""Analysiere die folgenden OHLCV-Daten von mehreren Kryptobörsen:
{chr(10).join(formatted)}
Führe eine {analysis_type}-Analyse durch und gib zurück:
1. Trendrichtung (bullish/bearish/neutral)
2. Schlüssel-Unterstützungswiderstände
3. Volumenanalyse
4. Empfehlung (Kauf/Verkauf/Halten) mit Begründung"""
def _estimate_cost(self, prompt: str, response: Dict) -> Dict:
"""Schätzt die Kosten für die API-Nutzung."""
input_tokens = len(prompt) // 4 # Grobabschätzung
output_tokens = response.get('usage', {}).get('completion_tokens', 0)
# HolySheep DeepSeek V3.2 Preise (Januar 2026)
price_per_mtok = 0.42 # USD
total_cost = ((input_tokens + output_tokens) / 1_000_000) * price_per_mtok
return {
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'estimated_cost_usd': round(total_cost, 4)
}
Verwendung
aggregator = CryptoDataAggregator(
api_key="YOUR_EXCHANGE_API_KEY",
holysheep_key="YOUR_HOLYSHEEP_API_KEY"
)
Historische Daten von allen Börsen abrufen
data = aggregator.get_historical_ohlcv(
symbol='BTC-USD',
timeframe='1h',
exchanges=['binance', 'coinbase', 'kraken']
)
KI-gestützte Analyse mit HolySheep
analysis = aggregator.analyze_with_holysheep(data, 'trend')
print(f"Analyse: {analysis['analysis']}")
print(f"Latenz: {analysis['latency_ms']:.0f}ms | Kosten: ${analysis['cost_estimate']['estimated_cost_usd']}")
Fortgeschrittene Features: Orderbook-Aggregation und Arbitrage-Erkennung
import asyncio
import aiohttp
from collections import defaultdict
import statistics
class AdvancedCryptoAggregator:
"""
Erweiterter Aggregator mit Orderbook-Synchronisierung und Arbitrage-Erkennung.
"""
def __init__(self, holysheep_key: str):
self.holysheep_key = holysheep_key
self.base_url = "https://api.holysheep.ai/v1"
self.orderbooks = defaultdict(dict)
self.last_update = defaultdict(float)
async def fetch_orderbooks_async(
self,
symbols: List[str],
exchanges: List[str] = None
) -> Dict[str, Dict]:
"""
Asynchrones Abrufen von Orderbüchern von allen Börsen.
Berechnet automatisch Arbitrage-Möglichkeiten und_depth divergences.
"""
if exchanges is None:
exchanges = ['binance', 'coinbase', 'kraken', 'bybit', 'okx']
async with aiohttp.ClientSession() as session:
tasks = []
for symbol in symbols:
for exchange in exchanges:
task = self._fetch_orderbook_async(
session, exchange, symbol
)
tasks.append((symbol, exchange, task))
results = await asyncio.gather(*[t[2] for t in tasks], return_exceptions=True)
aggregated = defaultdict(dict)
for i, (symbol, exchange, _) in enumerate(tasks):
if not isinstance(results[i], Exception):
aggregated[symbol][exchange] = results[i]
self.orderbooks[symbol][exchange] = results[i]
self.last_update[symbol] = time.time()
return dict(aggregated)
async def _fetch_orderbook_async(
self,
session: aiohttp.ClientSession,
exchange: str,
symbol: str
) -> Dict:
"""Asynchrones Abrufen eines einzelnen Orderbuchs."""
endpoints = {
'binance': f"https://api.binance.com/api/v3/depth?symbol={symbol.replace('-', '')}&limit=20",
'coinbase': f"https://api.coinbase.com/v2/products/{symbol}/book?level=2",
'kraken': f"https://api.kraken.com/0/public/Depth?pair={symbol.replace('-', '')}",
'bybit': f"https://api.bybit.com/v5/market/orderbook?category=spot&symbol={symbol}&limit=20",
'okx': f"https://www.okx.com/api/v5/market/books?instId={symbol}"
}
async with session.get(endpoints[exchange], timeout=aiohttp.ClientTimeout(total=5)) as response:
data = await response.json()
return self._normalize_orderbook(data, exchange)
def _normalize_orderbook(self, raw: Dict, exchange: str) -> Dict:
"""Normalisiert Orderbuch-Daten in ein einheitliches Format."""
normalized = {
'bids': [], # [price, quantity]
'asks': [],
'exchange': exchange,
'timestamp': time.time()
}
if exchange == 'binance':
normalized['bids'] = [[float(p), float(q)] for p, q in raw.get('bids', [])]
normalized['asks'] = [[float(p), float(q)] for p, q in raw.get('asks', [])]
elif exchange == 'coinbase':
data = raw.get('data', {})
normalized['bids'] = [[float(p), float(q)] for p, q in data.get('bids', [])]
normalized['asks'] = [[float(p), float(q)] for p, q in data.get('asks', [])]
elif exchange == 'kraken':
pairs = raw.get('result', {})
if pairs:
pair_data = list(pairs.values())[0]
normalized['bids'] = [[float(p), float(q)] for p, q in pair_data.get('bids', [])]
normalized['asks'] = [[float(p), float(q)] for p, q in pair_data.get('asks', [])]
return normalized
def detect_arbitrage(self, symbol: str) -> List[Dict]:
"""
Erkennt Arbitrage-Möglichkeiten basierend auf aktuellen Orderbüchern.
Returned Liste von Arbitrage-Gelegenheiten mit geschätztem Profit.
"""
if symbol not in self.orderbooks:
return []
orderbooks = self.orderbooks[symbol]
opportunities = []
exchanges = list(orderbooks.keys())
for i, buy_exchange in enumerate(exchanges):
for sell_exchange in exchanges[i+1:]:
if buy_exchange == sell_exchange:
continue
buy_book = orderbooks[buy_exchange]
sell_book = orderbooks[sell_exchange]
if not buy_book['asks'] or not sell_book['bids']:
continue
# Beste Kauf- und Verkaufspreise
best_buy_price = buy_book['asks'][0][0] # Niedrigster Ask
best_sell_price = sell_book['bids'][0][0] # Höchster Bid
spread = ((best_sell_price - best_buy_price) / best_buy_price) * 100
if spread > 0.5: # Mehr als 0.5% Spread = potenzielle Arbitrage
opportunities.append({
'symbol': symbol,
'buy_exchange': buy_exchange,
'sell_exchange': sell_exchange,
'buy_price': best_buy_price,
'sell_price': best_sell_price,
'spread_percent': round(spread, 3),
'estimated_profit_per_unit': round(best_sell_price - best_buy_price, 2),
'volume_available': min(
buy_book['asks'][0][1],
sell_book['bids'][0][1]
),
'max_trade_size': min(
buy_book['asks'][0][1],
sell_book['bids'][0][1]
)
})
# Sortiere nach höchstem Spread
return sorted(opportunities, key=lambda x: x['spread_percent'], reverse=True)
async def analyze_market_depth(self, symbol: str) -> Dict:
"""
Analysiert Markttiefe über alle Börsen mit HolySheep AI.
Nutzt Gemini 2.5 Flash für schnelle Risikobewertung (<500ms Latenz).
"""
orderbooks = self.orderbooks.get(symbol, {})
# Markttiefe aggregieren
all_bids = []
all_asks = []
for exchange, book in orderbooks.items():
all_bids.extend([(p, q, exchange) for p, q in book['bids'][:10]])
all_asks.extend([(p, q, exchange) for p, q in book['asks'][:10]])
# Nach Preis sortieren
all_bids.sort(key=lambda x: x[0], reverse=True)
all_asks.sort(key=lambda x: x[0])
# Top 10 Bids und Asks formatieren
bid_str = '\n'.join([f"{e}: ${p:.2f} × {q}" for p, q, e in all_bids[:10]])
ask_str = '\n'.join([f"{e}: ${p:.2f} × {q}" for p, q, e in all_asks[:10]])
prompt = f"""Analysiere die Markttiefe für {symbol}:
Top Bids (Kaufaufträge):
{bid_str}
Top Asks (Verkaufsaufträge):
{ask_str}
Gib zurück:
1. Liquiditätsanalyse: Wo ist die meiste Liquidität konzentriert?
2. Preisstabilität: Ist der Markt gut balanciert?
3. Slippage-Risiko für große Orders (100.000 USD)
4. Empfehlung für Orderausführung (welche Börse für große Orders)"""
payload = {
"model": "gemini-2.5-flash",
"messages": [
{"role": "user", "content": prompt}
],
"temperature": 0.2,
"max_tokens": 1500
}
headers = {
"Authorization": f"Bearer {self.holysheep_key}",
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}/chat/completions",
json=payload,
headers=headers
) as response:
result = await response.json()
return {
'analysis': result['choices'][0]['message']['content'],
'model': 'gemini-2.5-flash',
'latency_ms': response.headers.get('X-Response-Time', 'N/A'),
'spread_summary': self._calculate_spread_summary(orderbooks)
}
def _calculate_spread_summary(self, orderbooks: Dict) -> Dict:
"""Berechnet Spread-Zusammenfassung über alle Börsen."""
spreads = []
for exchange, book in orderbooks.items():
if book['asks'] and book['bids']:
spread = book['asks'][0][0] - book['bids'][0][0]
spread_pct = (spread / book['asks'][0][0]) * 100
spreads.append({
'exchange': exchange,
'spread_usd': round(spread, 4),
'spread_percent': round(spread_pct, 4)
})
return {
'individual_spreads': spreads,
'avg_spread_percent': round(
statistics.mean([s['spread_percent'] for s in spreads]), 4
) if spreads else 0,
'best_exchange': min(spreads, key=lambda x: x['spread_percent'])['exchange'] if spreads else None
}
Beispiel-Nutzung
async def main():
aggregator = AdvancedCryptoAggregator(
holysheep_key="YOUR_HOLYSHEEP_API_KEY"
)
# Orderbücher asynchron abrufen
books = await aggregator.fetch_orderbooks_async(
symbols=['BTC-USD', 'ETH-USD'],
exchanges=['binance', 'coinbase', 'kraken']
)
# Arbitrage-Möglichkeiten prüfen
arb_opportunities = aggregator.detect_arbitrage('BTC-USD')
if arb_opportunities:
print("🚨 Arbitrage-Möglichkeiten gefunden:")
for arb in arb_opportunities:
print(f" {arb['buy_exchange']} → {arb['sell_exchange']}: "
f"{arb['spread_percent']}% Spread")
# Markttiefen-Analyse mit KI
analysis = await aggregator.analyze_market_depth('BTC-USD')
print(f"\n📊 KI-Analyse: {analysis['analysis']}")
print(f"💰 Modell: {analysis['model']} | Latenz: {analysis['latency_ms']}ms")
asyncio.run(main())
Häufige Fehler und Lösungen
1. Rate-Limit-Überschreitung bei Multi-Exchange Requests
Problem: Bei gleichzeitigem Abruf von mehreren Börsen treten 429-Fehler auf, besonders bei Binance (1200 Requests/Minute Limit) und Coinbase (10 Requests/Sekunde).
# FEHLERHAFT: Keine Rate-Limit-Handhabung
def fetch_all_prices(symbols):
results = {}
for symbol in symbols:
for exchange in ALL_EXCHANGES:
results[f"{exchange}_{symbol}"] = requests.get(
f"{exchange}/price/{symbol}"
).json()
return results # Rate-Limit garantiert!
LÖSUNG: Implementierung mit Exponential Backoff
import asyncio
import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential
class RateLimitedClient:
def __init__(self):
self.rate_limits = {
'binance': {'requests': 1200, 'window': 60}, # pro Minute
'coinbase': {'requests': 10, 'window': 1}, # pro Sekunde
'kraken': {'requests': 60, 'window': 60} # pro Minute
}
self.request_history = defaultdict(list)
async def throttled_request(
self,
session: aiohttp.ClientSession,
exchange: str,
url: str
) -> Optional[Dict]:
"""Request mit intelligenter Rate-Limit-Handhabung."""
limits = self.rate_limits.get(exchange, {'requests': 100, 'window': 60})
now = time.time()
# Alte Requests aus History entfernen
self.request_history[exchange] = [
t for t in self.request_history[exchange]
if now - t < limits['window']
]
# Prüfen, ob Limit erreicht
if len(self.request_history[exchange]) >= limits['requests']:
wait_time = limits['window'] - (now - self.request_history[exchange][0])
if wait_time > 0:
await asyncio.sleep(wait_time)
# Request durchführen mit Retry-Logik
for attempt in range(3):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 429:
# Rate-Limit erreicht: Exponential Backoff
retry_after = int(response.headers.get('Retry-After', 60))
await asyncio.sleep(retry_after)
continue
self.request_history[exchange].append(time.time())
return await response.json()
except aiohttp.ClientError as e:
if attempt == 2:
raise
await asyncio.sleep(2 ** attempt) # Exponential Backoff
return None
2. Dateninkonsistenz durch unterschiedliche Zeitzonen
Problem: Binance liefert Unix-Timestamps in Millisekunden, Coinbase in Sekunden, und Kraken verwendet eigene Formate. Dies führt zu fehlerhaften Alignment bei der Aggregation.
# LÖSUNG: Universelle Zeitkonvertierung
from datetime import datetime, timezone
from typing import Union
def normalize_timestamp(
timestamp: Union[int, float, str],
source_exchange: str
) -> datetime:
"""
Normalisiert Timestamps von allen Börsen zu UTC datetime.
Behandelt:
- Binance: Millisekunden (1577836800000)
- Coinbase: Sekunden (1577836800)
- Kraken: Sekunden als Float (1577836800.1234)
- Strings: ISO8601 Format
"""
if isinstance(timestamp, str):
# ISO8601 Format
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
return dt.astimezone(timezone.utc)
# Numerischer Timestamp
ts = float(timestamp)
# Binance: Millisekunden → Sekunden
if ts > 1_000_000_000_000: # Nach Jahr 2001 in ms
ts = ts / 1000
# In UTC datetime konvertieren
return datetime.fromtimestamp(ts, tz=timezone.utc)
def create_aligned_candles(
binance_data: List,
coinbase_data: List,
timeframes: List[str] = ['1h']
) -> Dict[str, List]:
"""
Erstellt timeframe-aligned Candles aus verschiedenen Quellen.
"""
from collections import defaultdict
aligned = defaultdict(list)
# Alle Daten in normalisierte Struktur konvertieren
all_candles = []
for candle in binance_data:
all_candles.append({
'timestamp': normalize_timestamp(candle[0], 'binance'),
'open': float(candle[1]),
'high': float(candle[2]),
'low': float(candle[3]),
'close': float(candle[4]),
'volume': float(candle[5]),
'source': 'binance'
})
for candle in coinbase_data:
all_candles.append({
'timestamp': normalize_timestamp(candle[0], 'coinbase'),
'open': float(candle[3]), # Coinbase: low, high, open, close
'high': float(candle[2]),
'low': float(candle[1]),
'close': float(candle[4]),
'volume': float(candle[5]),
'source': 'coinbase'
})
# Nach Timestamp sortieren
all_candles.sort(key=lambda x: x['timestamp'])
# In Zeitrahmen gruppieren
for timeframe in timeframes:
seconds = {
'1m': 60, '5m': 300, '15m': 900,
'1h': 3600, '4h': 14400, '1d': 86400
}[timeframe]
grouped = defaultdict(list)
for candle in all_candles:
period_start = int(candle['timestamp'].timestamp() // seconds * seconds)
grouped[period_start].append(candle)
# Aggregierte Candles erstellen
for period_ts, candles in grouped.items():
if len(candles) < 2:
continue
prices = [c['close'] for c in candles]
volumes = [c['volume'] for c in candles]
aligned[timeframe].append({
'timestamp': datetime.fromtimestamp(period_ts, tz=timezone.utc),
'open': candles[0]['open'],
'high': max(c['high'] for c in candles),
'low': min(c['low'] for c in candles),
'close': candles[-1]['close'],
'volume': sum(volumes),
'price_std': statistics.stdev(prices) if len(prices) > 1 else 0,
'sources':