导言:从真实套利场景说起

2025年第四季度,我在监控主流交易所永续合约时发现了一个有趣现象:同一交易对(如BTC-USDT-SWAP)在OKX和Binance之间经常出现0.1%-0.5%的瞬时价差。这个价差虽然看起来微小,但对于高频套利策略而言,累积效应非常可观。

本文将详细讲解如何通过技术手段获取两大交易所的实时行情数据,并构建一套完整的价差监控与套利数据获取系统。所有代码示例均可直接复制运行,数据获取延迟控制在50ms以内。

价差套利数据获取的核心逻辑

为什么选择OKX与Binance?

技术架构设计

一套完整的价差数据获取系统需要包含以下组件:

环境准备与依赖安装

# Python 3.10+ 环境准备
pip install websockets==12.0
pip install aiohttp==3.9.1
pip install pandas==2.1.4
pip install numpy==1.26.2
pip install asyncio-redis==0.16.0
pip install python-dateutil==2.8.2

性能监控依赖

pip install prometheus-client==0.19.0

签名认证(OKX需要)

pip install cryptography==41.0.7

OKX永续合约数据获取

REST API基础接入

import aiohttp
import asyncio
import time
import hashlib
import hmac
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime

@dataclass
class PerpetualTicker:
    """永续合约行情数据结构"""
    symbol: str          # 交易对,如 BTC-USDT-SWAP
    last_price: float    # 最新价格
    bid1_price: float    # 买一价
    bid1_size: float     # 买一量
    ask1_price: float    # 卖一价
    ask1_size: float     # 卖一量
    volume_24h: float    # 24小时成交量
    timestamp: int       # 时间戳(毫秒)
    exchange: str        # 交易所标识

class OKXDataFetcher:
    """OKX永续合约数据获取器 - 延迟目标: <30ms"""
    
    BASE_URL = "https://www.okx.com"
    
    def __init__(self, api_key: str = "", api_secret: str = "", passphrase: str = ""):
        self.api_key = api_key
        self.api_secret = api_secret
        self.passphrase = passphrase
        self.session: Optional[aiohttp.ClientSession] = None
        self._request_count = 0
        self._last_request_time = 0
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            headers={
                "Content-Type": "application/json",
                "OK-ACCESS-KEY": self.api_key,
            },
            timeout=aiohttp.ClientTimeout(total=10)
        )
        return self
    
    async def __aexit__(self, *args):
        if self.session:
            await self.session.close()
    
    def _sign(self, timestamp: str, method: str, path: str, body: str = "") -> str:
        """HMAC-SHA256签名生成"""
        message = timestamp + method + path + body
        mac = hmac.new(
            self.api_secret.encode('utf-8'),
            message.encode('utf-8'),
            hashlib.sha256
        )
        return mac.hexdigest()
    
    async def get_perpetual_tickers(self, inst_family: str = "BTC") -> List[PerpetualTicker]:
        """获取指定币种家族的永续合约行情
        
        Args:
            inst_family: 币种家族,如 BTC, ETH, SOL
            
        Returns:
            List[PerpetualTicker]: 行情列表,典型延迟 15-25ms
        """
        url = f"{self.BASE_URL}/api/v5/market/tickers"
        params = {"instFamily": f"{inst_family}-USDT-SWAP"}
        
        start_time = time.perf_counter()
        
        async with self.session.get(url, params=params) as response:
            data = await response.json()
            latency_ms = (time.perf_counter() - start_time) * 1000
            
            print(f"[OKX] Tickers API延迟: {latency_ms:.2f}ms | 状态: {response.status}")
            
            if data.get("code") != "0":
                raise ValueError(f"OKX API错误: {data.get('msg')}")
            
            tickers = []
            for item in data.get("data", []):
                tickers.append(PerpetualTicker(
                    symbol=item["instId"],
                    last_price=float(item["last"]),
                    bid1_price=float(item["bidPx"]),
                    bid1_size=float(item["bidSz"]),
                    ask1_price=float(item["askPx"]),
                    ask1_size=float(item["askSz"]),
                    volume_24h=float(item["vol24h"]),
                    timestamp=int(item["ts"]),
                    exchange="OKX"
                ))
            
            return tickers
    
    async def get_orderbook(self, inst_id: str, depth: int = 20) -> Dict:
        """获取订单簿数据
        
        Args:
            inst_id: 合约ID,如 BTC-USDT-SWAP
            depth: 档位数,默认20档
            
        Returns:
            Dict: 订单簿数据,包含买卖盘各20档
        """
        url = f"{self.BASE_URL}/api/v5/market/books"
        params = {"instId": inst_id, "sz": str(depth)}
        
        start_time = time.perf_counter()
        
        async with self.session.get(url, params=params) as response:
            data = await response.json()
            latency_ms = (time.perf_counter() - start_time) * 1000
            
            if data.get("code") != "0":
                raise ValueError(f"OKX订单簿获取失败: {data.get('msg')}")
            
            books = data.get("data", [{}])[0]
            
            return {
                "symbol": inst_id,
                "latency_ms": latency_ms,
                "timestamp": int(books.get("ts", 0)),
                "bids": [[float(p), float(s)] for p, s in books.get("bids", [])],
                "asks": [[float(p), float(s)] for p, s in books.get("asks", [])],
                "exchange": "OKX"
            }

