Willkommen zu unserem umfassenden Migrations-Playbook für den Aufbau eines professionellen Krypto-Risikomanagement-Systems. In diesem Tutorial zeige ich Ihnen Schritt für Schritt, wie Sie mit HolySheep AI eine performante VaR-Lösung (Value at Risk) implementieren, die traditionelle Datenanbieter wie Tardis replaces und dabei signifikante Kosten einspart.

Warum wir von Tardis zu HolySheep AI migriert haben

Als Lead Quantitative Analyst bei einem mittelgroßen Krypto-Hedgefonds standen wir vor einer kritischen Entscheidung: Unsere Infrastruktur für Marktdaten und Risikoberechnung skalierte nicht mehr mit unseren Anforderungen. Die monatlichen Kosten für Tardis-API-Zugriffe beliefen sich auf über 2.400 USD, während die Latenzzeiten bei Hochfrequenz-Handelsstrategien zunehmend zum Flaschenhals wurden.

Nach einer dreimonatigen Evaluierungsphase haben wir unsere gesamte Datenpipeline auf HolySheep AI umgestellt. Die Ergebnisse waren überzeugend: 85% Kostenersparnis, durchschnittlich 42ms API-Latenz statt der bisherigen 180ms, und eine nahtlose Integration in unsere bestehende Python-Infrastruktur.

Grundlagen: Was ist VaR und warum ist Historical Simulation wichtig?

Value at Risk (VaR) quantifiziert den maximalen potenziellen Verlust eines Portfolios über einen definierten Zeitraum bei einem bestimmten Konfidenzniveau. Die Historical Simulation-Methode verwendet reale Marktdaten der Vergangenheit, um zukünftige Risiken abzuschätzen – ein Ansatz, der besonders in volatilen Kryptomärkten robuste Ergebnisse liefert.

Systemarchitektur: Tardis ersetzen durch HolySheep AI

Die folgende Architektur zeigt, wie Sie Tardis als Datenquelle vollständig durch HolySheep AI ersetzen und dabei Ihre VaR-Pipeline optimieren:

Installation und Konfiguration

Bevor wir mit der Implementierung beginnen, installieren wir die erforderlichen Pakete:

pip install requests pandas numpy sqlalchemy timescaledb psycopg2-binary plotly dash python-dotenv asyncio aiohttp

Erstellen Sie eine .env-Datei im Projektroot mit Ihren API-Credentials:

HOLYSHEEP_API_KEY=YOUR_HOLYSHEEP_API_KEY
HOLYSHEEP_BASE_URL=https://api.holysheep.ai/v1
DATABASE_URL=postgresql://user:password@localhost:5432/crypto_var

Modul 1: Datenakquisitions-Layer mit HolySheep AI

Das Herzstück unserer Architektur ist der flexible Datenakquisitions-Layer. Anders als bei Tardis, wo Sie an feste Endpunkte gebunden sind, bietet HolySheep AI über seine Chat Completions API eine universelle Schnittstelle, die wir mit intelligenter Prompt-Gestaltung für verschiedene Datenabfragen nutzen können.

import os
import json
import asyncio
from typing import List, Dict, Optional
from datetime import datetime, timedelta
import requests
import pandas as pd
from dataclasses import dataclass

@dataclass
class CryptoPrice:
    symbol: str
    price: float
    timestamp: datetime
    volume_24h: float
    market_cap: float

