暗号通貨取引所のリアルタイムデータ処理は、高頻度取引bot、ポートフォリオ管理、リスク管理システムなど、多くのユースケースで必要とされています。本稿では、KafkaとWebSocketを活用した堅牢なデータパイプラインの構築方法を、Pythonでの実装例とともに解説します。

リアルタイム取引データ処理の基礎知識

暗号通貨取引所(Bybit、Binance、OKXなど)は、WebSocket APIを通じてリアルタイムの約定・注文・開発板データを提供します。生のWebSocketストリームをそのままアプリケーションで処理すると、接続断やデータ欠損のリスクがあります。

ここにKafkarokerを追加することで、以下の利点が得られます:

システムアーキテクチャ


┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   取引所A        │     │   取引所B        │     │   取引所C        │
│  (Binance)      │     │  (Bybit)        │     │  (OKX)          │
│  WebSocket      │     │  WebSocket      │     │  WebSocket      │
└────────┬────────┘     └────────┬────────┘     └────────┬────────┘
         │                       │                       │
         │ JSON messages         │ JSON messages         │ JSON messages
         ▼                       ▼                       ▼
┌─────────────────────────────────────────────────────────────────────┐
│                      Apache Kafka Cluster                            │
│  ┌──────────────────┐  ┌──────────────────┐  ┌──────────────────┐   │
│  │ Topic: trades    │  │ Topic: orderbook │  │ Topic: ticker    │   │
│  │ Partitions: 8    │  │ Partitions: 8    │  │ Partitions: 4    │   │
│  └──────────────────┘  └──────────────────┘  └──────────────────┘   │
└─────────────────────────────────────────────────────────────────────┘
         │                       │                       │
         ▼                       ▼                       ▼
┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│  コンシューマーA  │     │  コンシューマーB  │     │  コンシューマーC  │
│  (取引bot)       │     │  (分析基盤)       │     │  (通知サービス)   │
└─────────────────┘     └─────────────────┘     └─────────────────┘

実装コード:WebSocket → Kafka Producer

まず、WebSocketからデータを受信し、Kafkaトピックにpublishするプロデューサーアプリケーションを作成します。

#!/usr/bin/env python3
"""
暗号通貨取引所WebSocket → Kafka プロデューサー
Binance / Bybit 対応
"""

import asyncio
import json
import signal
from datetime import datetime
from typing import Optional

import websockets
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient, NewTopic