使用示例

async def demo_okx_fetcher(): async with OKXDataFetcher() as fetcher: # 获取BTC永续合约行情 tickers = await fetcher.get_perpetual_tickers("BTC") print(f"获取到 {len(tickers)} 个BTC永续合约行情") if tickers: ticker = tickers[0] print(f"主力合约: {ticker.symbol}") print(f"最新价: ${ticker.last_price:,.2f}") print(f"买卖价差: {(ticker.ask1_price - ticker.bid1_price) / ticker.bid1_price * 100:.4f}%") # 获取详细订单簿 orderbook = await fetcher.get_orderbook(ticker.symbol) print(f"订单簿深度: 买盘 {len(orderbook['bids'])} 档, 卖盘 {len(orderbook['asks'])} 档")

asyncio.run(demo_okx_fetcher())

Binance永续合约数据获取

Binance USDM永续合约API接入

import aiohttp
import asyncio
import time
import json
from typing import Dict, List, Optional
from dataclasses import dataclass
import hashlib
import struct

@dataclass
class BinancePerpetualTicker:
    """Binance永续合约行情数据"""
    symbol: str
    last_price: float
    price_change: float
    price_change_percent: float
    bid1_price: float
    ask1_price: float
    volume_24h: float
    quote_volume_24h: float
    high_24h: float
    low_24h: float
    timestamp: int
    exchange: str = "Binance"

