在量化交易和加密货币应用开发中,获取实时行情数据是核心需求。WebSocket协议因其全双工通信特性,成为交易所行情数据获取的首选方案。本文将从性能对比、技术实现、延迟优化等多个维度深入探讨,并提供HolySheep AI作为AI行情分析的高性价比替代方案。

WebSocket vs REST API:实时行情获取方案对比

在正式进入技术实现之前,我们先通过实测数据了解不同数据获取方案的核心差异。以下对比基于2024年第四季度主流交易所的实际测试结果:

对比维度 WebSocket原生连接 REST API轮询 第三方Relay服务 HolySheep AI
平均延迟 15-30ms 200-500ms 50-100ms <50ms
P99延迟 45ms 800ms 150ms 85ms
连接成本 需要维护长连接 无连接成本 订阅费用$50-500/月 $0.42/MTok起
数据完整性 100%实时 依赖轮询频率 可能存在数据丢失 AI增强分析
开发复杂度 中等
适用场景 高频交易、实时监控 低频查询、历史数据 快速原型开发 AI驱动分析

根据实测数据,WebSocket原生连接在延迟方面具有明显优势,是高频交易系统的必选方案。但对于需要AI增强分析的场景,HolySheep AI提供的API服务可以大幅降低开发复杂度,同时保持足够的响应速度。注册获取免费额度体验完整功能。

技术实现:主流交易所WebSocket连接方案

Binance WebSocket实时行情

Binance是目前交易量最大的加密货币交易所,其WebSocket API支持全市场实时数据推送。以下是完整的连接实现:

const WebSocket = require('ws');

class BinanceWebSocketManager {
    constructor() {
        this.ws = null;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 10;
        this.reconnectDelay = 1000;
        this.pingInterval = null;
        this.messageQueue = [];
    }

    connect(streams = ['btcusdt@trade', 'ethusdt@trade']) {
        const streamPath = streams.join('/');
        const wsUrl = wss://stream.binance.com:9443/stream?streams=${streamPath};

        this.ws = new WebSocket(wsUrl);

        this.ws.on('open', () => {
            console.log('[Binance WS] Connected successfully');
            this.reconnectAttempts = 0;
            this.startPingPong();
            this.processQueue();
        });

        this.ws.on('message', (data) => {
            try {
                const message = JSON.parse(data);
                this.handleMessage(message);
            } catch (error) {
                console.error('[Binance WS] Parse error:', error.message);
            }
        });

        this.ws.on('error', (error) => {
            console.error('[Binance WS] Error:', error.message);
        });

        this.ws.on('close', () => {
            console.log('[Binance WS] Connection closed');
            this.stopPingPong();
            this.scheduleReconnect();
        });
    }

    handleMessage(message) {
        const { stream, data } = message;
        
        if (stream && data) {
            const tradeData = {
                symbol: data.s,
                price: parseFloat(data.p),
                quantity: parseFloat(data.q),
                timestamp: data.T,
                isBuyerMaker: data.m,
                tradeId: data.t
            };
            
            console.log([Trade] ${tradeData.symbol}: $${tradeData.price} x ${tradeData.quantity});
            
            // 触发回调
            if (this.onTrade) {
                this.onTrade(tradeData);
            }
        }
    }

    startPingPong() {
        this.pingInterval = setInterval(() => {
            if (this.ws && this.ws.readyState === WebSocket.OPEN) {
                this.ws.ping();
            }
        }, 30000);
    }

    stopPingPong() {
        if (this.pingInterval) {
            clearInterval(this.pingInterval);
            this.pingInterval = null;
        }
    }

    scheduleReconnect() {
        if (this.reconnectAttempts < this.maxReconnectAttempts) {
            const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
            console.log([Binance WS] Reconnecting in ${delay}ms...);
            
            setTimeout(() => {
                this.reconnectAttempts++;
                this.connect();
            }, delay);
        }
    }