class ExchangeKafkaProducer:
    """取引所WebSocketストリームをKafkaにpublishするクラス"""
    
    def __init__(
        self,
        bootstrap_servers: str = "localhost:9092",
        exchange: str = "binance"
    ):
        self.bootstrap_servers = bootstrap_servers
        self.exchange = exchange
        self.running = True
        
        # Kafka Producer設定
        self.producer_config = {
            'bootstrap.servers': bootstrap_servers,
            'client.id': f'producer-{exchange}-websocket',
            'acks': 'all',                    # 全レプリカからの応答を待つ
            'retries': 3,
            'retry.backoff.ms': 100,
            'compression.type': 'gzip',      # 帯域幅節約
            'batch.size': 16384,              # 16KBバッチ
            'linger.ms': 5,                   # 5ms待機でバッチ送信
        }
        self.producer = Producer(self.producer_config)
        
        # 取引所別エンドポイント
        self.endpoints = {
            'binance': {
                'trades': 'wss://stream.binance.com:9443/ws/!trade@arr',
                'ticker': 'wss://stream.binance.com:9443/ws/!miniTicker@arr',
                'depth': 'wss://stream.binance.com:9443/ws/!depth@100ms@arr',
            },
            'bybit': {
                'trades': 'wss://stream.bybit.com/v5/public/spot',
                'ticker': 'wss://stream.bybit.com/v5/public/spot',
            }
        }
        
        # Kafkaトピック定義
        self.topics = ['exchange-trades', 'exchange-ticker', 'exchange-depth']
    
    def _delivery_callback(self, err, msg):
        """Kafka publishコールバック"""
        if err is not None:
            print(f"[ERROR] Message delivery failed: {err}")
        else:
            topic = msg.topic()
            partition = msg.partition()
            offset = msg.offset()
            # 本番環境ではロギングライブラリを使用
            # print(f"Delivered to {topic}[{partition}] @ {offset}")
    
    def _ensure_topics_exist(self):
        """必要なKafkaトピックが存在しない場合は作成"""
        admin = AdminClient({
            'bootstrap.servers': self.bootstrap_servers
        })
        
        existing = admin.list_topics().topics
        topics_to_create = []
        
        for topic in self.topics:
            if topic not in existing:
                topics_to_create.append(NewTopic(
                    topic,
                    num_partitions=8,
                    replication_factor=1,  # 本番環境では3以上を推奨
                    config={
                        'retention.ms': str(86400000 * 7),  # 7日間保持
                        'cleanup.policy': 'delete',
                    }
                ))
        
        if topics_to_create:
            futures = admin.create_topics(topics_to_create)
            for topic, future in futures.items():
                try:
                    future.result()
                    print(f"[INFO] Topic '{topic}' created successfully")
                except Exception as e:
                    print(f"[WARN] Topic '{topic}' creation: {e}")
    
    def _transform_binance_trade(self, data: dict) -> dict:
        """Binance tradeデータを正規化"""
        return {
            'exchange': 'binance',
            'symbol': data.get('s', ''),
            'trade_id': data.get('t', ''),
            'price': float(data.get('p', 0)),
            'quantity': float(data.get('q', 0)),
            'timestamp': int(data.get('T', 0)),
            'is_buyer_maker': data.get('m', False),
            'ingested_at': int(datetime.utcnow().timestamp() * 1000),
        }
    
    def _transform_binance_ticker(self, data: dict) -> dict:
        """Binanceティッカーデータを正規化"""
        return {
            'exchange': 'binance',
            'symbol': data.get('s', ''),
            'last_price': float(data.get('c', 0)),
            'open_price': float(data.get('o', 0)),
            'high_price': float(data.get('h', 0)),
            'low_price': float(data.get('l', 0)),
            'volume': float(data.get('v', 0)),
            'quote_volume': float(data.get('q', 0)),
            'timestamp': int(data.get('E', 0)),
            'ingested_at': int(datetime.utcnow().timestamp() * 1000),
        }
    
    def publish(self, topic: str, data: dict):
        """Kafkaトピックにメッセージを送信"""
        key = data.get('symbol', '').encode('utf-8')
        value = json.dumps(data, separators=(',', ':')).encode('utf-8')
        
        self.producer.produce(
            topic=topic,
            key=key,
            value=value,
            callback=self._delivery_callback
        )
        # 批処理送信のflushを待たずにパブリッシュ
        self.producer.poll(0)
    
    async def connect_binance_depth(self, symbol: str = 'btcusdt'):
        """Binance_DEPTH WebSocketに接続"""
        uri = f"wss://stream.binance.com:9443/ws/{symbol}@depth@100ms"
        print(f"[INFO] Connecting to Binance depth: {uri}")
        
        while self.running:
            try:
                async with websockets.connect(uri) as ws:
                    print(f"[INFO] Connected to Binance depth stream")
                    
                    while self.running:
                        message = await asyncio.wait_for(ws.recv(), timeout=30)
                        data = json.loads(message)
                        
                        transformed = {
                            'exchange': 'binance',
                            'symbol': symbol.upper(),
                            'bids': [[float(p), float(q)] for p, q in data.get('b', [])],
                            'asks': [[float(p), float(q)] for p, q in data.get('a', [])],
                            'last_update_id': data.get('lastUpdateId', 0),
                            'timestamp': int(datetime.utcnow().timestamp() * 1000),
                            'ingested_at': int(datetime.utcnow().timestamp() * 1000),
                        }
                        
                        self.publish('exchange-depth', transformed)
                        
            except websockets.ConnectionClosed:
                print("[WARN] Connection closed, reconnecting...")
                await asyncio.sleep(5)
            except Exception as e:
                print(f"[ERROR] {e}")
                await asyncio.sleep(5)
    
    async def connect_binance_ticker(self, symbols: list = None):
        """Binance ティッカーWebSocketに接続(複数シンボル対応)"""
        if symbols is None:
            symbols = ['btcusdt', 'ethusdt', 'solusdt']
        
        streams = '/'.join([f"{s}@miniTicker" for s in symbols])
        uri = f"wss://stream.binance.com:9443/stream?streams={streams}"
        print(f"[INFO] Connecting to Binance ticker: {len(symbols)} symbols")
        
        while self.running:
            try:
                async with websockets.connect(uri) as ws:
                    print(f"[INFO] Connected to Binance ticker stream")
                    
                    while self.running:
                        message = await asyncio.wait_for(ws.recv(), timeout=30)
                        wrapper = json.loads(message)
                        
                        if 'data' in wrapper:
                            for ticker_data in wrapper['data']:
                                transformed = self._transform_binance_ticker(ticker_data)
                                self.publish('exchange-ticker', transformed)
                        
            except websockets.ConnectionClosed:
                print("[WARN] Connection closed, reconnecting...")
                await asyncio.sleep(5)
            except Exception as e:
                print(f"[ERROR] {e}")
                await asyncio.sleep(5)
    
    async def connect_binance_trades(self, symbols: list = None):
        """Binance 約定WebSocketに接続"""
        if symbols is None:
            symbols = ['btcusdt', 'ethusdt', 'solusdt', 'bnbusdt']
        
        streams = '/'.join([f"{s}@aggTrade" for s in symbols])
        uri = f"wss://stream.binance.com:9443/stream?streams={streams}"
        print(f"[INFO] Connecting to Binance trades: {len(symbols)} symbols")
        
        while self.running:
            try:
                async with websockets.connect(uri) as ws:
                    print(f"[INFO] Connected to Binance aggTrade stream")
                    
                    while self.running:
                        message = await asyncio.wait_for(ws.recv(), timeout=30)
                        wrapper = json.loads(message)
                        
                        if 'data' in wrapper:
                            trade_data = wrapper['data']
                            transformed = {
                                'exchange': 'binance',
                                'symbol': trade_data.get('s', ''),
                                'aggregate_trade_id': trade_data.get('a', ''),
                                'price': float(trade_data.get('p', 0)),
                                'quantity': float(trade_data.get('q', 0)),
                                'first_trade_id': trade_data.get('f', 0),
                                'last_trade_id': trade_data.get('l', 0),
                                'timestamp': trade_data.get('T', 0),
                                'is_buyer_maker': trade_data.get('m', False),
                                'ingested_at': int(datetime.utcnow().timestamp() * 1000),
                            }
                            self.publish('exchange-trades', transformed)
                        
            except websockets.ConnectionClosed:
                print("[WARN] Connection closed, reconnecting...")
                await asyncio.sleep(5)
            except Exception as e:
                print(f"[ERROR] {e}")
                await asyncio.sleep(5)
    
    def start(self):
        """プロデューサーサービスを開始"""
        self._ensure_topics_exist()
        
        # シグナルハンドリング
        signal.signal(signal.SIGINT, lambda s, f: self.stop())
        signal.signal(signal.SIGTERM, lambda s, f: self.stop())
        
        # 非同期タスクを開始
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        
        tasks = [
            loop.create_task(self.connect_binance_trades()),
            loop.create_task(self.connect_binance_ticker()),
            loop.create_task(self.connect_binance_depth()),
        ]
        
        try:
            loop.run_until_complete(asyncio.gather(*tasks))
        finally:
            self.stop()
    
    def stop(self):
        """プロデューサーを停止"""
        print("[INFO] Shutting down producer...")
        self.running = False
        self.producer.flush(timeout=10)
        print("[INFO] Producer stopped")


