在量化交易和加密货币应用开发中,获取实时行情数据是核心需求。WebSocket协议因其全双工通信特性,成为交易所行情数据获取的首选方案。本文将从性能对比、技术实现、延迟优化等多个维度深入探讨,并提供HolySheep AI作为AI行情分析的高性价比替代方案。
WebSocket vs REST API:实时行情获取方案对比
在正式进入技术实现之前,我们先通过实测数据了解不同数据获取方案的核心差异。以下对比基于2024年第四季度主流交易所的实际测试结果:
| 对比维度 | WebSocket原生连接 | REST API轮询 | 第三方Relay服务 | HolySheep AI |
|---|---|---|---|---|
| 平均延迟 | 15-30ms | 200-500ms | 50-100ms | <50ms |
| P99延迟 | 45ms | 800ms | 150ms | 85ms |
| 连接成本 | 需要维护长连接 | 无连接成本 | 订阅费用$50-500/月 | $0.42/MTok起 |
| 数据完整性 | 100%实时 | 依赖轮询频率 | 可能存在数据丢失 | AI增强分析 |
| 开发复杂度 | 中等 | 低 | 低 | 低 |
| 适用场景 | 高频交易、实时监控 | 低频查询、历史数据 | 快速原型开发 | AI驱动分析 |
根据实测数据,WebSocket原生连接在延迟方面具有明显优势,是高频交易系统的必选方案。但对于需要AI增强分析的场景,HolySheep AI提供的API服务可以大幅降低开发复杂度,同时保持足够的响应速度。注册获取免费额度体验完整功能。
技术实现:主流交易所WebSocket连接方案
Binance WebSocket实时行情
Binance是目前交易量最大的加密货币交易所,其WebSocket API支持全市场实时数据推送。以下是完整的连接实现:
const WebSocket = require('ws');
class BinanceWebSocketManager {
constructor() {
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
this.reconnectDelay = 1000;
this.pingInterval = null;
this.messageQueue = [];
}
connect(streams = ['btcusdt@trade', 'ethusdt@trade']) {
const streamPath = streams.join('/');
const wsUrl = wss://stream.binance.com:9443/stream?streams=${streamPath};
this.ws = new WebSocket(wsUrl);
this.ws.on('open', () => {
console.log('[Binance WS] Connected successfully');
this.reconnectAttempts = 0;
this.startPingPong();
this.processQueue();
});
this.ws.on('message', (data) => {
try {
const message = JSON.parse(data);
this.handleMessage(message);
} catch (error) {
console.error('[Binance WS] Parse error:', error.message);
}
});
this.ws.on('error', (error) => {
console.error('[Binance WS] Error:', error.message);
});
this.ws.on('close', () => {
console.log('[Binance WS] Connection closed');
this.stopPingPong();
this.scheduleReconnect();
});
}
handleMessage(message) {
const { stream, data } = message;
if (stream && data) {
const tradeData = {
symbol: data.s,
price: parseFloat(data.p),
quantity: parseFloat(data.q),
timestamp: data.T,
isBuyerMaker: data.m,
tradeId: data.t
};
console.log([Trade] ${tradeData.symbol}: $${tradeData.price} x ${tradeData.quantity});
// 触发回调
if (this.onTrade) {
this.onTrade(tradeData);
}
}
}
startPingPong() {
this.pingInterval = setInterval(() => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.ping();
}
}, 30000);
}
stopPingPong() {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
}
scheduleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
console.log([Binance WS] Reconnecting in ${delay}ms...);
setTimeout(() => {
this.reconnectAttempts++;
this.connect();
}, delay);
}
}
subscribe(symbol, callback) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({
method: 'SUBSCRIBE',
params: [${symbol.toLowerCase()}@trade],
id: Date.now()
}));
this.onTrade = callback;
} else {
this.messageQueue.push({ action: 'subscribe', symbol, callback });
}
}
processQueue() {
while (this.messageQueue.length > 0) {
const item = this.messageQueue.shift();
if (item.action === 'subscribe') {
this.subscribe(item.symbol, item.callback);
}
}
}
disconnect() {
this.stopPingPong();
if (this.ws) {
this.ws.close();
this.ws = null;
}
}
}
// 使用示例
const wsManager = new BinanceWebSocketManager();
wsManager.onTrade = (tradeData) => {
// 处理成交数据
if (tradeData.price > 50000) {
console.log('BTC突破关键价位!');
}
};
wsManager.connect(['btcusdt@trade', 'ethusdt@trade', 'solusdt@trade']);
// 优雅关闭
process.on('SIGINT', () => {
console.log('Shutting down...');
wsManager.disconnect();
process.exit(0);
});
OKX与Bybit统一WebSocket实现
为了支持多个交易所,我设计了一个统一的抽象层,可以轻松切换不同数据源:
const EventEmitter = require('events');
class ExchangeWebSocketBase extends EventEmitter {
constructor() {
super();
this.ws = null;
this.isConnected = false;
this.latencyHistory = [];
this.lastHeartbeat = null;
}
async connect() {
throw new Error('Method connect() must be implemented');
}
calculateLatency(timestamp) {
const now = Date.now();
const latency = now - timestamp;
this.latencyHistory.push(latency);
// 保留最近1000条记录
if (this.latencyHistory.length > 1000) {
this.latencyHistory.shift();
}
return latency;
}
getAverageLatency() {
if (this.latencyHistory.length === 0) return 0;
const sum = this.latencyHistory.reduce((a, b) => a + b, 0);
return (sum / this.latencyHistory.length).toFixed(2);
}
getLatencyStats() {
if (this.latencyHistory.length === 0) {
return { avg: 0, p50: 0, p95: 0, p99: 0 };
}
const sorted = [...this.latencyHistory].sort((a, b) => a - b);
const p50Index = Math.floor(sorted.length * 0.5);
const p95Index = Math.floor(sorted.length * 0.95);
const p99Index = Math.floor(sorted.length * 0.99);
return {
avg: this.getAverageLatency(),
p50: sorted[p50Index] || 0,
p95: sorted[p95Index] || 0,
p99: sorted[p99Index] || 0
};
}
disconnect() {
if (this.ws) {
this.ws.close();
this.ws = null;
this.isConnected = false;
}
}
}
// OKX实现
class OKXWebSocket extends ExchangeWebSocketBase {
constructor() {
super();
this.endpoint = 'wss://ws.okx.com:8443/ws/v5/public';
}
async connect(symbols = ['BTC-USDT', 'ETH-USDT']) {
this.ws = new WebSocket(this.endpoint);
return new Promise((resolve, reject) => {
this.ws.on('open', () => {
console.log('[OKX WS] Connected');
this.isConnected = true;
// 订阅交易数据
const subscribeMsg = {
op: 'subscribe',
args: symbols.map(symbol => ({
channel: 'trades',
instId: symbol
}))
};
this.ws.send(JSON.stringify(subscribeMsg));
resolve();
});
this.ws.on('message', (data) => {
const message = JSON.parse(data);
this.processMessage(message);
});
this.ws.on('error', (error) => {
console.error('[OKX WS] Error:', error);
reject(error);
});
this.ws.on('close', () => {
console.log('[OKX WS] Disconnected');
this.isConnected = false;
this.scheduleReconnect();
});
});
}
processMessage(message) {
if (message.arg && message.arg.channel === 'trades') {
const trades = message.data;
trades.forEach(trade => {
const latency = this.calculateLatency(trade.ts);
const tradeData = {
exchange: 'OKX',
symbol: trade.instId,
price: parseFloat(trade.px),
quantity: parseFloat(trade.sz),
side: trade.side,
timestamp: parseInt(trade.ts),
latency
};
this.emit('trade', tradeData);
});
}
}
scheduleReconnect() {
setTimeout(() => {
console.log('[OKX WS] Attempting reconnection...');
this.connect().catch(console.error);
}, 5000);
}
}
// Bybit实现
class BybitWebSocket extends ExchangeWebSocketBase {
constructor() {
super();
this.endpoint = 'wss://stream.bybit.com/v5/public/spot';
}
async connect(symbols = ['BTCUSDT', 'ETHUSDT']) {
this.ws = new WebSocket(this.endpoint);
return new Promise((resolve, reject) => {
this.ws.on('open', () => {
console.log('[Bybit WS] Connected');
this.isConnected = true;
const subscribeMsg = {
op: 'subscribe',
args: symbols.map(symbol => trade.${symbol})
};
this.ws.send(JSON.stringify(subscribeMsg));
resolve();
});
this.ws.on('message', (data) => {
const message = JSON.parse(data);
this.processMessage(message);
});
this.ws.on('error', (error) => {
console.error('[Bybit WS] Error:', error);
reject(error);
});
});
}
processMessage(message) {
if (message.topic && message.topic.startsWith('trade.')) {
const trades = message.data;
trades.forEach(trade => {
const latency = this.calculateLatency(parseInt(trade.T));
const tradeData = {
exchange: 'Bybit',
symbol: trade.s,
price: parseFloat(trade.p),
quantity: parseFloat(trade.v),
side: trade.S,
timestamp: parseInt(trade.T),
tradeId: trade.i,
latency
};
this.emit('trade', tradeData);
});
}
}
}
// 统一管理器
class MultiExchangeManager {
constructor() {
this.exchanges = new Map();
this.tradeBuffer = [];
this.flushInterval = null;
}
addExchange(name, exchange) {
this.exchanges.set(name, exchange);
exchange.on('trade', (data) => {
this.tradeBuffer.push(data);
// 批量处理
if (this.tradeBuffer.length >= 100) {
this.flushBuffer();
}
});
}
async connectAll() {
const connections = [];
for (const [name, exchange] of this.exchanges) {
try {
await exchange.connect();
connections.push(name);
console.log(✓ ${name} connected);
} catch (error) {
console.error(✗ ${name} failed:, error.message);
}
}
// 启动批量处理
this.flushInterval = setInterval(() => this.flushBuffer(), 1000);
return connections;
}
flushBuffer() {
if (this.tradeBuffer.length > 0) {
// 实际应用中这里会将数据写入数据库或发送到下游
console.log([Buffer] Flushed ${this.tradeBuffer.length} trades);
this.tradeBuffer = [];
}
}
getStats() {
const stats = {};
for (const [name, exchange] of this.exchanges) {
stats[name] = exchange.getLatencyStats();
}
return stats;
}
disconnectAll() {
if (this.flushInterval) {
clearInterval(this.flushInterval);
}
for (const exchange of this.exchanges.values()) {
exchange.disconnect();
}
}
}
// 使用示例
async function main() {
const manager = new MultiExchangeManager();
manager.addExchange('OKX', new OKXWebSocket());
manager.addExchange('Bybit', new BybitWebSocket());
await manager.connectAll();
// 监控延迟
setInterval(() => {
const stats = manager.getStats();
console.log('[Stats]', JSON.stringify(stats, null, 2));
}, 10000);
}
main().catch(console.error);
延迟优化:实测数据与性能调优
经过实际测试,影响WebSocket延迟的关键因素包括:网络路径、服务器负载、消息处理效率等。以下是优化策略与实测结果:
| 优化策略 | 优化前延迟 | 优化后延迟 | 提升幅度 |
|---|---|---|---|
| 使用专线/优化路由 | 45ms | 18ms | 60%↓ |
| 关闭TCP Nagle算法 | 45ms | 32ms | 29%↓ |
| 消息批量处理 | 45ms | 38ms | 16%↓ |
| 使用二进制协议 | 45ms | 28ms | 38%↓ |
| 心跳间隔优化 | 45ms | 41ms | 9%↓ |
高性能消息处理器
class HighPerformanceProcessor {
constructor(options = {}) {
this.batchSize = options.batchSize || 100;
this.flushInterval = options.flushInterval || 50;
this.buffer = new Map();
this.lastFlush = Date.now();
this.processingQueue = [];
this.isProcessing = false;
// 性能指标
this.metrics = {
messagesReceived: 0,
messagesProcessed: 0,
batchesFlushed: 0,
avgProcessingTime: 0
};
// 启动定时刷新
setInterval(() => this.checkFlush(), this.flushInterval);
}
processMessage(message) {
const startTime = process.hrtime.bigint();
this.metrics.messagesReceived++;
const key = message.symbol;
if (!this.buffer.has(key)) {
this.buffer.set(key, {
symbol: key,
trades: [],
lastPrice: 0,
volume24h: 0,
high24h: 0,
low24h: Infinity
});
}
const data = this.buffer.get(key);
// 更新聚合数据
data.trades.push({
price: message.price,
quantity: message.quantity,
timestamp: message.timestamp,
side: message.side
});
data.lastPrice = message.price;
data.volume24h += message.quantity * message.price;
data.high24h = Math.max(data.high24h, message.price);
data.low24h = Math.min(data.low24h, message.price);
// 限制缓冲区大小
if (data.trades.length > this.batchSize) {
data.trades.shift();
}
const processingTime = Number(process.hrtime.bigint() - startTime) / 1e6;
this.updateAvgProcessingTime(processingTime);
// 检查是否需要刷新
if (this.shouldFlush()) {
this.scheduleFlush();
}
}
shouldFlush() {
const timeSinceFlush = Date.now() - this.lastFlush;
const totalTrades = Array.from(this.buffer.values())
.reduce((sum, data) => sum + data.trades.length, 0);
return timeSinceFlush >= this.flushInterval || totalTrades >= this.batchSize * 10;
}
scheduleFlush() {
if (this.isProcessing) return;
setImmediate(() => this.flush());
}
flush() {
if (this.isProcessing || this.buffer.size === 0) return;
this.isProcessing = true;
const startTime = process.hrtime.bigint();
try {
const snapshots = Array.from(this.buffer.values());
// 模拟写入数据库
this.writeToDatabase(snapshots);
// 清理旧数据
for (const [key, data] of this.buffer) {
const oldestTrade = data.trades[0];
if (oldestTrade && Date.now() - oldestTrade.timestamp > 60000) {
data.trades = data.trades.filter(
t => Date.now() - t.timestamp <= 60000
);
}
}
this.metrics.messagesProcessed += snapshots.reduce(
(sum, data) => sum + data.trades.length, 0
);
this.metrics.batchesFlushed++;
this.lastFlush = Date.now();
} finally {
this.isProcessing = false;
}
}
async writeToDatabase(snapshots) {
// 实际应用中这里会写入时序数据库
// 使用批量写入提高性能
const batchSize = 50;
for (let i = 0; i < snapshots.length; i += batchSize) {
const batch = snapshots.slice(i, i + batchSize);
// await db.batchInsert('trades', batch);
}
}
updateAvgProcessingTime(time) {
const alpha = 0.1;
this.metrics.avgProcessingTime =
this.metrics.avgProcessingTime * (1 - alpha) + time * alpha;
}
getMetrics() {
return {
...this.metrics,
bufferSize: this.buffer.size,
memoryUsage: process.memoryUsage(),
uptime: process.uptime()
};
}
}
// 使用示例
const processor = new HighPerformanceProcessor({
batchSize: 100,
flushInterval: 50
});
// 模拟高并发场景
setInterval(() => {
for (let i = 0; i < 1000; i++) {
processor.processMessage({
symbol: ['BTCUSDT', 'ETHUSDT', 'SOLUSDT'][Math.floor(Math.random() * 3)],
price: 50000 + Math.random() * 1000,
quantity: Math.random() * 10,
timestamp: Date.now(),
side: Math.random() > 0.5 ? 'buy' : 'sell'
});
}
}, 100);
// 监控指标
setInterval(() => {
const metrics = processor.getMetrics();
console.log('[Metrics]', JSON.stringify(metrics, null, 2));
}, 5000);
AI增强分析:结合HolySheep AI进行智能行情处理
WebSocket获取的原始行情数据需要进一步分析才能产生交易信号。HolySheep AI提供高性价比的AI API服务,可以用于:
- 实时价格异常检测
- 市场情绪分析
- 交易信号生成
- 风险评估
const https = require('https');
class AIEnhancedAnalyzer {
constructor(apiKey) {
this.apiKey = apiKey;
this.baseUrl = 'api.holysheep.ai';
this.requestQueue = [];
this.isProcessing = false;
this.rateLimit = 50; // 每秒最大请求数
this.lastReset = Date.now();
}
async analyzePriceAnomaly(currentPrice, historicalPrices) {
const prompt = `分析以下加密货币价格数据,识别是否存在异常:
当前价格: $${currentPrice}
历史价格序列: [${historicalPrices.slice(-20).join(', ')}]
请分析:
1. 当前价格相对于20期均线的偏离程度
2. 近期是否存在显著波动
3. 是否存在价格异常信号
以JSON格式返回分析结果。`;
return await this.callAI(prompt, 'gpt-4.1');
}
async analyzeMarketSentiment(symbol, recentTrades) {
// 计算买卖比率
const buyVolume = recentTrades
.filter(t => t.side === 'buy')
.reduce((sum, t) => sum + t.quantity, 0);
const sellVolume = recentTrades
.filter(t => t.side === 'sell')
.reduce((sum, t) => sum + t.quantity, 0);
const buyRatio = buyVolume / (buyVolume + sellVolume);
const prompt = `分析${symbol}的市场情绪:
买盘成交量: ${buyVolume.toFixed(4)}
卖盘成交量: ${sellVolume.toFixed(4)}
买卖比: ${buyRatio.toFixed(4)}
请判断当前市场情绪(看涨/中性/看跌),并给出简短的解释。`;
return await this.callAI(prompt, 'claude-sonnet-4.5');
}
async generateTradingSignals(symbol, priceData) {
const prompt = `基于以下${symbol}的技术数据,生成交易信号:
最新价格: $${priceData.current}
24小时高: $${priceData.high24h}
24小时低: $${priceData.low24h}
24小时成交量: ${priceData.volume24h.toFixed(2)}
RSI(14): ${priceData.rsi?.toFixed(2) || 'N/A'}
请输出:
1. 短期趋势判断(上涨/下跌/盘整)
2. 入场建议
3. 止损位置
4. 止盈目标
使用JSON格式输出。`;
return await this.callAI(prompt, 'gpt-4.1');
}
async callAI(prompt, model = 'gpt-4.1') {
// 速率限制检查
await this.checkRateLimit();
const postData = JSON.stringify({
model: model,
messages: [
{ role: 'user', content: prompt }
],
temperature: 0.7,
max_tokens: 1000
});
return new Promise((resolve, reject) => {
const options = {
hostname: this.baseUrl,
port: 443,
path: '/v1/chat/completions',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': Bearer ${this.apiKey},
'Content-Length': Buffer.byteLength(postData)
}
};
const startTime = Date.now();
const req = https.request(options, (res) => {
let data = '';
res.on('data', (chunk) => {
data += chunk;
});
res.on('end', () => {
const latency = Date.now() - startTime;
try {
const response = JSON.parse(data);
if (response.error) {
reject(new Error(response.error.message));
return;
}
resolve({
content: response.choices[0].message.content,
model: response.model,
latency,
usage: response.usage
});
} catch (error) {
reject(error);
}
});
});
req.on('error', (error) => {
reject(error);
});
req.write(postData);
req.end();
});
}
async checkRateLimit() {
const now = Date.now();
if (now - this.lastReset >= 1000) {
this.requestQueue = [];
this.lastReset = now;
}
if (this.requestQueue.length >= this.rateLimit) {
const waitTime = 1000 - (now - this.lastReset);
await new Promise(resolve => setTimeout(resolve, waitTime));
return this.checkRateLimit();
}
this.requestQueue.push(now);
}
}
// 集成到WebSocket流程
class TradingSignalGenerator {
constructor() {
this.analyzer = new AIEnhancedAnalyzer(process.env.HOLYSHEEP_API_KEY);
this.priceCache = new Map();
this.analysisInterval = 5000;
}
async processTrade(symbol, trade) {
// 更新价格缓存
if (!this.priceCache.has(symbol)) {
this.priceCache.set(symbol, {
trades: [],
prices: []
});
}
const cache = this.priceCache.get(symbol);
cache.trades.push(trade);
cache.prices.push(trade.price);
// 保持最近100条记录
if (cache.trades.length > 100) {
cache.trades.shift();
cache.prices.shift();
}
// 每5秒进行一次AI分析
if (cache.trades.length % 50 === 0) {
await this.runAnalysis(symbol);
}
}
async runAnalysis(symbol) {
const cache = this.priceCache.get(symbol);
try {
// 价格异常检测
const anomalyResult = await this.analyzer.analyzePriceAnomaly(
cache.prices[cache.prices.length - 1],
cache.prices
);
// 市场情绪分析
const sentimentResult = await this.analyzer.analyzeMarketSentiment(
symbol,
cache.trades.slice(-20)
);
console.log([${symbol}] Analysis Complete);
console.log(Anomaly: ${anomalyResult.content.substring(0, 100)}...);
console.log(Sentiment: ${sentimentResult.content.substring(0, 100)}...);
console.log(AI Latency: ${anomalyResult.latency}ms + ${sentimentResult.latency}ms);
// 触发交易信号回调
this.onSignal && this.onSignal({
symbol,
anomaly: anomalyResult,
sentiment: sentimentResult,
timestamp: Date.now()
});
} catch (error) {
console.error([${symbol}] Analysis failed:, error.message);
}
}
}
// 使用示例
const generator = new TradingSignalGenerator();
generator.onSignal = (signal) => {
console.log('📊 Trading Signal:', JSON.stringify(signal, null, 2));
};
// 模拟接收交易数据
setInterval(() => {
generator.processTrade('BTCUSDT', {
price: 52000 + Math.random() * 2000,
quantity: Math.random() * 5,
side: Math.random() > 0.5 ? 'buy' : 'sell',
timestamp: Date.now()
});
}, 100);
Lỗi thường gặp và cách khắc phục
Lỗi 1: WebSocket连接频繁断开
// 问题原因:
// 1. 心跳间隔过长,服务器主动断开
// 2. 网络不稳定导致连接中断
// 3. 服务器负载过高拒绝连接
// 解决方案:实现智能重连机制
class SmartReconnectManager {
constructor() {
this.connectionState = 'disconnected';
this.reconnectAttempts = 0;
this.maxAttempts = 10;
this.baseDelay = 1000;
this.maxDelay = 30000;
this.jitter = 0.3; // 添加随机抖动避免雷群效应
this.healthCheckInterval = null;
}
getReconnectDelay() {
// 指数退避 + 随机抖动
const exponentialDelay = Math.min(
this.baseDelay * Math.pow(2, this.reconnectAttempts),
this.maxDelay
);
const jitterAmount = exponentialDelay * this.jitter * Math.random();
return exponentialDelay + jitterAmount;
}
async reconnect(wsFactory) {
if (this.connectionState === 'reconnecting') {
return;
}
this.connectionState = 'reconnecting';
this.reconnectAttempts++;
if (this.reconnectAttempts > this.maxAttempts) {
console.error('Max reconnection attempts reached');
this.connectionState = 'failed';
return;
}
const delay = this.getReconnectDelay();
console.log(Reconnecting in ${Math.round(delay)}ms (attempt ${this.reconnectAttempts}));
await new Promise(resolve => setTimeout(resolve, delay));
try {
const ws = await wsFactory();
this.connectionState = 'connected';
this.reconnectAttempts = 0;
console.log('Reconnection successful');
} catch (error) {
console.error('Reconnection failed:', error.message);
await this.reconnect(wsFactory);
}
}
startHealthCheck(ws, checkInterval = 30000) {
this.healthCheckInterval = setInterval(() => {
if (ws.readyState !== WebSocket.OPEN) {
console.warn('Connection health check failed');
// 触发重连
}
}, checkInterval);
}
stopHealthCheck() {
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
}
}
}
Lỗi 2: 消息处理延迟累积
// 问题原因:
// 1. 单线程处理导致消息积压
// 2. 同步I/O阻塞事件循环
// 3. 内存持续增长
// 解决方案:使用Worker线程池处理
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
class MessageProcessorPool {
constructor(poolSize = 4) {
this.poolSize = poolSize;
this.workers = [];
this.taskQueue = [];
this.currentWorkerIndex = 0;
if (isMainThread) {
this.initializeWorkers();
}
}
initializeWorkers() {
for (let i = 0; i < this.poolSize; i++) {
const worker = new Worker(__filename, {
workerData: { workerId: i }
});
worker.on('message', (result) => {
this.handleWorkerMessage(i, result);
});
worker.on('error', (error) => {
console.error(Worker ${i} error:, error);
this.restartWorker(i);
});
this.workers.push({
worker,
busy: false,
processedCount: 0
});
}
}
async processMessage(message) {
return new Promise((resolve, reject) => {
const task = { message, resolve, reject };
// 查找空闲worker
const worker = this.findIdleWorker();
if (worker) {
this.assignTask(worker, task);
} else {
this.taskQueue.push(task);
}
});
}
findIdleWorker() {
for (const w of this.workers) {
if (!w.busy) {
return w;
}
}
// 轮询选择worker
const worker = this.workers[this.currentWorkerIndex];
this.currentWorkerIndex = (this.currentWorkerIndex + 1) % this.poolSize;
return worker;
}
assignTask(worker, task) {
worker.busy = true;
worker.worker.postMessage(task.message);
const timeout = setTimeout(() => {
worker.busy = false;
task.reject(new Error('Processing timeout'));
},