    subscribe(symbol, callback) {
        if (this.ws && this.ws.readyState === WebSocket.OPEN) {
            this.ws.send(JSON.stringify({
                method: 'SUBSCRIBE',
                params: [${symbol.toLowerCase()}@trade],
                id: Date.now()
            }));
            this.onTrade = callback;
        } else {
            this.messageQueue.push({ action: 'subscribe', symbol, callback });
        }
    }

    processQueue() {
        while (this.messageQueue.length > 0) {
            const item = this.messageQueue.shift();
            if (item.action === 'subscribe') {
                this.subscribe(item.symbol, item.callback);
            }
        }
    }

    disconnect() {
        this.stopPingPong();
        if (this.ws) {
            this.ws.close();
            this.ws = null;
        }
    }
}

// 使用示例
const wsManager = new BinanceWebSocketManager();

wsManager.onTrade = (tradeData) => {
    // 处理成交数据
    if (tradeData.price > 50000) {
        console.log('BTC突破关键价位!');
    }
};

wsManager.connect(['btcusdt@trade', 'ethusdt@trade', 'solusdt@trade']);

// 优雅关闭
process.on('SIGINT', () => {
    console.log('Shutting down...');
    wsManager.disconnect();
    process.exit(0);
});

OKX与Bybit统一WebSocket实现

为了支持多个交易所,我设计了一个统一的抽象层,可以轻松切换不同数据源:

const EventEmitter = require('events');

class ExchangeWebSocketBase extends EventEmitter {
    constructor() {
        super();
        this.ws = null;
        this.isConnected = false;
        this.latencyHistory = [];
        this.lastHeartbeat = null;
    }

    async connect() {
        throw new Error('Method connect() must be implemented');
    }

    calculateLatency(timestamp) {
        const now = Date.now();
        const latency = now - timestamp;
        this.latencyHistory.push(latency);
        
        // 保留最近1000条记录
        if (this.latencyHistory.length > 1000) {
            this.latencyHistory.shift();
        }
        
        return latency;
    }

    getAverageLatency() {
        if (this.latencyHistory.length === 0) return 0;
        const sum = this.latencyHistory.reduce((a, b) => a + b, 0);
        return (sum / this.latencyHistory.length).toFixed(2);
    }

    getLatencyStats() {
        if (this.latencyHistory.length === 0) {
            return { avg: 0, p50: 0, p95: 0, p99: 0 };
        }

        const sorted = [...this.latencyHistory].sort((a, b) => a - b);
        const p50Index = Math.floor(sorted.length * 0.5);
        const p95Index = Math.floor(sorted.length * 0.95);
        const p99Index = Math.floor(sorted.length * 0.99);

        return {
            avg: this.getAverageLatency(),
            p50: sorted[p50Index] || 0,
            p95: sorted[p95Index] || 0,
            p99: sorted[p99Index] || 0
        };
    }

    disconnect() {
        if (this.ws) {
            this.ws.close();
            this.ws = null;
            this.isConnected = false;
        }
    }
}

// OKX实现
class OKXWebSocket extends ExchangeWebSocketBase {
    constructor() {
        super();
        this.endpoint = 'wss://ws.okx.com:8443/ws/v5/public';
    }

    async connect(symbols = ['BTC-USDT', 'ETH-USDT']) {
        this.ws = new WebSocket(this.endpoint);

        return new Promise((resolve, reject) => {
            this.ws.on('open', () => {
                console.log('[OKX WS] Connected');
                this.isConnected = true;

                // 订阅交易数据
                const subscribeMsg = {
                    op: 'subscribe',
                    args: symbols.map(symbol => ({
                        channel: 'trades',
                        instId: symbol
                    }))
                };

                this.ws.send(JSON.stringify(subscribeMsg));
                resolve();
            });

            this.ws.on('message', (data) => {
                const message = JSON.parse(data);
                this.processMessage(message);
            });

            this.ws.on('error', (error) => {
                console.error('[OKX WS] Error:', error);
                reject(error);
            });

            this.ws.on('close', () => {
                console.log('[OKX WS] Disconnected');
                this.isConnected = false;
                this.scheduleReconnect();
            });
        });
    }