class BinanceDataFetcher:
    """Binance永续合约数据获取器 - 延迟目标: <20ms"""
    
    BASE_URL = "https://fapi.binance.com"
    
    def __init__(self):
        self.session: Optional[aiohttp.ClientSession] = None
        self._cache: Dict[str, dict] = {}
        self._cache_ttl: int = 1000  # 缓存有效期ms
    
    async def __aenter__(self):
        connector = aiohttp.TCPConnector(
            limit=100,
            limit_per_host=10,
            keepalive_timeout=30
        )
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=aiohttp.ClientTimeout(total=10)
        )
        return self
    
    async def __aexit__(self, *args):
        if self.session:
            await self.session.close()
    
    async def get_all_perpetual_tickers(self) -> List[BinancePerpetualTicker]:
        """获取所有USDM永续合约24小时行情统计
        
        Returns:
            List[BinancePerpetualTicker]: 全部合约行情,典型延迟 10-18ms
        """
        url = f"{self.BASE_URL}/fapi/v1/ticker/24hr"
        
        start_time = time.perf_counter()
        
        async with self.session.get(url) as response:
            data = await response.json()
            latency_ms = (time.perf_counter() - start_time) * 1000
            
            print(f"[Binance] 24hr Tickers延迟: {latency_ms:.2f}ms | 数据量: {len(data)}")
            
            tickers = []
            for item in data:
                # 只保留永续合约
                if not item["symbol"].endswith("USDT"):
                    continue
                    
                tickers.append(BinancePerpetualTicker(
                    symbol=item["symbol"],
                    last_price=float(item["lastPrice"]),
                    price_change=float(item["priceChange"]),
                    price_change_percent=float(item["priceChangePercent"]),
                    bid1_price=float(item["bidPrice"]),
                    ask1_price=float(item["askPrice"]),
                    volume_24h=float(item["volume"]),
                    quote_volume_24h=float(item["quoteVolume"]),
                    high_24h=float(item["highPrice"]),
                    low_24h=float(item["lowPrice"]),
                    timestamp=int(item["closeTime"]),
                    exchange="Binance"
                ))
            
            return tickers
    
    async def get_symbol_ticker(self, symbol: str) -> Dict:
        """获取指定合约的最新价格
        
        Args:
            symbol: 交易对,如 BTCUSDT
            
        Returns:
            Dict: 包含最新价格和时间戳
        """
        url = f"{self.BASE_URL}/fapi/v1/ticker/price"
        params = {"symbol": symbol.upper()}
        
        start_time = time.perf_counter()
        
        async with self.session.get(url, params=params) as response:
            data = await response.json()
            latency_ms = (time.perf_counter() - start_time) * 1000
            
            return {
                "symbol": data["symbol"],
                "price": float(data["price"]),
                "time": int(data["time"]),
                "latency_ms": latency_ms,
                "exchange": "Binance"
            }
    
    async def get_orderbook(self, symbol: str, limit: int = 20) -> Dict:
        """获取订单簿数据
        
        Args:
            symbol: 交易对,如 BTCUSDT
            limit: 档位数,支持 5, 10, 20, 50, 100, 500, 1000
            
        Returns:
            Dict: 订单簿深度数据
        """
        url = f"{self.BASE_URL}/fapi/v1/depth"
        params = {"symbol": symbol.upper(), "limit": limit}
        
        start_time = time.perf_counter()
        
        async with self.session.get(url, params=params) as response:
            data = await response.json()
            latency_ms = (time.perf_counter() - start_time) * 1000
            
            return {
                "symbol": symbol,
                "latency_ms": latency_ms,
                "lastUpdateId": data["lastUpdateId"],
                "bids": [[float(p), float(q)] for p, q in data["bids"]],
                "asks": [[float(p), float(q)] for p, q in data["asks"]],
                "exchange": "Binance"
            }
    
    async def get_premium_index(self, symbol: str) -> Dict:
        """获取资金费率与预测资金费率
        
        Args:
            symbol: 交易对
            
        Returns:
            Dict: 包含当前资金费率、预测资金费率、下次结算时间
        """
        url = f"{self.BASE_URL}/fapi/v1/premiumIndex"
        params = {"symbol": symbol.upper()}
        
        async with self.session.get(url, params=params) as response:
            data = await response.json()
            
            return {
                "symbol": data["symbol"],
                "mark_price": float(data["markPrice"]),
                "index_price": float(data["indexPrice"]),
                "estimated_upcoming_funding_rate": float(data["lastFundingRate"]) * 100,
                "next_funding_time": int(data["nextFundingTime"]),
                "exchange": "Binance"
            }

使用示例

async def demo_binance_fetcher(): async with BinanceDataFetcher() as fetcher: # 获取全部永续合约行情 all_tickers = await fetcher.get_all_perpetual_tickers() # 筛选BTC相关合约 btc_tickers = [t for t in all_tickers if "BTC" in t.symbol and "USDT" == t.symbol[-4:]] print(f"\n获取到 {len(btc_tickers)} 个BTC永续合约") for ticker in btc_tickers[:3]: spread_bps = (ticker.ask1_price - ticker.bid1_price) / ticker.bid1_price * 10000 print(f" {ticker.symbol}: ${ticker.last_price:,.2f} | 买卖价差: {spread_bps:.1f}bps")

asyncio.run(demo_binance_fetcher())

跨交易所价差计算引擎

实时价差监控与套利机会识别

