暗号通貨取引所のリアルタイムデータ処理は、高頻度取引bot、ポートフォリオ管理、リスク管理システムなど、多くのユースケースで必要とされています。本稿では、KafkaとWebSocketを活用した堅牢なデータパイプラインの構築方法を、Pythonでの実装例とともに解説します。
リアルタイム取引データ処理の基礎知識
暗号通貨取引所(Bybit、Binance、OKXなど)は、WebSocket APIを通じてリアルタイムの約定・注文・開発板データを提供します。生のWebSocketストリームをそのままアプリケーションで処理すると、接続断やデータ欠損のリスクがあります。
ここにKafkarokerを追加することで、以下の利点が得られます:
- バッファリング:データ消費者の処理速度が生産速度を下回ってもデータが失われない
- 耐障害性:コンシューマー障害時にメッセージをリプレイ可能
- ファンアウト:同じデータを複数のコンシューマーに同時配信可能
- バックプレッシャー:システム全体の流量制御が可能
システムアーキテクチャ
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 取引所A │ │ 取引所B │ │ 取引所C │
│ (Binance) │ │ (Bybit) │ │ (OKX) │
│ WebSocket │ │ WebSocket │ │ WebSocket │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
│ JSON messages │ JSON messages │ JSON messages
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ Apache Kafka Cluster │
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Topic: trades │ │ Topic: orderbook │ │ Topic: ticker │ │
│ │ Partitions: 8 │ │ Partitions: 8 │ │ Partitions: 4 │ │
│ └──────────────────┘ └──────────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ コンシューマーA │ │ コンシューマーB │ │ コンシューマーC │
│ (取引bot) │ │ (分析基盤) │ │ (通知サービス) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
実装コード:WebSocket → Kafka Producer
まず、WebSocketからデータを受信し、Kafkaトピックにpublishするプロデューサーアプリケーションを作成します。
#!/usr/bin/env python3
"""
暗号通貨取引所WebSocket → Kafka プロデューサー
Binance / Bybit 対応
"""
import asyncio
import json
import signal
from datetime import datetime
from typing import Optional
import websockets
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient, NewTopic
class ExchangeKafkaProducer:
"""取引所WebSocketストリームをKafkaにpublishするクラス"""
def __init__(
self,
bootstrap_servers: str = "localhost:9092",
exchange: str = "binance"
):
self.bootstrap_servers = bootstrap_servers
self.exchange = exchange
self.running = True
# Kafka Producer設定
self.producer_config = {
'bootstrap.servers': bootstrap_servers,
'client.id': f'producer-{exchange}-websocket',
'acks': 'all', # 全レプリカからの応答を待つ
'retries': 3,
'retry.backoff.ms': 100,
'compression.type': 'gzip', # 帯域幅節約
'batch.size': 16384, # 16KBバッチ
'linger.ms': 5, # 5ms待機でバッチ送信
}
self.producer = Producer(self.producer_config)
# 取引所別エンドポイント
self.endpoints = {
'binance': {
'trades': 'wss://stream.binance.com:9443/ws/!trade@arr',
'ticker': 'wss://stream.binance.com:9443/ws/!miniTicker@arr',
'depth': 'wss://stream.binance.com:9443/ws/!depth@100ms@arr',
},
'bybit': {
'trades': 'wss://stream.bybit.com/v5/public/spot',
'ticker': 'wss://stream.bybit.com/v5/public/spot',
}
}
# Kafkaトピック定義
self.topics = ['exchange-trades', 'exchange-ticker', 'exchange-depth']
def _delivery_callback(self, err, msg):
"""Kafka publishコールバック"""
if err is not None:
print(f"[ERROR] Message delivery failed: {err}")
else:
topic = msg.topic()
partition = msg.partition()
offset = msg.offset()
# 本番環境ではロギングライブラリを使用
# print(f"Delivered to {topic}[{partition}] @ {offset}")
def _ensure_topics_exist(self):
"""必要なKafkaトピックが存在しない場合は作成"""
admin = AdminClient({
'bootstrap.servers': self.bootstrap_servers
})
existing = admin.list_topics().topics
topics_to_create = []
for topic in self.topics:
if topic not in existing:
topics_to_create.append(NewTopic(
topic,
num_partitions=8,
replication_factor=1, # 本番環境では3以上を推奨
config={
'retention.ms': str(86400000 * 7), # 7日間保持
'cleanup.policy': 'delete',
}
))
if topics_to_create:
futures = admin.create_topics(topics_to_create)
for topic, future in futures.items():
try:
future.result()
print(f"[INFO] Topic '{topic}' created successfully")
except Exception as e:
print(f"[WARN] Topic '{topic}' creation: {e}")
def _transform_binance_trade(self, data: dict) -> dict:
"""Binance tradeデータを正規化"""
return {
'exchange': 'binance',
'symbol': data.get('s', ''),
'trade_id': data.get('t', ''),
'price': float(data.get('p', 0)),
'quantity': float(data.get('q', 0)),
'timestamp': int(data.get('T', 0)),
'is_buyer_maker': data.get('m', False),
'ingested_at': int(datetime.utcnow().timestamp() * 1000),
}
def _transform_binance_ticker(self, data: dict) -> dict:
"""Binanceティッカーデータを正規化"""
return {
'exchange': 'binance',
'symbol': data.get('s', ''),
'last_price': float(data.get('c', 0)),
'open_price': float(data.get('o', 0)),
'high_price': float(data.get('h', 0)),
'low_price': float(data.get('l', 0)),
'volume': float(data.get('v', 0)),
'quote_volume': float(data.get('q', 0)),
'timestamp': int(data.get('E', 0)),
'ingested_at': int(datetime.utcnow().timestamp() * 1000),
}
def publish(self, topic: str, data: dict):
"""Kafkaトピックにメッセージを送信"""
key = data.get('symbol', '').encode('utf-8')
value = json.dumps(data, separators=(',', ':')).encode('utf-8')
self.producer.produce(
topic=topic,
key=key,
value=value,
callback=self._delivery_callback
)
# 批処理送信のflushを待たずにパブリッシュ
self.producer.poll(0)
async def connect_binance_depth(self, symbol: str = 'btcusdt'):
"""Binance_DEPTH WebSocketに接続"""
uri = f"wss://stream.binance.com:9443/ws/{symbol}@depth@100ms"
print(f"[INFO] Connecting to Binance depth: {uri}")
while self.running:
try:
async with websockets.connect(uri) as ws:
print(f"[INFO] Connected to Binance depth stream")
while self.running:
message = await asyncio.wait_for(ws.recv(), timeout=30)
data = json.loads(message)
transformed = {
'exchange': 'binance',
'symbol': symbol.upper(),
'bids': [[float(p), float(q)] for p, q in data.get('b', [])],
'asks': [[float(p), float(q)] for p, q in data.get('a', [])],
'last_update_id': data.get('lastUpdateId', 0),
'timestamp': int(datetime.utcnow().timestamp() * 1000),
'ingested_at': int(datetime.utcnow().timestamp() * 1000),
}
self.publish('exchange-depth', transformed)
except websockets.ConnectionClosed:
print("[WARN] Connection closed, reconnecting...")
await asyncio.sleep(5)
except Exception as e:
print(f"[ERROR] {e}")
await asyncio.sleep(5)
async def connect_binance_ticker(self, symbols: list = None):
"""Binance ティッカーWebSocketに接続(複数シンボル対応)"""
if symbols is None:
symbols = ['btcusdt', 'ethusdt', 'solusdt']
streams = '/'.join([f"{s}@miniTicker" for s in symbols])
uri = f"wss://stream.binance.com:9443/stream?streams={streams}"
print(f"[INFO] Connecting to Binance ticker: {len(symbols)} symbols")
while self.running:
try:
async with websockets.connect(uri) as ws:
print(f"[INFO] Connected to Binance ticker stream")
while self.running:
message = await asyncio.wait_for(ws.recv(), timeout=30)
wrapper = json.loads(message)
if 'data' in wrapper:
for ticker_data in wrapper['data']:
transformed = self._transform_binance_ticker(ticker_data)
self.publish('exchange-ticker', transformed)
except websockets.ConnectionClosed:
print("[WARN] Connection closed, reconnecting...")
await asyncio.sleep(5)
except Exception as e:
print(f"[ERROR] {e}")
await asyncio.sleep(5)
async def connect_binance_trades(self, symbols: list = None):
"""Binance 約定WebSocketに接続"""
if symbols is None:
symbols = ['btcusdt', 'ethusdt', 'solusdt', 'bnbusdt']
streams = '/'.join([f"{s}@aggTrade" for s in symbols])
uri = f"wss://stream.binance.com:9443/stream?streams={streams}"
print(f"[INFO] Connecting to Binance trades: {len(symbols)} symbols")
while self.running:
try:
async with websockets.connect(uri) as ws:
print(f"[INFO] Connected to Binance aggTrade stream")
while self.running:
message = await asyncio.wait_for(ws.recv(), timeout=30)
wrapper = json.loads(message)
if 'data' in wrapper:
trade_data = wrapper['data']
transformed = {
'exchange': 'binance',
'symbol': trade_data.get('s', ''),
'aggregate_trade_id': trade_data.get('a', ''),
'price': float(trade_data.get('p', 0)),
'quantity': float(trade_data.get('q', 0)),
'first_trade_id': trade_data.get('f', 0),
'last_trade_id': trade_data.get('l', 0),
'timestamp': trade_data.get('T', 0),
'is_buyer_maker': trade_data.get('m', False),
'ingested_at': int(datetime.utcnow().timestamp() * 1000),
}
self.publish('exchange-trades', transformed)
except websockets.ConnectionClosed:
print("[WARN] Connection closed, reconnecting...")
await asyncio.sleep(5)
except Exception as e:
print(f"[ERROR] {e}")
await asyncio.sleep(5)
def start(self):
"""プロデューサーサービスを開始"""
self._ensure_topics_exist()
# シグナルハンドリング
signal.signal(signal.SIGINT, lambda s, f: self.stop())
signal.signal(signal.SIGTERM, lambda s, f: self.stop())
# 非同期タスクを開始
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = [
loop.create_task(self.connect_binance_trades()),
loop.create_task(self.connect_binance_ticker()),
loop.create_task(self.connect_binance_depth()),
]
try:
loop.run_until_complete(asyncio.gather(*tasks))
finally:
self.stop()
def stop(self):
"""プロデューサーを停止"""
print("[INFO] Shutting down producer...")
self.running = False
self.producer.flush(timeout=10)
print("[INFO] Producer stopped")
if __name__ == '__main__':
producer = ExchangeKafkaProducer(
bootstrap_servers='localhost:9092',
exchange='binance'
)
producer.start()
実装コード:Kafka Consumer(取引bot用)
Kafkaトピックからデータを消費し、取引シグナルを生成するコンシューマーアプリケーションの例です。
#!/usr/bin/env python3
"""
Kafka コンシューマー - 取引シグナル生成
HolySheep AI API統合で感情分析やニュースベースのリスク判定も可能
"""
import json
import signal
import asyncio
from datetime import datetime
from typing import Optional, Callable
import websockets
from confluent_kafka import Consumer, KafkaError, KafkaException
class TradingSignalGenerator:
"""Kafkaからリアルタイム約定データを消費し、取引シグナルを生成"""
def __init__(
self,
bootstrap_servers: str = "localhost:9092",
group_id: str = "trading-signal-consumer",
holysheep_api_key: Optional[str] = None,
):
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
self.holysheep_api_key = holysheep_api_key
self.running = True
# Kafka Consumer設定
self.consumer_config = {
'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'auto.offset.reset': 'latest', # 最新から開始
'enable.auto.commit': False, # 手動コミット
'max.poll.interval.ms': 300000,
'session.timeout.ms': 45000,
'heartbeat.interval.ms': 15000,
}
self.consumer = Consumer(self.consumer_config)
# Subscribe to topics
self.consumer.subscribe(['exchange-trades', 'exchange-ticker'])
# 価格状態管理
self.price_history = {} # symbol -> list of (timestamp, price)
self.ticker_cache = {} # symbol -> latest ticker
# シグナルコールバック
self.signal_callbacks: list[Callable] = []
def add_signal_callback(self, callback: Callable):
"""シグナルコールバックを追加"""
self.signal_callbacks.append(callback)
async def call_holysheep_analysis(self, text: str) -> Optional[dict]:
"""HolySheep AI APIでテキスト分析(リスク判定など)"""
if not self.holysheep_api_key:
return None
try:
async with websockets.connect(
'https://api.holysheep.ai/v1/chat/completions'
) as ws:
request = {
'model': 'gpt-4.1',
'messages': [
{
'role': 'system',
'content': 'You are a crypto market analyst. Analyze the sentiment and risk level.'
},
{
'role': 'user',
'content': text
}
],
'temperature': 0.3,
'max_tokens': 150,
}
await ws.send(json.dumps(request))
response = await asyncio.wait_for(ws.recv(), timeout=10)
return json.loads(response)
except Exception as e:
print(f"[WARN] HolySheep API call failed: {e}")
return None
def _calculate_volatility(self, symbol: str, window: int = 20) -> float:
"""指定 window 足の価格データからボラティリティを計算"""
if symbol not in self.price_history:
return 0.0
prices = [p for _, p in self.price_history[symbol][-window:]]
if len(prices) < 2:
return 0.0
mean = sum(prices) / len(prices)
variance = sum((p - mean) ** 2 for p in prices) / len(prices)
return variance ** 0.5 / mean if mean > 0 else 0.0
def _calculate_momentum(self, symbol: str, window: int = 10) -> float:
"""モメンタム(価格変化率)を計算"""
if symbol not in self.price_history or len(self.price_history[symbol]) < window:
return 0.0
prices = self.price_history[symbol]
old_price = prices[-window][1]
new_price = prices[-1][1]
return (new_price - old_price) / old_price if old_price > 0 else 0.0
def _generate_signal(self, symbol: str) -> dict:
"""現在の状態から取引シグナルを生成"""
volatility = self._calculate_volatility(symbol)
momentum = self._calculate_momentum(symbol)
ticker = self.ticker_cache.get(symbol, {})
# シグナル判定ロジック
signal_type = 'HOLD'
confidence = 0.0
reason = []
if momentum > 0.02 and volatility < 0.005:
signal_type = 'BUY'
confidence = min(abs(momentum) * 10, 0.95)
reason.append(f'強気モメンタム (+{momentum:.2%})')
elif momentum < -0.02 and volatility < 0.005:
signal_type = 'SELL'
confidence = min(abs(momentum) * 10, 0.95)
reason.append(f'弱気モメンタム ({momentum:.2%})')
elif volatility > 0.01:
signal_type = 'WATCH'
confidence = volatility * 5
reason.append(f'高ボラティリティ ({volatility:.2%})')
return {
'signal_type': signal_type,
'symbol': symbol,
'confidence': round(confidence, 4),
'momentum': round(momentum, 6),
'volatility': round(volatility, 6),
'last_price': ticker.get('last_price', 0),
'volume_24h': ticker.get('volume', 0),
'reason': '; '.join(reason),
'generated_at': int(datetime.utcnow().timestamp() * 1000),
}
def process_trade(self, data: dict):
"""約定データを処理"""
symbol = data.get('symbol')
price = data.get('price')
timestamp = data.get('timestamp', 0)
if not symbol or not price:
return
# 価格履歴を更新(最大1000件保持)
if symbol not in self.price_history:
self.price_history[symbol] = []
self.price_history[symbol].append((timestamp, price))
if len(self.price_history[symbol]) > 1000:
self.price_history[symbol] = self.price_history[symbol][-1000:]
def process_ticker(self, data: dict):
"""ティッカーデータを処理"""
symbol = data.get('symbol')
if symbol:
self.ticker_cache[symbol] = data
# シグナル生成(ティッカー更新時のみ)
if len(self.price_history.get(symbol, [])) >= 10:
signal_data = self._generate_signal(symbol)
# コールバック実行
for callback in self.signal_callbacks:
try:
callback(signal_data)
except Exception as e:
print(f"[ERROR] Signal callback failed: {e}")
def start(self):
"""コンシューマーサービスを開始"""
print(f"[INFO] Starting consumer group: {self.group_id}")
signal.signal(signal.SIGINT, lambda s, f: self.stop())
signal.signal(signal.SIGTERM, lambda s, f: self.stop())
try:
while self.running:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(f"[INFO] End of partition reached")
else:
raise KafkaException(msg.error())
continue
try:
data = json.loads(msg.value().decode('utf-8'))
topic = msg.topic()
if topic == 'exchange-trades':
self.process_trade(data)
elif topic == 'exchange-ticker':
self.process_ticker(data)
# オフセットコミット(5件ごと)
if msg.offset() % 5 == 0:
self.consumer.commit(asynchronous=True)
except json.JSONDecodeError as e:
print(f"[ERROR] JSON decode failed: {e}")
except Exception as e:
print(f"[ERROR] Processing error: {e}")
finally:
self.stop()
def stop(self):
"""コンシューマーを停止"""
print("[INFO] Shutting down consumer...")
self.running = False
self.consumer.close()
print("[INFO] Consumer stopped")
def example_signal_callback(signal_data: dict):
"""シグナル処理の例"""
if signal_data['signal_type'] in ['BUY', 'SELL']:
print(f"[SIGNAL] {signal_data['signal_type']} {signal_data['symbol']} "
f"(confidence: {signal_data['confidence']:.2%})")
print(f" Reason: {signal_data['reason']}")
print(f" Momentum: {signal_data['momentum']:.4f}, "
f"Volatility: {signal_data['volatility']:.4f}")
if __name__ == '__main__':
consumer = TradingSignalGenerator(
bootstrap_servers='localhost:9092',
group_id='trading-signal-v1',
# HolySheep APIキーを設定することでAI分析を有効化
# holysheep_api_key='YOUR_HOLYSHEEP_API_KEY'
)
# シグナルコールバックを登録
consumer.add_signal_callback(example_signal_callback)
# コンシューマー開始
consumer.start()
docker-composeでのKafkaクラスタ構築
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- kafka-network
kafka-1:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka-1
container_name: kafka-1
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
networks:
- kafka-network
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka-1
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
networks:
- kafka-network
networks:
kafka-network:
driver: bridge
# Kafkaクラスタ起動
docker-compose up -d
トピック一覧確認
docker exec kafka-1 kafka-topics --bootstrap-server localhost:9092 --list
プロデューサー/コンシューマー接続確認
docker exec -it kafka-1 kafka-console-producer --bootstrap-server localhost:9092 --topic exchange-trades
docker exec -it kafka-1 kafka-console-consumer --bootstrap-server localhost:9092 --topic exchange-trades --from-beginning
向いている人・向いていない人
| 向いている人 | 向いていない人 |
|---|---|
| 高頻度取引システムの開発者 | 少量データの定期処理だけで十分な人 |
| 複数の取引所データを統合分析したい人 | Kafkaの運用コストを払いたくない人 |
| リアルタイムのリスク管理が必要不可欠な人 | SQLベースのバッチ処理で十分な人 |
| マイクロサービスアーキテクチャを採用しているチーム | Infrastructure as Code的学习曲線を避けたい人 |
| データバックフィル(過去データ再生)が必要な人 | 1つの取引所で少額取引をしているだけの個人投資家 |
価格とROI
Kafkaクラスタを自社運用する場合のコスト構造と、HolySheep AIなどのManaged Service比較を示します。
| 項目 | 自社運用(Kafka) | Managed Service |
|---|---|---|
| インフラコスト | ¥50,000〜/月(EC2 3台構成) | ¥15,000〜/月(CvoKfa on Confluent Cloud) |
| 運用工数 | 月40〜80時間(監視・障害対応・アップグレード) | 月5〜10時間 |
| 初期構築 | 2〜4週間 | 1〜3日 |
| スケーラビリティ | 手動対応が必要 | 自動スケール |
| 可用性 | 設計に依存(SLA自己管理) | 99.99% SLO提供 |
| 1年総コスト | ¥600,000 + 運用人件費 | ¥180,000 + 運用人件費 |
HolySheepを選ぶ理由
リアルタイム取引データ処理にAI分析を組み込む場合、APIコストも重要な要素です。HolySheep AIは2026年現在の pricingで他社と比較しても大幅なコスト優位性があります。
| Provider | Model | Output Price ($/M tokens) | ¥1での取得量 | DeepSeek比 |
|---|---|---|---|---|
| HolySheep AI | DeepSeek V3.2 | $0.42 | 2,381,000 tokens | - |
| Gemini 2.5 Flash | $2.50 | 400,000 tokens | 83%高い | |
| OpenAI | GPT-4.1 | $8.00 | 125,000 tokens | 95%高い |
| Anthropic | Claude Sonnet 4.5 | $15.00 | 66,667 tokens | 97%高い |
月1000万トークン使用の場合、成本差は歴然です:
- HolySheep AI(DeepSeek V3.2): 約$4.20/月(¥30.7相当)
- Gemini 2.5 Flash: 約$25/月(¥183相当)
- GPT-4.1: 約$80/月(¥584相当)
- Claude Sonnet 4.5: 約$150/月(¥1,095相当)
HolySheep AIを選ぶ具体的な理由は:
- レート面の優位性:¥1=$1の固定レートで、公式サイト¥7.3=$1比85%節約
- Asia-Pacific最適化:東京リージョンからの<50msレイテンシ
- 決済の柔軟性:WeChat Pay / Alipay対応で中国人民間決済が可能
- 無料クレジット:登録時に無料クレジット付与
- LLM互換性:OpenAI API互換でコード変更不要
よくあるエラーと対処法
エラー1: Kafka接続時の「LEADER_NOT_AVAILABLE」
# 症状
KafkaError{val=3,str='Broker: Leader not available'}
原因
ブローカーの再起動直後や、ネットワーク分断時に発生
解決法
1. トピック一覧確認
docker exec kafka-1 kafka-topics --bootstrap-server localhost:9092 --list
2. 該当トピックの詳細確認
docker exec kafka-1 kafka-topics --bootstrap-server localhost:9092 \
--describe --topic exchange-trades
3. リーダーを再選出任せる
docker exec kafka-1 kafka-leader-election \
--bootstrap-server localhost:9092 \
--topic exchange-trades \
--election-type UNCLEAN
4. 或いは、焦らず待ってから再試行(通常30秒以内に自動回復)
sleep 30 && docker restart kafka-1
エラー2: WebSocket切断と再接続の無限ループ
# 症状
ConnectionClosedException が連続発生
原因
- レートリミット超過(Too Many Requests)
- ネットワーク不安定
- サーバー側の接続数制限
解決法(指数バックオフの実装)
import asyncio
import random
class ResilientWebSocket:
def __init__(self, max_retries: int = 10):
self.max_retries = max_retries
async def connect_with_backoff(self, uri: str):
base_delay = 1
max_delay = 60
for attempt in range(self.max_retries):
try:
async with websockets.connect(uri, ping_interval=20) as ws:
await self._consume_messages(ws)
except websockets.ConnectionClosed as e:
delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay)
print(f"[WARN] Connection closed: {e.code} - Retry in {delay:.1f}s")
await asyncio.sleep(delay)
except Exception as e:
print(f"[ERROR] Unexpected error: {e}")
await asyncio.sleep(5)
print("[ERROR]