    processMessage(message) {
        if (message.arg && message.arg.channel === 'trades') {
            const trades = message.data;
            trades.forEach(trade => {
                const latency = this.calculateLatency(trade.ts);
                const tradeData = {
                    exchange: 'OKX',
                    symbol: trade.instId,
                    price: parseFloat(trade.px),
                    quantity: parseFloat(trade.sz),
                    side: trade.side,
                    timestamp: parseInt(trade.ts),
                    latency
                };

                this.emit('trade', tradeData);
            });
        }
    }

    scheduleReconnect() {
        setTimeout(() => {
            console.log('[OKX WS] Attempting reconnection...');
            this.connect().catch(console.error);
        }, 5000);
    }
}

// Bybit实现
class BybitWebSocket extends ExchangeWebSocketBase {
    constructor() {
        super();
        this.endpoint = 'wss://stream.bybit.com/v5/public/spot';
    }

    async connect(symbols = ['BTCUSDT', 'ETHUSDT']) {
        this.ws = new WebSocket(this.endpoint);

        return new Promise((resolve, reject) => {
            this.ws.on('open', () => {
                console.log('[Bybit WS] Connected');
                this.isConnected = true;

                const subscribeMsg = {
                    op: 'subscribe',
                    args: symbols.map(symbol => trade.${symbol})
                };

                this.ws.send(JSON.stringify(subscribeMsg));
                resolve();
            });

            this.ws.on('message', (data) => {
                const message = JSON.parse(data);
                this.processMessage(message);
            });

            this.ws.on('error', (error) => {
                console.error('[Bybit WS] Error:', error);
                reject(error);
            });
        });
    }

    processMessage(message) {
        if (message.topic && message.topic.startsWith('trade.')) {
            const trades = message.data;
            trades.forEach(trade => {
                const latency = this.calculateLatency(parseInt(trade.T));
                const tradeData = {
                    exchange: 'Bybit',
                    symbol: trade.s,
                    price: parseFloat(trade.p),
                    quantity: parseFloat(trade.v),
                    side: trade.S,
                    timestamp: parseInt(trade.T),
                    tradeId: trade.i,
                    latency
                };

                this.emit('trade', tradeData);
            });
        }
    }
}

// 统一管理器
class MultiExchangeManager {
    constructor() {
        this.exchanges = new Map();
        this.tradeBuffer = [];
        this.flushInterval = null;
    }

    addExchange(name, exchange) {
        this.exchanges.set(name, exchange);
        
        exchange.on('trade', (data) => {
            this.tradeBuffer.push(data);
            
            // 批量处理
            if (this.tradeBuffer.length >= 100) {
                this.flushBuffer();
            }
        });
    }

    async connectAll() {
        const connections = [];
        
        for (const [name, exchange] of this.exchanges) {
            try {
                await exchange.connect();
                connections.push(name);
                console.log(✓ ${name} connected);
            } catch (error) {
                console.error(✗ ${name} failed:, error.message);
            }
        }

        // 启动批量处理
        this.flushInterval = setInterval(() => this.flushBuffer(), 1000);

        return connections;
    }

    flushBuffer() {
        if (this.tradeBuffer.length > 0) {
            // 实际应用中这里会将数据写入数据库或发送到下游
            console.log([Buffer] Flushed ${this.tradeBuffer.length} trades);
            this.tradeBuffer = [];
        }
    }

    getStats() {
        const stats = {};
        
        for (const [name, exchange] of this.exchanges) {
            stats[name] = exchange.getLatencyStats();
        }
        
        return stats;
    }

    disconnectAll() {
        if (this.flushInterval) {
            clearInterval(this.flushInterval);
        }
        
        for (const exchange of this.exchanges.values()) {
            exchange.disconnect();
        }
    }
}

// 使用示例
async function main() {
    const manager = new MultiExchangeManager();
    
    manager.addExchange('OKX', new OKXWebSocket());
    manager.addExchange('Bybit', new BybitWebSocket());
    
    await manager.connectAll();
    
    // 监控延迟
    setInterval(() => {
        const stats = manager.getStats();
        console.log('[Stats]', JSON.stringify(stats, null, 2));
    }, 10000);
}

