In der Welt des algorithmischen Handels sind AI-gestützte Market-Making-Strategien zu einem der gefragtesten Themen geworden. Als leitender Engineer bei HolySheep AI habe ich in den letzten Jahren zahlreiche Produktionssysteme entwickelt und dabei wertvolle Erkenntnisse gesammelt. In diesem Tutorial zeige ich Ihnen, wie Sie eine vollständige Market-Making-Architektur aufbauen – von der订单簿analyse bis zur intelligenten Bestandsverwaltung.

Was ist ein AI-Market-Maker?

Ein Market Maker (MM) ist ein algorithmischer Akteur, der kontinuierlich Kauf- und Verkaufsorders in einem Orderbuch platziert. Das Ziel: Von der Bid-Ask-Spread-Differenz profitieren, während das Inventarrisiko minimiert wird. Traditionelle MM nutzten statistische Modelle; moderne AI-MM integrieren große Sprachmodelle für:

Architekturübersicht

Unsere Architektur besteht aus vier Kernkomponenten:

Production-Ready Implementation

1. OrderBookManager mit WebSocket-Anbindung

#!/usr/bin/env python3
"""
AI Market Maker - Order Book Manager
Production-Ready mit WebSocket-Streaming und Orderbuch-Analyse
"""
import asyncio
import json
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from collections import deque
import numpy as np

@dataclass
class OrderBookLevel:
    price: float
    size: float
    order_count: int = 0

@dataclass
class OrderBookSnapshot:
    symbol: str
    bids: List[OrderBookLevel]  # Sortiert: höchster zuerst
    asks: List[OrderBookLevel]  # Sortiert: niedrigster zuerst
    timestamp: int
    sequence: int

