In der Welt des algorithmischen Handels sind AI-gestützte Market-Making-Strategien zu einem der gefragtesten Themen geworden. Als leitender Engineer bei HolySheep AI habe ich in den letzten Jahren zahlreiche Produktionssysteme entwickelt und dabei wertvolle Erkenntnisse gesammelt. In diesem Tutorial zeige ich Ihnen, wie Sie eine vollständige Market-Making-Architektur aufbauen – von der订单簿analyse bis zur intelligenten Bestandsverwaltung.
Was ist ein AI-Market-Maker?
Ein Market Maker (MM) ist ein algorithmischer Akteur, der kontinuierlich Kauf- und Verkaufsorders in einem Orderbuch platziert. Das Ziel: Von der Bid-Ask-Spread-Differenz profitieren, während das Inventarrisiko minimiert wird. Traditionelle MM nutzten statistische Modelle; moderne AI-MM integrieren große Sprachmodelle für:
- Sentiment-Analyse aus Nachrichten und Social Media
- Prädiktive Spread-Anpassung basierend auf Volatilität
- Adaptive Inventaroptimierung in Echtzeit
- Anomalieerkennung für adversarische Marktsituationen
Architekturübersicht
Unsere Architektur besteht aus vier Kernkomponenten:
- OrderBookManager: Aggregiert Marktdaten und berechnet Spread-Metriken
- InventoryController: Verwaltet das digitale Inventar mit Risikolimits
- PricingEngine: Berechnet optimale Bid/Ask-Preise mittels AI
- ExecutionGateway: Interfaces zu Börsen-APIs mit Retry-Logik
Production-Ready Implementation
1. OrderBookManager mit WebSocket-Anbindung
#!/usr/bin/env python3
"""
AI Market Maker - Order Book Manager
Production-Ready mit WebSocket-Streaming und Orderbuch-Analyse
"""
import asyncio
import json
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from collections import deque
import numpy as np
@dataclass
class OrderBookLevel:
price: float
size: float
order_count: int = 0
@dataclass
class OrderBookSnapshot:
symbol: str
bids: List[OrderBookLevel] # Sortiert: höchster zuerst
asks: List[OrderBookLevel] # Sortiert: niedrigster zuerst
timestamp: int
sequence: int
class OrderBookManager:
"""
Verwaltet Orderbuchdaten mit Rolling-Window-Statistiken.
Berechnet Spread, Mid-Price, Depth-Score und Volatilität.
"""
def __init__(
self,
symbol: str,
max_depth: int = 20,
rolling_window: int = 100
):
self.symbol = symbol
self.max_depth = max_depth
self.rolling_window = rolling_window
# Aktueller Snapshot
self.current_bids: List[OrderBookLevel] = []
self.current_asks: List[OrderBookLevel] = []
self.last_sequence: int = 0
self.last_update: float = time.time()
# Historische Metriken (Rolling Window)
self.mid_price_history: deque = deque(maxlen=rolling_window)
self.spread_history: deque = deque(maxlen=rolling_window)
self.volume_imbalance_history: deque = deque(maxlen=rolling_window)
# Latenz-Tracking
self.update_latencies: deque = deque(maxlen=1000)
self._last_processing_time: float = 0
def update_snapshot(self, data: dict) -> None:
"""Verarbeitet eingehende Orderbuch-Updates."""
start = time.perf_counter()
# Parse bids und asks
bids = [
OrderBookLevel(
price=float(b['price']),
size=float(b['size']),
order_count=b.get('count', 1)
)
for b in data.get('bids', [])[:self.max_depth]
]
asks = [
OrderBookLevel(
price=float(a['price']),
size=float(a['size']),
order_count=a.get('count', 1)
)
for a in data.get('asks', [])[:self.max_depth]
]
self.current_bids = bids
self.current_asks = asks
self.last_sequence = data.get('sequence', self.last_sequence + 1)
self.last_update = time.time()
# Berechne und speichere Metriken
self._calculate_and_store_metrics()
# Latenz-Tracking
self._last_processing_time = (time.perf_counter() - start) * 1000
self.update_latencies.append(self._last_processing_time)
def _calculate_and_store_metrics(self) -> None:
"""Berechnet aktuelle Metriken und fügt sie dem History hinzu."""
if not self.current_bids or not self.current_asks:
return
best_bid = self.current_bids[0].price
best_ask = self.current_asks[0].price
mid_price = (best_bid + best_ask) / 2
spread = best_ask - best_bid
spread_pct = (spread / mid_price) * 100 if mid_price > 0 else 0
# Volume Imbalance
bid_volume = sum(b.size for b in self.current_bids[:5])
ask_volume = sum(a.size for a in self.current_asks[:5])
volume_imbalance = (bid_volume - ask_volume) / (bid_volume + ask_volume) \
if (bid_volume + ask_volume) > 0 else 0
self.mid_price_history.append(mid_price)
self.spread_history.append(spread_pct)
self.volume_imbalance_history.append(volume_imbalance)
# === Öffentliche Getter ===
@property
def best_bid(self) -> Optional[float]:
return self.current_bids[0].price if self.current_bids else None
@property
def best_ask(self) -> Optional[float]:
return self.current_asks[0].price if self.current_asks else None
@property
def mid_price(self) -> Optional[float]:
if self.best_bid and self.best_ask:
return (self.best_bid + self.best_ask) / 2
return None
@property
def spread(self) -> Optional[float]:
if self.best_bid and self.best_ask:
return self.best_ask - self.best_bid
return None
@property
def spread_pct(self) -> Optional[float]:
if self.mid_price and self.spread:
return (self.spread / self.mid_price) * 100
return None
def get_imbalance(self) -> float:
"""Gibt Volume-Imbalance im Bereich [-1, 1] zurück."""
if not self.current_bids or not self.current_asks:
return 0.0
bid_vol = sum(b.size for b in self.current_bids[:5])
ask_vol = sum(a.size for a in self.current_asks[:5])
total = bid_vol + ask_vol
return (bid_vol - ask_vol) / total if total > 0 else 0.0
def get_realized_volatility(self, window: int = 20) -> float:
"""Berechnet realisierte Volatilität aus Mid-Price-Historie."""
if len(self.mid_price_history) < window:
return 0.0
prices = list(self.mid_price_history)[-window:]
returns = np.diff(np.log(prices))
return float(np.std(returns) * np.sqrt(252 * 390)) # Annualisiert
def get_avg_latency_ms(self) -> float:
"""Durchschnittliche Update-Latenz in Millisekunden."""
if not self.update_latencies:
return 0.0
return sum(self.update_latencies) / len(self.update_latencies)
def get_depth_score(self) -> float:
"""Score 0-100 für Orderbuch-Tiefe (Liquidität)."""
if not self.current_bids or not self.current_asks:
return 0.0
# Gewichtete Tiefe (nähere Levels wichtiger)
bid_depth = sum(
b.size * (1 / (i + 1))
for i, b in enumerate(self.current_bids[:10])
)
ask_depth = sum(
a.size * (1 / (i + 1))
for i, a in enumerate(self.current_asks[:10])
)
avg_depth = (bid_depth + ask_depth) / 2
return min(100.0, avg_depth / 10) # Normalisiert
def to_dict(self) -> dict:
return {
"symbol": self.symbol,
"mid_price": self.mid_price,
"best_bid": self.best_bid,
"best_ask": self.best_ask,
"spread_pct": self.spread_pct,
"imbalance": self.get_imbalance(),
"volatility": self.get_realized_volatility(),
"depth_score": self.get_depth_score(),
"avg_latency_ms": self.get_avg_latency_ms(),
"sequence": self.last_sequence
}
=== WebSocket-Integration ===
import websockets
class WebSocketOrderBookFeed:
"""
Verbinder zu Börsen-WebSocket-APIs.
Unterstützt reconnection mit exponential backoff.
"""
def __init__(
self,
manager: OrderBookManager,
ws_url: str,
reconnect_max_retries: int = 10,
initial_backoff: float = 0.1
):
self.manager = manager
self.ws_url = ws_url
self.max_retries = reconnect_max_retries
self.backoff = initial_backoff
self.running = False
async def connect_and_subscribe(self) -> None:
"""Main loop mit automatischer Reconnection."""
retry_count = 0
while retry_count < self.max_retries and self.running:
try:
async with websockets.connect(self.ws_url) as ws:
retry_count = 0
self.backoff = 0.1 # Reset bei erfolgreicher Verbindung
# Subscribe-Nachricht senden
subscribe_msg = json.dumps({
"method": "subscribe",
"params": {"channels": [f"orderbook:{self.manager.symbol}"]},
"id": 1
})
await ws.send(subscribe_msg)
# Messages verarbeiten
async for message in ws:
data = json.loads(message)
if 'data' in data:
self.manager.update_snapshot(data['data'])
except (websockets.ConnectionClosed, OSError) as e:
retry_count += 1
wait_time = self.backoff * (2 ** retry_count)
print(f"[WS] Verbindung verloren, Retry {retry_count}/{self.max_retries} in {wait_time:.1f}s")
await asyncio.sleep(wait_time)
def start(self) -> None:
self.running = True
asyncio.run(self.connect_and_subscribe())
def stop(self) -> None:
self.running = False
if __name__ == "__main__":
# Demo mit simulierten Daten
manager = OrderBookManager("BTC-USDT", max_depth=20)
# Simuliere Orderbuch-Updates
for i in range(50):
manager.update_snapshot({
"sequence": i,
"bids": [
{"price": 100000 - i * 10, "size": 0.5 + i * 0.01, "count": 3},
{"price": 99990 - i * 10, "size": 1.2, "count": 5},
],
"asks": [
{"price": 100010 + i * 10, "size": 0.8, "count": 4},
{"price": 100020 + i * 10, "size": 1.5, "count": 2},
]
})
print(f"Mid-Price: ${manager.mid_price:,.2f}")
print(f"Spread: {manager.spread:.2f} ({manager.spread_pct:.4f}%)")
print(f"Imbalance: {manager.get_imbalance():.3f}")
print(f"Volatilität (ann.): {manager.get_realized_volatility():.4f}")
print(f"Avg Latency: {manager.get_avg_latency_ms():.3f}ms")
print(f"Depth Score: {manager.get_depth_score():.1f}")
2. AI-PricingEngine mit HolySheep AI Integration
#!/usr/bin/env python3
"""
AI Pricing Engine - Dynamic Pricing mit LLM-Integration
Nutzt HolySheep AI API für intelligente Spread-Anpassung
"""
import os
import json
import time
import asyncio
from dataclasses import dataclass
from typing import Optional, Tuple
from enum import Enum
import httpx
HolySheep AI Configuration
Registrieren Sie sich hier: https://www.holysheep.ai/register
HOLYSHEEP_API_KEY = os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY")
HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1"
Preisstruktur 2026 (USD per Million Tokens):
GPT-4.1: $8.00 | Claude Sonnet 4.5: $15.00 | Gemini 2.5 Flash: $2.50 | DeepSeek V3.2: $0.42
MODEL_COSTS = {
"gpt-4.1": {"input": 8.00, "output": 8.00},
"claude-sonnet-4.5": {"input": 15.00, "output": 15.00},
"gemini-2.5-flash": {"input": 2.50, "output": 2.50},
"deepseek-v3.2": {"input": 0.42, "output": 0.42}, # 85%+ günstiger!
}
class MarketRegime(Enum):
TRENDING_UP = "trending_up"
TRENDING_DOWN = "trending_down"
RANGE_BOUND = "range_bound"
VOLATILE = "volatile"
LOW_LIQUIDITY = "low_liquidity"
@dataclass
class PricingContext:
"""Kontext-Informationen für die Preisgestaltung."""
symbol: str
mid_price: float
spread_bps: float
imbalance: float # -1 to 1
volatility: float
depth_score: float
inventory_ratio: float # 0 to 1 (0.5 = neutral)
recent_pnl: float
time_of_day: int # Hour 0-23
regime: MarketRegime
@dataclass
class Quote:
"""Repräsentiert eineQuote-Offer."""
symbol: str
bid_price: float
ask_price: float
bid_size: float
ask_size: float
timestamp: int
confidence: float
model_used: str
class PricingEngine:
"""
Berechnet optimale Bid/Ask-Preise unter Berücksichtigung von:
- Orderbuch-Metriken
- Inventarrisiken
- Marktbedingungen
- LLM-basierter Sentiment-Analyse
"""
def __init__(
self,
api_key: str = HOLYSHEEP_API_KEY,
model: str = "deepseek-v3.2", # Kosteneffizient: $0.42/MTok
base_spread_bps: float = 10.0, # 0.10% Basis-Spread
max_spread_bps: float = 50.0,
inventory_target: float = 0.5,
inventory_skew_factor: float = 0.3
):
self.api_key = api_key
self.model = model
self.base_spread_bps = base_spread_bps
self.max_spread_bps = max_spread_bps
self.inventory_target = inventory_target
self.inventory_skew_factor = inventory_skew_factor
# Rate Limiting
self.request_count = 0
self.cost_accumulator = 0.0
self.last_request_time = 0
self.min_request_interval = 1.0 # Sekunden zwischen LLM-Calls
# HTTP Client mit Retry
self.client = httpx.AsyncClient(
timeout=30.0,
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5)
)
async def call_llm_for_sentiment(self, context: PricingContext) -> dict:
"""
Ruft HolySheep AI API auf für Marktsentiment-Analyse.
Kostengünstig: DeepSeek V3.2 mit $0.42/MTok.
"""
current_time = time.time()
# Throttle LLM-Requests
if current_time - self.last_request_time < self.min_request_interval:
await asyncio.sleep(self.min_request_interval - (current_time - self.last_request_time))
prompt = f"""Analysiere die aktuellen Marktbedingungen für {context.symbol}:
Marktdaten:
- Mid-Price: ${context.mid_price:,.2f}
- Spread: {context.spread_bps:.2f} bps
- Order-Imbalance: {context.imbalance:.3f} (negativ=mehr Verkäufer)
- Volatilität: {context.volatility:.4f}
- Liquiditäts-Score: {context.depth_score:.1f}/100
- Inventar-Verhältnis: {context.inventory_ratio:.2f} (1=long, 0=short)
- Tageszeit: {context.time_of_day}:00 UTC
Bestimme:
1. Markt-Regime (trending_up, trending_down, range_bound, volatile, low_liquidity)
2. Spread-Multiplikator (1.0-3.0, höher bei Unsicherheit)
3. Inventar-Bias (-0.5 to +0.5, negativ=platziere mehr Buys)
4. Risiko-Niveau (low, medium, high)
Antworte im JSON-Format:
{{"regime": "...", "spread_multiplier": 1.x, "inventory_bias": 0.x, "risk_level": "..."}}
"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": "Du bist ein erfahrener Market-Making-Analyst. Antworte nur mit JSON."},
{"role": "user", "content": prompt}
],
"temperature": 0.3,
"max_tokens": 200
}
try:
response = await self.client.post(
f"{HOLYSHEEP_BASE_URL}/chat/completions",
headers=headers,
json=payload
)
response.raise_for_status()
result = response.json()
self.request_count += 1
# Kosten berechnen
tokens_used = result.get('usage', {}).get('total_tokens', 0)
cost = (tokens_used / 1_000_000) * MODEL_COSTS[self.model]['input']
self.cost_accumulator += cost
self.last_request_time = time.time()
# Parse LLM-Response
content = result['choices'][0]['message']['content']
# Cleanup Markdown falls vorhanden
content = content.strip()
if content.startswith('```json'):
content = content[7:]
if content.endswith('```'):
content = content[:-3]
return json.loads(content)
except httpx.HTTPStatusError as e:
print(f"[LLM] HTTP Error: {e.response.status_code}")
return self._fallback_sentiment(context)
except json.JSONDecodeError as e:
print(f"[LLM] JSON Parse Error: {e}")
return self._fallback_sentiment(context)
except Exception as e:
print(f"[LLM] Unexpected Error: {e}")
return self._fallback_sentiment(context)
def _fallback_sentiment(self, context: PricingContext) -> dict:
"""Fallback wenn LLM nicht verfügbar - regelbasiert."""
return {
"regime": MarketRegime.RANGE_BOUND.value,
"spread_multiplier": 1.0 + abs(context.imbalance) * 0.5,
"inventory_bias": (context.inventory_target - context.inventory_ratio) * 0.5,
"risk_level": "medium"
}
def calculate_base_quote(
self,
context: PricingContext,
sentiment: dict
) -> Tuple[float, float, float]:
"""
Berechnet Basis-Bid/Ask unter Berücksichtigung aller Faktoren.
Returns: (bid_price, ask_price, confidence)
"""
# 1. Spread-Anpassung
spread_multiplier = sentiment.get("spread_multiplier", 1.0)
spread_bps = min(
self.base_spread_bps * spread_multiplier,
self.max_spread_bps
)
# Volatilitäts-Anpassung
vol_adjustment = 1.0 + context.volatility * 2
spread_bps *= vol_adjustment
# 2. Inventar-Skew
inventory_bias = sentiment.get("inventory_bias", 0.0)
# Bias aus Kontext (eigenes Inventar)
inventory_delta = (self.inventory_target - context.inventory_ratio) * self.inventory_skew_factor
total_bias = inventory_bias + inventory_delta
# 3. Spread auf Basis-Preise aufteilen
half_spread = (context.mid_price * spread_bps / 10000) / 2
# 4. Preise berechnen mit Bias
# Positiver Bias = höhere Asks, niedrigere Bids (reduziere Long-Exposure)
bid_price = context.mid_price - half_spread - (total_bias * half_spread)
ask_price = context.mid_price + half_spread + (total_bias * half_spread)
# 5. Confidence berechnen
confidence = self._calculate_confidence(context, sentiment)
return bid_price, ask_price, confidence
def _calculate_confidence(
self,
context: PricingContext,
sentiment: dict
) -> float:
"""Berechnet Confidence-Score (0-1) für die Quote."""
factors = []
# Depth Score (mehr Liquidität = höhere Confidence)
factors.append(context.depth_score / 100)
# Volatilität (hohe Vol = niedrigere Confidence)
factors.append(max(0, 1 - context.volatility * 10))
# Sentiment Confidence
risk = sentiment.get("risk_level", "medium")
risk_scores = {"low": 1.0, "medium": 0.7, "high": 0.4}
factors.append(risk_scores.get(risk, 0.5))
return sum(factors) / len(factors)
async def generate_quote(self, context: PricingContext) -> Quote:
"""
Generiert vollständige Quote mit LLM-Sentiment-Analyse.
"""
# LLM-Analyse (kostengünstig mit DeepSeek V3.2)
sentiment = await self.call_llm_for_sentiment(context)
# Basis-Preise berechnen
bid, ask, confidence = self.calculate_base_quote(context, sentiment)
# Size berechnen (größer bei hoher Confidence)
base_size = 0.1
size_multiplier = 0.5 + confidence * 0.5
quote_size = base_size * size_multiplier
return Quote(
symbol=context.symbol,
bid_price=round(bid, 2),
ask_price=round(ask, 2),
bid_size=quote_size,
ask_size=quote_size,
timestamp=int(time.time() * 1000),
confidence=round(confidence, 3),
model_used=self.model
)
def get_cost_report(self) -> dict:
"""Gibt Kostenbericht für LLM-Nutzung zurück."""
return {
"requests": self.request_count,
"total_cost_usd": round(self.cost_accumulator, 4),
"avg_cost_per_request_usd": round(
self.cost_accumulator / self.request_count, 4
) if self.request_count > 0 else 0,
"model": self.model,
"cost_per_mtok": MODEL_COSTS[self.model]['input']
}
=== Benchmark-Test ===
async def benchmark_pricing_engine():
"""Benchmark der Pricing Engine mit simulierten Marktdaten."""
import statistics
engine = PricingEngine(model="deepseek-v3.2")
# Simuliere 100 Pricing-Requests
latencies = []
for i in range(100):
context = PricingContext(
symbol="BTC-USDT",
mid_price=100000 + (i % 10) * 100,
spread_bps=10.0,
imbalance=(i % 20 - 10) / 10,
volatility=0.001 * (1 + (i % 5)),
depth_score=70 + (i % 30),
inventory_ratio=0.3 + (i % 40) / 100,
recent_pnl=100 * (1 if i % 2 == 0 else -1),
time_of_day=(i % 24),
regime=MarketRegime.RANGE_BOUND
)
start = time.perf_counter()
quote = await engine.generate_quote(context)
latency = (time.perf_counter() - start) * 1000
latencies.append(latency)
if i < 5:
print(f"Quote {i+1}: Bid={quote.bid_price:.2f}, Ask={quote.ask_price:.2f}, "
f"Confidence={quote.confidence:.2%}")
report = engine.get_cost_report()
print(f"\n{'='*50}")
print(f"BENCHMARK RESULTS ({len(latencies)} requests)")
print(f"{'='*50}")
print(f"Model: {report['model']} (${report['cost_per_mtok']}/MTok)")
print(f"Avg Latency: {statistics.mean(latencies):.1f}ms")
print(f"P50 Latency: {statistics.median(latencies):.1f}ms")
print(f"P95 Latency: {statistics.quantiles(latencies, n=20)[18]:.1f}ms")
print(f"Total LLM Cost: ${report['total_cost_usd']:.4f}")
print(f"Avg Cost/Request: ${report['avg_cost_per_request_usd']:.4f}")
if __name__ == "__main__":
asyncio.run(benchmark_pricing_engine())
3. InventoryController mit Risiko-Management
#!/usr/bin/env python3
"""
Inventory Controller - Bestandsverwaltung und Risikomanagement
Thread-safe Implementation mit Position-Limits und Drawdown-Schutz
"""
import asyncio
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from collections import deque
from threading import Lock
import numpy as np
@dataclass
class Position:
symbol: str
quantity: float # Positiv = Long, Negativ = Short
avg_entry_price: float
current_price: float = 0.0
unrealized_pnl: float = 0.0
realized_pnl: float = 0.0
def update_market_value(self, current_price: float) -> None:
self.current_price = current_price
cost = self.quantity * self.avg_entry_price
market_value = self.quantity * current_price
self.unrealized_pnl = market_value - cost
def get_notional_value(self) -> float:
return abs(self.quantity * (self.current_price or self.avg_entry_price))
@dataclass
class RiskLimits:
max_position_size: float = 10.0 # Max Einheiten
max_notional_exposure: float = 100000.0 # Max USD Exposure
max_daily_loss: float = 1000.0 # Max Daily Drawdown
max_position_concentration: float = 0.3 # Max % Portfolio pro Asset
inventory_imbalance_limit: float = 0.4 # Max Abweichung von Neutral
@dataclass
class TradeRecord:
timestamp: float
symbol: str
side: str # "buy" or "sell"
price: float
quantity: float
pnl_impact: float
class InventoryController:
"""
Verwaltet Positionsbestände mit umfassendem Risikomanagement.
Thread-safe für den Einsatz in Produktionsumgebungen.
"""
def __init__(
self,
risk_limits: Optional[RiskLimits] = None,
portfolio_value: float = 100000.0
):
self.risk_limits = risk_limits or RiskLimits()
self.portfolio_value = portfolio_value
# Thread-safe Lock
self._lock = Lock()
# Positionen
self.positions: Dict[str, Position] = {}
# Trade History für P&L-Tracking
self.trade_history: deque = deque(maxlen=10000)
self.daily_trades: List[TradeRecord] = []
self.daily_start_pnl: float = 0.0
self.last_reset_date: str = ""
# Statistiken
self.total_realized_pnl: float = 0.0
self.total_trades: int = 0
# Inventory-Metriken
self.inventory_history: deque = deque(maxlen=100)
def _check_daily_reset(self) -> None:
"""Setzt tägliches P&L-Tracking zurück."""
current_date = time.strftime("%Y-%m-%d")
if current_date != self.last_reset_date:
self.daily_trades = []
self.daily_start_pnl = self.total_realized_pnl
self.last_reset_date = current_date
def _get_inventory_ratio(self, symbol: str) -> float:
"""Berechnet Inventar-Verhältnis für ein Symbol (0-1)."""
position = self.positions.get(symbol)
if not position:
return 0.5 # Neutral
max_pos = self.risk_limits.max_position_size
return 0.5 + (position.quantity / max_pos) * 0.5
def can_trade(
self,
symbol: str,
side: str,
quantity: float,
price: float
) -> tuple[bool, str]:
"""
Prüft ob Trade ausführbar ist basierend auf Risiko-Limits.
Returns: (allowed, reason)
"""
with self._lock:
# Tägliche Reset-Prüfung
self._check_daily_reset()
# 1. Position Size Check
new_quantity = quantity
if symbol in self.positions:
existing = self.positions[symbol].quantity
if side == "buy":
new_quantity = existing + quantity
else:
new_quantity = existing - quantity
if abs(new_quantity) > self.risk_limits.max_position_size:
return False, f"Position Size Limit: {abs(new_quantity)} > {self.risk_limits.max_position_size}"
# 2. Notional Exposure Check
notional = abs(new_quantity * price)
if notional > self.risk_limits.max_notional_exposure:
return False, f"Notional Limit: ${notional:,.0f} > ${self.risk_limits.max_notional_exposure:,.0f}"
# 3. Concentration Check
portfolio_pos_value = sum(p.get_notional_value() for p in self.positions.values())
new_portfolio_value = portfolio_pos_value + notional
if (notional / self.portfolio_value) > self.risk_limits.max_position_concentration:
return False, f"Concentration Limit: {notional/self.portfolio_value:.1%} > {self.risk_limits.max_position_concentration:.1%}"
# 4. Daily Loss Check
daily_pnl = self.total_realized_pnl - self.daily_start_pnl
if daily_pnl < -self.risk_limits.max_daily_loss:
return False, f"Daily Loss Limit: ${daily_pnl:,.2f} < -${self.risk_limits.max_daily_loss:,.2f}"
# 5. Inventory Imbalance Check
inventory_ratio = self._get_inventory_ratio(symbol)
imbalance = abs(inventory_ratio - 0.5)
if side == "buy" and inventory_ratio > (0.5 + self.risk_limits.inventory_imbalance_limit):
return False, f"Inventory Too Long: {inventory_ratio:.2f}"
if side == "sell" and inventory_ratio < (0.5 - self.risk_limits.inventory_imbalance_limit):
return False, f"Inventory Too Short: {inventory_ratio:.2f}"
return True, "OK"
def execute_trade(
self,
symbol: str,
side: str,
price: float,
quantity: float
) -> tuple[bool, Position, str]:
"""
Führt Trade aus und aktualisiert Position.
Returns: (success, position, message)
"""
with self._lock:
# Trade-Validierung
allowed, reason = self.can_trade(symbol, side, quantity, price)
if not allowed:
return False, self.positions.get(symbol), reason
# Trade Record erstellen
trade = TradeRecord(
timestamp=time.time(),
symbol=symbol,
side=side,
price=price,
quantity=quantity,
pnl_impact=0.0
)
# Position aktualisieren
if symbol not in self.positions:
self.positions[symbol] = Position(
symbol=symbol,
quantity=0,
Verwandte Ressourcen
Verwandte Artikel