if __name__ == '__main__':
    producer = ExchangeKafkaProducer(
        bootstrap_servers='localhost:9092',
        exchange='binance'
    )
    producer.start()

実装コード:Kafka Consumer(取引bot用)

Kafkaトピックからデータを消費し、取引シグナルを生成するコンシューマーアプリケーションの例です。

#!/usr/bin/env python3
"""
Kafka コンシューマー - 取引シグナル生成
HolySheep AI API統合で感情分析やニュースベースのリスク判定も可能
"""

import json
import signal
import asyncio
from datetime import datetime
from typing import Optional, Callable

import websockets
from confluent_kafka import Consumer, KafkaError, KafkaException


class TradingSignalGenerator:
    """Kafkaからリアルタイム約定データを消費し、取引シグナルを生成"""
    
    def __init__(
        self,
        bootstrap_servers: str = "localhost:9092",
        group_id: str = "trading-signal-consumer",
        holysheep_api_key: Optional[str] = None,
    ):
        self.bootstrap_servers = bootstrap_servers
        self.group_id = group_id
        self.holysheep_api_key = holysheep_api_key
        self.running = True
        
        # Kafka Consumer設定
        self.consumer_config = {
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id,
            'auto.offset.reset': 'latest',        # 最新から開始
            'enable.auto.commit': False,          # 手動コミット
            'max.poll.interval.ms': 300000,
            'session.timeout.ms': 45000,
            'heartbeat.interval.ms': 15000,
        }
        self.consumer = Consumer(self.consumer_config)
        
        #  Subscribe to topics
        self.consumer.subscribe(['exchange-trades', 'exchange-ticker'])
        
        # 価格状態管理
        self.price_history = {}  # symbol -> list of (timestamp, price)
        self.ticker_cache = {}   # symbol -> latest ticker
        
        # シグナルコールバック
        self.signal_callbacks: list[Callable] = []
    
    def add_signal_callback(self, callback: Callable):
        """シグナルコールバックを追加"""
        self.signal_callbacks.append(callback)
    
    async def call_holysheep_analysis(self, text: str) -> Optional[dict]:
        """HolySheep AI APIでテキスト分析(リスク判定など)"""
        if not self.holysheep_api_key:
            return None
        
        try:
            async with websockets.connect(
                'https://api.holysheep.ai/v1/chat/completions'
            ) as ws:
                request = {
                    'model': 'gpt-4.1',
                    'messages': [
                        {
                            'role': 'system',
                            'content': 'You are a crypto market analyst. Analyze the sentiment and risk level.'
                        },
                        {
                            'role': 'user', 
                            'content': text
                        }
                    ],
                    'temperature': 0.3,
                    'max_tokens': 150,
                }
                await ws.send(json.dumps(request))
                response = await asyncio.wait_for(ws.recv(), timeout=10)
                return json.loads(response)
        except Exception as e:
            print(f"[WARN] HolySheep API call failed: {e}")
            return None
    
    def _calculate_volatility(self, symbol: str, window: int = 20) -> float:
        """指定 window 足の価格データからボラティリティを計算"""
        if symbol not in self.price_history:
            return 0.0
        
        prices = [p for _, p in self.price_history[symbol][-window:]]
        if len(prices) < 2:
            return 0.0
        
        mean = sum(prices) / len(prices)
        variance = sum((p - mean) ** 2 for p in prices) / len(prices)
        return variance ** 0.5 / mean if mean > 0 else 0.0
    
    def _calculate_momentum(self, symbol: str, window: int = 10) -> float:
        """モメンタム(価格変化率)を計算"""
        if symbol not in self.price_history or len(self.price_history[symbol]) < window:
            return 0.0
        
        prices = self.price_history[symbol]
        old_price = prices[-window][1]
        new_price = prices[-1][1]
        
        return (new_price - old_price) / old_price if old_price > 0 else 0.0
    
    def _generate_signal(self, symbol: str) -> dict:
        """現在の状態から取引シグナルを生成"""
        volatility = self._calculate_volatility(symbol)
        momentum = self._calculate_momentum(symbol)
        ticker = self.ticker_cache.get(symbol, {})
        
        # シグナル判定ロジック
        signal_type = 'HOLD'
        confidence = 0.0
        reason = []
        
        if momentum > 0.02 and volatility < 0.005:
            signal_type = 'BUY'
            confidence = min(abs(momentum) * 10, 0.95)
            reason.append(f'強気モメンタム (+{momentum:.2%})')
        elif momentum < -0.02 and volatility < 0.005:
            signal_type = 'SELL'
            confidence = min(abs(momentum) * 10, 0.95)
            reason.append(f'弱気モメンタム ({momentum:.2%})')
        elif volatility > 0.01:
            signal_type = 'WATCH'
            confidence = volatility * 5
            reason.append(f'高ボラティリティ ({volatility:.2%})')
        
        return {
            'signal_type': signal_type,
            'symbol': symbol,
            'confidence': round(confidence, 4),
            'momentum': round(momentum, 6),
            'volatility': round(volatility, 6),
            'last_price': ticker.get('last_price', 0),
            'volume_24h': ticker.get('volume', 0),
            'reason': '; '.join(reason),
            'generated_at': int(datetime.utcnow().timestamp() * 1000),
        }
    
    def process_trade(self, data: dict):
        """約定データを処理"""
        symbol = data.get('symbol')
        price = data.get('price')
        timestamp = data.get('timestamp', 0)
        
        if not symbol or not price:
            return
        
        # 価格履歴を更新(最大1000件保持)
        if symbol not in self.price_history:
            self.price_history[symbol] = []
        
        self.price_history[symbol].append((timestamp, price))
        if len(self.price_history[symbol]) > 1000:
            self.price_history[symbol] = self.price_history[symbol][-1000:]
    
    def process_ticker(self, data: dict):
        """ティッカーデータを処理"""
        symbol = data.get('symbol')
        if symbol:
            self.ticker_cache[symbol] = data
            
            # シグナル生成(ティッカー更新時のみ)
            if len(self.price_history.get(symbol, [])) >= 10:
                signal_data = self._generate_signal(symbol)
                
                # コールバック実行
                for callback in self.signal_callbacks:
                    try:
                        callback(signal_data)
                    except Exception as e:
                        print(f"[ERROR] Signal callback failed: {e}")
    
    def start(self):
        """コンシューマーサービスを開始"""
        print(f"[INFO] Starting consumer group: {self.group_id}")
        
        signal.signal(signal.SIGINT, lambda s, f: self.stop())
        signal.signal(signal.SIGTERM, lambda s, f: self.stop())
        
        try:
            while self.running:
                msg = self.consumer.poll(timeout=1.0)
                
                if msg is None:
                    continue
                
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        print(f"[INFO] End of partition reached")
                    else:
                        raise KafkaException(msg.error())
                    continue
                
                try:
                    data = json.loads(msg.value().decode('utf-8'))
                    topic = msg.topic()
                    
                    if topic == 'exchange-trades':
                        self.process_trade(data)
                    elif topic == 'exchange-ticker':
                        self.process_ticker(data)
                    
                    # オフセットコミット(5件ごと)
                    if msg.offset() % 5 == 0:
                        self.consumer.commit(asynchronous=True)
                        
                except json.JSONDecodeError as e:
                    print(f"[ERROR] JSON decode failed: {e}")
                except Exception as e:
                    print(f"[ERROR] Processing error: {e}")
                    
        finally:
            self.stop()
    
    def stop(self):
        """コンシューマーを停止"""
        print("[INFO] Shutting down consumer...")
        self.running = False
        self.consumer.close()
        print("[INFO] Consumer stopped")