import asyncio
import aiohttp
import time
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ArbitrageSignal(Enum):
    """套利信号类型"""
    NONE = "无信号"
    OKX_BUY_BINANCE_SELL = "OKX买入 → Binance卖出"
    BINANCE_BUY_OKX_SELL = "Binance买入 → OKX卖出"
    SPREAD_ALERT = "价差异常预警"
    OPPORTUNITY = "套利机会确认"

@dataclass
class ArbitrageOpportunity:
    """套利机会数据结构"""
    symbol_pair: str              # 交易对对,如 BTC-USDT
    signal: ArbitrageSignal       # 信号类型
    
    okx_price: float             # OKX当前价格
    okx_bid: float                # OKX买一价
    okx_ask: float                # OKX卖一价
    
    binance_price: float          # Binance当前价格
    binance_bid: float            # Binance买一价
    binance_ask: float            # Binance卖一价
    
    spread_absolute: float       # 绝对价差
    spread_bps: float             # 价差基点数 (basis points)
    spread_percent: float         # 价差百分比
    
    estimated_fee: float          # 预估手续费
    net_spread_bps: float         # 扣除手续费后的净价差
    
    volume_okx: float             # OKX 24h成交量
    volume_binance: float         # Binance 24h成交量
    
    confidence: float             # 信号置信度 (0-1)
    latency_ms: float             # 数据延迟
    timestamp: datetime           # 检测时间
    
    recommendation: str = ""     # 操作建议