class OrderBookManager:
    """
    Verwaltet Orderbuchdaten mit Rolling-Window-Statistiken.
    Berechnet Spread, Mid-Price, Depth-Score und Volatilität.
    """
    
    def __init__(
        self,
        symbol: str,
        max_depth: int = 20,
        rolling_window: int = 100
    ):
        self.symbol = symbol
        self.max_depth = max_depth
        self.rolling_window = rolling_window
        
        # Aktueller Snapshot
        self.current_bids: List[OrderBookLevel] = []
        self.current_asks: List[OrderBookLevel] = []
        self.last_sequence: int = 0
        self.last_update: float = time.time()
        
        # Historische Metriken (Rolling Window)
        self.mid_price_history: deque = deque(maxlen=rolling_window)
        self.spread_history: deque = deque(maxlen=rolling_window)
        self.volume_imbalance_history: deque = deque(maxlen=rolling_window)
        
        # Latenz-Tracking
        self.update_latencies: deque = deque(maxlen=1000)
        self._last_processing_time: float = 0
    
    def update_snapshot(self, data: dict) -> None:
        """Verarbeitet eingehende Orderbuch-Updates."""
        start = time.perf_counter()
        
        # Parse bids und asks
        bids = [
            OrderBookLevel(
                price=float(b['price']),
                size=float(b['size']),
                order_count=b.get('count', 1)
            )
            for b in data.get('bids', [])[:self.max_depth]
        ]
        
        asks = [
            OrderBookLevel(
                price=float(a['price']),
                size=float(a['size']),
                order_count=a.get('count', 1)
            )
            for a in data.get('asks', [])[:self.max_depth]
        ]
        
        self.current_bids = bids
        self.current_asks = asks
        self.last_sequence = data.get('sequence', self.last_sequence + 1)
        self.last_update = time.time()
        
        # Berechne und speichere Metriken
        self._calculate_and_store_metrics()
        
        # Latenz-Tracking
        self._last_processing_time = (time.perf_counter() - start) * 1000
        self.update_latencies.append(self._last_processing_time)
    
    def _calculate_and_store_metrics(self) -> None:
        """Berechnet aktuelle Metriken und fügt sie dem History hinzu."""
        if not self.current_bids or not self.current_asks:
            return
        
        best_bid = self.current_bids[0].price
        best_ask = self.current_asks[0].price
        
        mid_price = (best_bid + best_ask) / 2
        spread = best_ask - best_bid
        spread_pct = (spread / mid_price) * 100 if mid_price > 0 else 0
        
        # Volume Imbalance
        bid_volume = sum(b.size for b in self.current_bids[:5])
        ask_volume = sum(a.size for a in self.current_asks[:5])
        volume_imbalance = (bid_volume - ask_volume) / (bid_volume + ask_volume) \
                          if (bid_volume + ask_volume) > 0 else 0
        
        self.mid_price_history.append(mid_price)
        self.spread_history.append(spread_pct)
        self.volume_imbalance_history.append(volume_imbalance)
    
    # === Öffentliche Getter ===
    
    @property
    def best_bid(self) -> Optional[float]:
        return self.current_bids[0].price if self.current_bids else None
    
    @property
    def best_ask(self) -> Optional[float]:
        return self.current_asks[0].price if self.current_asks else None
    
    @property
    def mid_price(self) -> Optional[float]:
        if self.best_bid and self.best_ask:
            return (self.best_bid + self.best_ask) / 2
        return None
    
    @property
    def spread(self) -> Optional[float]:
        if self.best_bid and self.best_ask:
            return self.best_ask - self.best_bid
        return None
    
    @property
    def spread_pct(self) -> Optional[float]:
        if self.mid_price and self.spread:
            return (self.spread / self.mid_price) * 100
        return None
    
    def get_imbalance(self) -> float:
        """Gibt Volume-Imbalance im Bereich [-1, 1] zurück."""
        if not self.current_bids or not self.current_asks:
            return 0.0
        
        bid_vol = sum(b.size for b in self.current_bids[:5])
        ask_vol = sum(a.size for a in self.current_asks[:5])
        
        total = bid_vol + ask_vol
        return (bid_vol - ask_vol) / total if total > 0 else 0.0
    
    def get_realized_volatility(self, window: int = 20) -> float:
        """Berechnet realisierte Volatilität aus Mid-Price-Historie."""
        if len(self.mid_price_history) < window:
            return 0.0
        
        prices = list(self.mid_price_history)[-window:]
        returns = np.diff(np.log(prices))
        return float(np.std(returns) * np.sqrt(252 * 390))  # Annualisiert
    
    def get_avg_latency_ms(self) -> float:
        """Durchschnittliche Update-Latenz in Millisekunden."""
        if not self.update_latencies:
            return 0.0
        return sum(self.update_latencies) / len(self.update_latencies)
    
    def get_depth_score(self) -> float:
        """Score 0-100 für Orderbuch-Tiefe (Liquidität)."""
        if not self.current_bids or not self.current_asks:
            return 0.0
        
        # Gewichtete Tiefe (nähere Levels wichtiger)
        bid_depth = sum(
            b.size * (1 / (i + 1)) 
            for i, b in enumerate(self.current_bids[:10])
        )
        ask_depth = sum(
            a.size * (1 / (i + 1)) 
            for i, a in enumerate(self.current_asks[:10])
        )
        
        avg_depth = (bid_depth + ask_depth) / 2
        return min(100.0, avg_depth / 10)  # Normalisiert
    
    def to_dict(self) -> dict:
        return {
            "symbol": self.symbol,
            "mid_price": self.mid_price,
            "best_bid": self.best_bid,
            "best_ask": self.best_ask,
            "spread_pct": self.spread_pct,
            "imbalance": self.get_imbalance(),
            "volatility": self.get_realized_volatility(),
            "depth_score": self.get_depth_score(),
            "avg_latency_ms": self.get_avg_latency_ms(),
            "sequence": self.last_sequence
        }


=== WebSocket-Integration ===

