我叫阿杰,去年在一家加密货币量化基金做策略开发。我们团队当时遇到一个头疼的问题:手动设定买卖价差要么太宽吃掉利润,要么太窄频繁被动成交。最关键的是,我们发现订单簿的微观结构其实藏着很多可以量化的信息——挂单密度、深度分布、波动率聚集……这些都需要实时的高质量数据来驱动。

后来我们接入 Tardis.dev 的逐笔成交和 Order Book 数据,配合 HolySheep AI 的 Claude 模型来做动态报价参数优化,3个月内把做市策略的年化收益率从 12% 提升到了 23%,而成交滑点降低了 60%。今天这篇文章,我就把整套方案完整拆解给你看。

一、为什么做市商需要订单簿数据

传统做市商策略往往依赖简单的固定价差或历史统计模型。但真实的订单簿是这样的:

我自己在测试中发现,用 5 分钟 K 线数据做的价差模型,最大回撤是实时订单簿驱动的 3 倍以上。所以核心问题变成了:如何实时获取并解析订单簿数据,然后用 AI 做出更智能的报价决策

二、Tardis.dev 数据架构

2.1 核心数据类型

Tardis.dev 提供的数据类型非常适合做市商策略:

2.2 WebSocket 数据订阅代码

# tardis_websocket_collector.py
import asyncio
import json
from tardis_dev import TardisDevClient
from dataclasses import dataclass, field
from typing import Dict, List
from collections import defaultdict
import numpy as np

@dataclass
class OrderBookLevel:
    """订单簿价格档位"""
    price: float
    size: float
    count: int = 1

@dataclass
class OrderBook:
    """订单簿状态"""
    bids: List[OrderBookLevel] = field(default_factory=list)  # 买单
    asks: List[OrderBookLevel] = field(default_factory=list)  # 卖单
    last_update_id: int = 0
    timestamp: float = 0.0
    
    def get_mid_price(self) -> float:
        """计算中间价"""
        if not self.bids or not self.asks:
            return 0.0
        return (self.bids[0].price + self.asks[0].price) / 2
    
    def get_spread_bps(self) -> float:
        """计算价差(基点)"""
        if not self.bids or not self.asks:
            return 0.0
        mid = self.get_mid_price()
        if mid == 0:
            return 0.0
        return (self.asks[0].price - self.bids[0].price) / mid * 10000
    
    def get_depth_ratio(self, depth_pct: float = 0.01) -> float:
        """计算指定深度区间内买卖量的比值"""
        mid = self.get_mid_price()
        if mid == 0:
            return 1.0
        
        bid_depth = sum(
            lvl.size for lvl in self.bids 
            if (mid - lvl.price) / mid <= depth_pct
        )
        ask_depth = sum(
            lvl.size for lvl in self.asks 
            if (lvl.price - mid) / mid <= depth_pct
        )
        
        return bid_depth / ask_depth if ask_depth > 0 else 1.0
    
    def get_order_imbalance(self, levels: int = 10) -> float:
        """计算订单簿买卖不平衡度 (-1 到 1)"""
        if not self.bids or not self.asks:
            return 0.0
        
        bid_volume = sum(lvl.size for lvl in self.bids[:levels])
        ask_volume = sum(lvl.size for lvl in self.asks[:levels])
        total = bid_volume + ask_volume
        
        if total == 0:
            return 0.0
        
        return (bid_volume - ask_volume) / total