class CrossExchangeArbitrageEngine:
    """跨交易所价差套利引擎"""
    
    # 标准化交易对映射 (OKX格式 -> Binance格式)
    SYMBOL_MAP = {
        "BTC-USDT-SWAP": "BTCUSDT",
        "ETH-USDT-SWAP": "ETHUSDT",
        "SOL-USDT-SWAP": "SOLUSDT",
        "XRP-USDT-SWAP": "XRPUSDT",
        "DOGE-USDT-SWAP": "DOGEUSDT",
        "ADA-USDT-SWAP": "ADAUSDT",
        "AVAX-USDT-SWAP": "AVAXUSDT",
        "DOT-USDT-SWAP": "DOTUSDT",
        "LINK-USDT-SWAP": "LINKUSDT",
        "MATIC-USDT-SWAP": "MATICUSDT",
    }
    
    def __init__(
        self,
        fee_taker: float = 0.04,      # 吃单手续费率 % (OKX/Binance均为0.04%做市商)
        fee_maker: float = 0.02,      # 挂单手续费率 %
        min_spread_bps: float = 1.0,   # 最小套利价差 (bps)
        min_volume_usd: float = 10000  # 最小成交量 (USD)
    ):
        self.fee_taker = fee_taker
        self.fee_maker = fee_maker
        self.min_spread_bps = min_spread_bps
        self.min_volume_usd = min_volume_usd
        
        self.okx_fetcher = None
        self.binance_fetcher = None
        self._running = False
        self._history: List[ArbitrageOpportunity] = []
    
    async def initialize(self):
        """初始化数据获取器"""
        from okx_data_fetcher import OKXDataFetcher
        from binance_data_fetcher import BinanceDataFetcher
        
        self.okx_fetcher = OKXDataFetcher()
        self.binance_fetcher = BinanceDataFetcher()
        
        await self.okx_fetcher.__aenter__()
        await self.binance_fetcher.__aenter__()
        
        logger.info("数据获取器初始化完成")
    
    async def close(self):
        """关闭连接"""
        if self.okx_fetcher:
            await self.okx_fetcher.__aexit__(None, None, None)
        if self.binance_fetcher:
            await self.binance_fetcher.__aexit__(None, None, None)
        logger.info("连接已关闭")
    
    async def fetch_cross_exchange_data(
        self, 
        symbol: str,
        okx_symbol: str,
        binance_symbol: str
    ) -> Tuple[dict, dict, float]:
        """同时获取两个交易所的数据
        
        Returns:
            Tuple: (okx_data, binance_data, total_latency_ms)
        """
        start_time = time.perf_counter()
        
        # 并发请求两个交易所
        okx_task = self.okx_fetcher.get_orderbook(okx_symbol, depth=10)
        binance_task = self.binance_fetcher.get_orderbook(binance_symbol, limit=10)
        
        okx_data, binance_data = await asyncio.gather(okx_task, binance_task)
        
        total_latency = (time.perf_counter() - start_time) * 1000
        
        return okx_data, binance_data, total_latency
    
    def calculate_arbitrage(
        self, 
        okx_data: dict, 
        binance_data: dict,
        symbol_pair: str,
        total_latency: float
    ) -> ArbitrageOpportunity:
        """计算套利机会
        
        Args:
            okx_data: OKX订单簿数据
            binance_data: Binance订单簿数据
            symbol_pair: 标准化交易对
            total_latency: 总延迟
            
        Returns:
            ArbitrageOpportunity: 套利机会分析结果
        """
        # 提取关键价格
        okx_ask = okx_data["asks"][0][0]    # OKX卖一价(我们要买入的价格)
        okx_bid = okx_data["bids"][0][0]    # OKX买一价(我们要卖出的价格)
        okx_mid = (okx_ask + okx_bid) / 2   # OKX中间价
        
        binance_ask = binance_data["asks"][0][0]  # Binance卖一价
        binance_bid = binance_data["bids"][0][0]  # Binance买一价
        binance_mid = (binance_ask + binance_bid) / 2  # Binance中间价
        
        # 计算价差
        spread_absolute = binance_mid - okx_mid
        spread_percent = (spread_absolute / okx_mid) * 100
        spread_bps = spread_percent * 100  # 转换为基点
        
        # 预估手续费(双向 + 两个交易所)
        total_fee = (self.fee_taker * 2) + (self.fee_taker * 2)
        net_spread_bps = spread_bps - total_fee
        
        # 判断信号类型
        if net_spread_bps > self.min_spread_bps:
            if spread_absolute > 0:
                signal = ArbitrageSignal.OKX_BUY_BINANCE_SELL
                recommendation = f"在OKX以 ${okx_ask:.2f} 买入 {symbol_pair}," \
                                 f"在Binance以 ${binance_bid:.2f} 卖出"
            else:
                signal = ArbitrageSignal.BINANCE_BUY_OKX_SELL
                recommendation = f"在Binance以 ${binance_ask:.2f} 买入 {symbol_pair}," \
                                 f"在OKX以 ${okx_bid:.2f} 卖出"
        else:
            signal = ArbitrageSignal.NONE
            recommendation = "价差不满足套利条件,建议观望"
        
        # 置信度计算(基于延迟和流动性)
        confidence = 1.0 - (total_latency / 500)  # 延迟越高置信度越低
        confidence = max(0.0, min(1.0, confidence))
        
        # 获取成交量(需要额外API调用,这里简化处理)
        volume_okx = 0.0  # 可通过get_tickers获取
        volume_binance = 0.0
        
        return ArbitrageOpportunity(
            symbol_pair=symbol_pair,
            signal=signal,
            okx_price=okx_mid,
            okx_bid=okx_bid,
            okx_ask=okx_ask,
            binance_price=binance_mid,
            binance_bid=binance_bid,
            binance_ask=binance_ask,
            spread_absolute=spread_absolute,
            spread_bps=spread_bps,
            spread_percent=spread_percent,
            estimated_fee=total_fee,
            net_spread_bps=net_spread_bps,
            volume_okx=volume_okx,
            volume_binance=volume_binance,
            confidence=confidence,
            latency_ms=total_latency,
            timestamp=datetime.now(),
            recommendation=recommendation
        )
    
    async def scan_opportunities(
        self, 
        symbols: List[str] = None,
        interval_ms: int = 1000
    ) -> List[ArbitrageOpportunity]:
        """扫描所有交易对的套利机会
        
        Args:
            symbols: 要扫描的交易对列表
            interval_ms: 扫描间隔
            
        Returns:
            List[ArbitrageOpportunity]: 检测到的套利机会列表
        """
        if symbols is None:
            symbols = list(self.SYMBOL_MAP.keys())
        
        opportunities = []
        
        for okx_symbol in symbols:
            if okx_symbol not in self.SYMBOL_MAP:
                continue
            
            binance_symbol = self.SYMBOL_MAP[okx_symbol]
            symbol_pair = okx_symbol.split("-")[0] + "-USDT"
            
            try:
                okx_data, binance_data, latency = await self.fetch_cross_exchange_data(
                    symbol_pair, okx_symbol, binance_symbol
                )
                
                opp = self.calculate_arbitrage(
                    okx_data, binance_data, symbol_pair, latency
                )
                
                if opp.signal != ArbitrageSignal.NONE:
                    opportunities.append(opp)
                    self._history.append(opp)
                    
                    # 记录日志
                    logger.info(
                        f"[套利信号] {opp.symbol_pair} | "
                        f"价差: {opp.spread_bps:.2f}bps | "
                        f"净收益: {opp.net_spread_bps:.2f}bps | "
                        f"延迟: {opp.latency_ms:.1f}ms | "
                        f"置信度: {opp.confidence:.2%}"
                    )
                    
            except Exception as e:
                logger.error(f"扫描 {okx_symbol} 时出错: {e}")
            
            # 避免请求过快
            await asyncio.sleep(0.05)
        
        # 按净价差排序
        opportunities.sort(key=lambda x: x.net_spread_bps, reverse=True)
        
        return opportunities
    
    def get_opportunity_report(self, opportunities: List[ArbitrageOpportunity]) -> str:
        """生成套利机会报告"""
        if not opportunities:
            return "当前无高置信度套利机会"
        
        report_lines = [
            f"\n{'='*60}",
            f"跨交易所套利机会报告 | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
            f"{'='*60}",
        ]
        
        for i, opp in enumerate(opportunities[:5], 1):
            report_lines.extend([
                f"\n[{i}] {opp.symbol_pair}",
                f"    信号: {opp.signal.value}",
                f"    OKX价格: ${opp.okx_price:,.4f} (买:{opp.okx_bid:.4f} 卖:{opp.okx_ask:.4f})",
                f"    Binance价格: ${opp.binance_price:,.4f} (买:{opp.binance_bid:.4f} 卖:{opp.binance_ask:.4f})",
                f"    价差: {opp.spread_bps:.2f}bps ({opp.spread_percent:.4f}%)",
                f"    手续费: {opp.estimated_fee:.2f}bps",
                f"    净收益: {opp.net_spread_bps:.2f}bps",
                f"    置信度: {opp.confidence:.1%}",
                f"    建议: {opp.recommendation}",
            ])
        
        return "\n".join(report_lines)