def example_signal_callback(signal_data: dict):
    """シグナル処理の例"""
    if signal_data['signal_type'] in ['BUY', 'SELL']:
        print(f"[SIGNAL] {signal_data['signal_type']} {signal_data['symbol']} "
              f"(confidence: {signal_data['confidence']:.2%})")
        print(f"  Reason: {signal_data['reason']}")
        print(f"  Momentum: {signal_data['momentum']:.4f}, "
              f"Volatility: {signal_data['volatility']:.4f}")


if __name__ == '__main__':
    consumer = TradingSignalGenerator(
        bootstrap_servers='localhost:9092',
        group_id='trading-signal-v1',
        # HolySheep APIキーを設定することでAI分析を有効化
        # holysheep_api_key='YOUR_HOLYSHEEP_API_KEY'
    )
    
    # シグナルコールバックを登録
    consumer.add_signal_callback(example_signal_callback)
    
    # コンシューマー開始
    consumer.start()

docker-composeでのKafkaクラスタ構築

# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - kafka-network

  kafka-1:
    image: confluentinc/cp-kafka:7.5.0
    hostname: kafka-1
    container_name: kafka-1
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
    networks:
      - kafka-network

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka-1
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
    networks:
      - kafka-network

networks:
  kafka-network:
    driver: bridge