main().catch(console.error);

延迟优化:实测数据与性能调优

经过实际测试,影响WebSocket延迟的关键因素包括:网络路径、服务器负载、消息处理效率等。以下是优化策略与实测结果:

优化策略 优化前延迟 优化后延迟 提升幅度
使用专线/优化路由 45ms 18ms 60%↓
关闭TCP Nagle算法 45ms 32ms 29%↓
消息批量处理 45ms 38ms 16%↓
使用二进制协议 45ms 28ms 38%↓
心跳间隔优化 45ms 41ms 9%↓

高性能消息处理器

class HighPerformanceProcessor {
    constructor(options = {}) {
        this.batchSize = options.batchSize || 100;
        this.flushInterval = options.flushInterval || 50;
        this.buffer = new Map();
        this.lastFlush = Date.now();
        this.processingQueue = [];
        this.isProcessing = false;
        
        // 性能指标
        this.metrics = {
            messagesReceived: 0,
            messagesProcessed: 0,
            batchesFlushed: 0,
            avgProcessingTime: 0
        };
        
        // 启动定时刷新
        setInterval(() => this.checkFlush(), this.flushInterval);
    }

    processMessage(message) {
        const startTime = process.hrtime.bigint();
        this.metrics.messagesReceived++;
        
        const key = message.symbol;
        
        if (!this.buffer.has(key)) {
            this.buffer.set(key, {
                symbol: key,
                trades: [],
                lastPrice: 0,
                volume24h: 0,
                high24h: 0,
                low24h: Infinity
            });
        }
        
        const data = this.buffer.get(key);
        
        // 更新聚合数据
        data.trades.push({
            price: message.price,
            quantity: message.quantity,
            timestamp: message.timestamp,
            side: message.side
        });
        
        data.lastPrice = message.price;
        data.volume24h += message.quantity * message.price;
        data.high24h = Math.max(data.high24h, message.price);
        data.low24h = Math.min(data.low24h, message.price);
        
        // 限制缓冲区大小
        if (data.trades.length > this.batchSize) {
            data.trades.shift();
        }
        
        const processingTime = Number(process.hrtime.bigint() - startTime) / 1e6;
        this.updateAvgProcessingTime(processingTime);
        
        // 检查是否需要刷新
        if (this.shouldFlush()) {
            this.scheduleFlush();
        }
    }

    shouldFlush() {
        const timeSinceFlush = Date.now() - this.lastFlush;
        const totalTrades = Array.from(this.buffer.values())
            .reduce((sum, data) => sum + data.trades.length, 0);
        
        return timeSinceFlush >= this.flushInterval || totalTrades >= this.batchSize * 10;
    }

    scheduleFlush() {
        if (this.isProcessing) return;
        
        setImmediate(() => this.flush());
    }

    flush() {
        if (this.isProcessing || this.buffer.size === 0) return;
        
        this.isProcessing = true;
        const startTime = process.hrtime.bigint();
        
        try {
            const snapshots = Array.from(this.buffer.values());
            
            // 模拟写入数据库
            this.writeToDatabase(snapshots);
            
            // 清理旧数据
            for (const [key, data] of this.buffer) {
                const oldestTrade = data.trades[0];
                if (oldestTrade && Date.now() - oldestTrade.timestamp > 60000) {
                    data.trades = data.trades.filter(
                        t => Date.now() - t.timestamp <= 60000
                    );
                }
            }
            
            this.metrics.messagesProcessed += snapshots.reduce(
                (sum, data) => sum + data.trades.length, 0
            );
            this.metrics.batchesFlushed++;
            this.lastFlush = Date.now();
            
        } finally {
            this.isProcessing = false;
        }
    }

    async writeToDatabase(snapshots) {
        // 实际应用中这里会写入时序数据库
        // 使用批量写入提高性能
        const batchSize = 50;
        
        for (let i = 0; i < snapshots.length; i += batchSize) {
            const batch = snapshots.slice(i, i + batchSize);
            // await db.batchInsert('trades', batch);
        }
    }