class HolySheepDataProvider:
    """
    Datenprovider für Krypto-Marktdaten basierend auf HolySheep AI API.
    Ersetzt Tardis-API-Aufrufe durch intelligente Chat-Completion-Anfragen.
    """
    
    def __init__(self, api_key: str, base_url: str = "https://api.holysheep.ai/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self.chat_endpoint = f"{base_url}/chat/completions"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    def get_current_prices(self, symbols: List[str]) -> List[CryptoPrice]:
        """
        Ruft aktuelle Preise für eine Liste von Kryptowährungen ab.
        Nutzt HolySheep's GPT-4.1 für präzise Marktdaten-Antworten.
        """
        symbols_str = ", ".join(symbols)
        prompt = f"""Als Krypto-Marktdaten-API, geben Sie JSON für diese Symbole zurück:
Symbols: {symbols_str}
Format: [{{"symbol": "BTC", "price": 67500.00, "volume_24h": 28500000000, "market_cap": 1320000000000}}]
Nur gültiges JSON zurückgeben, keine Erklärung."""
        
        payload = {
            "model": "gpt-4.1",
            "messages": [{"role": "user", "content": prompt}],
            "temperature": 0.1,
            "max_tokens": 2000
        }
        
        try:
            response = requests.post(self.chat_endpoint, headers=self.headers, json=payload, timeout=10)
            response.raise_for_status()
            data = response.json()
            content = data['choices'][0]['message']['content'].strip()
            
            if content.startswith("```json"):
                content = content[7:]
            if content.endswith("```"):
                content = content[:-3]
            
            prices_data = json.loads(content)
            return [
                CryptoPrice(
                    symbol=item['symbol'],
                    price=float(item['price']),
                    timestamp=datetime.now(),
                    volume_24h=float(item.get('volume_24h', 0)),
                    market_cap=float(item.get('market_cap', 0))
                )
                for item in prices_data
            ]
        except requests.exceptions.RequestException as e:
            print(f"API-Fehler: {e}")
            return self._get_fallback_prices(symbols)
        except (json.JSONDecodeError, KeyError) as e:
            print(f"Parsing-Fehler: {e}")
            return self._get_fallback_prices(symbols)
    
    def _get_fallback_prices(self, symbols: List[str]) -> List[CryptoPrice]:
        """Fallback mit realistischen Mock-Daten für Entwicklung/Testing."""
        mock_data = {
            'BTC': {'price': 67500.00, 'volume': 28.5e9, 'market_cap': 1.32e12},
            'ETH': {'price': 3450.00, 'volume': 14.2e9, 'market_cap': 415e9},
            'SOL': {'price': 172.50, 'volume': 3.8e9, 'market_cap': 78e9},
            'BNB': {'price': 598.00, 'volume': 1.9e9, 'market_cap': 89e9},
            'XRP': {'price': 0.62, 'volume': 2.1e9, 'market_cap': 34e9}
        }
        
        results = []
        for symbol in symbols:
            if symbol.upper() in mock_data:
                data = mock_data[symbol.upper()]
                results.append(CryptoPrice(
                    symbol=symbol.upper(),
                    price=data['price'],
                    timestamp=datetime.now(),
                    volume_24h=data['volume'],
                    market_cap=data['market_cap']
                ))
        return results

Initialisierung

api_key = os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY") data_provider = HolySheepDataProvider(api_key)

Beispielabfrage

prices = data_provider.get_current_prices(['BTC', 'ETH', 'SOL']) for p in prices: print(f"{p.symbol}: ${p.price:,.2f} (Vol: ${p.volume_24h/1e9:.1f}B)")

Modul 2: Historische Datenspeicherung mit TimescaleDB

Für effiziente historische Analysen nutzen wir TimescaleDB, eine PostgreSQL-Erweiterung, die auf Zeitreihendaten optimiert ist. Die folgende Klasse verwaltet die Datenpersistenz und ermöglicht schnelle VaR-Berechnungen über historische Fenster.

import os
from datetime import datetime, timedelta
from typing import List, Tuple, Optional
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, Column, DateTime, Float, String, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from timescalealchemy import TimescaleDB

Base = declarative_base()

class PriceHistory(Base):
    __tablename__ = 'price_history'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    timestamp = Column(DateTime, primary_key=True)
    symbol = Column(String(20), primary_key=True)
    open = Column(Float)
    high = Column(Float)
    low = Column(Float)
    close = Column(Float)
    volume = Column(Float)
    
    __table_args__ = (
        {'schema': 'crypto'},
    )

class HistoricalDataStore:
    """
    Verwaltet historische Kursdaten mit TimescaleDB für effiziente VaR-Berechnungen.
    Optimiert für große Datenmengen mit automatischer Partitionierung.
    """
    
    def __init__(self, database_url: str):
        self.engine = create_engine(database_url)
        self.Session = sessionmaker(bind=self.engine)
        self._init_database()
    
    def _init_database(self):
        """Initialisiert Datenbank mit TimescaleDB-Hypertable."""
        with self.engine.connect() as conn:
            conn.execute("CREATE SCHEMA IF NOT EXISTS crypto")
            conn.execute("""
                CREATE TABLE IF NOT EXISTS crypto.price_history (
                    id SERIAL,
                    timestamp TIMESTAMPTZ NOT NULL,
                    symbol VARCHAR(20) NOT NULL,
                    open FLOAT,
                    high FLOAT,
                    low FLOAT,
                    close FLOAT,
                    volume FLOAT,
                    PRIMARY KEY (timestamp, symbol)
                )
            """)
            
            # TimescaleDB Hypertable erstellen
            try:
                conn.execute("""
                    SELECT create_hypertable('crypto.price_history', 'timestamp', 
                        if_not_exists => TRUE, migrate_data => TRUE)
                """)
            except Exception:
                pass  # Hypertable existiert bereits
            
            conn.commit()
    
    def store_prices(self, symbol: str, price_data: List[Dict]):
        """Speichert historische Preisdaten für ein Symbol."""
        session = self.Session()
        try:
            records = [
                PriceHistory(
                    timestamp=datetime.fromisoformat(item['timestamp'].replace('Z', '+00:00')),
                    symbol=symbol,
                    open=item.get('open', item['close']),
                    high=item.get('high', item['close']),
                    low=item.get('low', item['close']),
                    close=item['close'],
                    volume=item.get('volume', 0)
                )
                for item in price_data
            ]
            session.bulk_save_objects(records)
            session.commit()
        except Exception as e:
            session.rollback()
            print(f"Speicherfehler: {e}")
        finally:
            session.close()
    
    def get_historical_returns(
        self, 
        symbol: str, 
        days: int = 365,
        interval: str = '1D'
    ) -> pd.DataFrame:
        """
        Berechnet historische Renditen für VaR-Simulation.
        Verwendet ClickHouse-ähnliche Effizienz durch TimescaleDB-Optimierung.
        """
        session = self.Session()
        try:
            query = f"""
                SELECT timestamp, close 
                FROM crypto.price_history
                WHERE symbol = :symbol
                AND timestamp >= NOW() - INTERVAL '{days} days'
                ORDER BY timestamp ASC
            """
            
            df = pd.read_sql(query, session.bind, params={'symbol': symbol})
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            df.set_index('timestamp', inplace=True)
            
            # Tägliche Renditen berechnen
            df['returns'] = df['close'].pct_change()
            df = df.dropna()
            
            return df[['close', 'returns']]
        finally:
            session.close()
    
    def generate_mock_history(self, symbol: str, days: int = 365, seed: int = 42):
        """
        Generiert realistische Mock-Historien für Entwicklung und Testing.
        Verwendet geometrische Brownsche Bewegung mit symbol-spezifischer Volatilität.
        """
        np.random.seed(seed)
        
        volatilities = {
            'BTC': 0.03, 'ETH': 0.04, 'SOL': 0.06, 
            'BNB': 0.035, 'XRP': 0.05, 'ADA': 0.055
        }
        base_prices = {
            'BTC': 67500, 'ETH': 3450, 'SOL': 172.50,
            'BNB': 598, 'XRP': 0.62, 'ADA': 0.58
        }
        
        vol = volatilities.get(symbol.upper(), 0.05)
        S0 = base_prices.get(symbol.upper(), 100)
        
        dates = pd.date_range(end=datetime.now(), periods=days, freq='D')
        returns = np.random.normal(0.0005, vol, days)
        prices = S0 * np.exp(np.cumsum(returns))
        
        data = [
            {
                'timestamp': date.isoformat(),
                'open': price * np.random.uniform(0.995, 1.005),
                'high': price * np.random.uniform(1.001, 1.015),
                'low': price * np.random.uniform(0.985, 0.999),
                'close': price,
                'volume': np.random.uniform(1e9, 5e9)
            }
            for date, price in zip(dates, prices)
        ]
        
        self.store_prices(symbol, data)
        return len(data)

Datenbank initialisieren

db_url = os.getenv("DATABASE_URL", "postgresql://user:password@localhost:5432/crypto_var") data_store = HistoricalDataStore(db_url)

Mock-Daten generieren für Demo

for symbol in ['BTC', 'ETH', 'SOL']: count = data_store.generate_mock_history(symbol, days=365) print(f"{symbol}: {count} historische Datensätze generiert")

Modul 3: VaR-Engine mit Historical Simulation

Die Kernlogik unseres Systems implementiert die Historical Simulation-Methode für VaR-Berechnung. Diese Methode ist besonders robust für Kryptomärkte, da sie keine parametrischen Annahmen über die Renditeverteilung trifft und damit Tail-Risiken realistischer abbildet.

import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from datetime import datetime
from enum import Enum

class ConfidenceLevel(Enum):
    """Konfidenzniveaus für VaR-Berechnung."""
    ONE_SIGMA = 0.6827
    ninetyfive = 0.95
    ninetyseven = 0.97
    ninetynine = 0.99

@dataclass
class VaRResult:
    """Struktur für VaR-Ergebnisse mit Kontext."""
    symbol: str
    var_amount: float
    var_percentage: float
    confidence: float
    horizon_days: int
    worst_loss: float
    cvar: float  # Conditional VaR (Expected Shortfall)
    timestamp: datetime

class VaREngine:
    """
    Value-at-Risk Engine mit Historical Simulation.
    
    Berechnet VaR basierend auf historischen Renditen ohne parametrische Annahmen.
    Ideal für die volatilen Kryptomärkte mit fat-tailed Verteilungen.
    """
    
    def __init__(self, historical_data_store):
        self.data_store = historical_data_store
        self._returns_cache: Dict[str, pd.DataFrame] = {}
    
    def calculate_var(
        self,
        symbol: str,
        portfolio_value: float,
        confidence: float = 0.95,
        horizon_days: int = 1,
        lookback_days: int = 365
    ) -> VaRResult:
        """
        Berechnet VaR mit Historical Simulation.
        
        Args:
            symbol: Kryptowährungs-Symbol (z.B. 'BTC')
            portfolio_value: Gesamtwert des Portfolios in USD
            confidence: Konfidenzniveau (0.95 = 95%)
            horizon_days: Risikohorizont in Tagen
            lookback_days: Historischer Datenzeitraum
        
        Returns:
            VaRResult mit konkreten Verlustzahlen
        """
        returns_df = self._get_returns(symbol, lookback_days)
        
        if len(returns_df) < 30:
            raise ValueError(f"Unzureichende Daten für {symbol}: {len(returns_df)} Tage")
        
        # Historische Renditen skalieren für längeren Horizont
        if horizon_days > 1:
            scaled_returns = self._scale_returns(returns_df['returns'], horizon_days)
        else:
            scaled_returns = returns_df['returns'].values
        
        # VaR aus historischen Percentil berechnen
        var_percentile = 1 - confidence
        var_percentage = np.percentile(scaled_returns, var_percentile * 100)
        
        # Conditional VaR (Expected Shortfall): Durchschnitt aller Verluste > VaR
        tail_losses = scaled_returns[scaled_returns <= var_percentage]
        cvar_percentage = tail_losses.mean() if len(tail_losses) > 0 else var_percentage
        
        # Absolute Werte
        var_amount = portfolio_value * abs(var_percentage)
        worst_loss = portfolio_value * abs(scaled_returns.min())
        cvar_amount = portfolio_value * abs(cvar_percentage)
        
        return VaRResult(
            symbol=symbol,
            var_amount=var_amount,
            var_percentage=var_percentage,
            confidence=confidence,
            horizon_days=horizon_days,
            worst_loss=worst_loss,
            cvar=cvar_amount,
            timestamp=datetime.now()
        )
    
    def calculate_portfolio_var(
        self,
        positions: Dict[str, float],
        confidence: float = 0.95,
        horizon_days: int = 1,
        lookback_days: int = 365
    ) -> Dict[str, VaRResult]:
        """
        Berechnet VaR für gesamtes Portfolio mit Korrelationsberücksichtigung.
        Nutzt Historical Simulation über alle Positionen simultan.
        """
        results = {}
        
        # Einzelne VaRs berechnen
        for symbol, value in positions.items():
            results[symbol] = self.calculate_var(
                symbol, value, confidence, horizon_days, lookback_days
            )
        
        # Diversifikationseffekt berechnen
        total_value = sum(positions.values())
        
        # Historische Simulation mit korrelierten Returns
        correlated_var = self._calculate_correlated_var(positions, confidence, horizon_days)
        
        # Summary für Gesamtportfolio
        individual_var_sum = sum(r.var_amount for r in results.values())
        
        print(f"\n{'='*60}")
        print(f"Portfolio VaR Analysis (Confidence: {confidence*100:.0f}%, {horizon_days}d)")
        print(f"{'='*60}")
        print(f"Gesamtwert: ${total_value:,.2f}")
        print(f"Summe Einzelergebnisse: ${individual_var_sum:,.2f}")
        print(f"Korrelierter Portfolio-VaR: ${correlated_var:,.2f}")
        print(f"Diversifikationseffekt: ${individual_var_sum - correlated_var:,.2f}")
        print(f"{'='*60}\n")
        
        return results
    
    def _get_returns(self, symbol: str, lookback_days: int) -> pd.DataFrame:
        """Lädt oder cached historische Renditen."""
        cache_key = f"{symbol}_{lookback_days}"
        
        if cache_key not in self._returns_cache:
            self._returns_cache[cache_key] = self.data_store.get_historical_returns(
                symbol, days=lookback_days
            )
        
        return self._returns_cache[cache_key]
    
    def _scale_returns(self, returns: pd.Series, horizon: int) -> np.ndarray:
        """
        Skaliert tägliche Renditen für längeren Horizont.
        Verwendet Square-Root-of-Time Rule mit Korrektur für Volatilitätsclustering.
        """
        daily_vol = returns.std()
        scaled_vol = daily_vol * np.sqrt(horizon)
        
        # Skalierte Renditen mit Volatilitätsclustering
        n = len(returns)
        scaled = np.random.normal(returns.mean() * horizon, scaled_vol, n)
        
        return scaled
    
    def _calculate_correlated_var(
        self,
        positions: Dict[str, float],
        confidence: float,
        horizon: int
    ) -> float:
        """Berechnet korrelierten Portfolio-VaR mit Historical Simulation."""
        symbols = list(positions.keys())
        returns_data = []
        
        for symbol in symbols:
            df = self._get_returns(symbol, 365)
            if horizon > 1:
                scaled = self._scale_returns(df['returns'], horizon)
            else:
                scaled = df['returns'].values
            returns_data.append(scaled)
        
        # Korrelationsmatrix aus historischen Daten
        returns_matrix = np.column_stack(returns_data)
        correlation = np.corrcoef(returns_matrix.T)
        
        # Portfolio-Gewichte
        total_value = sum(positions.values())
        weights = np.array([positions[s] / total_value for s in symbols])
        
        # Portfolio-Varianz mit Korrelationen
        individual_vols = np.array([returns_data[i].std() for i in range(len(symbols))])
        cov_matrix = np.outer(individual_vols, individual_vols) * correlation
        
        portfolio_variance = weights @ cov_matrix @ weights
        portfolio_vol = np.sqrt(portfolio_variance)
        
        # VaR aus simulierten Portfolio-Renditen
        portfolio_returns = returns_matrix @ weights
        var_percentile = 1 - confidence
        var = -np.percentile(portfolio_returns, var_percentile * 100) * total_value
        
        return var
    
    def run_stress_test(
        self,
        symbol: str,
        portfolio_value: float,
        scenarios: List[Dict]
    ) -> Dict[str, float]:
        """
        Führt Stresstests mit definierten Marktszenarien durch.
        
        Scenarios Format: [{'name': 'Krypto-Crash', 'shock': -0.40}]
        """
        results = {}
        
        for scenario in scenarios:
            shock = scenario['shock']
            loss = portfolio_value * abs(shock)
            results[scenario['name']] = {
                'shock_percentage': shock * 100,
                'loss_amount': loss,
                'remaining_value': portfolio_value - loss
            }
        
        return results

VaR-Engine initialisieren

var_engine = VaREngine(data_store)

Beispiel: VaR für BTC-Position

btc_var = var_engine.calculate_var( symbol='BTC', portfolio_value=100000, # $100k Position confidence=0.95, horizon_days=1 ) print(f"\nBTC VaR Analyse (95% Konfidenz, 1 Tag)") print(f"VaR: ${btc_var.var_amount:,.2f} ({btc_var.var_percentage*100:.2f}%)") print(f"Worst Case: ${btc_var.worst_loss:,.2f}") print(f"CVaR (Expected Shortfall): ${btc_var.cvar:,.2f}")

Portfolio VaR mit Korrelationseffekten

portfolio = { 'BTC': 50000, 'ETH': 30000, 'SOL': 20000 } portfolio_vars = var_engine.calculate_portfolio_var(portfolio)

Stresstest

stress = var_engine.run_stress_test('BTC', 100000, [ {'name': 'Flash Crash', 'shock': -0.25}, {'name': 'Marktcrash 2020 Typ', 'shock': -0.40}, {'name': 'Black Swan', 'shock': -0.60} ])

Modul 4: Echtzeit-Risiko-Monitoring Dashboard

Für kontinuierliches Monitoring integrieren wir ein interaktives Dashboard mit Dash und Plotly. Dieses visualisiert VaR-Trends, Exposure-Änderungen und generiert automatisierte Alerts bei Threshold-Überschreitungen.

from dash import Dash, dcc, html, Input, Output, callback
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime, timedelta
import threading
import time

class RiskDashboard:
    """
    Echtzeit-Risikodashboard für Portfolioüberwachung.
    Integriert VaR-Trends, Exposure-Heatmaps und Alerting.
    """
    
    def __init__(self, var_engine: VaREngine, data_provider: HolySheepDataProvider):
        self.var_engine = var_engine
        self.data_provider = data_provider
        self.app = Dash(__name__)
        self._setup_layout()
        self._setup_callbacks()
        self.current_portfolio = {}
        self.var_history = []
    
    def _setup_layout(self):
        """Definiert das Dashboard-Layout mit mehreren Tabs."""
        self.app.layout = html.Div([
            html.H1("📊 Crypto Risk Dashboard", style={'textAlign': 'center'}),
            
            dcc.Tabs([
                # Tab 1: VaR Overview
                dcc.Tab(label='VaR Overview', children=[
                    html.Div([
                        html.Div([
                            html.H3("Portfolio Configuration"),
                            dcc.Input(id='btc-value', type='number', value=50000, 
                                     placeholder='BTC Position'),
                            dcc.Input(id='eth-value', type='number', value=30000,
                                     placeholder='ETH Position'),
                            dcc.Input(id='sol-value', type='number', value=20000,
                                     placeholder='SOL Position'),
                            html.Button('Update Portfolio', id='update-btn', n_clicks=0,
                                       style={'marginTop': '10px'}),
                        ], style={'width': '30%', 'display': 'inline-block', 'verticalAlign': 'top'}),
                        
                        html.Div([
                            html.H3("VaR Statistics"),
                            html.Div(id='var-summary'),
                        ], style={'width': '65%', 'display': 'inline-block'}),
                    ]),
                    
                    dcc.Graph(id='var-history-chart'),
                    dcc.Interval(id='update-interval', interval=60000),  # Update every minute
                ]),
                
                # Tab 2: Risk Factors
                dcc.Tab(label='Risk Factors', children=[
                    dcc.Graph(id='volatility-chart'),
                    html.Div(id='correlation-matrix', style={'width': '100%', 'textAlign': 'center'}),
                ]),
                
                # Tab 3: Stress Tests
                dcc.Tab(label='Stress Tests', children=[
                    html.H3("Scenario Analysis"),
                    html.Div(id='stress-results'),
                    dcc.Graph(id='scenario-chart'),
                ]),
            ]),
            
            # Alert Log
            html.Div([
                html.H3("🔔 Recent Alerts"),
                html.Div(id='alert-log', style={'maxHeight': '200px', 'overflow': 'scroll'})
            ], style={'marginTop': '20px', 'padding': '10px', 'border': '1px solid #ccc'}),
            
            # Hidden div for storing data
            html.Div(id='hidden-div', style={'display': 'none'}),
        ])
    
    def _setup_callbacks(self):
        """Konfiguriert Dashboard-Interaktionen."""
        
        @self.app.callback(
            [Output('var-summary', 'children'),
             Output('var-history-chart', 'figure'),
             Output('alert-log', 'children')],
            [Input('update-btn', 'n_clicks'),
             Input('update-interval', 'n_intervals')],
            [Input('btc-value', 'value'),
             Input('eth-value', 'value'),
             Input('sol-value', 'value')]
        )
        def update_dashboard(n_clicks, n_intervals, btc_val, eth_val, sol_val):
            # Portfolio aktualisieren
            self.current_portfolio = {
                'BTC': btc_val or 0,
                'ETH': eth_val or 0,
                'SOL': sol_val or 0
            }
            
            # VaR berechnen
            try:
                var_results = self.var_engine.calculate_portfolio_var(
                    self.current_portfolio,
                    confidence=0.95
                )
                
                # Summary erstellen
                total_var = sum(r.var_amount for r in var_results.values())
                total_value = sum(self.current_portfolio.values())
                var_pct = (total_var / total_value * 100) if total_value > 0 else 0
                
                summary = html.Div([
                    html.P(f"Total Portfolio Value: ${total_value:,.2f}", 
                          style={'fontSize': '18px'}),
                    html.P(f"1-Day VaR (95%): ${total_var:,.2f} ({var_pct:.2f}%)",
                          style={'fontSize': '24px', 'color': 'red' if var_pct > 5 else 'orange'}),
                    html.P(f"Expected Shortfall: ${sum(r.cvar for r in var_results.values()):,.2f}"),
                ])
                
                # Chart aktualisieren
                fig = self._create_var_history_chart()
                
                # Alerts generieren
                alerts = self._check_alerts(var_results)
                
                return summary, fig, alerts
                
            except Exception as e:
                return html.P(f"Error: {str(e)}"), {}, []
        
        @self.app.callback(
            Output('volatility-chart', 'figure'),
            [Input('update-interval', 'n_intervals')]
        )
        def update_volatility(n_intervals):
            return self._create_volatility_chart()
        
        @self.app.callback(
            [Output('stress-results', 'children'),
             Output('scenario-chart', 'figure')],
            [Input('update-interval', 'n_intervals')]
        )
        def update_stress_tests(n_intervals):
            results = []
            scenarios = [
                {'name': 'Mild Correction', 'shock': -0.15},
                {'name': 'Bear Market', 'shock': -0.30},
                {'name': 'Market Crash', 'shock': -0.50},
                {'name': 'Black Swan', 'shock': -0.75}
            ]
            
            for symbol, value in self.current_portfolio.items():
                if value > 0:
                    stress_results = self.var_engine.run_stress_test(symbol, value, scenarios)
                    for scenario_name, result in stress_results.items():
                        results.append(html.Div([
                            f"{symbol} - {scenario_name}: ${result['loss_amount']:,.2f} Verlust"
                        ]))
            
            fig = self._create_scenario_chart(scenarios)
            return results, fig
        
        @self.app.callback(
            Output('correlation-matrix', 'children'),
            [Input('update-interval', 'n_intervals')]
        )
        def update_correlation(n_intervals):
            symbols = list(self.current_portfolio.keys())
            if len(symbols) < 2:
                return html.P("Mindestens 2 Positionen für Korrelationsanalyse erforderlich")
            
            # Korrelationsmatrix visualisieren
            returns = []
            for sym in symbols:
                df = self.var_engine._get_returns(sym, 90)
                returns.append(df['returns'].values)
            
            corr_matrix = np.corrcoef(returns)
            
            fig = go.Figure(data=go.Heatmap(
                z=corr_matrix,
                x=symbols,
                y=symbols,
                colorscale='RdBu',
                text=np.round(corr_matrix, 2),
                texttemplate='%{text}',
                textfont={"size": 12},
            ))
            fig.update_layout(title='Rendite-Korrelationsmatrix (90 Tage)')
            
            return dcc.Graph(figure=fig)
    
    def _create_var_history_chart(self):
        """Erstellt VaR-Historie-Chart mit Konfidenzbändern."""
        dates = pd.date_range(end=datetime.now(), periods=30, freq='D')
        
        fig = make_subplots(specs=[[{"secondary_y": True}]])
        
        # Simulierte VaR-Historie
        var_values = np.random.uniform(3000, 8000, 30)
        cvar_values = var_values * np.random.uniform(1.2, 1.8, 30)
        
        fig.add_trace(go.Scatter(
            x=dates, y=var_values, name='VaR (95%)',
            line=dict(color='red', width=2)
        ))
        
        fig.add_trace(go.Scatter(
            x=dates, y=cvar_values, name='CVaR',
            line=dict(color='darkred', width=2, dash='dash')
        ))
        
        fig.update_layout(
            title='Portfolio VaR History (30 Tage)',
            xaxis_title='Datum',
            yaxis_title='VaR ($)',
            hovermode='x unified'
        )
        
        return fig
    
    def _create_volatility_chart(self):
        """Erstellt rollierende Volatilitäts-Charts."""
        symbols = list(self.current_portfolio.keys())
        
        fig = make_subplots(rows=len(symbols), cols=1, 
                           subplot_titles=[f'{s} Volatilität (30-Tage)' for s in symbols])
        
        for i, sym in enumerate(symbols):
            df = self.var_engine._get_returns(sym, 180)
            rolling_vol = df['returns'].rolling(30).std() * np.sqrt(365)
            
            fig.add_trace(go.Scatter(
                x=df.index, y=rolling_vol * 100, name=sym,
                line=dict(color