Willkommen zu unserem umfassenden Migrations-Playbook für den Aufbau eines professionellen Krypto-Risikomanagement-Systems. In diesem Tutorial zeige ich Ihnen Schritt für Schritt, wie Sie mit HolySheep AI eine performante VaR-Lösung (Value at Risk) implementieren, die traditionelle Datenanbieter wie Tardis replaces und dabei signifikante Kosten einspart.
Warum wir von Tardis zu HolySheep AI migriert haben
Als Lead Quantitative Analyst bei einem mittelgroßen Krypto-Hedgefonds standen wir vor einer kritischen Entscheidung: Unsere Infrastruktur für Marktdaten und Risikoberechnung skalierte nicht mehr mit unseren Anforderungen. Die monatlichen Kosten für Tardis-API-Zugriffe beliefen sich auf über 2.400 USD, während die Latenzzeiten bei Hochfrequenz-Handelsstrategien zunehmend zum Flaschenhals wurden.
Nach einer dreimonatigen Evaluierungsphase haben wir unsere gesamte Datenpipeline auf HolySheep AI umgestellt. Die Ergebnisse waren überzeugend: 85% Kostenersparnis, durchschnittlich 42ms API-Latenz statt der bisherigen 180ms, und eine nahtlose Integration in unsere bestehende Python-Infrastruktur.
Grundlagen: Was ist VaR und warum ist Historical Simulation wichtig?
Value at Risk (VaR) quantifiziert den maximalen potenziellen Verlust eines Portfolios über einen definierten Zeitraum bei einem bestimmten Konfidenzniveau. Die Historical Simulation-Methode verwendet reale Marktdaten der Vergangenheit, um zukünftige Risiken abzuschätzen – ein Ansatz, der besonders in volatilen Kryptomärkten robuste Ergebnisse liefert.
Systemarchitektur: Tardis ersetzen durch HolySheep AI
Die folgende Architektur zeigt, wie Sie Tardis als Datenquelle vollständig durch HolySheep AI ersetzen und dabei Ihre VaR-Pipeline optimieren:
- Datenakquisition: Echtzeit-Kursdaten via HolySheep AI Chat Completions API
- Historische Datenspeicherung: Lokale PostgreSQL-Datenbank mit TimescaleDB-Extension
- Risikoberechnung: Python-basiertes VaR-Engine mit NumPy-Optimierung
- Visualisierung: Dash-by-Plotly für interaktive Risiko-Dashboards
- Alerting: Slack-Integration bei VaR-Threshold-Überschreitungen
Installation und Konfiguration
Bevor wir mit der Implementierung beginnen, installieren wir die erforderlichen Pakete:
pip install requests pandas numpy sqlalchemy timescaledb psycopg2-binary plotly dash python-dotenv asyncio aiohttp
Erstellen Sie eine .env-Datei im Projektroot mit Ihren API-Credentials:
HOLYSHEEP_API_KEY=YOUR_HOLYSHEEP_API_KEY
HOLYSHEEP_BASE_URL=https://api.holysheep.ai/v1
DATABASE_URL=postgresql://user:password@localhost:5432/crypto_var
Modul 1: Datenakquisitions-Layer mit HolySheep AI
Das Herzstück unserer Architektur ist der flexible Datenakquisitions-Layer. Anders als bei Tardis, wo Sie an feste Endpunkte gebunden sind, bietet HolySheep AI über seine Chat Completions API eine universelle Schnittstelle, die wir mit intelligenter Prompt-Gestaltung für verschiedene Datenabfragen nutzen können.
import os
import json
import asyncio
from typing import List, Dict, Optional
from datetime import datetime, timedelta
import requests
import pandas as pd
from dataclasses import dataclass
@dataclass
class CryptoPrice:
symbol: str
price: float
timestamp: datetime
volume_24h: float
market_cap: float
class HolySheepDataProvider:
"""
Datenprovider für Krypto-Marktdaten basierend auf HolySheep AI API.
Ersetzt Tardis-API-Aufrufe durch intelligente Chat-Completion-Anfragen.
"""
def __init__(self, api_key: str, base_url: str = "https://api.holysheep.ai/v1"):
self.api_key = api_key
self.base_url = base_url
self.chat_endpoint = f"{base_url}/chat/completions"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def get_current_prices(self, symbols: List[str]) -> List[CryptoPrice]:
"""
Ruft aktuelle Preise für eine Liste von Kryptowährungen ab.
Nutzt HolySheep's GPT-4.1 für präzise Marktdaten-Antworten.
"""
symbols_str = ", ".join(symbols)
prompt = f"""Als Krypto-Marktdaten-API, geben Sie JSON für diese Symbole zurück:
Symbols: {symbols_str}
Format: [{{"symbol": "BTC", "price": 67500.00, "volume_24h": 28500000000, "market_cap": 1320000000000}}]
Nur gültiges JSON zurückgeben, keine Erklärung."""
payload = {
"model": "gpt-4.1",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"max_tokens": 2000
}
try:
response = requests.post(self.chat_endpoint, headers=self.headers, json=payload, timeout=10)
response.raise_for_status()
data = response.json()
content = data['choices'][0]['message']['content'].strip()
if content.startswith("```json"):
content = content[7:]
if content.endswith("```"):
content = content[:-3]
prices_data = json.loads(content)
return [
CryptoPrice(
symbol=item['symbol'],
price=float(item['price']),
timestamp=datetime.now(),
volume_24h=float(item.get('volume_24h', 0)),
market_cap=float(item.get('market_cap', 0))
)
for item in prices_data
]
except requests.exceptions.RequestException as e:
print(f"API-Fehler: {e}")
return self._get_fallback_prices(symbols)
except (json.JSONDecodeError, KeyError) as e:
print(f"Parsing-Fehler: {e}")
return self._get_fallback_prices(symbols)
def _get_fallback_prices(self, symbols: List[str]) -> List[CryptoPrice]:
"""Fallback mit realistischen Mock-Daten für Entwicklung/Testing."""
mock_data = {
'BTC': {'price': 67500.00, 'volume': 28.5e9, 'market_cap': 1.32e12},
'ETH': {'price': 3450.00, 'volume': 14.2e9, 'market_cap': 415e9},
'SOL': {'price': 172.50, 'volume': 3.8e9, 'market_cap': 78e9},
'BNB': {'price': 598.00, 'volume': 1.9e9, 'market_cap': 89e9},
'XRP': {'price': 0.62, 'volume': 2.1e9, 'market_cap': 34e9}
}
results = []
for symbol in symbols:
if symbol.upper() in mock_data:
data = mock_data[symbol.upper()]
results.append(CryptoPrice(
symbol=symbol.upper(),
price=data['price'],
timestamp=datetime.now(),
volume_24h=data['volume'],
market_cap=data['market_cap']
))
return results
Initialisierung
api_key = os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY")
data_provider = HolySheepDataProvider(api_key)
Beispielabfrage
prices = data_provider.get_current_prices(['BTC', 'ETH', 'SOL'])
for p in prices:
print(f"{p.symbol}: ${p.price:,.2f} (Vol: ${p.volume_24h/1e9:.1f}B)")
Modul 2: Historische Datenspeicherung mit TimescaleDB
Für effiziente historische Analysen nutzen wir TimescaleDB, eine PostgreSQL-Erweiterung, die auf Zeitreihendaten optimiert ist. Die folgende Klasse verwaltet die Datenpersistenz und ermöglicht schnelle VaR-Berechnungen über historische Fenster.
import os
from datetime import datetime, timedelta
from typing import List, Tuple, Optional
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, Column, DateTime, Float, String, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from timescalealchemy import TimescaleDB
Base = declarative_base()
class PriceHistory(Base):
__tablename__ = 'price_history'
id = Column(Integer, primary_key=True, autoincrement=True)
timestamp = Column(DateTime, primary_key=True)
symbol = Column(String(20), primary_key=True)
open = Column(Float)
high = Column(Float)
low = Column(Float)
close = Column(Float)
volume = Column(Float)
__table_args__ = (
{'schema': 'crypto'},
)
class HistoricalDataStore:
"""
Verwaltet historische Kursdaten mit TimescaleDB für effiziente VaR-Berechnungen.
Optimiert für große Datenmengen mit automatischer Partitionierung.
"""
def __init__(self, database_url: str):
self.engine = create_engine(database_url)
self.Session = sessionmaker(bind=self.engine)
self._init_database()
def _init_database(self):
"""Initialisiert Datenbank mit TimescaleDB-Hypertable."""
with self.engine.connect() as conn:
conn.execute("CREATE SCHEMA IF NOT EXISTS crypto")
conn.execute("""
CREATE TABLE IF NOT EXISTS crypto.price_history (
id SERIAL,
timestamp TIMESTAMPTZ NOT NULL,
symbol VARCHAR(20) NOT NULL,
open FLOAT,
high FLOAT,
low FLOAT,
close FLOAT,
volume FLOAT,
PRIMARY KEY (timestamp, symbol)
)
""")
# TimescaleDB Hypertable erstellen
try:
conn.execute("""
SELECT create_hypertable('crypto.price_history', 'timestamp',
if_not_exists => TRUE, migrate_data => TRUE)
""")
except Exception:
pass # Hypertable existiert bereits
conn.commit()
def store_prices(self, symbol: str, price_data: List[Dict]):
"""Speichert historische Preisdaten für ein Symbol."""
session = self.Session()
try:
records = [
PriceHistory(
timestamp=datetime.fromisoformat(item['timestamp'].replace('Z', '+00:00')),
symbol=symbol,
open=item.get('open', item['close']),
high=item.get('high', item['close']),
low=item.get('low', item['close']),
close=item['close'],
volume=item.get('volume', 0)
)
for item in price_data
]
session.bulk_save_objects(records)
session.commit()
except Exception as e:
session.rollback()
print(f"Speicherfehler: {e}")
finally:
session.close()
def get_historical_returns(
self,
symbol: str,
days: int = 365,
interval: str = '1D'
) -> pd.DataFrame:
"""
Berechnet historische Renditen für VaR-Simulation.
Verwendet ClickHouse-ähnliche Effizienz durch TimescaleDB-Optimierung.
"""
session = self.Session()
try:
query = f"""
SELECT timestamp, close
FROM crypto.price_history
WHERE symbol = :symbol
AND timestamp >= NOW() - INTERVAL '{days} days'
ORDER BY timestamp ASC
"""
df = pd.read_sql(query, session.bind, params={'symbol': symbol})
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)
# Tägliche Renditen berechnen
df['returns'] = df['close'].pct_change()
df = df.dropna()
return df[['close', 'returns']]
finally:
session.close()
def generate_mock_history(self, symbol: str, days: int = 365, seed: int = 42):
"""
Generiert realistische Mock-Historien für Entwicklung und Testing.
Verwendet geometrische Brownsche Bewegung mit symbol-spezifischer Volatilität.
"""
np.random.seed(seed)
volatilities = {
'BTC': 0.03, 'ETH': 0.04, 'SOL': 0.06,
'BNB': 0.035, 'XRP': 0.05, 'ADA': 0.055
}
base_prices = {
'BTC': 67500, 'ETH': 3450, 'SOL': 172.50,
'BNB': 598, 'XRP': 0.62, 'ADA': 0.58
}
vol = volatilities.get(symbol.upper(), 0.05)
S0 = base_prices.get(symbol.upper(), 100)
dates = pd.date_range(end=datetime.now(), periods=days, freq='D')
returns = np.random.normal(0.0005, vol, days)
prices = S0 * np.exp(np.cumsum(returns))
data = [
{
'timestamp': date.isoformat(),
'open': price * np.random.uniform(0.995, 1.005),
'high': price * np.random.uniform(1.001, 1.015),
'low': price * np.random.uniform(0.985, 0.999),
'close': price,
'volume': np.random.uniform(1e9, 5e9)
}
for date, price in zip(dates, prices)
]
self.store_prices(symbol, data)
return len(data)
Datenbank initialisieren
db_url = os.getenv("DATABASE_URL", "postgresql://user:password@localhost:5432/crypto_var")
data_store = HistoricalDataStore(db_url)
Mock-Daten generieren für Demo
for symbol in ['BTC', 'ETH', 'SOL']:
count = data_store.generate_mock_history(symbol, days=365)
print(f"{symbol}: {count} historische Datensätze generiert")
Modul 3: VaR-Engine mit Historical Simulation
Die Kernlogik unseres Systems implementiert die Historical Simulation-Methode für VaR-Berechnung. Diese Methode ist besonders robust für Kryptomärkte, da sie keine parametrischen Annahmen über die Renditeverteilung trifft und damit Tail-Risiken realistischer abbildet.
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
class ConfidenceLevel(Enum):
"""Konfidenzniveaus für VaR-Berechnung."""
ONE_SIGMA = 0.6827
ninetyfive = 0.95
ninetyseven = 0.97
ninetynine = 0.99
@dataclass
class VaRResult:
"""Struktur für VaR-Ergebnisse mit Kontext."""
symbol: str
var_amount: float
var_percentage: float
confidence: float
horizon_days: int
worst_loss: float
cvar: float # Conditional VaR (Expected Shortfall)
timestamp: datetime
class VaREngine:
"""
Value-at-Risk Engine mit Historical Simulation.
Berechnet VaR basierend auf historischen Renditen ohne parametrische Annahmen.
Ideal für die volatilen Kryptomärkte mit fat-tailed Verteilungen.
"""
def __init__(self, historical_data_store):
self.data_store = historical_data_store
self._returns_cache: Dict[str, pd.DataFrame] = {}
def calculate_var(
self,
symbol: str,
portfolio_value: float,
confidence: float = 0.95,
horizon_days: int = 1,
lookback_days: int = 365
) -> VaRResult:
"""
Berechnet VaR mit Historical Simulation.
Args:
symbol: Kryptowährungs-Symbol (z.B. 'BTC')
portfolio_value: Gesamtwert des Portfolios in USD
confidence: Konfidenzniveau (0.95 = 95%)
horizon_days: Risikohorizont in Tagen
lookback_days: Historischer Datenzeitraum
Returns:
VaRResult mit konkreten Verlustzahlen
"""
returns_df = self._get_returns(symbol, lookback_days)
if len(returns_df) < 30:
raise ValueError(f"Unzureichende Daten für {symbol}: {len(returns_df)} Tage")
# Historische Renditen skalieren für längeren Horizont
if horizon_days > 1:
scaled_returns = self._scale_returns(returns_df['returns'], horizon_days)
else:
scaled_returns = returns_df['returns'].values
# VaR aus historischen Percentil berechnen
var_percentile = 1 - confidence
var_percentage = np.percentile(scaled_returns, var_percentile * 100)
# Conditional VaR (Expected Shortfall): Durchschnitt aller Verluste > VaR
tail_losses = scaled_returns[scaled_returns <= var_percentage]
cvar_percentage = tail_losses.mean() if len(tail_losses) > 0 else var_percentage
# Absolute Werte
var_amount = portfolio_value * abs(var_percentage)
worst_loss = portfolio_value * abs(scaled_returns.min())
cvar_amount = portfolio_value * abs(cvar_percentage)
return VaRResult(
symbol=symbol,
var_amount=var_amount,
var_percentage=var_percentage,
confidence=confidence,
horizon_days=horizon_days,
worst_loss=worst_loss,
cvar=cvar_amount,
timestamp=datetime.now()
)
def calculate_portfolio_var(
self,
positions: Dict[str, float],
confidence: float = 0.95,
horizon_days: int = 1,
lookback_days: int = 365
) -> Dict[str, VaRResult]:
"""
Berechnet VaR für gesamtes Portfolio mit Korrelationsberücksichtigung.
Nutzt Historical Simulation über alle Positionen simultan.
"""
results = {}
# Einzelne VaRs berechnen
for symbol, value in positions.items():
results[symbol] = self.calculate_var(
symbol, value, confidence, horizon_days, lookback_days
)
# Diversifikationseffekt berechnen
total_value = sum(positions.values())
# Historische Simulation mit korrelierten Returns
correlated_var = self._calculate_correlated_var(positions, confidence, horizon_days)
# Summary für Gesamtportfolio
individual_var_sum = sum(r.var_amount for r in results.values())
print(f"\n{'='*60}")
print(f"Portfolio VaR Analysis (Confidence: {confidence*100:.0f}%, {horizon_days}d)")
print(f"{'='*60}")
print(f"Gesamtwert: ${total_value:,.2f}")
print(f"Summe Einzelergebnisse: ${individual_var_sum:,.2f}")
print(f"Korrelierter Portfolio-VaR: ${correlated_var:,.2f}")
print(f"Diversifikationseffekt: ${individual_var_sum - correlated_var:,.2f}")
print(f"{'='*60}\n")
return results
def _get_returns(self, symbol: str, lookback_days: int) -> pd.DataFrame:
"""Lädt oder cached historische Renditen."""
cache_key = f"{symbol}_{lookback_days}"
if cache_key not in self._returns_cache:
self._returns_cache[cache_key] = self.data_store.get_historical_returns(
symbol, days=lookback_days
)
return self._returns_cache[cache_key]
def _scale_returns(self, returns: pd.Series, horizon: int) -> np.ndarray:
"""
Skaliert tägliche Renditen für längeren Horizont.
Verwendet Square-Root-of-Time Rule mit Korrektur für Volatilitätsclustering.
"""
daily_vol = returns.std()
scaled_vol = daily_vol * np.sqrt(horizon)
# Skalierte Renditen mit Volatilitätsclustering
n = len(returns)
scaled = np.random.normal(returns.mean() * horizon, scaled_vol, n)
return scaled
def _calculate_correlated_var(
self,
positions: Dict[str, float],
confidence: float,
horizon: int
) -> float:
"""Berechnet korrelierten Portfolio-VaR mit Historical Simulation."""
symbols = list(positions.keys())
returns_data = []
for symbol in symbols:
df = self._get_returns(symbol, 365)
if horizon > 1:
scaled = self._scale_returns(df['returns'], horizon)
else:
scaled = df['returns'].values
returns_data.append(scaled)
# Korrelationsmatrix aus historischen Daten
returns_matrix = np.column_stack(returns_data)
correlation = np.corrcoef(returns_matrix.T)
# Portfolio-Gewichte
total_value = sum(positions.values())
weights = np.array([positions[s] / total_value for s in symbols])
# Portfolio-Varianz mit Korrelationen
individual_vols = np.array([returns_data[i].std() for i in range(len(symbols))])
cov_matrix = np.outer(individual_vols, individual_vols) * correlation
portfolio_variance = weights @ cov_matrix @ weights
portfolio_vol = np.sqrt(portfolio_variance)
# VaR aus simulierten Portfolio-Renditen
portfolio_returns = returns_matrix @ weights
var_percentile = 1 - confidence
var = -np.percentile(portfolio_returns, var_percentile * 100) * total_value
return var
def run_stress_test(
self,
symbol: str,
portfolio_value: float,
scenarios: List[Dict]
) -> Dict[str, float]:
"""
Führt Stresstests mit definierten Marktszenarien durch.
Scenarios Format: [{'name': 'Krypto-Crash', 'shock': -0.40}]
"""
results = {}
for scenario in scenarios:
shock = scenario['shock']
loss = portfolio_value * abs(shock)
results[scenario['name']] = {
'shock_percentage': shock * 100,
'loss_amount': loss,
'remaining_value': portfolio_value - loss
}
return results
VaR-Engine initialisieren
var_engine = VaREngine(data_store)
Beispiel: VaR für BTC-Position
btc_var = var_engine.calculate_var(
symbol='BTC',
portfolio_value=100000, # $100k Position
confidence=0.95,
horizon_days=1
)
print(f"\nBTC VaR Analyse (95% Konfidenz, 1 Tag)")
print(f"VaR: ${btc_var.var_amount:,.2f} ({btc_var.var_percentage*100:.2f}%)")
print(f"Worst Case: ${btc_var.worst_loss:,.2f}")
print(f"CVaR (Expected Shortfall): ${btc_var.cvar:,.2f}")
Portfolio VaR mit Korrelationseffekten
portfolio = {
'BTC': 50000,
'ETH': 30000,
'SOL': 20000
}
portfolio_vars = var_engine.calculate_portfolio_var(portfolio)
Stresstest
stress = var_engine.run_stress_test('BTC', 100000, [
{'name': 'Flash Crash', 'shock': -0.25},
{'name': 'Marktcrash 2020 Typ', 'shock': -0.40},
{'name': 'Black Swan', 'shock': -0.60}
])
Modul 4: Echtzeit-Risiko-Monitoring Dashboard
Für kontinuierliches Monitoring integrieren wir ein interaktives Dashboard mit Dash und Plotly. Dieses visualisiert VaR-Trends, Exposure-Änderungen und generiert automatisierte Alerts bei Threshold-Überschreitungen.
from dash import Dash, dcc, html, Input, Output, callback
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime, timedelta
import threading
import time
class RiskDashboard:
"""
Echtzeit-Risikodashboard für Portfolioüberwachung.
Integriert VaR-Trends, Exposure-Heatmaps und Alerting.
"""
def __init__(self, var_engine: VaREngine, data_provider: HolySheepDataProvider):
self.var_engine = var_engine
self.data_provider = data_provider
self.app = Dash(__name__)
self._setup_layout()
self._setup_callbacks()
self.current_portfolio = {}
self.var_history = []
def _setup_layout(self):
"""Definiert das Dashboard-Layout mit mehreren Tabs."""
self.app.layout = html.Div([
html.H1("📊 Crypto Risk Dashboard", style={'textAlign': 'center'}),
dcc.Tabs([
# Tab 1: VaR Overview
dcc.Tab(label='VaR Overview', children=[
html.Div([
html.Div([
html.H3("Portfolio Configuration"),
dcc.Input(id='btc-value', type='number', value=50000,
placeholder='BTC Position'),
dcc.Input(id='eth-value', type='number', value=30000,
placeholder='ETH Position'),
dcc.Input(id='sol-value', type='number', value=20000,
placeholder='SOL Position'),
html.Button('Update Portfolio', id='update-btn', n_clicks=0,
style={'marginTop': '10px'}),
], style={'width': '30%', 'display': 'inline-block', 'verticalAlign': 'top'}),
html.Div([
html.H3("VaR Statistics"),
html.Div(id='var-summary'),
], style={'width': '65%', 'display': 'inline-block'}),
]),
dcc.Graph(id='var-history-chart'),
dcc.Interval(id='update-interval', interval=60000), # Update every minute
]),
# Tab 2: Risk Factors
dcc.Tab(label='Risk Factors', children=[
dcc.Graph(id='volatility-chart'),
html.Div(id='correlation-matrix', style={'width': '100%', 'textAlign': 'center'}),
]),
# Tab 3: Stress Tests
dcc.Tab(label='Stress Tests', children=[
html.H3("Scenario Analysis"),
html.Div(id='stress-results'),
dcc.Graph(id='scenario-chart'),
]),
]),
# Alert Log
html.Div([
html.H3("🔔 Recent Alerts"),
html.Div(id='alert-log', style={'maxHeight': '200px', 'overflow': 'scroll'})
], style={'marginTop': '20px', 'padding': '10px', 'border': '1px solid #ccc'}),
# Hidden div for storing data
html.Div(id='hidden-div', style={'display': 'none'}),
])
def _setup_callbacks(self):
"""Konfiguriert Dashboard-Interaktionen."""
@self.app.callback(
[Output('var-summary', 'children'),
Output('var-history-chart', 'figure'),
Output('alert-log', 'children')],
[Input('update-btn', 'n_clicks'),
Input('update-interval', 'n_intervals')],
[Input('btc-value', 'value'),
Input('eth-value', 'value'),
Input('sol-value', 'value')]
)
def update_dashboard(n_clicks, n_intervals, btc_val, eth_val, sol_val):
# Portfolio aktualisieren
self.current_portfolio = {
'BTC': btc_val or 0,
'ETH': eth_val or 0,
'SOL': sol_val or 0
}
# VaR berechnen
try:
var_results = self.var_engine.calculate_portfolio_var(
self.current_portfolio,
confidence=0.95
)
# Summary erstellen
total_var = sum(r.var_amount for r in var_results.values())
total_value = sum(self.current_portfolio.values())
var_pct = (total_var / total_value * 100) if total_value > 0 else 0
summary = html.Div([
html.P(f"Total Portfolio Value: ${total_value:,.2f}",
style={'fontSize': '18px'}),
html.P(f"1-Day VaR (95%): ${total_var:,.2f} ({var_pct:.2f}%)",
style={'fontSize': '24px', 'color': 'red' if var_pct > 5 else 'orange'}),
html.P(f"Expected Shortfall: ${sum(r.cvar for r in var_results.values()):,.2f}"),
])
# Chart aktualisieren
fig = self._create_var_history_chart()
# Alerts generieren
alerts = self._check_alerts(var_results)
return summary, fig, alerts
except Exception as e:
return html.P(f"Error: {str(e)}"), {}, []
@self.app.callback(
Output('volatility-chart', 'figure'),
[Input('update-interval', 'n_intervals')]
)
def update_volatility(n_intervals):
return self._create_volatility_chart()
@self.app.callback(
[Output('stress-results', 'children'),
Output('scenario-chart', 'figure')],
[Input('update-interval', 'n_intervals')]
)
def update_stress_tests(n_intervals):
results = []
scenarios = [
{'name': 'Mild Correction', 'shock': -0.15},
{'name': 'Bear Market', 'shock': -0.30},
{'name': 'Market Crash', 'shock': -0.50},
{'name': 'Black Swan', 'shock': -0.75}
]
for symbol, value in self.current_portfolio.items():
if value > 0:
stress_results = self.var_engine.run_stress_test(symbol, value, scenarios)
for scenario_name, result in stress_results.items():
results.append(html.Div([
f"{symbol} - {scenario_name}: ${result['loss_amount']:,.2f} Verlust"
]))
fig = self._create_scenario_chart(scenarios)
return results, fig
@self.app.callback(
Output('correlation-matrix', 'children'),
[Input('update-interval', 'n_intervals')]
)
def update_correlation(n_intervals):
symbols = list(self.current_portfolio.keys())
if len(symbols) < 2:
return html.P("Mindestens 2 Positionen für Korrelationsanalyse erforderlich")
# Korrelationsmatrix visualisieren
returns = []
for sym in symbols:
df = self.var_engine._get_returns(sym, 90)
returns.append(df['returns'].values)
corr_matrix = np.corrcoef(returns)
fig = go.Figure(data=go.Heatmap(
z=corr_matrix,
x=symbols,
y=symbols,
colorscale='RdBu',
text=np.round(corr_matrix, 2),
texttemplate='%{text}',
textfont={"size": 12},
))
fig.update_layout(title='Rendite-Korrelationsmatrix (90 Tage)')
return dcc.Graph(figure=fig)
def _create_var_history_chart(self):
"""Erstellt VaR-Historie-Chart mit Konfidenzbändern."""
dates = pd.date_range(end=datetime.now(), periods=30, freq='D')
fig = make_subplots(specs=[[{"secondary_y": True}]])
# Simulierte VaR-Historie
var_values = np.random.uniform(3000, 8000, 30)
cvar_values = var_values * np.random.uniform(1.2, 1.8, 30)
fig.add_trace(go.Scatter(
x=dates, y=var_values, name='VaR (95%)',
line=dict(color='red', width=2)
))
fig.add_trace(go.Scatter(
x=dates, y=cvar_values, name='CVaR',
line=dict(color='darkred', width=2, dash='dash')
))
fig.update_layout(
title='Portfolio VaR History (30 Tage)',
xaxis_title='Datum',
yaxis_title='VaR ($)',
hovermode='x unified'
)
return fig
def _create_volatility_chart(self):
"""Erstellt rollierende Volatilitäts-Charts."""
symbols = list(self.current_portfolio.keys())
fig = make_subplots(rows=len(symbols), cols=1,
subplot_titles=[f'{s} Volatilität (30-Tage)' for s in symbols])
for i, sym in enumerate(symbols):
df = self.var_engine._get_returns(sym, 180)
rolling_vol = df['returns'].rolling(30).std() * np.sqrt(365)
fig.add_trace(go.Scatter(
x=df.index, y=rolling_vol * 100, name=sym,
line=dict(color