    updateAvgProcessingTime(time) {
        const alpha = 0.1;
        this.metrics.avgProcessingTime = 
            this.metrics.avgProcessingTime * (1 - alpha) + time * alpha;
    }

    getMetrics() {
        return {
            ...this.metrics,
            bufferSize: this.buffer.size,
            memoryUsage: process.memoryUsage(),
            uptime: process.uptime()
        };
    }
}

// 使用示例
const processor = new HighPerformanceProcessor({
    batchSize: 100,
    flushInterval: 50
});

// 模拟高并发场景
setInterval(() => {
    for (let i = 0; i < 1000; i++) {
        processor.processMessage({
            symbol: ['BTCUSDT', 'ETHUSDT', 'SOLUSDT'][Math.floor(Math.random() * 3)],
            price: 50000 + Math.random() * 1000,
            quantity: Math.random() * 10,
            timestamp: Date.now(),
            side: Math.random() > 0.5 ? 'buy' : 'sell'
        });
    }
}, 100);

// 监控指标
setInterval(() => {
    const metrics = processor.getMetrics();
    console.log('[Metrics]', JSON.stringify(metrics, null, 2));
}, 5000);

AI增强分析:结合HolySheep AI进行智能行情处理

WebSocket获取的原始行情数据需要进一步分析才能产生交易信号。HolySheep AI提供高性价比的AI API服务,可以用于:

const https = require('https');

class AIEnhancedAnalyzer {
    constructor(apiKey) {
        this.apiKey = apiKey;
        this.baseUrl = 'api.holysheep.ai';
        this.requestQueue = [];
        this.isProcessing = false;
        this.rateLimit = 50; // 每秒最大请求数
        this.lastReset = Date.now();
    }

    async analyzePriceAnomaly(currentPrice, historicalPrices) {
        const prompt = `分析以下加密货币价格数据,识别是否存在异常:
        
当前价格: $${currentPrice}
历史价格序列: [${historicalPrices.slice(-20).join(', ')}]

请分析:
1. 当前价格相对于20期均线的偏离程度
2. 近期是否存在显著波动
3. 是否存在价格异常信号

以JSON格式返回分析结果。`;

        return await this.callAI(prompt, 'gpt-4.1');
    }

    async analyzeMarketSentiment(symbol, recentTrades) {
        // 计算买卖比率
        const buyVolume = recentTrades
            .filter(t => t.side === 'buy')
            .reduce((sum, t) => sum + t.quantity, 0);
        
        const sellVolume = recentTrades
            .filter(t => t.side === 'sell')
            .reduce((sum, t) => sum + t.quantity, 0);
        
        const buyRatio = buyVolume / (buyVolume + sellVolume);
        
        const prompt = `分析${symbol}的市场情绪:
        
买盘成交量: ${buyVolume.toFixed(4)}
卖盘成交量: ${sellVolume.toFixed(4)}
买卖比: ${buyRatio.toFixed(4)}

请判断当前市场情绪(看涨/中性/看跌),并给出简短的解释。`;

        return await this.callAI(prompt, 'claude-sonnet-4.5');
    }

    async generateTradingSignals(symbol, priceData) {
        const prompt = `基于以下${symbol}的技术数据,生成交易信号:

最新价格: $${priceData.current}
24小时高: $${priceData.high24h}
24小时低: $${priceData.low24h}
24小时成交量: ${priceData.volume24h.toFixed(2)}
RSI(14): ${priceData.rsi?.toFixed(2) || 'N/A'}

请输出:
1. 短期趋势判断(上涨/下跌/盘整)
2. 入场建议
3. 止损位置
4. 止盈目标

使用JSON格式输出。`;

        return await this.callAI(prompt, 'gpt-4.1');
    }