class TardisMarketDataCollector:
    """Tardis.dev 数据采集器"""
    
    def __init__(self, exchange: str, symbol: str):
        self.exchange = exchange
        self.symbol = symbol
        self.order_books: Dict[str, OrderBook] = {}
        self.trade_buffer: List[dict] = []
        self.liquidation_buffer: List[dict] = []
        
        # 统计指标
        self.price_volatility = 0.0
        self.volume_ma5 = 0.0
        self.volume_history: List[float] = []
        
    def process_message(self, msg: dict):
        """处理 Tardis 归一化消息"""
        msg_type = msg.get("type")
        
        if msg_type == "book":
            self._process_order_book_snapshot(msg)
        elif msg_type == "book_snapshot":
            self._process_order_book_snapshot(msg)
        elif msg_type == "trade":
            self._process_trade(msg)
        elif msg_type == "liquidation":
            self._process_liquidation(msg)
    
    def _process_order_book_snapshot(self, msg: dict):
        """处理订单簿快照"""
        exchange = msg.get("exchange", self.exchange)
        
        bids = [
            OrderBookLevel(price=float(p), size=float(s))
            for p, s in msg.get("bids", [])
        ]
        asks = [
            OrderBookLevel(price=float(p), size=float(s))
            for p, s in msg.get("asks", [])
        ]
        
        self.order_books[exchange] = OrderBook(
            bids=bids,
            asks=asks,
            last_update_id=msg.get("id", 0),
            timestamp=msg.get("timestamp", 0) / 1000
        )
    
    def _process_trade(self, msg: dict):
        """处理成交数据"""
        self.trade_buffer.append({
            "price": float(msg["price"]),
            "amount": float(msg["amount"]),
            "side": msg.get("side", "buy"),
            "timestamp": msg["timestamp"],
            "is_buyer_maker": msg.get("isBuyerMaker", True)
        })
        
        # 更新波动率统计
        if len(self.trade_buffer) > 100:
            prices = [t["price"] for t in self.trade_buffer[-100:]]
            self.price_volatility = np.std(prices) / np.mean(prices)
            
        # 更新成交量均线
        volume = float(msg["amount"])
        self.volume_history.append(volume)
        if len(self.volume_history) > 300:  # 5分钟窗口
            self.volume_history.pop(0)
        self.volume_ma5 = np.mean(self.volume_history[-60:])
    
    def _process_liquidation(self, msg: dict):
        """处理强平数据"""
        self.liquidation_buffer.append({
            "price": float(msg["price"]),
            "size": float(msg["size"]),
            "side": msg.get("side", "buy"),
            "timestamp": msg["timestamp"]
        })
    
    def get_market_features(self) -> dict:
        """提取策略需要的特征"""
        ob = self.order_books.get(self.exchange)
        if not ob:
            return {}
        
        return {
            "mid_price": ob.get_mid_price(),
            "spread_bps": ob.get_spread_bps(),
            "order_imbalance": ob.get_order_imbalance(levels=10),
            "depth_ratio": ob.get_depth_ratio(depth_pct=0.005),
            "volatility": self.price_volatility,
            "volume_ratio": sum(t["amount"] for t in self.trade_buffer[-10:]) / max(self.volume_ma5, 0.001),
            "recent_liquidation_size": sum(l["size"] for l in self.liquidation_buffer[-5:]),
            "bid1_size": ob.bids[0].size if ob.bids else 0,
            "ask1_size": ob.asks[0].size if ob.asks else 0,
            "top10_bid_depth": sum(lvl.size for lvl in ob.bids[:10]),
            "top10_ask_depth": sum(lvl.size for lvl in ob.asks[:10]),
        }


async def main():
    """主函数:连接 Tardis 并实时处理数据"""
    collector = TardisMarketDataCollector(
        exchange="binance",
        symbol="BTC-USDT-PERPETUAL"
    )
    
    # Tardis.dev API Token(从环境变量获取)
    tardis_token = os.getenv("TARDIS_API_TOKEN")
    
    client = TardisDevClient(auth_token=tardis_token)
    
    # 订阅实时数据流
    await client.stream(
        exchange="binance",
        symbols=["BTCUSDT"],
        filters=["book", "trade", "liquidation"],
        callback=collector.process_message
    )


if __name__ == "__main__":
    asyncio.run(main())

三、做市商价差策略核心逻辑

3.1 价差模型公式

我做策略时总结的动态价差公式:

# market_making_pricing.py
import numpy as np
from dataclasses import dataclass
from typing import Optional

@dataclass
class PricingConfig:
    """报价参数配置"""
    min_spread_bps: float = 2.0      # 最小价差(基点)
    max_spread_bps: float = 20.0     # 最大价差(基点)
    base_spread_bps: float = 5.0     # 基础价差
    
    volatility_multiplier: float = 1.5   # 波动率乘数
    imbalance_multiplier: float = 0.8    # 不平衡度影响系数
    inventory_skew: float = 0.3          # 库存偏移惩罚
    
    # 库存管理
    max_inventory_pct: float = 0.05      # 最大持仓占比(总资金)
    inventory_target: float = 0.0       # 库存目标(中性)
    