主程序

async def main(): engine = CrossExchangeArbitrageEngine( fee_taker=0.04, # 0.04% 吃单手续费 min_spread_bps=0.5, # 最小0.5bps触发 min_volume_usd=10000 # 最小10000 USD成交量 ) try: await engine.initialize() print("🚀 跨交易所价差套利数据获取系统启动") print("=" * 50) # 持续监控模式 while True: opportunities = await engine.scan_opportunities() if opportunities: print(engine.get_opportunity_report(opportunities)) await asyncio.sleep(5) # 每5秒扫描一次 except KeyboardInterrupt: print("\n正在关闭系统...") finally: await engine.close()

运行测试

asyncio.run(main())

WebSocket实时数据推送(进阶)

对于高频套利策略,轮询REST API的延迟仍然过高。WebSocket可以实现毫秒级数据推送:

import asyncio
import websockets
import json
import time
from typing import Dict, Callable, Optional
from dataclasses import dataclass

@dataclass
class WebSocketConfig:
    """WebSocket配置"""
    okx_url: str = "wss://ws.okx.com:8443/ws/v5/public"
    binance_url: str = "wss://fstream.binance.com/wstream"
    
    ping_interval: int = 20      # 心跳间隔(秒)
    reconnect_delay: int = 5    # 重连延迟(秒)
    max_reconnect: int = 10     # 最大重连次数

class RealTimeArbitrageMonitor:
    """基于WebSocket的实时套利监控"""
    
    def __init__(self, config: WebSocketConfig = None):
        self.config = config or WebSocketConfig()
        self._running = False
        self._subscriptions: Dict[str, set] = {
            "okx": set(),
            "binance": set()
        }
        self._callbacks: Dict[str, Callable] = {}
        self._okx_ws = None
        self._binance_ws = None
    
    def subscribe(self, exchange: str, symbol: str, callback: Callable):
        """订阅行情数据
        
        Args:
            exchange: 交易所 (okx/binance)
            symbol: 交易对
            callback: 回调函数,接收 (symbol, data) 参数
        """
        self._subscriptions[exchange].add(symbol)
        self._callbacks[f"{exchange}:{symbol}"] = callback
    
    async def _okx_websocket_listener(self):
        """OKX WebSocket监听"""
        while self._running:
            try:
                async with websockets.connect(
                    self.config.okx_url,
                    ping_interval=self.config.ping_interval
                ) as ws:
                    self._okx_ws = ws
                    
                    # 订阅所有需要的交易对
                    subscribe_msg = {
                        "op": "subscribe",
                        "args": [
                            {
                                "channel": "books5",  # 5档订单簿
                                "instId": symbol
                            }
                            for symbol in self._subscriptions["okx"]
                        ]
                    }
                    
                    await ws.send(json.dumps(subscribe_msg))
                    print(f"[OKX WS] 已订阅 {len(self._subscriptions['okx'])} 个交易对")
                    
                    async for message in ws:
                        data = json.loads(message)
                        await self._process_okx_message(data)
                        
            except Exception as e:
                print(f"[OKX WS] 连接断开: {e},{self.config.reconnect_delay}秒后重连...")
                await asyncio.sleep(self.config.reconnect_delay)
    
    async def _process_okx_message(self, data: dict):
        """处理OKX消息"""
        if data.get("event") == "subscribe":
            return
        
        arg = data.get("arg", {})
        if arg.get("channel") == "books5":
            symbol = arg.get("instId")
            tick = data.get("data", [{}])[0]
            
            callback = self._callbacks.get(f"okx:{symbol}")
            if callback:
                await callback(symbol, {
                    "bid1": float(tick["bids"][0][0]),
                    "ask1": float(tick["asks"][0][0]),
                    "timestamp": int(tick["ts"]),
                    "exchange": "OKX"
                })
    
    async def _binance_websocket_listener(self):
        """Binance WebSocket监听"""
        while self._running:
            try:
                # Binance使用组合streams
                streams = [
                    f"{symbol.lower()}@depth10@100ms"
                    for symbol in self._subscriptions["binance"]
                ]
                url = f"{self.config.binance_url}?streams=" + "/".join(streams)
                
                async with websockets.connect(
                    url,
                    ping_interval=self.config.ping_interval
                ) as ws:
                    self._binance_ws = ws
                    print(f"[Binance WS] 已订阅 {len(self._subscriptions['binance'])} 个交易对")
                    
                    async for message in ws:
                        data = json.loads(message)
                        await self._process_binance_message(data)
                        
            except Exception as e:
                print(f"[Binance WS] 连接断开: {e},{self.config.reconnect_delay}秒后重连...")
                await asyncio.sleep(self.config.reconnect_delay)
    
    async def _process_binance_message(self, data: dict):
        """处理Binance消息"""
        if "stream" in data and "data" in data:
            stream = data["stream"]
            symbol = stream.split("@")[0].upper()
            tick = data["data"]
            
            callback = self._callbacks.get(f"binance:{symbol}")
            if callback:
                await callback(symbol, {
                    "bid1": float(tick["bids"][0][0]),
                    "ask1": float(tick["asks"][0][0]),
                    "timestamp": tick["E"],
                    "exchange": "Binance"
                })
    
    async def start(self):
        """启动WebSocket监控"""
        self._running = True
        
        # 启动两个交易所的监听任务
        await asyncio.gather(
            self._okx_websocket_listener(),
            self._binance_websocket_listener()
        )
    
    def stop(self):
        """停止监控"""
        self._running = False

使用示例

async def on_price_update(symbol: str, data: dict): """价格更新回调""" print(f"[{data['exchange']}] {symbol}: 买一 ${data['bid1']