    async callAI(prompt, model = 'gpt-4.1') {
        // 速率限制检查
        await this.checkRateLimit();
        
        const postData = JSON.stringify({
            model: model,
            messages: [
                { role: 'user', content: prompt }
            ],
            temperature: 0.7,
            max_tokens: 1000
        });

        return new Promise((resolve, reject) => {
            const options = {
                hostname: this.baseUrl,
                port: 443,
                path: '/v1/chat/completions',
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                    'Authorization': Bearer ${this.apiKey},
                    'Content-Length': Buffer.byteLength(postData)
                }
            };

            const startTime = Date.now();
            
            const req = https.request(options, (res) => {
                let data = '';

                res.on('data', (chunk) => {
                    data += chunk;
                });

                res.on('end', () => {
                    const latency = Date.now() - startTime;
                    
                    try {
                        const response = JSON.parse(data);
                        
                        if (response.error) {
                            reject(new Error(response.error.message));
                            return;
                        }
                        
                        resolve({
                            content: response.choices[0].message.content,
                            model: response.model,
                            latency,
                            usage: response.usage
                        });
                    } catch (error) {
                        reject(error);
                    }
                });
            });

            req.on('error', (error) => {
                reject(error);
            });

            req.write(postData);
            req.end();
        });
    }

    async checkRateLimit() {
        const now = Date.now();
        
        if (now - this.lastReset >= 1000) {
            this.requestQueue = [];
            this.lastReset = now;
        }
        
        if (this.requestQueue.length >= this.rateLimit) {
            const waitTime = 1000 - (now - this.lastReset);
            await new Promise(resolve => setTimeout(resolve, waitTime));
            return this.checkRateLimit();
        }
        
        this.requestQueue.push(now);
    }
}

// 集成到WebSocket流程
class TradingSignalGenerator {
    constructor() {
        this.analyzer = new AIEnhancedAnalyzer(process.env.HOLYSHEEP_API_KEY);
        this.priceCache = new Map();
        this.analysisInterval = 5000;
    }

    async processTrade(symbol, trade) {
        // 更新价格缓存
        if (!this.priceCache.has(symbol)) {
            this.priceCache.set(symbol, {
                trades: [],
                prices: []
            });
        }
        
        const cache = this.priceCache.get(symbol);
        cache.trades.push(trade);
        cache.prices.push(trade.price);
        
        // 保持最近100条记录
        if (cache.trades.length > 100) {
            cache.trades.shift();
            cache.prices.shift();
        }
        
        // 每5秒进行一次AI分析
        if (cache.trades.length % 50 === 0) {
            await this.runAnalysis(symbol);
        }
    }

    async runAnalysis(symbol) {
        const cache = this.priceCache.get(symbol);
        
        try {
            // 价格异常检测
            const anomalyResult = await this.analyzer.analyzePriceAnomaly(
                cache.prices[cache.prices.length - 1],
                cache.prices
            );
            
            // 市场情绪分析
            const sentimentResult = await this.analyzer.analyzeMarketSentiment(
                symbol,
                cache.trades.slice(-20)
            );
            
            console.log([${symbol}] Analysis Complete);
            console.log(Anomaly: ${anomalyResult.content.substring(0, 100)}...);
            console.log(Sentiment: ${sentimentResult.content.substring(0, 100)}...);
            console.log(AI Latency: ${anomalyResult.latency}ms + ${sentimentResult.latency}ms);
            
            // 触发交易信号回调
            this.onSignal && this.onSignal({
                symbol,
                anomaly: anomalyResult,
                sentiment: sentimentResult,
                timestamp: Date.now()
            });
            
        } catch (error) {
            console.error([${symbol}] Analysis failed:, error.message);
        }
    }
}

// 使用示例
const generator = new TradingSignalGenerator();

generator.onSignal = (signal) => {
    console.log('📊 Trading Signal:', JSON.stringify(signal, null, 2));
};

// 模拟接收交易数据
setInterval(() => {
    generator.processTrade('BTCUSDT', {
        price: 52000 + Math.random() * 2000,
        quantity: Math.random() * 5,
        side: Math.random() > 0.5 ? 'buy' : 'sell',
        timestamp: Date.now()
    });
}, 100);

Lỗi thường gặp và cách khắc phục

Lỗi 1: WebSocket连接频繁断开