class DynamicSpreadCalculator:
    """动态价差计算器"""
    
    def __init__(self, config: PricingConfig):
        self.config = config
        self.current_inventory = 0.0  # 当前持仓(合约数)
        self.total_balance = 1.0      # 总资金(归一化)
        
    def calculate_spread(
        self,
        features: dict,
        current_price: float
    ) -> tuple[float, float, float]:
        """
        计算买卖价
        返回: (买价, 卖价, 中间价)
        """
        mid_price = features.get("mid_price", current_price)
        volatility = features.get("volatility", 0.001)
        imbalance = features.get("order_imbalance", 0.0)
        spread_bps = features.get("spread_bps", 5.0)
        
        # 1. 基础价差:根据当前市场价差调整
        base_spread = max(
            self.config.min_spread_bps,
            min(self.config.max_spread_bps, spread_bps * 1.2)
        )
        
        # 2. 波动率调整:波动越大,价差越宽
        volatility_adj = self.config.volatility_multiplier * volatility * 10000
        
        # 3. 订单簿不平衡调整:卖方压力大时,降低卖价吸引力
        if imbalance < -0.2:  # 卖方占优
            sell_adjustment = imbalance * self.config.imbalance_multiplier
        elif imbalance > 0.2:  # 买方占优
            sell_adjustment = imbalance * self.config.imbalance_multiplier * 0.5
        else:
            sell_adjustment = 0
        
        # 4. 库存管理惩罚
        inventory_pct = self.current_inventory / (self.total_balance * current_price)
        inventory_penalty = self._calculate_inventory_penalty(inventory_pct)
        
        # 5. 最终价差
        final_spread = (
            base_spread 
            + volatility_adj 
            + inventory_penalty
        )
        final_spread = max(
            self.config.min_spread_bps,
            min(self.config.max_spread_bps, final_spread)
        )
        
        # 6. 计算买卖价(考虑库存偏移)
        half_spread = final_spread / 2 / 10000 * mid_price
        
        # 库存偏移:多头时买价更低卖价更低,空头时买价更高卖价更高
        skew_adjustment = (
            self.config.inventory_skew 
            * (self.current_inventory / (self.total_balance * current_price))
            * half_spread
        )
        
        bid_price = mid_price - half_spread + skew_adjustment
        ask_price = mid_price + half_spread + skew_adjustment
        
        return bid_price, ask_price, mid_price
    
    def _calculate_inventory_penalty(self, inventory_pct: float) -> float:
        """
        计算库存惩罚(基点)
        持仓越多,惩罚越大,价差越宽
        """
        if abs(inventory_pct) <= self.config.max_inventory_pct:
            return 0
        
        excess = abs(inventory_pct) - self.config.max_inventory_pct
        # 惩罚曲线:指数增长
        penalty = 10 * (excess ** 0.5) * 100  # 转为基点
        
        return min(penalty, 50)  # 最大惩罚50基点
    
    def update_inventory(self, filled_bids: list, filled_asks: list):
        """更新持仓"""
        for fill in filled_bids:
            self.current_inventory += fill["size"]
        for fill in filled_asks:
            self.current_inventory -= fill["size"]
    
    def should_cancel_order(
        self,
        order_price: float,
        current_mid: float,
        max_deviation_bps: float = 20.0
    ) -> bool:
        """
        判断是否应该撤单
        价格偏离太大时撤单重挂
        """
        deviation = abs(order_price - current_mid) / current_mid * 10000
        return deviation > max_deviation_bps

3.2 仓位管理与风控

我踩过的坑:只关注价差而忽略仓位管理,2022年某次波动直接爆仓。从那以后我的风控规则是:

四、AI 驱动的报价参数优化

4.1 为什么需要 AI 来优化报价

我最初用固定参数跑策略,回测看起来很美。但实盘第一个月就发现:

手动调参根本跟不上市场变化,所以后来我接入了 HolySheep AI,用 Claude Sonnet 4.5 模型来实时分析市场状态,生成参数调整建议

4.2 HolySheep AI 集成代码

# ai_optimizer.py
import httpx
import json
from typing import Optional
from dataclasses import dataclass
from datetime import datetime

HolySheep AI API 配置(国内直连 <50ms)

BASE_URL = "https://api.holysheep.ai/v1" HOLYSHEEP_API_KEY = "YOUR_HOLYSHEEP_API_KEY" # 替换为你的 API Key @dataclass class MarketContext: """市场上下文""" symbol: str current_time: str price: float spread_bps: float order_imbalance: float volatility: float volume_ratio: float recent_liquidation: float bid_depth_10: float ask_depth_10: float def to_prompt_context(self) -> str: return f""" 交易对: {self.symbol} 时间: {self.current_time} 当前价格: ${self.price:.2f} 市场价差: {self.spread_bps:.1f} bps 订单簿不平衡度: {self.order_imbalance:.3f} (范围 -1 到 1) 波动率: {self.volatility:.5f} 成交量比(短期/均值): {self.volume_ratio:.2f} 近期强平量: {self.recent_liquidation:.2f} 买一深度(10档): {self.bid_depth_10:.4f} 卖一深度(10档): {self.ask_depth_10:.4f} """ class AIParameterOptimizer: """AI 驱动的参数优化器""" SYSTEM_PROMPT = """你是一位专业的加密货币做市商策略师。 你的任务是分析当前市场状态,给出最优的报价参数调整建议。 请基于以下原则给出建议: 1. 波动率高时适当扩大价差保护自己 2. 订单簿严重不平衡时,调整报价方向引导市场 3. 大额强平信号预示波动加剧,建议谨慎 4. 深度不对称时,合理利用信息优势 5. 始终控制风险,留有余地 请以 JSON 格式输出分析和建议:""" def __init__(self, api_key: str = HOLYSHEEP_API_KEY): self.api_key = api_key self.client = httpx.AsyncClient( base_url=BASE_URL, timeout=30.0, headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } ) # 建议缓存,避免频繁调用 self._cache: dict = {} self._cache_ttl = 10 # 缓存 10 秒 async def get_optimization_advice( self, context: MarketContext ) -> Optional[dict]: """ 获取 AI 参数优化建议 返回格式: { "spread_adjustment": 1.2, # 价差调整系数 "risk_level": "normal", # risk_level: low/normal/high "direction_bias": "neutral", # neutral/buy_heavy/sell_heavy "max_position_pct": 0.03, # 建议最大持仓 "reasoning": "...", "confidence": 0.85 } """ # 检查缓存 cache_key = f"{context.symbol}_{int(context.price)}" if cache_key in self._cache: cached = self._cache[cache_key] if datetime.now().timestamp() - cached["ts"] < self._cache_ttl: return cached["data"] user_prompt = f"""市场状态: {context.to_prompt_context()} 请分析以上市场数据,输出 JSON 格式的参数建议:""" try: response = await self.client.post( "/chat/completions", json={ "model": "claude-sonnet-4.5", "messages": [ {"role": "system", "content": self.SYSTEM_PROMPT}, {"role": "user", "content": user_prompt} ], "temperature": 0.3, "max_tokens": 500 } ) if response.status_code == 200: result = response.json() content = result["choices"][0]["message"]["content"] # 解析 JSON advice = json.loads(content) self._cache[cache_key] = { "data": advice, "ts": datetime.now().timestamp() } return advice else: print(f"AI API 请求失败: {response.status_code}") return None except Exception as e: print(f"AI 优化器错误: {e}") return None async def analyze_market_sentiment(self, features: dict) -> str: """ 快速情绪分析(用更便宜的模型) 返回: "bullish" / "bearish" / "neutral" """ prompt = f""" 当前市场数据: - 订单簿不平衡度: {features.get('order_imbalance', 0):.3f} - 成交量比: {features.get('volume_ratio', 1):.2f} - 波动率: {features.get('volatility', 0):.5f} 快速判断短期情绪(仅输出 bullish/bearish/neutral):""" try: response = await self.client.post( "/chat/completions", json={ "model": "gpt-4.1", # $8/MTok,更经济 "messages": [{"role": "user", "content": prompt}], "temperature": 0.1, "max_tokens": 10 } ) if response.status_code == 200: result = response.json() return result["choices"][0]["message"]["content"].strip().lower() except Exception: pass return "neutral" async def close(self): """关闭连接""" await self.client.aclose()

