Cuối tháng 3/2026, mình đã triển khai một hệ thống arbitrage futures perpetual trên 3 sàn (Binance, Bybit, OKX) và gặp một vấn đề nan giải: làm sao để backtest chiến lược dựa trên funding rate và order book liquidity một cách chính xác nhất? Sau 2 tuần thử nghiệm với Tardis (dữ liệu high-frequency) và Binance API, mình đã xây dựng được workflow hoàn chỉnh mà bài viết này sẽ chia sẻ chi tiết.
Bối Cảnh Và Tại Sao Cần Kết Hợp Funding Rate + Order Book
Trong thị trường crypto futures perpetual, có hai nguồn alpha chính mà mình đã nghiên cứu sâu:
- Funding Rate Arbitrage: Chênh lệch funding rate giữa các sàn. Khi Binance funding 0.01%/8h trong khi Bybit 0.03%/8h, gap này có thể khai thác nếu spread hợp lý.
- Order Book Liquidity Arbitrage: Khác biệt về bid-ask spread và depth tại các mức giá khác nhau. Mình từng thấy BTC/USDT perpetual có spread chênh 0.02% giữa Binance và Bybit trong 200ms — đủ để arbitrage nếu leverage đủ cao.
Vấn đề là mỗi nguồn dữ liệu có đặc thù riêng. Binance cung cấp funding rate history qua public endpoint miễn phí, nhưng chỉ ở granularity 8h. Tardis cung cấp tick-by-tick order book data với độ trễ thực tế dưới 50ms, nhưng chi phí $299/tháng cho full market data. Mình đã kết hợp cả hai để tạo signal engine hiệu quả hơn.
Kiến Trúc Hệ Thống Backtesting
Trước khi đi vào code, mình muốn chia sẻ architecture mà mình đã xây dựng sau nhiều lần thất bại:
┌─────────────────────────────────────────────────────────────┐
│ ARBITRAGE BACKTESTING STACK │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Binance │ │ Tardis │ │ Bybit │ │
│ │ Funding API │ │ Order Book │ │ Funding API │ │
│ │ (Free) │ │ ($299/mo) │ │ (Free) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Data Lake │ │
│ │ (Parquet) │ │
│ └────────┬─────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Signal Engine │ │
│ │ (HolySheep AI) │ │
│ └────────┬─────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Backtest Runner │ │
│ │ (VectorBT Pro) │ │
│ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Data Collection: Funding Rate Từ Binance
Binance cung cấp historical funding rate qua endpoint /fapi/v1/premiumIndexKline. Mình đã viết script collect data với error handling đầy đủ:
import requests
import pandas as pd
from datetime import datetime, timedelta
import time
from pathlib import Path
============================================================
BINANCE FUNDING RATE DATA COLLECTION
HolySheep AI: https://api.holysheep.ai/v1
============================================================
class BinanceFundingCollector:
"""Collect historical funding rates from Binance Futures"""
BASE_URL = "https://fapi.binance.com"
def __init__(self, symbols: list = None):
self.symbols = symbols or [
"BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT",
"XRPUSDT", "ADAUSDT", "DOGEUSDT", "AVAXUSDT"
]
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "ArbitrageBacktest/1.0"
})
def get_funding_rate_history(
self,
symbol: str,
start_time: int,
end_time: int,
limit: int = 1000
) -> pd.DataFrame:
"""
Fetch historical funding rate for a symbol
Args:
symbol: Trading pair (e.g., "BTCUSDT")
start_time: Start timestamp in milliseconds
end_time: End timestamp in milliseconds
limit: Max records per request (default 1000)
Returns:
DataFrame with columns: timestamp, funding_rate, mark_price, index_price
"""
endpoint = "/fapi/v1/premiumIndexKline"
params = {
"symbol": symbol,
"intervalType": "funding", # 8h intervals for perpetual
"startTime": start_time,
"endTime": end_time,
"limit": limit
}
try:
response = self.session.get(
f"{self.BASE_URL}{endpoint}",
params=params,
timeout=10
)
response.raise_for_status()
data = response.json()
if not data:
print(f"[WARNING] No funding data for {symbol}")
return pd.DataFrame()
df = pd.DataFrame(data)
df['timestamp'] = pd.to_datetime(df[0], unit='ms')
df['funding_rate'] = df[1].astype(float)
df['mark_price'] = df[2].astype(float)
df['index_price'] = df[3].astype(float)
return df[['timestamp', 'funding_rate', 'mark_price', 'index_price']]
except requests.exceptions.RequestException as e:
print(f"[ERROR] Request failed for {symbol}: {e}")
return pd.DataFrame()
except (KeyError, ValueError) as e:
print(f"[ERROR] Parse error for {symbol}: {e}")
return pd.DataFrame()
def collect_all_symbols(
self,
days_back: int = 90
) -> pd.DataFrame:
"""
Collect funding rates for all configured symbols
Args:
days_back: Number of days to look back (max ~90 for free tier)
Returns:
Combined DataFrame for all symbols
"""
end_time = int(datetime.now().timestamp() * 1000)
start_time = int(
(datetime.now() - timedelta(days=days_back)).timestamp() * 1000
)
all_data = []
for symbol in self.symbols:
print(f"[INFO] Collecting funding rate for {symbol}...")
# Binance rate limit: 1200 requests/minute
# We throttle to avoid 429 errors
df = self.get_funding_rate_history(
symbol=symbol,
start_time=start_time,
end_time=end_time
)
if not df.empty:
df['symbol'] = symbol
all_data.append(df)
print(f"[SUCCESS] Collected {len(df)} records for {symbol}")
# Respect rate limits (50ms minimum between requests)
time.sleep(0.05)
if all_data:
combined = pd.concat(all_data, ignore_index=True)
combined = combined.sort_values(['symbol', 'timestamp'])
return combined
else:
return pd.DataFrame()
def save_to_parquet(self, df: pd.DataFrame, filepath: str = "data/funding_rates.parquet"):
"""Save collected data to Parquet for efficient storage"""
Path(filepath).parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(filepath, engine='pyarrow', compression='snappy')
print(f"[INFO] Saved {len(df)} records to {filepath}")
============================================================
HOLYSHEEP AI SIGNAL GENERATION
============================================================
class HolySheepSignalEngine:
"""
Use HolySheep AI to analyze funding rate patterns
API: https://api.holysheep.ai/v1
Key: YOUR_HOLYSHEEP_API_KEY
"""
BASE_URL = "https://api.holysheep.ai/v1"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# DeepSeek V3.2: $0.42/MTok - most cost effective for structured analysis
self.model = "deepseek-v3.2"
def analyze_arbitrage_opportunity(
self,
binance_rate: float,
bybit_rate: float,
binance_depth: dict,
bybit_depth: dict
) -> dict:
"""
Use AI to assess if funding rate + order book gap is tradeable
Returns:
Dictionary with opportunity score, confidence, and recommendation
"""
prompt = f"""Analyze this cross-exchange arbitrage opportunity:
BINANCE:
- Funding Rate: {binance_rate*100:.4f}% per 8h
- Bid Depth ($1000): {binance_depth.get('bid_depth_1000', 'N/A')}
- Ask Depth ($1000): {binance_depth.get('ask_depth_1000', 'N/A')}
BYBIT:
- Funding Rate: {bybit_rate*100:.4f}% per 8h
- Bid Depth ($1000): {bybit_depth.get('bid_depth_1000', 'N/A')}
- Ask Depth ($1000): {bybit_depth.get('ask_depth_1000', 'N/A')}
Calculate:
1. Net funding arbitrage (annualized %)
2. Estimated slippage based on depth
3. Risk score (1-10)
4. Trade recommendation: EXECUTE / SKIP / MONITOR
Return JSON with keys: net_annualized, slippage_est, risk_score, recommendation, reasoning
"""
payload = {
"model": self.model,
"messages": [
{
"role": "system",
"content": "You are a quantitative trading analyst specializing in crypto arbitrage."
},
{
"role": "user",
"content": prompt
}
],
"temperature": 0.1, # Low temp for consistent analysis
"max_tokens": 500
}
try:
response = requests.post(
f"{self.BASE_URL}/chat/completions",
headers=self.headers,
json=payload,
timeout=5 # HolySheep <50ms latency
)
response.raise_for_status()
result = response.json()
# Parse AI response
content = result['choices'][0]['message']['content']
# Extract JSON from response
import json
import re
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
return json.loads(json_match.group())
else:
return {"error": "Could not parse AI response"}
except requests.exceptions.RequestException as e:
print(f"[ERROR] HolySheep API failed: {e}")
return {"error": str(e)}
============================================================
MAIN EXECUTION
============================================================
if __name__ == "__main__":
# Initialize collector
collector = BinanceFundingCollector(symbols=["BTCUSDT", "ETHUSDT"])
# Collect 30 days of funding history
funding_df = collector.collect_all_symbols(days_back=30)
if not funding_df.empty:
# Save to Parquet for fast access
collector.save_to_parquet(funding_df)
# Display summary
print("\n=== FUNDING RATE SUMMARY ===")
print(funding_df.groupby('symbol')['funding_rate'].agg(['mean', 'std', 'min', 'max']))
Order Book Data Với Tardis API
Tardis cung cấp historical order book data với độ phân giải cao. Mình sử dụng cho việc backtest slippage và liquidity analysis. Tardis có free tier 1 triệu messages/tháng — đủ để test trước khi mua plan.
import asyncio
import aiohttp
from typing import List, Dict, Tuple
import pandas as pd
from dataclasses import dataclass
from datetime import datetime
import json
============================================================
TARDIS ORDER BOOK DATA COLLECTION
Documentation: https://docs.tardis.dev/
============================================================
@dataclass
class OrderBookSnapshot:
"""Represents a single order book snapshot"""
timestamp: datetime
exchange: str
symbol: str
bids: List[Tuple[float, float]] # [(price, quantity), ...]
asks: List[Tuple[float, float]]
spread: float
mid_price: float
@property
def best_bid(self) -> float:
return self.bids[0][0] if self.bids else 0.0
@property
def best_ask(self) -> float:
return self.asks[0][0] if self.asks else 0.0
@property
def spread_bps(self) -> float:
"""Spread in basis points"""
if self.mid_price == 0:
return 0.0
return (self.ask - self.bid) / self.mid_price * 10000
def get_depth(self, levels: int = 10) -> Dict[str, float]:
"""Calculate cumulative depth up to N levels"""
bid_depth = sum(qty for _, qty in self.bids[:levels])
ask_depth = sum(qty for _, qty in self.asks[:levels])
return {
'bid_depth': bid_depth,
'ask_depth': ask_depth,
'imbalance': (bid_depth - ask_depth) / (bid_depth + ask_depth) if (bid_depth + ask_depth) > 0 else 0
}
class TardisOrderBookCollector:
"""
Collect historical order book data from Tardis
Free tier: 1M messages/month
Paid: $299/month for full market data
"""
TARDIS_WS_URL = "wss://stream.tardis.dev/v1/stream"
TARDIS_REST_URL = "https://api.tardis.dev/v1"
def __init__(self, api_key: str):
self.api_key = api_key
self.session = None
async def fetch_historical_book(
self,
exchange: str,
symbol: str,
start_date: str, # "2026-03-01"
end_date: str, # "2026-03-15"
channels: List[str] = None
) -> List[OrderBookSnapshot]:
"""
Fetch historical order book via Tardis HTTP API
Note: For large datasets, use Tardis replay service
"""
if channels is None:
channels = ["book"] # Order book channel
# Construct replay request
# Tardis uses same format as Exchange WebSocket
symbols = [f"{exchange}:{symbol}"]
payload = {
"type": "auth",
"apikey": self.api_key
}
# For demonstration, we'll simulate with REST
# In production, use Tardis replay WebSocket
print(f"[INFO] Fetching {exchange}:{symbol} from {start_date} to {end_date}")
# Alternative: Use Tardis HTTP API for metadata
meta_url = f"{self.TARDIS_REST_URL}/exchanges/{exchange}/symbols"
async with aiohttp.ClientSession() as session:
async with session.get(meta_url, params={"symbol": symbol}) as resp:
if resp.status == 200:
meta = await resp.json()
print(f"[INFO] Symbol metadata: {meta}")
return []
def calculate_arbitrage_metrics(
self,
binance_book: OrderBookSnapshot,
bybit_book: OrderBookSnapshot,
funding_diff: float
) -> Dict:
"""
Calculate key metrics for arbitrage decision
Returns:
Dictionary with opportunity analysis
"""
metrics = {}
# 1. Spread arbitrage (buy on one exchange, sell on other)
# Assuming: Buy at Bybit ask, Sell at Binance bid
buy_exchange = "bybit" if bybit_book.best_ask < binance_book.best_bid else "binance"
sell_exchange = "binance" if buy_exchange == "bybit" else "bybit"
if buy_exchange == "bybit":
buy_price = bybit_book.best_ask
sell_price = binance_book.best_bid
else:
buy_price = binance_book.best_ask
sell_price = bybit_book.best_bid
spread_profit = (sell_price - buy_price) / buy_price * 100 # %
metrics['spread_profit_bps'] = spread_profit * 100 # Convert to bps
metrics['buy_exchange'] = buy_exchange
metrics['sell_exchange'] = sell_exchange
# 2. Funding arbitrage
# Daily funding = rate * 3 (8h intervals)
daily_funding = funding_diff * 3 * 100
metrics['daily_funding'] = daily_funding
metrics['annualized_funding'] = daily_funding * 365
# 3. Combined opportunity
# Net profit = spread profit - fees - slippage
# Typical fees: 0.02% maker, 0.04% taker
maker_fee = 0.0002
taker_fee = 0.0004
total_fees = maker_fee + taker_fee
# Slippage estimation based on depth
slippage_estimate = 0.0001 # 1 bps default
net_profit = (spread_profit - total_fees - slippage_estimate) * 100 # %
metrics['net_profit_bps'] = net_profit * 100
metrics['fees_bps'] = total_fees * 10000
metrics['slippage_bps'] = slippage_estimate * 10000
# 4. Risk metrics
# Price reversion risk: how fast can price move against position
price_volatility = abs(binance_book.mid_price - bybit_book.mid_price) / binance_book.mid_price
metrics['price_divergence_pct'] = price_volatility * 100
# Liquidity risk
binance_depth = binance_book.get_depth(20)
bybit_depth = bybit_book.get_depth(20)
min_depth = min(
binance_depth['bid_depth'] + bybit_depth['ask_depth'],
bybit_depth['bid_depth'] + binance_depth['ask_depth']
) / 2 # Average for balanced position
metrics['max_position_size'] = min_depth * binance_book.mid_price # USD value
# 5. Recommendation
metrics['is_viable'] = (
metrics['net_profit_bps'] > 0 and
metrics['max_position_size'] > 1000 # Min $1000 position
)
return metrics
============================================================
HOLYSHEEP AI PATTERN RECOGNITION
============================================================
async def analyze_with_holysheep(
api_key: str,
historical_data: pd.DataFrame
) -> dict:
"""
Use HolySheep AI to identify funding rate patterns
DeepSeek V3.2 at $0.42/MTok - optimal for pattern analysis
"""
# Prepare summary statistics
summary = historical_data.groupby('symbol').agg({
'funding_rate': ['mean', 'std', 'min', 'max', 'count'],
'mark_price': ['first', 'last']
}).to_string()
prompt = f"""Analyze this historical funding rate data and identify:
1. Which pairs have highest average funding rates (potential long bias)?
2. Which pairs show most volatility in funding rates?
3. Any seasonal patterns (time of day/week)?
4. Correlation between funding rate and price movements?
DATA SUMMARY:
{summary}
Return a JSON object with:
- "high_funding_pairs": list of symbols with avg funding > 0.001
- "volatile_pairs": list of symbols with highest funding std
- "seasonal_patterns": object describing any time-based patterns
- "recommendations": object with trading pair recommendations
"""
payload = {
"model": "deepseek-v3.2", # $0.42/MTok - best value for analysis
"messages": [
{
"role": "system",
"content": "You are a quantitative researcher specializing in crypto derivatives."
},
{
"role": "user",
"content": prompt
}
],
"temperature": 0.2,
"max_tokens": 1000
}
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.holysheep.ai/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json=payload,
timeout=aiohttp.ClientTimeout(total=10)
) as resp:
result = await resp.json()
content = result['choices'][0]['message']['content']
# Parse JSON from response
import re
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
return json.loads(json_match.group())
return {"raw_analysis": content}
============================================================
MAIN BACKTEST SIMULATION
============================================================
async def run_backtest_simulation():
"""Simulate arbitrage strategy on historical data"""
# Load collected funding data
funding_df = pd.read_parquet("data/funding_rates.parquet")
# HolySheep API for pattern analysis
holysheep = HolySheepSignalEngine("YOUR_HOLYSHEEP_API_KEY")
# Group by timestamp to find cross-exchange opportunities
results = []
for symbol in funding_df['symbol'].unique():
symbol_data = funding_df[funding_df['symbol'] == symbol].copy()
symbol_data = symbol_data.sort_values('timestamp')
# Calculate rolling funding rate differences
# In real backtest, would also load Bybit/OKX data
symbol_data['funding_ma_3'] = symbol_data['funding_rate'].rolling(3).mean()
symbol_data['funding_signal'] = (
symbol_data['funding_rate'] > symbol_data['funding_ma_3'] * 1.5
)
# Simulate entry/exit
position = 0
entries = []
for idx, row in symbol_data.iterrows():
if row['funding_signal'] and position == 0:
# Entry signal
entries.append({
'entry_time': row['timestamp'],
'entry_rate': row['funding_rate'],
'entry_price': row['mark_price']
})
position = 1
elif not row['funding_signal'] and position == 1:
# Exit signal
if entries:
entry = entries.pop()
results.append({
'symbol': symbol,
'entry_time': entry['entry_time'],
'exit_time': row['timestamp'],
'entry_rate': entry['entry_rate'],
'exit_rate': row['funding_rate'],
'pnl_bps': (row['funding_rate'] - entry['entry_rate']) * 10000,
'holding_hours': (row['timestamp'] - entry['entry_time']).total_seconds() / 3600
})
position = 0
print(f"[INFO] {symbol}: {len(entries)} open positions remaining")
# Convert to DataFrame and analyze
results_df = pd.DataFrame(results)
if not results_df.empty:
print("\n=== BACKTEST RESULTS ===")
print(f"Total Trades: {len(results_df)}")
print(f"Win Rate: {(results_df['pnl_bps'] > 0).mean() * 100:.1f}%")
print(f"Avg PnL: {results_df['pnl_bps'].mean():.2f} bps")
print(f"Best Trade: {results_df['pnl_bps'].max():.2f} bps")
print(f"Worst Trade: {results_df['pnl_bps'].min():.2f} bps")
# Save results
results_df.to_parquet("data/backtest_results.parquet")
return results_df
if __name__ == "__main__":
# Run async backtest
asyncio.run(run_backtest_simulation())
Signal Engine Với HolySheep AI
Điểm mấu chốt trong chiến lược của mình là sử dụng AI để phân tích signals từ dữ liệu funding rate và order book. Mình đã thử nghiệm với nhiều model và kết luận DeepSeek V3.2 tại HolySheep là lựa chọn tối ưu về chi phí và tốc độ.
"""
ARBITRAGE SIGNAL ENGINE
Powered by HolySheep AI - $0.42/MTok, <50ms latency
Register: https://www.holysheep.ai/register
"""
import requests
import json
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
import time
@dataclass
class ArbitrageSignal:
"""Represents an arbitrage opportunity signal"""
timestamp: datetime
symbol: str
# Funding data
binance_funding: float
bybit_funding: float
okx_funding: float
# Order book data
binance_bid: float
binance_ask: float
bybit_bid: float
bybit_ask: float
# Calculated metrics
funding_spread: float
order_spread: float
net_opportunity: float
confidence: float
# AI Analysis
recommendation: str
reasoning: str
risk_score: float
max_position: float
# Costs
estimated_fees: float
estimated_slippage: float
class HolySheepArbitrageEngine:
"""
Real-time arbitrage signal generation using HolySheep AI
Cost comparison (2026 pricing):
- HolySheep DeepSeek V3.2: $0.42/MTok (RECOMMENDED)
- OpenAI GPT-4.1: $8/MTok
- Anthropic Claude Sonnet 4.5: $15/MTok
- Google Gemini 2.5 Flash: $2.50/MTok
HolySheep is 95%+ cheaper than alternatives!
"""
BASE_URL = "https://api.holysheep.ai/v1"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# Using DeepSeek V3.2 for cost efficiency
self.model = "deepseek-v3.2"
self.prompt_tokens = 0
self.completion_tokens = 0
def analyze_opportunity(
self,
signal_data: Dict,
leverage: int = 5
) -> ArbitrageSignal:
"""
Analyze an arbitrage opportunity using HolySheep AI
Args:
signal_data: Dictionary with funding and order book data
leverage: Leverage to use for position sizing
Returns:
ArbitrageSignal with AI analysis
"""
prompt = self._build_analysis_prompt(signal_data, leverage)
start_time = time.time()
payload = {
"model": self.model,
"messages": [
{
"role": "system",
"content": """You are an expert quantitative trader specializing in crypto futures arbitrage.
Analyze funding rate and order book opportunities with extreme precision.
Always consider:
1. Transaction fees (typically 0.02-0.04% per side)
2. Slippage based on order book depth
3. Funding rate convergence risk
4. Liquidation risk at high leverage
5. Counterparty risk between exchanges"""
},
{
"role": "user",
"content": prompt
}
],
"temperature": 0.1,
"max_tokens": 800
}
try:
response = requests.post(
f"{self.BASE_URL}/chat/completions",
headers=self.headers,
json=payload,
timeout=5
)
response.raise_for_status()
latency_ms = (time.time() - start_time) * 1000
result = response.json()
content = result['choices'][0]['message']['content']
# Track token usage for cost calculation
usage = result.get('usage', {})
self.prompt_tokens += usage.get('prompt_tokens', 0)
self.completion_tokens += usage.get('completion_tokens', 0)
# Parse AI response
analysis = self._parse_ai_response(content)
return self._build_signal(signal_data, analysis, latency_ms)
except requests.exceptions.RequestException as e:
print(f"[ERROR] HolySheep API failed: {e}")
return self._build_fallback_signal(signal_data)
def _build_analysis_prompt(
self,
data: Dict,
leverage: int
) -> str:
"""Build prompt for AI analysis"""
return f"""ANALYZE THIS ARBITRAGE OPPORTUNITY:
EXCHANGE FUNDING RATES (per 8h):
- Binance: {data['binance_funding']*100:.4f}%
- Bybit: {data['bybit_funding']*100:.4f}%
- OKX: {data['okx_funding']*100:.4f}%
ORDER BOOK SNAPSHOT:
BINANCE {data['symbol']}:
Bid: {data['binance_bid']:.2f} | Ask: {data['binance_ask']:.2f}
Bid Depth (10 levels): {data['binance_depth']:.2f} contracts
Ask Depth (10 levels): {data['binance_ask_depth']:.2f} contracts
BYBIT {data['symbol']}:
Bid: {data['bybit_bid']:.2f} | Ask: {data['bybit_ask']:.2f}
Bid Depth (10 levels): {data['bybit_depth']:.2f} contracts
Ask Depth (10 levels): {data['bybit_ask_depth']:.2f} contracts
TRADING PARAMETERS:
- Leverage: {leverage}x
- Position Size: $10,000 notional
CALCULATE AND RETURN JSON:
{{
"funding_spread_bps": (highest - lowest funding) in basis points,
"order_spread_bps": cross-exchange bid-ask spread in bps,
"gross_profit_bps": estimated gross profit per round trip,
"fees_bps": total fees (maker+taker both sides) in bps,
"slippage_bps": estimated slippage in bps,
"net_profit_bps": gross - fees - slippage,
"risk_score": 1-10 scale (10 = highest risk),
"confidence": 0.0-1.0 scale,
"recommendation": "EXECUTE" / "SKIP" / "MONITOR",
"reasoning": "2-3 sentence explanation",
"max_safe_position": dollar amount to risk,
"exit_strategy": "close when funding converges" / "hold 1 period" / "intraday only"
}}
Return ONLY valid JSON, no other text."""
def _parse_ai_response(self, content: str) -> Dict:
"""Parse JSON from AI response"""
import re
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
return json.loads(json_match.group())
return {}
def _build_signal(
self,
data: Dict,
analysis: Dict,
latency_ms: float
) -> ArbitrageSignal:
"""Build ArbitrageSignal from AI analysis"""
# Extract values with defaults
funding_rates = [
data['binance_funding'],
data['bybit_funding'],
data['okx_funding']
]
return ArbitrageSignal(
timestamp=datetime.now(),
symbol=data['symbol'],
binance_funding=data['binance_funding'],
bybit_funding=data['bybit_funding'],
okx_funding=data['okx_funding'],
binance_bid=data['binance_bid'],
binance_ask=data['binance_ask'],
bybit_bid=data['bybit_bid'],
bybit_ask=data['bybit_ask'],
funding_spread=max(funding_rates) - min(funding_rates),
order_spread=analysis.get('order_spread_bps', 0) / 10000,
net_opportunity=analysis.get('net_profit_bps', 0) / 10000,
confidence=analysis.get('confidence', 0.5),
recommendation=analysis.get('recommendation', 'MONITOR'),
reasoning=analysis.get('reasoning', 'Fallback analysis'),
risk_score=analysis.get('risk_score', 5.0),
max_position=analysis.get('max_safe_position', 1000),
estimated_fees=analysis.get('fees_bps', 6) / 10000,
estimated_slippage=analysis.get('slippage_bps', 2) / 10000
)
def _build_fallback_signal(self, data: Dict) -> ArbitrageSignal:
"""Build signal with simple rule-based fallback"""
funding_rates = [
data['binance_funding'],
data['bybit_funding'],
data['okx_f