import websockets class WebSocketOrderBookFeed: """ Verbinder zu Börsen-WebSocket-APIs. Unterstützt reconnection mit exponential backoff. """ def __init__( self, manager: OrderBookManager, ws_url: str, reconnect_max_retries: int = 10, initial_backoff: float = 0.1 ): self.manager = manager self.ws_url = ws_url self.max_retries = reconnect_max_retries self.backoff = initial_backoff self.running = False async def connect_and_subscribe(self) -> None: """Main loop mit automatischer Reconnection.""" retry_count = 0 while retry_count < self.max_retries and self.running: try: async with websockets.connect(self.ws_url) as ws: retry_count = 0 self.backoff = 0.1 # Reset bei erfolgreicher Verbindung # Subscribe-Nachricht senden subscribe_msg = json.dumps({ "method": "subscribe", "params": {"channels": [f"orderbook:{self.manager.symbol}"]}, "id": 1 }) await ws.send(subscribe_msg) # Messages verarbeiten async for message in ws: data = json.loads(message) if 'data' in data: self.manager.update_snapshot(data['data']) except (websockets.ConnectionClosed, OSError) as e: retry_count += 1 wait_time = self.backoff * (2 ** retry_count) print(f"[WS] Verbindung verloren, Retry {retry_count}/{self.max_retries} in {wait_time:.1f}s") await asyncio.sleep(wait_time) def start(self) -> None: self.running = True asyncio.run(self.connect_and_subscribe()) def stop(self) -> None: self.running = False if __name__ == "__main__": # Demo mit simulierten Daten manager = OrderBookManager("BTC-USDT", max_depth=20) # Simuliere Orderbuch-Updates for i in range(50): manager.update_snapshot({ "sequence": i, "bids": [ {"price": 100000 - i * 10, "size": 0.5 + i * 0.01, "count": 3}, {"price": 99990 - i * 10, "size": 1.2, "count": 5}, ], "asks": [ {"price": 100010 + i * 10, "size": 0.8, "count": 4}, {"price": 100020 + i * 10, "size": 1.5, "count": 2}, ] }) print(f"Mid-Price: ${manager.mid_price:,.2f}") print(f"Spread: {manager.spread:.2f} ({manager.spread_pct:.4f}%)") print(f"Imbalance: {manager.get_imbalance():.3f}") print(f"Volatilität (ann.): {manager.get_realized_volatility():.4f}") print(f"Avg Latency: {manager.get_avg_latency_ms():.3f}ms") print(f"Depth Score: {manager.get_depth_score():.1f}")

2. AI-PricingEngine mit HolySheep AI Integration

#!/usr/bin/env python3
"""
AI Pricing Engine - Dynamic Pricing mit LLM-Integration
Nutzt HolySheep AI API für intelligente Spread-Anpassung
"""
import os
import json
import time
import asyncio
from dataclasses import dataclass
from typing import Optional, Tuple
from enum import Enum
import httpx

HolySheep AI Configuration

Registrieren Sie sich hier: https://www.holysheep.ai/register

HOLYSHEEP_API_KEY = os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY") HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1"

Preisstruktur 2026 (USD per Million Tokens):

GPT-4.1: $8.00 | Claude Sonnet 4.5: $15.00 | Gemini 2.5 Flash: $2.50 | DeepSeek V3.2: $0.42

