引言:做市商的核心技术挑战
在加密货币交易领域,订单簿(Order Book)数据实时处理是做市商(Market Maker)策略的核心技术基础。做市商通过在买卖盘口提供流动性,赚取买卖价差(Spread)来实现盈利。这一过程需要对订单簿数据进行毫秒级甚至微秒级的实时分析,并根据分析结果动态调整报价策略。 本文将深入探讨如何构建一个高效的加密货币做市API系统,涵盖订单簿数据处理、实时分析、订单执行的完整流程。我们将使用HolySheep AI作为后端AI引擎,利用其超低延迟(<50ms)和极具竞争力的价格(GPT-4.1仅$8/MTok,DeepSeek V3.2低至$0.42/MTok)来优化做市策略的AI决策模块。订单簿数据结构与实时获取
订单簿基础概念
订单簿是交易所记录所有未成交买卖订单的数据结构,主要包含以下字段:- bid_price:买方出价(买一价、买二价...买N价)
- bid_quantity:买方数量
- ask_price:卖方出价(卖一价、卖二价...卖N价)
- ask_quantity:卖方数量
- timestamp:数据时间戳
通过WebSocket实时获取订单簿数据
以下是一个完整的Python实现,用于连接主流交易所并实时获取订单簿数据:#!/usr/bin/env python3
"""
订单簿数据实时获取模块
支持 Binance, Coinbase, Kraken 等主流交易所
"""
import asyncio
import json
import time
from dataclasses import dataclass, field
from typing import List, Dict, Optional
import aiohttp
import websockets
@dataclass
class OrderBookEntry:
"""订单簿条目"""
price: float
quantity: float
side: str # 'bid' or 'ask'
@dataclass
class OrderBook:
"""完整订单簿数据结构"""
symbol: str
bids: List[OrderBookEntry] = field(default_factory=list)
asks: List[OrderBookEntry] = field(default_factory=list)
last_update: float = field(default_factory=time.time)
exchange: str = ""
@property
def best_bid(self) -> float:
return self.bids[0].price if self.bids else 0.0
@property
def best_ask(self) -> float:
return self.asks[0].price if self.asks else 0.0
@property
def mid_price(self) -> float:
if self.best_bid and self.best_ask:
return (self.best_bid + self.best_ask) / 2
return 0.0
@property
def spread(self) -> float:
if self.best_bid and self.best_ask:
return (self.best_ask - self.best_bid) / self.mid_price * 100
return 0.0
@property
def spread_bps(self) -> float:
"""以基点计算的价差"""
return self.spread * 100
class BinanceOrderBookFetcher:
"""币安订单簿获取器"""
def __init__(self, symbol: str = "BTCUSDT"):
self.symbol = symbol.lower()
self.ws_url = "wss://stream.binance.com:9443/ws"
self.order_book: OrderBook = OrderBook(symbol=symbol, exchange="binance")
self._running = False
self._update_count = 0
async def connect(self):
"""建立WebSocket连接"""
stream_name = f"{self.symbol}@depth20@100ms"
uri = f"{self.ws_url}/{stream_name}"
async with websockets.connect(uri) as ws:
self._running = True
print(f"[Binance] 已连接到 {self.symbol} 订单簿流")
while self._running:
try:
data = await asyncio.wait_for(ws.recv(), timeout=30)
await self._process_update(json.loads(data))
except asyncio.TimeoutError:
print("[警告] WebSocket心跳超时")
async def _process_update(self, data: dict):
"""处理订单簿更新"""
bids = [
OrderBookEntry(price=float(b[0]), quantity=float(b[1]), side='bid')
for b in data.get('bids', [])[:20]
]
asks = [
OrderBookEntry(price=float(a[0]), quantity=float(a[1]), side='ask')
for a in data.get('asks', [])[:20]
]
self.order_book.bids = bids
self.order_book.asks = asks
self.order_book.last_update = time.time()
self._update_count += 1
# 每100次更新打印一次状态
if self._update_count % 100 == 0:
print(f"[更新 #{self._update_count}] "
f"买一:{self.order_book.best_bid:.2f} | "
f"卖一:{self.order_book.best_ask:.2f} | "
f"价差:{self.order_book.spread:.4f}%")
async def main():
"""主函数"""
fetcher = BinanceOrderBookFetcher("BTCUSDT")
# 启动获取任务
fetch_task = asyncio.create_task(fetcher.connect())
# 运行30秒后停止
await asyncio.sleep(30)
fetcher._running = False
await fetch_task
print(f"\n[统计] 共处理 {fetcher._update_count} 次订单簿更新")
if __name__ == "__main__":
asyncio.run(main())
此模块实现了每秒约10次的订单簿更新频率,完全满足大多数做市策略的数据需求。
订单簿分析:价差与流动性计算
核心分析指标
做市商需要实时计算以下关键指标:#!/usr/bin/env python3
"""
订单簿分析与做市信号生成
"""
from dataclasses import dataclass
from typing import Dict, List, Optional
from collections import defaultdict
import statistics
@dataclass
class MarketAnalysis:
"""市场分析结果"""
symbol: str
mid_price: float
spread_bps: float
market_depth_5: float # 5档深度总量
market_depth_10: float # 10档深度总量
volatility_30s: float # 30秒波动率
imbalance_ratio: float # 订单簿不平衡度
vwap: float # 加权平均价
confidence: float # 分析置信度 (0-1)
class OrderBookAnalyzer:
"""订单簿分析器"""
def __init__(self, symbol: str, price_history_size: int = 100):
self.symbol = symbol
self.price_history: List[float] = []
self.history_size = price_history_size
self.volume_by_side: Dict[str, List[float]] = {
'bid': [], 'ask': []
}
def analyze(self, order_book) -> MarketAnalysis:
"""执行完整市场分析"""
# 1. 计算中间价和价差
mid_price = order_book.mid_price
spread_bps = order_book.spread_bps
# 2. 计算市场深度
market_depth_5 = self._calculate_depth(order_book, levels=5)
market_depth_10 = self._calculate_depth(order_book, levels=10)
# 3. 更新价格历史并计算波动率
self._update_price_history(mid_price)
volatility = self._calculate_volatility()
# 4. 计算订单簿不平衡度
imbalance = self._calculate_imbalance(order_book)
# 5. 计算VWAP
vwap = self._calculate_vwap(order_book)
# 6. 评估置信度
confidence = self._evaluate_confidence(order_book, imbalance)
return MarketAnalysis(
symbol=self.symbol,
mid_price=mid_price,
spread_bps=spread_bps,
market_depth_5=market_depth_5,
market_depth_10=market_depth_10,
volatility_30s=volatility,
imbalance_ratio=imbalance,
vwap=vwap,
confidence=confidence
)
def _calculate_depth(self, order_book, levels: int) -> float:
"""计算指定档位的市场深度(以基础货币计)"""
bid_depth = sum(
e.quantity for e in order_book.bids[:levels]
)
ask_depth = sum(
e.quantity for e in order_book.asks[:levels]
)
return (bid_depth + ask_depth) / 2
def _update_price_history(self, price: float):
"""更新价格历史"""
self.price_history.append(price)
if len(self.price_history) > self.history_size:
self.price_history.pop(0)
def _calculate_volatility(self) -> float:
"""计算30秒波动率(标准差百分比)"""
if len(self.price_history) < 10:
return 0.0
returns = []
for i in range(1, len(self.price_history)):
ret = (self.price_history[i] - self.price_history[i-1]) / \
self.price_history[i-1]
returns.append(ret)
if len(returns) < 2:
return 0.0
std_dev = statistics.stdev(returns)
return std_dev * 100 # 转换为百分比
def _calculate_imbalance(self, order_book) -> float:
"""
计算订单簿不平衡度
正值 = 买方压力
负值 = 卖方压力
范围: -1 到 1
"""
total_bid_vol = sum(e.quantity for e in order_book.bids[:10])
total_ask_vol = sum(e.quantity for e in order_book.asks[:10])
total_vol = total_bid_vol + total_ask_vol
if total_vol == 0:
return 0.0
imbalance = (total_bid_vol - total_ask_vol) / total_vol
return max(-1.0, min(1.0, imbalance))
def _calculate_vwap(self, order_book) -> float:
"""计算盘口加权平均价"""
total_value = 0.0
total_quantity = 0.0
for side in ['bids', 'asks']:
for entry in getattr(order_book, side)[:10]:
total_value += entry.price * entry.quantity
total_quantity += entry.quantity
if total_quantity == 0:
return order_book.mid_price
return total_value / total_quantity
def _evaluate_confidence(self, order_book, imbalance: float) -> float:
"""评估当前市场分析的置信度"""
base_confidence = 0.5
# 价差因素:价差越小,置信度越高
if order_book.spread_bps < 5:
base_confidence += 0.2
elif order_book.spread_bps > 20:
base_confidence -= 0.2
# 深度因素:深度越大,置信度越高
depth = self._calculate_depth(order_book, levels=10)
if depth > 100: # 假设基础货币数量
base_confidence += 0.2
elif depth < 10:
base_confidence -= 0.2
# 不平衡度极端值降低置信度
if abs(imbalance) > 0.7:
base_confidence -= 0.15
return max(0.1, min(0.95, base_confidence))
使用示例
def demonstrate_analysis():
"""演示分析功能"""
# 创建模拟订单簿数据(实际使用时从API获取)
from dataclasses import dataclass, field
@dataclass
class MockOrderBook:
bids: List = field(default_factory=list)
asks: List = field(default_factory=list)
@property
def mid_price(self):
return 45000.0
@property
def spread_bps(self):
return 3.5
mock_book = MockOrderBook(
bids=[OrderBookEntry(price=44999, quantity=2.5, side='bid'),
OrderBookEntry(price=44998, quantity=1.8, side='bid'),
OrderBookEntry(price=44997, quantity=3.2, side='bid')],
asks=[OrderBookEntry(price=45001, quantity=2.1, side='ask'),
OrderBookEntry(price=45002, quantity=1.5, side='ask'),
OrderBookEntry(price=45003, quantity=2.8, side='ask')]
)
analyzer = OrderBookAnalyzer("BTCUSDT")
result = analyzer.analyze(mock_book)
print(f"""
=== 市场分析报告 ===
交易对: {result.symbol}
中间价: ${result.mid_price:,.2f}
价差: {result.spread_bps:.2f} bps
5档深度: {result.market_depth_5:.4f}
不平衡度: {result.imbalance_ratio:+.2%}
波动率: {result.volatility_30s:.4f}%
置信度: {result.confidence:.0%}
""")
if __name__ == "__main__":
demonstrate_analysis()
AI驱动的做市策略决策
为什么需要AI来做市决策?
传统的做市策略基于固定规则,难以适应复杂多变的市场环境。通过集成AI能力,做市商可以:- 实时识别市场微观结构变化
- 预测短期价格走势并动态调整报价
- 根据历史模式优化库存管理
- 自动识别潜在的价格操纵行为
使用HolySheep AI进行策略分析
以下代码展示了如何集成HolySheep AI的API来进行做市策略分析。我们选择DeepSeek V3.2作为主要模型,其价格为$0.42/MTok,性价比极高:#!/usr/bin/env python3
"""
HolySheep AI 做市策略分析模块
支持多种AI模型进行市场分析和策略生成
"""
import asyncio
import json
import time
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
class AIModel(Enum):
"""支持的AI模型"""
GPT4 = "gpt-4.1"
CLAUDE = "claude-sonnet-4.5"
GEMINI = "gemini-2.5-flash"
DEEPSEEK = "deepseek-v3.2"
@dataclass
class StrategyRecommendation:
"""策略推荐结果"""
action: str # 'bid', 'ask', 'hold'
price: float
quantity: float
reasoning: str
confidence: float
model_used: str
latency_ms: float
cost_usd: float
@dataclass
class ModelPricing:
"""模型定价信息(2026年最新)"""
name: str
price_per_1m_tokens: float
def calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
"""计算API调用成本(美元)"""
total_tokens = input_tokens + output_tokens
return (total_tokens / 1_000_000) * self.price_per_1m_tokens
class HolySheepMarketMaker:
"""
基于HolySheep AI的做市策略分析器
HolySheep优势:
- 超低延迟: <50ms
- 极具竞争力的价格: DeepSeek V3.2 仅 $0.42/MTok
- 支持微信/支付宝付款
- 85%+ 成本节省 vs OpenAI/Anthropic
"""
BASE_URL = "https://api.holysheep.ai/v1"
# 2026年最新定价
MODEL_PRICING = {
AIModel.GPT4: ModelPricing("GPT-4.1", 8.0),
AIModel.CLAUDE: ModelPricing("Claude Sonnet 4.5", 15.0),
AIModel.GEMINI: ModelPricing("Gemini 2.5 Flash", 2.50),
AIModel.DEEPSEEK: ModelPricing("DeepSeek V3.2", 0.42),
}
def __init__(self, api_key: str, model: AIModel = AIModel.DEEPSEEK):
self.api_key = api_key
self.model = model
self.pricing = self.MODEL_PRICING[model]
self.total_cost = 0.0
self.total_calls = 0
self.total_latency_ms = 0.0
async def analyze_market_and_recommend(
self,
market_data: Dict,
portfolio_state: Dict,
risk_limits: Dict
) -> StrategyRecommendation:
"""
分析市场数据并生成做市策略
Args:
market_data: 包含订单簿分析结果的市场数据
portfolio_state: 当前持仓和资金状态
risk_limits: 风险限制参数
"""
prompt = self._build_analysis_prompt(
market_data, portfolio_state, risk_limits
)
start_time = time.time()
try:
response = await self._call_ai_api(prompt)
latency = (time.time() - start_time) * 1000
# 估算token使用量(实际应从API响应中获取)
input_tokens = len(prompt) // 4 # 粗略估算
output_tokens = len(response) // 4
cost = self.pricing.calculate_cost(input_tokens, output_tokens)
self.total_cost += cost
self.total_calls += 1
self.total_latency_ms += latency
return self._parse_strategy_response(
response, latency, cost
)
except Exception as e:
print(f"[错误] AI分析失败: {e}")
# 返回保守策略作为降级方案
return StrategyRecommendation(
action='hold',
price=market_data.get('mid_price', 0),
quantity=0,
reasoning=f"AI调用失败: {str(e)},执行保守策略",
confidence=0.1,
model_used=self.model.value,
latency_ms=(time.time() - start_time) * 1000,
cost_usd=0
)
def _build_analysis_prompt(
self,
market_data: Dict,
portfolio_state: Dict,
risk_limits: Dict
) -> str:
"""构建AI分析提示词"""
return f"""你是一个专业的高频做市商AI助手。请分析以下市场数据并给出交易建议:
【市场数据】
- 交易对: {market_data.get('symbol')}
- 中间价: ${market_data.get('mid_price'):,.2f}
- 价差: {market_data.get('spread_bps'):.2f} bps
- 5档深度: {market_data.get('market_depth_5'):.4f}
- 不平衡度: {market_data.get('imbalance_ratio'):+.2%}
- 波动率: {market_data.get('volatility_30s'):.4f}%
- 置信度: {market_data.get('confidence'):.0%}
【持仓状态】
- 基础货币余额: {portfolio_state.get('base_balance'):.4f}
- 计价货币余额: {portfolio_state.get('quote_balance'):.4f}
- 当前持仓: {portfolio_state.get('position'):.4f}
【风险限制】
- 最大单笔交易量: {risk_limits.get('max_quantity')}
- 最大持仓: {risk_limits.get('max_position')}
- 最大日损失: ${risk_limits.get('max_daily_loss')}
请返回JSON格式的建议:
{{"action": "bid/ask/hold", "price": 数值, "quantity": 数值, "reasoning": "理由", "confidence": 0-1}}
仅返回JSON,不要其他内容。"""
async def _call_ai_api(self, prompt: str) -> str:
"""调用HolySheep AI API"""
import aiohttp
url = f"{self.BASE_URL}/chat/completions"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": self.model.value,
"messages": [
{"role": "user", "content": prompt}
],
"temperature": 0.3, # 低温度确保策略稳定性
"max_tokens": 500
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, headers=headers) as resp:
if resp.status != 200:
error = await resp.text()
raise Exception(f"API错误 {resp.status}: {error}")
data = await resp.json()
return data['choices'][0]['message']['content']
def _parse_strategy_response(
self,
response: str,
latency: float,
cost: float
) -> StrategyRecommendation:
"""解析AI响应"""
try:
# 尝试提取JSON
json_str = response
if '```json' in response:
json_str = response.split('``json')[1].split('``')[0]
elif '```' in response:
json_str = response.split('``')[1].split('``')[0]
data = json.loads(json_str.strip())
return StrategyRecommendation(
action=data.get('action', 'hold'),
price=float(data.get('price', 0)),
quantity=float(data.get('quantity', 0)),
reasoning=data.get('reasoning', '无'),
confidence=float(data.get('confidence', 0.5)),
model_used=self.model.value,
latency_ms=latency,
cost_usd=cost
)
except json.JSONDecodeError:
# 降级处理
return StrategyRecommendation(
action='hold',
price=0,
quantity=0,
reasoning=f"响应解析失败: {response[:100]}",
confidence=0.1,
model_used=self.model.value,
latency_ms=latency,
cost_usd=cost
)
def get_cost_report(self) -> Dict:
"""获取成本报告"""
avg_latency = self.total_latency_ms / self.total_calls if self.total_calls > 0 else 0
return {
"total_calls": self.total_calls,
"total_cost_usd": self.total_cost,
"avg_latency_ms": avg_latency,
"avg_cost_per_call": self.total_cost / self.total_calls if self.total_calls > 0 else 0,
"model_used": self.model.value,
"model_price_per_1m": self.pricing.price_per_1m_tokens
}
使用示例
async def demo():
"""演示如何使用HolySheep AI进行做市分析"""
# 初始化做市分析器(使用DeepSeek V3.2,性价比最高)
api_key = "YOUR_HOLYSHEEP_API_KEY" # 替换为您的API密钥
maker = HolySheepMarketMaker(api_key, AIModel.DEEPSEEK)
# 模拟市场数据
market_data = {
"symbol": "BTCUSDT",
"mid_price": 45000.0,
"spread_bps": 3.5,
"market_depth_5": 15.5,
"imbalance_ratio": 0.15,
"volatility_30s": 0.02,
"confidence": 0.75
}
portfolio_state = {
"base_balance": 1.5, # 1.5 BTC
"quote_balance": 25000, # 25000 USDT
"position": 0.5 # 净持仓
}
risk_limits = {
"max_quantity": 0.1,
"max_position": 2.0,
"max_daily_loss": 500
}
# 执行分析
recommendation = await maker.analyze_market_and_recommend(
market_data, portfolio_state, risk_limits
)
print(f"""
=== AI做市策略建议 ===
操作: {recommendation.action.upper()}
价格: ${recommendation.price:,.2f}
数量: {recommendation.quantity}
置信度: {recommendation.confidence:.0%}
推理: {recommendation.reasoning}
=== 性能指标 ===
延迟: {recommendation.latency_ms:.1f}ms
成本: ${recommendation.cost_usd:.6f}
模型: {recommendation.model_used}
""")
# 打印成本报告
report = maker.get_cost_report()
print(f"""
=== 成本报告 ===
总调用次数: {report['total_calls']}
总成本: ${report['total_cost_usd']:.4f}
平均延迟: {report['avg_latency_ms']:.1f}ms
""")
return maker
if __name__ == "__main__":
asyncio.run(demo())
2026年AI模型成本对比分析
在构建做市系统时,选择合适的AI模型对成本控制至关重要。以下是2026年主流AI模型的价格对比:| 模型 | 价格 ($/MTok) | 10M Token/月成本 | 适合场景 | 延迟 |
|---|---|---|---|---|
| DeepSeek V3.2 | $0.42 | $4.20 | 高频分析、批量处理 | <50ms |
| Gemini 2.5 Flash | $2.50 | $25.00 | 快速响应、中等复杂度 | <80ms |
| GPT-4.1 | $8.00 | $80.00 | 复杂推理、高精度需求 | <100ms |
| Claude Sonnet 4.5 | $15.00 | $150.00 | 长文本分析、安全敏感 | <120ms |
成本节省分析:选择DeepSeek V3.2而非Claude Sonnet 4.5,每月可节省97%的AI成本。以10M Token/月计算,节省高达$145.80!
Geeignet / Nicht geeignet für
Geeignet für(适合的场景)
- 专业做市商:需要实时订单簿分析和策略优化的量化团队
- 交易所流动性提供商:为多个交易对提供流动性的机构
- 量化开发者:构建基于AI的高频交易系统
- DeFi协议:需要自动做市功能的项目方
- 经纪商:为客户提供增强流动性的服务
Nicht geeignet für(不适合的场景)
- 手动交易者:不依赖程序化交易的个人投资者
- 极低频策略:持仓周期以天或周计的交易方式
- 受监管限制的司法管辖区:当地法律禁止自动做市
- 资金量极小的账户:手续费可能超过潜在收益
Preise und ROI(价格与投资回报)
HolySheep AI定价方案
| 套餐 | 价格 | 包含额度 | 适合规模 | 付款方式 |
|---|---|---|---|---|
| 免费试用 | ¥0 | 初始赠送额度 | 测试/评估 | 微信/支付宝 |
| 标准套餐 | 按量计费 | 无限制 | 中小型做市商 | 微信/支付宝/信用卡 |
| 企业套餐 | 定制 | 专属额度+优先保障 | 大型机构 | 对公转账/微信/支付宝 |
ROI计算示例
假设您的做市系统:- 每分钟进行10次AI分析(DeepSeek V3.2)
- 每次分析消耗500 Token
- 月交易量:43,200次分析
月度AI成本:43,200 × 500 / 1,000,000 × $0.42 = $9.07
潜在收益:假设价差收益0.1%/天,初始资金$50,000,月收益约$1,500
ROI:($1,500 - $9.07) / $9.07 × 100% = 16,436%
Warum HolySheep wählen(为什么选择HolySheep)
HolySheep AI是专为量化交易和做市场景优化的AI服务平台,具备以下核心优势:
- 超低延迟:平均响应时间<50ms,满足高频交易需求
- 极具竞争力的价格:DeepSeek V3.2仅$0.42/MTok,比OpenAI节省85%+
- 多种AI模型:支持GPT-4.1、Claude Sonnet 4.5、Gemini 2.5 Flash、DeepSeek V3.2
- 本地化支付:支持微信支付、支付宝,轻松充值
- 免费试用额度:注册即送初始Credits,无需信用卡
- 稳定可靠:99.9%可用性保障,企业级SLA
👉 立即注册HolySheep AI,开启您的AI做市之旅!
完整做市系统架构
以下是一个完整的做市系统集成示例:#!/usr/bin/env python3
"""
完整做市系统
整合订单簿获取、分析、AI决策、订单执行
"""
import asyncio
import logging
from typing import Dict, Optional
from config import Config
from orderbook_fetcher import BinanceOrderBookFetcher, OrderBook
from analyzer import OrderBookAnalyzer, MarketAnalysis
from holy_sheep_client import HolySheepMarketMaker, AIModel, StrategyRecommendation
from exchange_api import ExchangeAPI # 交易所API封装
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MarketMaker:
"""
完整做市机器人
工作流程:
1. 实时获取订单簿数据
2. 分析市场状态
3. AI生成交易策略
4. 执行订单
5. 风险管理监控
"""
def __init__(self, config: Config):
self.config = config
self.order_book: Optional[OrderBook] = None
# 初始化组件
self.fetcher = BinanceOrderBookFetcher(config.symbol)
self.analyzer = OrderBookAnalyzer(config.symbol)
self.ai_client = HolySheepMarketMaker(
api_key=config.holy_sheep_api_key,
model=AIModel.DEEPSEEK # 使用性价比最高的模型
)
self.exchange = ExchangeAPI(config)
# 状态跟踪
self.position = 0.0
self.last_analysis_time = 0
self.analysis_interval = config.analysis_interval # 分析间隔(秒)
# 风险控制
self.max_position = config.max_position
self.max_daily_loss = config.max_daily_loss
self.daily_pnl = 0.0
async def start(self):
"""启动做市机器人"""
logger.info(f"启动做市机器人: {self.config.symbol}")
# 启动订单簿获取任务
fetch_task = asyncio.create_task(self._run_orderbook_fetcher())
# 主循环
while True:
try:
await self._trading_loop()
except Exception as e:
logger.error(f"交易循环异常: {e}")
await asyncio.sleep(1)
async def _run_orderbook_fetcher(self):
"""运行订单簿获取器"""
try:
await self.fetcher.connect()
except Exception as e:
logger.error(f"订单簿获取器异常: {e}")
raise
async def _trading_loop(self):
"""主交易循环"""
current_time = asyncio.get_event_loop().time()
# 定期执行分析和决策
if current_time - self.last_analysis_time >= self.analysis_interval:
if self.fetcher.order_book:
await self._analyze_and_trade()
self.last_analysis_time = current_time
await asyncio.sleep(0.1) # 避免CPU过度占用
async def _analyze_and_trade(self):
"""分析市场并执行交易"""
order_book = self.fetcher.order_book
# 1. 分析订单簿
analysis = self.analyzer.analyze(order_book)
# 2. 准备数据
market_data = {
"symbol": analysis.symbol,
"mid_price": analysis.mid_price,
"spread_bps": analysis.spread_bps,
"market_depth_5": analysis.market_depth_5,
"imbalance_ratio": analysis.imbalance_ratio,
"volatility_30s": analysis.volatility_30s,
"confidence": analysis.confidence
}
portfolio_state = {
"base_balance