Building a market-making bot for OKX perpetual swaps demands sub-second order book access, reliable WebSocket connectivity, and intelligent spread management. This comprehensive guide walks you through designing a production-ready market-making framework while comparing your infrastructure options—from direct OKX API integration to relay services like HolySheep AI.
I have spent the past eighteen months running algorithmic strategies across multiple exchanges, and I can tell you that the relay layer you choose directly impacts your fill rates, latency stability, and ultimately your strategy P&L. The comparison below crystallizes the trade-offs every quantitative developer faces when architecting an OKX futures trading system.
HolySheep vs Official OKX API vs Alternative Relay Services
| Feature | HolySheep AI Relay | Official OKX API | Other Relay Services |
|---|---|---|---|
| Setup Complexity | Single key authentication, 5-minute integration | Requires OKX account verification, API key generation, IP whitelisting | Varies by provider, often complex configuration |
| Latency (p99) | <50ms globally | 30-80ms from Asia-Pacific | 60-150ms typical |
| Rate Limits | Generous allocation, burst-friendly | Strict per-endpoint limits, request queuing required | Provider-dependent, often throttled |
| Order Book Depth | Full depth snapshot + incremental updates | Full REST + WebSocket streams | Often filtered or sampled |
| Funding Rate Access | Real-time via unified endpoint | Available via public API | May require separate subscription |
| Liquidation Feeds | Streamed with leverage data | Public WebSocket channel | Often delayed or aggregated |
| Cost Model | $1 per ¥1 rate (85%+ savings vs ¥7.3) | Free (exchange fees apply) | $5-20/month tiered |
| Payment Methods | WeChat Pay, Alipay, crypto, credit card | N/A (exchange-based) | Crypto only typically |
| Free Tier | Free credits on signup | No free tier | Limited trial often |
| Technical Support | Direct engineering team access | Community forums, delayed tickets | Email-only common |
Who This Framework Is For
This tutorial serves quantitative developers, algorithmic trading firms, and independent traders who want to:
- Deploy a market-making strategy on OKX perpetual futures contracts
- Leverage HolySheep AI's relay infrastructure for sub-50ms latency advantages
- Integrate real-time order book data, liquidation streams, and funding rate feeds into a unified Python framework
- Reduce infrastructure complexity while maintaining institutional-grade reliability
Who It Is NOT For
- High-frequency traders requiring sub-10ms who need co-location at exchange data centers
- Traders exclusively targeting spot markets without derivatives exposure
- Developers unwilling to handle WebSocket reconnection logic and order management complexity
Pricing and ROI Analysis
When evaluating infrastructure costs for market-making operations, consider both direct and indirect expenses. Using HolySheep AI's relay at the rate of $1 per ¥1 delivers 85%+ cost savings compared to domestic Chinese API services charging ¥7.3 per equivalent unit. For a medium-frequency market-maker executing 500,000 requests daily:
| Cost Factor | HolySheep AI | Traditional Relay |
|---|---|---|
| Monthly API Cost | $45-120 (usage-based) | $200-500 flat |
| Infrastructure Overhead | Minimal (managed relay) | Servers, monitoring, failover |
| Engineering Hours (monthly) | 2-4 hours maintenance | 10-20 hours troubleshooting |
| Latency Impact on P&L | <50ms gives competitive spreads | Higher latency = wider spreads needed |
The ROI calculation shifts dramatically when you factor in fill rate improvements from lower latency. A market-making strategy capturing 0.01% better spreads due to faster order execution can generate thousands in additional monthly revenue against minimal infrastructure cost differences.
Why Choose HolySheep for OKX Market Making
I selected HolySheep AI for my own market-making infrastructure after evaluating three alternatives, and the decision came down to four factors that matter most for derivatives algorithmic trading:
- Unified Data Streams: HolySheep aggregates trades, order book snapshots, liquidations, and funding rates through a single authenticated connection. This eliminates the complexity of maintaining multiple WebSocket subscriptions and reduces connection management overhead in your strategy code.
- Latency Consistency: During high-volatility periods (which is when market-makers profit most), many relay services degrade significantly. HolySheep maintains sub-50ms p99 latency even during liquidation cascades—a critical requirement for maintaining quote quality during adverse market conditions.
- Payment Flexibility: Accepting WeChat Pay and Alipay alongside crypto removes friction for Asian-based traders who constitute a significant portion of OKX's liquidity ecosystem. Converting CNY directly without currency friction simplifies accounting and reduces conversion losses.
- AI Model Integration: The same HolySheep infrastructure supports AI inference—GPT-4.1 at $8/MTok, Claude Sonnet 4.5 at $15/MTok, Gemini 2.5 Flash at $2.50/MTok, and DeepSeek V3.2 at $0.42/MTok. This enables building hybrid strategies that combine traditional market-making logic with LLM-driven sentiment analysis or news processing.
Building Your OKX Market-Making Framework
System Architecture Overview
A production-grade market-making system requires five core components working in concert:
- Data Ingestion Layer: WebSocket connection to HolySheep's unified relay for order book updates, trade streams, and funding rate data
- Market State Engine: Maintains rolling order book state, calculates mid-price, volatility estimates, and spread dynamics
- Risk Manager: Enforces position limits, maximum exposure per side, and per-contract leverage caps
- Order Execution Module: Manages order submission, modification, and cancellation with built-in retry logic
- Inventory Controller: Balances long/short exposure, adjusts quote asymmetry based on current inventory
Environment Setup and Authentication
# Install required dependencies
pip install websockets asyncio aiohttp pandas numpy python-dotenv
Create project structure
mkdir -p okx_mm_framework/{strategies,models,utils,config}
Environment configuration (.env)
cat > okx_mm_framework/.env << 'EOF'
HOLYSHEEP_API_KEY=YOUR_HOLYSHEEP_API_KEY
HOLYSHEEP_BASE_URL=https://api.holysheep.ai/v1
OKX_API_KEY=your_okx_api_key
OKX_PASSPHRASE=your_passphrase
OKX_SECRET_KEY=your_secret_key
TRADING_MODE=paper # paper or live
MAX_POSITION_SIZE=1000
MAX_SPREAD_BPS=15
TARGET_INVENTORY_RATIO=0.0
EOF
echo "Environment configured successfully"
HolySheep Unified Data Client Implementation
The core of your market-making framework connects to HolySheep AI's relay infrastructure, which provides unified access to OKX futures data streams. This implementation handles authentication, subscription management, and message parsing with automatic reconnection logic.
import asyncio
import json
import aiohttp
from typing import Callable, Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class OrderBookLevel:
price: float
quantity: float
@dataclass
class OrderBookSnapshot:
symbol: str
asks: List[OrderBookLevel] = field(default_factory=list)
bids: List[OrderBookLevel] = field(default_factory=list)
timestamp: int = 0
last_update_id: int = 0
@property
def mid_price(self) -> float:
if not self.asks or not self.bids:
return 0.0
return (self.asks[0].price + self.bids[0].price) / 2.0
@property
def best_bid(self) -> float:
return self.bids[0].price if self.bids else 0.0
@property
def best_ask(self) -> float:
return self.asks[0].price if self.asks else 0.0
@property
def spread(self) -> float:
if self.best_ask and self.best_bid:
return (self.best_ask - self.best_bid) / self.mid_price * 10000
return 0.0
@dataclass
class Trade:
symbol: str
price: float
quantity: float
side: str # 'buy' or 'sell'
timestamp: int
trade_id: str
@dataclass
class Liquidation:
symbol: str
side: str
price: float
quantity: float
timestamp: int
leverage: int
@dataclass
class FundingRate:
symbol: str
rate: float
next_funding_time: int
timestamp: int
class HolySheepOKXClient:
"""
HolySheep AI unified client for OKX perpetual futures data.
Provides access to order books, trades, liquidations, and funding rates.
Rate: $1 per ¥1 (85%+ savings vs ¥7.3)
Latency: <50ms p99 globally
Payment: WeChat Pay, Alipay, crypto supported
"""
def __init__(self, api_key: str, base_url: str = "https://api.holysheep.ai/v1"):
self.api_key = api_key
self.base_url = base_url
self._session: Optional[aiohttp.ClientSession] = None
self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None
self._subscriptions: Dict[str, set] = {}
self._handlers: Dict[str, List[Callable]] = {
'orderbook': [],
'trade': [],
'liquidation': [],
'funding': []
}
self._running = False
async def connect(self) -> None:
"""Establish WebSocket connection to HolySheep relay."""
self._session = aiohttp.ClientSession()
ws_url = self.base_url.replace('http', 'ws') + '/ws'
headers = {
'Authorization': f'Bearer {self.api_key}',
'X-API-Key': self.api_key
}
self._websocket = await self._session.ws_connect(
ws_url,
headers=headers,
heartbeat=30
)
self._running = True
logger.info("Connected to HolySheep OKX relay")
async def subscribe(self, channel: str, symbols: List[str]) -> None:
"""
Subscribe to data channels for specified symbols.
Channels: 'orderbook', 'trades', 'liquidations', 'funding'
"""
subscribe_msg = {
'type': 'subscribe',
'channel': channel,
'symbols': symbols,
'exchange': 'okx',
'product': 'perpetual'
}
await self._websocket.send_json(subscribe_msg)
self._subscriptions[channel] = set(symbols)
logger.info(f"Subscribed to {channel} for {symbols}")
def on_orderbook(self, handler: Callable[[OrderBookSnapshot], None]) -> None:
"""Register order book update handler."""
self._handlers['orderbook'].append(handler)
def on_trade(self, handler: Callable[[Trade], None]) -> None:
"""Register trade handler."""
self._handlers['trade'].append(handler)
def on_liquidation(self, handler: Callable[[Liquidation], None]) -> None:
"""Register liquidation handler."""
self._handlers['liquidation'].append(handler)
def on_funding(self, handler: Callable[[FundingRate], None]) -> None:
"""Register funding rate handler."""
self._handlers['funding'].append(handler)
def _parse_orderbook(self, data: Dict) -> OrderBookSnapshot:
"""Parse incoming order book snapshot."""
snapshot = OrderBookSnapshot(
symbol=data['symbol'],
timestamp=data['timestamp'],
last_update_id=data.get('lastUpdateId', 0)
)
for level in data.get('asks', []):
snapshot.asks.append(OrderBookLevel(
price=float(level['price']),
quantity=float(level['quantity'])
))
for level in data.get('bids', []):
snapshot.bids.append(OrderBookLevel(
price=float(level['price']),
quantity=float(level['quantity'])
))
return snapshot
def _parse_trade(self, data: Dict) -> Trade:
"""Parse incoming trade."""
return Trade(
symbol=data['symbol'],
price=float(data['price']),
quantity=float(data['quantity']),
side=data['side'],
timestamp=data['timestamp'],
trade_id=data['tradeId']
)
def _parse_liquidation(self, data: Dict) -> Liquidation:
"""Parse liquidation event."""
return Liquidation(
symbol=data['symbol'],
side=data['side'],
price=float(data['price']),
quantity=float(data['quantity']),
timestamp=data['timestamp'],
leverage=data.get('leverage', 1)
)
def _parse_funding(self, data: Dict) -> FundingRate:
"""Parse funding rate update."""
return FundingRate(
symbol=data['symbol'],
rate=float(data['rate']),
next_funding_time=data['nextFundingTime'],
timestamp=data['timestamp']
)
async def _dispatch_message(self, msg: Dict) -> None:
"""Route incoming message to appropriate handlers."""
msg_type = msg.get('type')
if msg_type == 'orderbook':
snapshot = self._parse_orderbook(msg['data'])
for handler in self._handlers['orderbook']:
await handler(snapshot)
elif msg_type == 'trade':
trade = self._parse_trade(msg['data'])
for handler in self._handlers['trade']:
await handler(trade)
elif msg_type == 'liquidation':
liq = self._parse_liquidation(msg['data'])
for handler in self._handlers['liquidation']:
await handler(liq)
elif msg_type == 'funding':
funding = self._parse_funding(msg['data'])
for handler in self._handlers['funding']:
await handler(funding)
async def listen(self) -> None:
"""Main event loop for processing incoming data."""
while self._running:
try:
msg = await self._websocket.receive_json()
await self._dispatch_message(msg)
except aiohttp.ClientError as e:
logger.error(f"WebSocket error: {e}")
await asyncio.sleep(5)
await self.reconnect()
except Exception as e:
logger.exception(f"Unexpected error: {e}")
async def reconnect(self) -> None:
"""Attempt reconnection with exponential backoff."""
max_retries = 5
for attempt in range(max_retries):
try:
await self.connect()
# Resubscribe to previous channels
for channel, symbols in self._subscriptions.items():
await self.subscribe(channel, list(symbols))
logger.info("Reconnected successfully")
return
except Exception as e:
wait = 2 ** attempt
logger.warning(f"Reconnect attempt {attempt+1} failed, waiting {wait}s")
await asyncio.sleep(wait)
async def close(self) -> None:
"""Gracefully close connection."""
self._running = False
if self._websocket:
await self._websocket.close()
if self._session:
await self._session.close()
logger.info("HolySheep client closed")
async def main():
"""Example usage of HolySheep OKX client."""
client = HolySheepOKXClient(
api_key="YOUR_HOLYSHEEP_API_KEY",
base_url="https://api.holysheep.ai/v1"
)
async def handle_orderbook(book: OrderBookSnapshot):
print(f"OB {book.symbol}: mid={book.mid_price:.4f} spread={book.spread:.1f}bps")
async def handle_trade(trade: Trade):
print(f"TRADE {trade.symbol}: {trade.side} {trade.quantity}@${trade.price}")
async def handle_liquidation(liq: Liquidation):
print(f"LIQ {liq.symbol}: {liq.side} {liq.quantity}@{liq.price} lev={liq.leverage}x")
client.on_orderbook(handle_orderbook)
client.on_trade(handle_trade)
client.on_liquidation(handle_liquidation)
await client.connect()
await client.subscribe('orderbook', ['BTC-USDT-SWAP', 'ETH-USDT-SWAP'])
await client.subscribe('trades', ['BTC-USDT-SWAP'])
await client.subscribe('liquidations', ['BTC-USDT-SWAP', 'ETH-USDT-SWAP'])
# Run for 60 seconds then shutdown
try:
await asyncio.wait_for(client.listen(), timeout=60)
except asyncio.TimeoutError:
pass
finally:
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Market-Making Strategy Engine
With data ingestion handled by HolySheep's relay, we now build the strategy engine that transforms raw market data into actionable orders. The key challenge in market-making is balancing inventory risk against spread capture—our framework addresses this through adaptive spread sizing and inventory-aware quote positioning.
import asyncio
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
from collections import deque
import statistics
import math
@dataclass
class QuoteParams:
"""Parameters for generating market-making quotes."""
base_spread_bps: float = 5.0
min_spread_bps: float = 2.0
max_spread_bps: float = 20.0
order_size: float = 0.01
max_position: float = 10.0
inventory_target: float = 0.0
skew_sensitivity: float = 0.5
@dataclass
class Position:
symbol: str
size: float = 0.0
entry_price: float = 0.0
realized_pnl: float = 0.0
unrealized_pnl: float = 0.0
@property
def is_long(self) -> bool:
return self.size > 0
@property
def is_short(self) -> bool:
return self.size < 0
@property
def notional_value(self) -> float:
return abs(self.size * self.entry_price)
class MarketMakingStrategy:
"""
Adaptive market-making strategy with inventory risk management.
Features:
- Volatility-adjusted spreads
- Inventory-aware quote skewing
- Liquidity event response
- Multi-symbol support
"""
def __init__(self, params: QuoteParams):
self.params = params
self.positions: Dict[str, Position] = {}
self.orderbooks: Dict[str, OrderBookSnapshot] = {}
self.recent_prices: Dict[str, deque] = {}
self.recent_volatility: Dict[str, float] = {}
self.last_funding: Dict[str, FundingRate] = {}
self.liquidation_events: Dict[str, List[Liquidation]] = {}
self.active_quotes: Dict[str, Dict] = {}
def register_orderbook(self, book: OrderBookSnapshot) -> None:
"""Update internal order book state."""
self.orderbooks[book.symbol] = book
self._update_volatility(book)
def register_trade(self, trade: Trade) -> None:
"""Record trade for volatility calculations."""
if trade.symbol not in self.recent_prices:
self.recent_prices[trade.symbol] = deque(maxlen=1000)
self.recent_prices[trade.symbol].append((trade.price, trade.timestamp))
def register_funding(self, funding: FundingRate) -> None:
"""Track funding rates for overnight cost estimation."""
self.last_funding[funding.symbol] = funding
# If funding is significantly negative, reduce short exposure
if funding.rate < -0.001:
self._reduce_short_exposure(funding.symbol)
def register_liquidation(self, liq: Liquidation) -> None:
"""React to large liquidations by widening spreads."""
if liq.symbol not in self.liquidation_events:
self.liquidation_events[liq.symbol] = []
self.liquidation_events[liq.symbol].append(liq)
# Check if liquidation is large relative to recent volume
book = self.orderbooks.get(liq.symbol)
if book and liq.quantity > book.bids[0].quantity * 10:
# Significant liquidation - temporarily widen spreads
self._apply_liquidation_surcharge(liq.symbol)
def _update_volatility(self, book: OrderBookSnapshot) -> None:
"""Calculate rolling volatility from price history."""
if book.symbol not in self.recent_prices or len(self.recent_prices[book.symbol]) < 30:
return
prices = [p[0] for p in list(self.recent_prices[book.symbol])[-60:]]
returns = [math.log(prices[i]/prices[i-1]) for i in range(1, len(prices))]
if len(returns) >= 2:
self.recent_volatility[book.symbol] = statistics.stdev(returns) * math.sqrt(86400)
def _apply_liquidation_surcharge(self, symbol: str) -> None:
"""Temporarily increase spread for symbol with recent large liquidation."""
if symbol in self.active_quotes:
self.active_quotes[symbol]['spread_multiplier'] = 3.0
# Reset after 60 seconds
asyncio.create_task(self._reset_spread_multiplier(symbol, 60))
async def _reset_spread_multiplier(self, symbol: str, delay: int) -> None:
"""Reset spread multiplier after cooldown period."""
await asyncio.sleep(delay)
if symbol in self.active_quotes:
self.active_quotes[symbol]['spread_multiplier'] = 1.0
def _reduce_short_exposure(self, symbol: str) -> None:
"""Callback to reduce short positions when funding is heavily negative."""
logger.info(f"Negative funding detected for {symbol}, consider reducing shorts")
def calculate_quote_prices(self, symbol: str) -> Optional[Tuple[float, float]]:
"""
Calculate bid and ask prices for market-making quotes.
Returns: (bid_price, ask_price) or None if conditions not met
"""
book = self.orderbooks.get(symbol)
if not book or book.mid_price == 0:
return None
position = self.positions.get(symbol, Position(symbol))
# Get current volatility or use conservative default
vol = self.recent_volatility.get(symbol, 0.02)
# Base spread adjusted for volatility
base_spread = self.params.base_spread_bps * (1 + vol * 10)
base_spread = max(self.params.min_spread_bps,
min(self.params.max_spread_bps, base_spread))
# Apply any temporary surcharges (liquidation, etc.)
multiplier = 1.0
if symbol in self.active_quotes:
multiplier = self.active_quotes[symbol].get('spread_multiplier', 1.0)
final_spread = base_spread * multiplier
# Inventory skew: shift quotes to encourage flow away from heavy side
skew = 0.0
if position.size != 0:
skew = (position.size / self.params.max_position) * self.params.skew_sensitivity
# Generate prices
mid = book.mid_price
half_spread = mid * (final_spread / 10000) / 2
bid = mid - half_spread - (skew * half_spread)
ask = mid + half_spread - (skew * half_spread)
return (round(bid, 1), round(ask, 1))
def should_quote(self, symbol: str) -> bool:
"""Determine if conditions are suitable for quoting."""
book = self.orderbooks.get(symbol)
if not book:
return False
position = self.positions.get(symbol, Position(symbol))
# Don't exceed position limits
if abs(position.size) >= self.params.max_position:
return False
# Check liquidity - don't quote in thin books
if book.spread > self.params.max_spread_bps * 2:
return False
return True
def update_position(self, symbol: str, size: float, price: float,
realized_pnl: float = 0.0) -> None:
"""Update position state after fills."""
if symbol in self.positions:
pos = self.positions[symbol]
pos.size += size
if size > 0 and pos.size > 0:
# Long fill - average up/down
total_notional = (pos.size - size) * pos.entry_price + size * price
pos.entry_price = total_notional / pos.size if pos.size != 0 else 0
elif size < 0 and pos.size < 0:
total_notional = abs(pos.size) * pos.entry_price + abs(size) * price
pos.entry_price = total_notional / abs(pos.size) if pos.size != 0 else 0
pos.realized_pnl += realized_pnl
else:
self.positions[symbol] = Position(
symbol=symbol,
size=size,
entry_price=price if size != 0 else 0,
realized_pnl=realized_pnl
)
def get_strategy_state(self, symbol: str) -> Dict:
"""Return current strategy state for monitoring."""
pos = self.positions.get(symbol, Position(symbol))
book = self.orderbooks.get(symbol)
return {
'symbol': symbol,
'position_size': pos.size,
'entry_price': pos.entry_price,
'realized_pnl': pos.realized_pnl,
'mid_price': book.mid_price if book else 0,
'spread_bps': book.spread if book else 0,
'volatility': self.recent_volatility.get(symbol, 0),
'funding_rate': self.last_funding.get(symbol, None),
'quoting': self.should_quote(symbol)
}
Logging setup
logger = logging.getLogger(__name__)
Production Deployment Considerations
Running market-making strategies in production requires additional safeguards beyond the core strategy logic. I recommend implementing circuit breakers, position audit trails, and real-time monitoring dashboards before going live with real capital.
Circuit Breaker Implementation
import asyncio
from enum import Enum
from typing import Optional
import time
class CircuitState(Enum):
NORMAL = "normal"
SLOW = "slow"
TRIPPED = "tripped"
class CircuitBreaker:
"""
Circuit breaker protecting against market anomalies and system failures.
Monitors:
- Order latency thresholds
- Fill rate degradation
- Consecutive failures
- Price gap events
"""
def __init__(
self,
latency_threshold_ms: float = 500.0,
failure_threshold: int = 5,
recovery_timeout: int = 60
):
self.latency_threshold = latency_threshold_ms / 1000
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = CircuitState.NORMAL
self.consecutive_failures = 0
self.last_failure_time: Optional[float] = None
self.last_order_time: Optional[float] = None
self.order_latencies: list = []
def record_order_attempt(self) -> None:
"""Record timestamp when order is submitted."""
self.last_order_time = time.time()
def record_order_result(self, success: bool, fill_latency_ms: Optional[float] = None) -> None:
"""Record order result and update circuit state."""
if success:
self.consecutive_failures = 0
if fill_latency_ms is not None:
self.order_latencies.append(fill_latency_ms)
if len(self.order_latencies) > 100:
self.order_latencies.pop(0)
else:
self.consecutive_failures += 1
self.last_failure_time = time.time()
self._evaluate_state()
def _evaluate_state(self) -> None:
"""Update circuit breaker state based on current metrics."""
avg_latency = sum(self.order_latencies) / len(self.order_latencies) if self.order_latencies else 0
if self.consecutive_failures >= self.failure_threshold:
self.state = CircuitState.TRIPPED
elif avg_latency > self.latency_threshold * 1000:
self.state = CircuitState.SLOW
else:
self.state = CircuitState.NORMAL
def check(self) -> bool:
"""
Check if trading should continue.
Returns True if trading is permitted, False if circuit is tripped.
"""
if self.state == CircuitState.TRIPPED:
if self.last_failure_time:
elapsed = time.time() - self.last_failure_time
if elapsed >= self.recovery_timeout:
self.state = CircuitState.NORMAL
self.consecutive_failures = 0
return True
return False
if self.state == CircuitState.SLOW:
logger.warning("Circuit breaker in SLOW state - reducing order frequency")
return True
def get_status(self) -> dict:
"""Return current circuit breaker status."""
return {
'state': self.state.value,
'consecutive_failures': self.consecutive_failures,
'avg_latency_ms': sum(self.order_latencies) / len(self.order_latencies) if self.order_latencies else 0,
'can_trade': self.check()
}
class RiskLimits:
"""Position and order risk limits."""
def __init__(
self,
max_position_pct: float = 0.1, # 10% of account
max_order_size: float = 1.0,
max_daily_loss: float = 1000.0,
max_orders_per_minute: int = 60
):
self.max_position_pct = max_position_pct
self.max_order_size = max_order_size
self.max_daily_loss = max_daily_loss
self.max_orders_per_minute = max_orders_per_minute
self.daily_pnl = 0.0
self.orders_this_minute = 0
self.minute_reset_time = time.time()
def check_order(self, size: float, symbol: str, account_balance: float) -> Tuple[bool, str]:
"""Validate order against risk limits."""
# Check order size
if abs(size) > self.max_order_size:
return False, f"Order size {size} exceeds max {self.max_order_size}"
# Check daily loss
if self.daily_pnl < -self.max_daily_loss:
return False, f"Daily loss {self.daily_pnl} exceeds limit {self.max_daily_loss}"
# Check