MODEL_COSTS = { "gpt-4.1": {"input": 8.00, "output": 8.00}, "claude-sonnet-4.5": {"input": 15.00, "output": 15.00}, "gemini-2.5-flash": {"input": 2.50, "output": 2.50}, "deepseek-v3.2": {"input": 0.42, "output": 0.42}, # 85%+ günstiger! } class MarketRegime(Enum): TRENDING_UP = "trending_up" TRENDING_DOWN = "trending_down" RANGE_BOUND = "range_bound" VOLATILE = "volatile" LOW_LIQUIDITY = "low_liquidity" @dataclass class PricingContext: """Kontext-Informationen für die Preisgestaltung.""" symbol: str mid_price: float spread_bps: float imbalance: float # -1 to 1 volatility: float depth_score: float inventory_ratio: float # 0 to 1 (0.5 = neutral) recent_pnl: float time_of_day: int # Hour 0-23 regime: MarketRegime @dataclass class Quote: """Repräsentiert eineQuote-Offer.""" symbol: str bid_price: float ask_price: float bid_size: float ask_size: float timestamp: int confidence: float model_used: str class PricingEngine: """ Berechnet optimale Bid/Ask-Preise unter Berücksichtigung von: - Orderbuch-Metriken - Inventarrisiken - Marktbedingungen - LLM-basierter Sentiment-Analyse """ def __init__( self, api_key: str = HOLYSHEEP_API_KEY, model: str = "deepseek-v3.2", # Kosteneffizient: $0.42/MTok base_spread_bps: float = 10.0, # 0.10% Basis-Spread max_spread_bps: float = 50.0, inventory_target: float = 0.5, inventory_skew_factor: float = 0.3 ): self.api_key = api_key self.model = model self.base_spread_bps = base_spread_bps self.max_spread_bps = max_spread_bps self.inventory_target = inventory_target self.inventory_skew_factor = inventory_skew_factor # Rate Limiting self.request_count = 0 self.cost_accumulator = 0.0 self.last_request_time = 0 self.min_request_interval = 1.0 # Sekunden zwischen LLM-Calls # HTTP Client mit Retry self.client = httpx.AsyncClient( timeout=30.0, limits=httpx.Limits(max_connections=10, max_keepalive_connections=5) ) async def call_llm_for_sentiment(self, context: PricingContext) -> dict: """ Ruft HolySheep AI API auf für Marktsentiment-Analyse. Kostengünstig: DeepSeek V3.2 mit $0.42/MTok. """ current_time = time.time() # Throttle LLM-Requests if current_time - self.last_request_time < self.min_request_interval: await asyncio.sleep(self.min_request_interval - (current_time - self.last_request_time)) prompt = f"""Analysiere die aktuellen Marktbedingungen für {context.symbol}: Marktdaten: - Mid-Price: ${context.mid_price:,.2f} - Spread: {context.spread_bps:.2f} bps - Order-Imbalance: {context.imbalance:.3f} (negativ=mehr Verkäufer) - Volatilität: {context.volatility:.4f} - Liquiditäts-Score: {context.depth_score:.1f}/100 - Inventar-Verhältnis: {context.inventory_ratio:.2f} (1=long, 0=short) - Tageszeit: {context.time_of_day}:00 UTC Bestimme: 1. Markt-Regime (trending_up, trending_down, range_bound, volatile, low_liquidity) 2. Spread-Multiplikator (1.0-3.0, höher bei Unsicherheit) 3. Inventar-Bias (-0.5 to +0.5, negativ=platziere mehr Buys) 4. Risiko-Niveau (low, medium, high) Antworte im JSON-Format: {{"regime": "...", "spread_multiplier": 1.x, "inventory_bias": 0.x, "risk_level": "..."}} """ headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } payload = { "model": self.model, "messages": [ {"role": "system", "content": "Du bist ein erfahrener Market-Making-Analyst. Antworte nur mit JSON."}, {"role": "user", "content": prompt} ], "temperature": 0.3, "max_tokens": 200 } try: response = await self.client.post( f"{HOLYSHEEP_BASE_URL}/chat/completions", headers=headers, json=payload ) response.raise_for_status() result = response.json() self.request_count += 1 # Kosten berechnen tokens_used = result.get('usage', {}).get('total_tokens', 0) cost = (tokens_used / 1_000_000) * MODEL_COSTS[self.model]['input'] self.cost_accumulator += cost self.last_request_time = time.time() # Parse LLM-Response content = result['choices'][0]['message']['content'] # Cleanup Markdown falls vorhanden content = content.strip() if content.startswith('```json'): content = content[7:] if content.endswith('```'): content = content[:-3] return json.loads(content) except httpx.HTTPStatusError as e: print(f"[LLM] HTTP Error: {e.response.status_code}") return self._fallback_sentiment(context) except json.JSONDecodeError as e: print(f"[LLM] JSON Parse Error: {e}") return self._fallback_sentiment(context) except Exception as e: print(f"[LLM] Unexpected Error: {e}") return self._fallback_sentiment(context) def _fallback_sentiment(self, context: PricingContext) -> dict: """Fallback wenn LLM nicht verfügbar - regelbasiert.""" return { "regime": MarketRegime.RANGE_BOUND.value, "spread_multiplier": 1.0 + abs(context.imbalance) * 0.5, "inventory_bias": (context.inventory_target - context.inventory_ratio) * 0.5, "risk_level": "medium" } def calculate_base_quote( self, context: PricingContext, sentiment: dict ) -> Tuple[float, float, float]: """ Berechnet Basis-Bid/Ask unter Berücksichtigung aller Faktoren. Returns: (bid_price, ask_price, confidence) """ # 1. Spread-Anpassung spread_multiplier = sentiment.get("spread_multiplier", 1.0) spread_bps = min( self.base_spread_bps * spread_multiplier, self.max_spread_bps ) # Volatilitäts-Anpassung vol_adjustment = 1.0 + context.volatility * 2 spread_bps *= vol_adjustment # 2. Inventar-Skew inventory_bias = sentiment.get("inventory_bias", 0.0) # Bias aus Kontext (eigenes Inventar) inventory_delta = (self.inventory_target - context.inventory_ratio) * self.inventory_skew_factor total_bias = inventory_bias + inventory_delta # 3. Spread auf Basis-Preise aufteilen half_spread = (context.mid_price * spread_bps / 10000) / 2 # 4. Preise berechnen mit Bias # Positiver Bias = höhere Asks, niedrigere Bids (reduziere Long-Exposure) bid_price = context.mid_price - half_spread - (total_bias * half_spread) ask_price = context.mid_price + half_spread + (total_bias * half_spread) # 5. Confidence berechnen confidence = self._calculate_confidence(context, sentiment) return bid_price, ask_price, confidence def _calculate_confidence( self, context: PricingContext, sentiment: dict ) -> float: """Berechnet Confidence-Score (0-1) für die Quote.""" factors = [] # Depth Score (mehr Liquidität = höhere Confidence) factors.append(context.depth_score / 100) # Volatilität (hohe Vol = niedrigere Confidence) factors.append(max(0, 1 - context.volatility * 10)) # Sentiment Confidence risk = sentiment.get("risk_level", "medium") risk_scores = {"low": 1.0, "medium": 0.7, "high": 0.4} factors.append(risk_scores.get(risk, 0.5)) return sum(factors) / len(factors) async def generate_quote(self, context: PricingContext) -> Quote: """ Generiert vollständige Quote mit LLM-Sentiment-Analyse. """ # LLM-Analyse (kostengünstig mit DeepSeek V3.2) sentiment = await self.call_llm_for_sentiment(context) # Basis-Preise berechnen bid, ask, confidence = self.calculate_base_quote(context, sentiment) # Size berechnen (größer bei hoher Confidence) base_size = 0.1 size_multiplier = 0.5 + confidence * 0.5 quote_size = base_size * size_multiplier return Quote( symbol=context.symbol, bid_price=round(bid, 2), ask_price=round(ask, 2), bid_size=quote_size, ask_size=quote_size, timestamp=int(time.time() * 1000), confidence=round(confidence, 3), model_used=self.model ) def get_cost_report(self) -> dict: """Gibt Kostenbericht für LLM-Nutzung zurück.""" return { "requests": self.request_count, "total_cost_usd": round(self.cost_accumulator, 4), "avg_cost_per_request_usd": round( self.cost_accumulator / self.request_count, 4 ) if self.request_count > 0 else 0, "model": self.model, "cost_per_mtok": MODEL_COSTS[self.model]['input'] }

=== Benchmark-Test ===

async def benchmark_pricing_engine(): """Benchmark der Pricing Engine mit simulierten Marktdaten.""" import statistics engine = PricingEngine(model="deepseek-v3.2") # Simuliere 100 Pricing-Requests latencies = [] for i in range(100): context = PricingContext( symbol="BTC-USDT", mid_price=100000 + (i % 10) * 100, spread_bps=10.0, imbalance=(i % 20 - 10) / 10, volatility=0.001 * (1 + (i % 5)), depth_score=70 + (i % 30), inventory_ratio=0.3 + (i % 40) / 100, recent_pnl=100 * (1 if i % 2 == 0 else -1), time_of_day=(i % 24), regime=MarketRegime.RANGE_BOUND ) start = time.perf_counter() quote = await engine.generate_quote(context) latency = (time.perf_counter() - start) * 1000 latencies.append(latency) if i < 5: print(f"Quote {i+1}: Bid={quote.bid_price:.2f}, Ask={quote.ask_price:.2f}, " f"Confidence={quote.confidence:.2%}") report = engine.get_cost_report() print(f"\n{'='*50}") print(f"BENCHMARK RESULTS ({len(latencies)} requests)") print(f"{'='*50}") print(f"Model: {report['model']} (${report['cost_per_mtok']}/MTok)") print(f"Avg Latency: {statistics.mean(latencies):.1f}ms") print(f"P50 Latency: {statistics.median(latencies):.1f}ms") print(f"P95 Latency: {statistics.quantiles(latencies, n=20)[18]:.1f}ms") print(f"Total LLM Cost: ${report['total_cost_usd']:.4f}") print(f"Avg Cost/Request: ${report['avg_cost_per_request_usd']:.4f}") if __name__ == "__main__": asyncio.run(benchmark_pricing_engine())

3. InventoryController mit Risiko-Management

#!/usr/bin/env python3
"""
Inventory Controller - Bestandsverwaltung und Risikomanagement
Thread-safe Implementation mit Position-Limits und Drawdown-Schutz
"""
import asyncio
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from collections import deque
from threading import Lock
import numpy as np

@dataclass
class Position:
    symbol: str
    quantity: float  # Positiv = Long, Negativ = Short
    avg_entry_price: float
    current_price: float = 0.0
    unrealized_pnl: float = 0.0
    realized_pnl: float = 0.0
    
    def update_market_value(self, current_price: float) -> None:
        self.current_price = current_price
        cost = self.quantity * self.avg_entry_price
        market_value = self.quantity * current_price
        self.unrealized_pnl = market_value - cost
    
    def get_notional_value(self) -> float:
        return abs(self.quantity * (self.current_price or self.avg_entry_price))

@dataclass
class RiskLimits:
    max_position_size: float = 10.0  # Max Einheiten
    max_notional_exposure: float = 100000.0  # Max USD Exposure
    max_daily_loss: float = 1000.0  # Max Daily Drawdown
    max_position_concentration: float = 0.3  # Max % Portfolio pro Asset
    inventory_imbalance_limit: float = 0.4  # Max Abweichung von Neutral

@dataclass
class TradeRecord:
    timestamp: float
    symbol: str
    side: str  # "buy" or "sell"
    price: float
    quantity: float
    pnl_impact: float

class InventoryController:
    """
    Verwaltet Positionsbestände mit umfassendem Risikomanagement.
    Thread-safe für den Einsatz in Produktionsumgebungen.
    """
    
    def __init__(
        self,
        risk_limits: Optional[RiskLimits] = None,
        portfolio_value: float = 100000.0
    ):
        self.risk_limits = risk_limits or RiskLimits()
        self.portfolio_value = portfolio_value
        
        # Thread-safe Lock
        self._lock = Lock()
        
        # Positionen
        self.positions: Dict[str, Position] = {}
        
        # Trade History für P&L-Tracking
        self.trade_history: deque = deque(maxlen=10000)
        self.daily_trades: List[TradeRecord] = []
        self.daily_start_pnl: float = 0.0
        self.last_reset_date: str = ""
        
        # Statistiken
        self.total_realized_pnl: float = 0.0
        self.total_trades: int = 0
        
        # Inventory-Metriken
        self.inventory_history: deque = deque(maxlen=100)
    
    def _check_daily_reset(self) -> None:
        """Setzt tägliches P&L-Tracking zurück."""
        current_date = time.strftime("%Y-%m-%d")
        
        if current_date != self.last_reset_date:
            self.daily_trades = []
            self.daily_start_pnl = self.total_realized_pnl
            self.last_reset_date = current_date
    
    def _get_inventory_ratio(self, symbol: str) -> float:
        """Berechnet Inventar-Verhältnis für ein Symbol (0-1)."""
        position = self.positions.get(symbol)
        if not position:
            return 0.5  # Neutral
        
        max_pos = self.risk_limits.max_position_size
        return 0.5 + (position.quantity / max_pos) * 0.5
    
    def can_trade(
        self,
        symbol: str,
        side: str,
        quantity: float,
        price: float
    ) -> tuple[bool, str]:
        """
        Prüft ob Trade ausführbar ist basierend auf Risiko-Limits.
        Returns: (allowed, reason)
        """
        with self._lock:
            # Tägliche Reset-Prüfung
            self._check_daily_reset()
            
            # 1. Position Size Check
            new_quantity = quantity
            if symbol in self.positions:
                existing = self.positions[symbol].quantity
                if side == "buy":
                    new_quantity = existing + quantity
                else:
                    new_quantity = existing - quantity
            
            if abs(new_quantity) > self.risk_limits.max_position_size:
                return False, f"Position Size Limit: {abs(new_quantity)} > {self.risk_limits.max_position_size}"
            
            # 2. Notional Exposure Check
            notional = abs(new_quantity * price)
            if notional > self.risk_limits.max_notional_exposure:
                return False, f"Notional Limit: ${notional:,.0f} > ${self.risk_limits.max_notional_exposure:,.0f}"
            
            # 3. Concentration Check
            portfolio_pos_value = sum(p.get_notional_value() for p in self.positions.values())
            new_portfolio_value = portfolio_pos_value + notional
            
            if (notional / self.portfolio_value) > self.risk_limits.max_position_concentration:
                return False, f"Concentration Limit: {notional/self.portfolio_value:.1%} > {self.risk_limits.max_position_concentration:.1%}"
            
            # 4. Daily Loss Check
            daily_pnl = self.total_realized_pnl - self.daily_start_pnl
            if daily_pnl < -self.risk_limits.max_daily_loss:
                return False, f"Daily Loss Limit: ${daily_pnl:,.2f} < -${self.risk_limits.max_daily_loss:,.2f}"
            
            # 5. Inventory Imbalance Check
            inventory_ratio = self._get_inventory_ratio(symbol)
            imbalance = abs(inventory_ratio - 0.5)
            
            if side == "buy" and inventory_ratio > (0.5 + self.risk_limits.inventory_imbalance_limit):
                return False, f"Inventory Too Long: {inventory_ratio:.2f}"
            
            if side == "sell" and inventory_ratio < (0.5 - self.risk_limits.inventory_imbalance_limit):
                return False, f"Inventory Too Short: {inventory_ratio:.2f}"
            
            return True, "OK"
    
    def execute_trade(
        self,
        symbol: str,
        side: str,
        price: float,
        quantity: float
    ) -> tuple[bool, Position, str]:
        """
        Führt Trade aus und aktualisiert Position.
        Returns: (success, position, message)
        """
        with self._lock:
            # Trade-Validierung
            allowed, reason = self.can_trade(symbol, side, quantity, price)
            if not allowed:
                return False, self.positions.get(symbol), reason
            
            # Trade Record erstellen
            trade = TradeRecord(
                timestamp=time.time(),
                symbol=symbol,
                side=side,
                price=price,
                quantity=quantity,
                pnl_impact=0.0
            )
            
            # Position aktualisieren
            if symbol not in self.positions:
                self.positions[symbol] = Position(
                    symbol=symbol,
                    quantity=0,