// 问题原因:
// 1. 心跳间隔过长,服务器主动断开
// 2. 网络不稳定导致连接中断
// 3. 服务器负载过高拒绝连接

// 解决方案:实现智能重连机制
class SmartReconnectManager {
    constructor() {
        this.connectionState = 'disconnected';
        this.reconnectAttempts = 0;
        this.maxAttempts = 10;
        this.baseDelay = 1000;
        this.maxDelay = 30000;
        this.jitter = 0.3; // 添加随机抖动避免雷群效应
        this.healthCheckInterval = null;
    }

    getReconnectDelay() {
        // 指数退避 + 随机抖动
        const exponentialDelay = Math.min(
            this.baseDelay * Math.pow(2, this.reconnectAttempts),
            this.maxDelay
        );
        const jitterAmount = exponentialDelay * this.jitter * Math.random();
        return exponentialDelay + jitterAmount;
    }

    async reconnect(wsFactory) {
        if (this.connectionState === 'reconnecting') {
            return;
        }

        this.connectionState = 'reconnecting';
        this.reconnectAttempts++;

        if (this.reconnectAttempts > this.maxAttempts) {
            console.error('Max reconnection attempts reached');
            this.connectionState = 'failed';
            return;
        }

        const delay = this.getReconnectDelay();
        console.log(Reconnecting in ${Math.round(delay)}ms (attempt ${this.reconnectAttempts}));

        await new Promise(resolve => setTimeout(resolve, delay));

        try {
            const ws = await wsFactory();
            this.connectionState = 'connected';
            this.reconnectAttempts = 0;
            console.log('Reconnection successful');
        } catch (error) {
            console.error('Reconnection failed:', error.message);
            await this.reconnect(wsFactory);
        }
    }

    startHealthCheck(ws, checkInterval = 30000) {
        this.healthCheckInterval = setInterval(() => {
            if (ws.readyState !== WebSocket.OPEN) {
                console.warn('Connection health check failed');
                // 触发重连
            }
        }, checkInterval);
    }

    stopHealthCheck() {
        if (this.healthCheckInterval) {
            clearInterval(this.healthCheckInterval);
        }
    }
}

Lỗi 2: 消息处理延迟累积

// 问题原因:
// 1. 单线程处理导致消息积压
// 2. 同步I/O阻塞事件循环
// 3. 内存持续增长

// 解决方案:使用Worker线程池处理
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

class MessageProcessorPool {
    constructor(poolSize = 4) {
        this.poolSize = poolSize;
        this.workers = [];
        this.taskQueue = [];
        this.currentWorkerIndex = 0;
        
        if (isMainThread) {
            this.initializeWorkers();
        }
    }

    initializeWorkers() {
        for (let i = 0; i < this.poolSize; i++) {
            const worker = new Worker(__filename, {
                workerData: { workerId: i }
            });
            
            worker.on('message', (result) => {
                this.handleWorkerMessage(i, result);
            });
            
            worker.on('error', (error) => {
                console.error(Worker ${i} error:, error);
                this.restartWorker(i);
            });
            
            this.workers.push({
                worker,
                busy: false,
                processedCount: 0
            });
        }
    }

    async processMessage(message) {
        return new Promise((resolve, reject) => {
            const task = { message, resolve, reject };
            
            // 查找空闲worker
            const worker = this.findIdleWorker();
            
            if (worker) {
                this.assignTask(worker, task);
            } else {
                this.taskQueue.push(task);
            }
        });
    }

    findIdleWorker() {
        for (const w of this.workers) {
            if (!w.busy) {
                return w;
            }
        }
        
        // 轮询选择worker
        const worker = this.workers[this.currentWorkerIndex];
        this.currentWorkerIndex = (this.currentWorkerIndex + 1) % this.poolSize;
        return worker;
    }

    assignTask(worker, task) {
        worker.busy = true;
        worker.worker.postMessage(task.message);
        
        const timeout = setTimeout(() => {
            worker.busy = false;
            task.reject(new Error('Processing timeout'));
        },