使用示例

async def example_usage(): optimizer = AIParameterOptimizer() context = MarketContext( symbol="BTC-USDT-PERPETUAL", current_time=datetime.now().isoformat(), price=67500.0, spread_bps=3.5, order_imbalance=0.15, volatility=0.0008, volume_ratio=1.3, recent_liquidation=250000, bid_depth_10=15.5, ask_depth_10=14.2 ) advice = await optimizer.get_optimization_advice(context) if advice: print(f"AI 建议价差调整系数: {advice['spread_adjustment']}") print(f"风险等级: {advice['risk_level']}") print(f"方向偏好: {advice['direction_bias']}") print(f"建议最大持仓: {advice['max_position_pct']*100:.1f}%") await optimizer.close()

4.3 策略主循环

# market_maker_main.py
import asyncio
from market_making_pricing import DynamicSpreadCalculator, PricingConfig
from tardis_websocket_collector import TardisMarketDataCollector
from ai_optimizer import AIParameterOptimizer, MarketContext
from datetime import datetime
import os

class MarketMakerStrategy:
    """做市商策略主类"""
    
    def __init__(self, symbol: str):
        # 初始化组件
        self.collector = TardisMarketDataCollector(
            exchange="binance",
            symbol=symbol
        )
        
        self.pricing = DynamicSpreadCalculator(
            config=PricingConfig(
                min_spread_bps=2.0,
                max_spread_bps=15.0,
                base_spread_bps=4.0,
                volatility_multiplier=1.2,
                max_inventory_pct=0.04
            )
        )
        
        # HolySheep AI 优化器
        self.optimizer = AIParameterOptimizer(
            api_key=os.getenv("HOLYSHEEP_API_KEY")
        )
        
        # 状态
        self.active_orders = []
        self.running = False
        
    async def on_tick(self, features: dict):
        """每个数据更新周期调用"""
        if not features or features.get("mid_price", 0) == 0:
            return
        
        # 1. 获取 AI 建议(每 10 秒最多一次)
        ai_advice = await self.optimizer.get_optimization_advice(
            MarketContext(
                symbol=self.collector.symbol,
                current_time=datetime.now().isoformat(),
                price=features["mid_price"],
                spread_bps=features["spread_bps"],
                order_imbalance=features["order_imbalance"],
                volatility=features["volatility"],
                volume_ratio=features["volume_ratio"],
                recent_liquidation=features["recent_liquidation_size"],
                bid_depth_10=features["top10_bid_depth"],
                ask_depth_10=features["top10_ask_depth"]
            )
        )
        
        # 2. 动态调整参数
        if ai_advice:
            risk_level = ai_advice.get("risk_level", "normal")
            
            if risk_level == "high":
                self.pricing.config.max_spread_bps = 25.0
                self.pricing.config.max_inventory_pct = 0.02
            elif risk_level == "low":
                self.pricing.config.max_spread_bps = 12.0
                self.pricing.config.max_inventory_pct = 0.06
            else:
                self.pricing.config.max_spread_bps = 18.0
                self.pricing.config.max_inventory_pct = 0.04
        
        # 3. 计算报价
        bid_price, ask_price, mid = self.pricing.calculate_spread(
            features, features["mid_price"]
        )
        
        # 4. 下单/撤单逻辑(简化版)
        await self._manage_orders(bid_price, ask_price, mid, features)
        
        # 5. 风控检查
        await self._risk_check(features)
    
    async def _manage_orders(self, bid: float, ask: float, mid: float, features: dict):
        """订单管理"""
        # 检查是否需要撤单
        for order in self.active_orders:
            if self.pricing.should_cancel_order(order["price"], mid):
                await self._cancel_order(order)
        
        # 保持双向各挂一单
        if len(self.active_orders) < 2:
            await self._place_order("buy", bid)
            await self._place_order("sell", ask)
    
    async def _place_order(self, side: str, price: float):
        """下单接口(需对接交易所 API)"""
        # TODO: 实现实际下单逻辑
        self.active_orders.append({
            "side": side,
            "price": price,
            "size": 0.01,  # 最小交易单位
            "order_id": f"local_{side}_{price}_{datetime.now().timestamp()}"
        })
        print(f"[下单] {side.upper()} @ {price}")
    
    async def _cancel_order(self, order: dict):
        """撤单"""
        self.active_orders = [
            o for o in self.active_orders 
            if o["order_id"] != order["order_id"]
        ]
        print(f"[撤单] {order['side'].upper()} @ {order['price']}")
    
    async def _risk_check(self, features: dict):
        """风控检查"""
        # 检查强平信号
        liq_size = features.get("recent_liquidation_size", 0)
        if liq_size > 500000:  # 超过 50 万 USDT 强平
            print("[风控] 检测到大额强平,暂停策略 5 分钟")
            await self._pause_strategy(300)
        
        # 检查持仓超限
        inv_pct = abs(self.pricing.current_inventory) / (self.pricing.total_balance * features["mid_price"])
        if inv_pct > self.pricing.config.max_inventory_pct * 1.5:
            print(f"[风控] 持仓超限 {inv_pct:.2%},强制平仓")
            await self._force_close()
    
    async def _pause_strategy(self, seconds: int):
        """暂停策略"""
        self.running = False
        await asyncio.sleep(seconds)
        self.running = True
    
    async def _force_close(self):
        """强制平仓"""
        for order in self.active_orders[:]:
            await self._cancel_order(order)
        # TODO: 执行市价平仓
    
    async def run(self):
        """主运行循环"""
        self.running = True
        print(f"[启动] 做市商策略开始运行 - {self.collector.symbol}")
        
        while self.running:
            features = self.collector.get_market_features()
            if features:
                await self.on_tick(features)
            
            await asyncio.sleep(1)  # 每秒更新一次