# Kafkaクラスタ起動
docker-compose up -d

トピック一覧確認

docker exec kafka-1 kafka-topics --bootstrap-server localhost:9092 --list

プロデューサー/コンシューマー接続確認

docker exec -it kafka-1 kafka-console-producer --bootstrap-server localhost:9092 --topic exchange-trades docker exec -it kafka-1 kafka-console-consumer --bootstrap-server localhost:9092 --topic exchange-trades --from-beginning

向いている人・向いていない人

向いている人 向いていない人
高頻度取引システムの開発者 少量データの定期処理だけで十分な人
複数の取引所データを統合分析したい人 Kafkaの運用コストを払いたくない人
リアルタイムのリスク管理が必要不可欠な人 SQLベースのバッチ処理で十分な人
マイクロサービスアーキテクチャを採用しているチーム Infrastructure as Code的学习曲線を避けたい人
データバックフィル(過去データ再生)が必要な人 1つの取引所で少額取引をしているだけの個人投資家

価格とROI

Kafkaクラスタを自社運用する場合のコスト構造と、HolySheep AIなどのManaged Service比較を示します。

項目 自社運用(Kafka) Managed Service
インフラコスト ¥50,000〜/月(EC2 3台構成) ¥15,000〜/月(CvoKfa on Confluent Cloud)
運用工数 月40〜80時間(監視・障害対応・アップグレード) 月5〜10時間
初期構築 2〜4週間 1〜3日
スケーラビリティ 手動対応が必要 自動スケール
可用性 設計に依存(SLA自己管理) 99.99% SLO提供
1年総コスト ¥600,000 + 運用人件費 ¥180,000 + 運用人件費