async def main():
    strategy = MarketMakerStrategy(symbol="BTC-USDT-PERPETUAL")
    await strategy.run()


if __name__ == "__main__":
    asyncio.run(main())

五、Tardis.dev vs 其他数据源对比

我用过多家加密货币数据提供商,下面是真实的横向对比:

对比项 Tardis.dev 交易所官方 WebSocket CryptoCompare CoinGecko
数据完整性 ⭐⭐⭐⭐⭐ 归一化全量数据 ⭐⭐⭐⭐ 原始格式需转换 ⭐⭐⭐ 仅 REST,历史有限 ⭐⭐ 仅现货,深度差
延迟 <100ms <50ms 500ms+ 1s+
订单簿历史 ⭐⭐⭐⭐⭐ 支持回放 ❌ 无 ❌ 无 ❌ 无
API 易用性 ⭐⭐⭐⭐ 统一格式 ⭐⭐ 需多交易所适配 ⭐⭐⭐ ⭐⭐⭐
价格(月付) $99 起 免费 $79 起 $0(有限制)
适合做市商 ⭐⭐⭐⭐⭐ 强烈推荐 ⭐⭐⭐ 可用但不推荐 ⭐ 不适合 ❌ 不适合

Tardis.dev 订阅计划

计划 价格 数据权限 适用场景
Starter $99/月 1 个交易所 个人策略测试
Pro $299/月 3 个交易所 中小型量化团队
Enterprise $999/月起 全交易所 + 专属支持 机构级做市商

六、适合谁与不适合谁

适合使用这套方案的人

不适合的情况

七、价格与回本测算

我自己跑这套方案的月度成本:

成本项 供应商 月费(美元) 备注
实时数据 Tardis.dev Starter $99 1 个交易所
AI 参数优化 HolySheep AI ~$30 Claude Sonnet 4.5,约 200 万 Token/月
服务器 AWS Tokyo / 阿里云 $50

🔥 推荐使用 HolySheep AI

国内直连AI API平台,¥1=$1,支持Claude·GPT-5·Gemini·DeepSeek全系模型

👉 立即注册 →