HolySheepを選ぶ理由

リアルタイム取引データ処理にAI分析を組み込む場合、APIコストも重要な要素です。HolySheep AIは2026年現在の pricingで他社と比較しても大幅なコスト優位性があります。

Provider Model Output Price ($/M tokens) ¥1での取得量 DeepSeek比
HolySheep AI DeepSeek V3.2 $0.42 2,381,000 tokens -
Google Gemini 2.5 Flash $2.50 400,000 tokens 83%高い
OpenAI GPT-4.1 $8.00 125,000 tokens 95%高い
Anthropic Claude Sonnet 4.5 $15.00 66,667 tokens 97%高い

月1000万トークン使用の場合、成本差は歴然です:

HolySheep AIを選ぶ具体的な理由は:

  1. レート面の優位性:¥1=$1の固定レートで、公式サイト¥7.3=$1比85%節約
  2. Asia-Pacific最適化:東京リージョンからの<50msレイテンシ
  3. 決済の柔軟性:WeChat Pay / Alipay対応で中国人民間決済が可能
  4. 無料クレジット登録時に無料クレジット付与
  5. LLM互換性:OpenAI API互換でコード変更不要

よくあるエラーと対処法

エラー1: Kafka接続時の「LEADER_NOT_AVAILABLE」

# 症状
KafkaError{val=3,str='Broker: Leader not available'}

原因

ブローカーの再起動直後や、ネットワーク分断時に発生

解決法

1. トピック一覧確認

docker exec kafka-1 kafka-topics --bootstrap-server localhost:9092 --list

2. 該当トピックの詳細確認

docker exec kafka-1 kafka-topics --bootstrap-server localhost:9092 \ --describe --topic exchange-trades

3. リーダーを再選出任せる

docker exec kafka-1 kafka-leader-election \ --bootstrap-server localhost:9092 \ --topic exchange-trades \ --election-type UNCLEAN

4. 或いは、焦らず待ってから再試行(通常30秒以内に自動回復)

sleep 30 && docker restart kafka-1

エラー2: WebSocket切断と再接続の無限ループ

# 症状
ConnectionClosedException が連続発生

原因

- レートリミット超過(Too Many Requests) - ネットワーク不安定 - サーバー側の接続数制限

解決法(指数バックオフの実装)

import asyncio import random class ResilientWebSocket: def __init__(self, max_retries: int = 10): self.max_retries = max_retries async def connect_with_backoff(self, uri: str): base_delay = 1 max_delay = 60 for attempt in range(self.max_retries): try: async with websockets.connect(uri, ping_interval=20) as ws: await self._consume_messages(ws) except websockets.ConnectionClosed as e: delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay) print(f"[WARN] Connection closed: {e.code} - Retry in {delay:.1f}s") await asyncio.sleep(delay) except Exception as e: print(f"[ERROR] Unexpected error: {e}") await asyncio.sleep(5) print("[ERROR]