暗号通貨取引所の板情報(Orderbook)は、市場の流動性・価格発見・トレーディング戦略の根幹を成すデータ構造です。しかし、板情報の履歴データは通常リアルタイム取引のために最適化されており、長期保存されることは稀です。本稿では、過去の板情報を高精度で復元する技術的手法を解説し、私自身の实践经验に基づく実装ポイントと、よくある課題への解決策を詳述します。
Orderbook Reconstructionとは
Orderbook Reconstructionとは、取引履歴(Trade Tick Data)や板快照(Snapshot)などの部分的なデータから、任意の時間点における板情報全体を再構築する技術です。主な用途は以下の通りです:
- バックテスト:高頻度取引戦略の精度向上
- マーケットマイクロストラクチャー研究:流動性供給・吸収の分析
- 約定履歴の検証:、特定注文の執行品質評価
- 価格操作検知:異常取引パターンの特定
向いている人・向いていない人
| 向いている人 | 向いていない人 |
|---|---|
| 高頻度取引(HFT)戦略を開發するクオンツ開発者 | 日足ベースの長期投資のみを行う投資家 |
| マーケットメイク戦略を検証するプロトレーダー | リアルタイム板データだけで十分な使用者 |
| 学術研究で板力学を分析する研究者 | свежихデータのみで十分なケース |
| 取引所やブローカーの執行品質を監査する人物 | 構築済みデータを提供する有料サービスを探しているだけのユーザー |
技術的背景:板情報の基本構造
暗号通貨取引所の板情報は一般的に以下の構造を持ちます:
- Bids(買い注文):価格降順で並んだ指値買い注文
- Asks(売り注文):価格昇順で並んだ指値売り注文
- Price Levels:各価格レベルでの累積数量
- Timestamp:マイクロ秒単位の時刻情報
実装:Historical Orderbook Reconstructionシステム
以下に、私が実際に構築した板情報復元システムの核心部分をPythonで実装します。HolySheep AIのAPI用于自然言語による分析クエリ生成과市場レポート作成を行います。
import asyncio
import aiohttp
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from collections import defaultdict
import heapq
from datetime import datetime
import json
===== 板情報データクラス =====
@dataclass
class PriceLevel:
"""価格レベルを表現"""
price: float
quantity: float
order_count: int = 0
@dataclass
class Orderbook:
"""一枚の板情報を表現"""
timestamp: int # ミリ秒タイムスタンプ
bids: List[PriceLevel] = field(default_factory=list)
asks: List[PriceLevel] = field(default_factory=list)
symbol: str = ""
def best_bid(self) -> Optional[float]:
return self.bids[0].price if self.bids else None
def best_ask(self) -> Optional[float]:
return self.ask[0].price if self.asks else None
def spread(self) -> Optional[float]:
b, a = self.best_bid(), self.best_ask()
return a - b if b and a else None
===== Orderbook Reconstruction Engine =====
class OrderbookReconstructor:
"""板情報履歴復元エンジン"""
def __init__(self, symbol: str, depth: int = 20):
self.symbol = symbol
self.depth = depth
# アクティブな注文を價格で管理(売りは最小ヒープ、買いは最大ヒープ)
self.active_bids: Dict[float, float] = {} # price -> quantity
self.active_asks: Dict[float, float] = {}
self.trades: List[Dict] = []
self.snapshots: List[Orderbook] = []
def apply_snapshot(self, snapshot: Dict):
"""初期快照を適用"""
self.active_bids.clear()
self.active_asks.clear()
for bid in snapshot.get('bids', [])[:self.depth]:
self.active_bids[float(bid['price'])] = float(bid['quantity'])
for ask in snapshot.get('asks', [])[:self.depth]:
self.active_asks[float(ask['price'])] = float(ask['quantity'])
ts = snapshot.get('timestamp', 0)
self.snapshots.append(self._build_orderbook(ts))
def apply_trade(self, trade: Dict):
"""取引を適用して板を更新"""
price = float(trade['price'])
quantity = float(trade['quantity'])
side = trade.get('side', 'buy') # 'buy' or 'sell'
timestamp = trade.get('timestamp', 0)
self.trades.append({
'price': price,
'quantity': quantity,
'side': side,
'timestamp': timestamp
})
# 板の更新
if side == 'buy':
self._match_bid(price, quantity)
else:
self._match_ask(price, quantity)
def apply_order_update(self, update: Dict):
"""注文更新(新規・変更・キャンセル)を適用"""
update_type = update.get('type', 'unknown')
price = float(update['price'])
quantity = float(update.get('quantity', 0))
if update_type == 'new':
if update.get('side') == 'buy':
self.active_bids[price] = quantity
else:
self.active_asks[price] = quantity
elif update_type == 'cancel':
if update.get('side') == 'buy':
self.active_bids.pop(price, None)
else:
self.active_asks.pop(price, None)
elif update_type == 'modify':
if update.get('side') == 'buy':
self.active_bids[price] = quantity
else:
self.active_asks[price] = quantity
def _match_bid(self, price: float, quantity: float):
"""買い約定を処理:板の売り注文からマッチング"""
remaining = quantity
# 最安値の売りから処理
for ask_price in sorted(self.active_asks.keys()):
if ask_price > price:
break
if remaining <= 0:
break
available = self.active_asks[ask_price]
matched = min(remaining, available)
remaining -= matched
if available == matched:
del self.active_asks[ask_price]
else:
self.active_asks[ask_price] = available - matched
def _match_ask(self, price: float, quantity: float):
"""売り約定を処理:板の買い注文からマッチング"""
remaining = quantity
# 最高値の買いから処理
for bid_price in sorted(self.active_bids.keys(), reverse=True):
if bid_price < price:
break
if remaining <= 0:
break
available = self.active_bids[bid_price]
matched = min(remaining, available)
remaining -= matched
if available == matched:
del self.active_bids[bid_price]
else:
self.active_bids[bid_price] = available - matched
def _build_orderbook(self, timestamp: int) -> Orderbook:
"""現在の状態でOrderbookオブジェクトを構築"""
bids = [
PriceLevel(price=p, quantity=q)
for p, q in sorted(self.active_bids.items(), reverse=True)[:self.depth]
]
asks = [
PriceLevel(price=p, quantity=q)
for p, q in sorted(self.active_asks.items())[:self.depth]
]
return Orderbook(timestamp=timestamp, bids=bids, asks=asks, symbol=self.symbol)
def get_state(self, timestamp: int) -> Orderbook:
"""指定タイムスタンプの板状態を取得"""
return self._build_orderbook(timestamp)
def get_snapshots(self, start_ts: int, end_ts: int) -> List[Orderbook]:
"""期間内の板快照リストを取得"""
return [
s for s in self.snapshots
if start_ts <= s.timestamp <= end_ts
]
===== データソース抽象化 =====
class DataSource:
"""various exchange data sources"""
def __init__(self, api_key: str, base_url: str = "https://api.holysheep.ai/v1"):
self.api_key = api_key
self.base_url = base_url
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
headers={"Authorization": f"Bearer {self.api_key}"}
)
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def fetch_historical_data(
self,
exchange: str,
symbol: str,
start_ts: int,
end_ts: int
) -> Dict:
""" exchanges APIから履歴データを取得(例:Binance, Coinbase, Kraken)"""
# 実際の実装では該当するexchangeのAPIを呼び出す
# ここでは統一インターフェースを示す
endpoints = {
'binance': f"https://api.binance.com/api/v3",
'coinbase': f"https://api.exchange.coinbase.com",
'kraken': f"https://api.kraken.com/0"
}
# ダミーの実装(実際のexchange API呼び出しに置き換え)
return {
'snapshots': [], # List[Dict] - 板快照
'trades': [], # List[Dict] - 取引履歴
'updates': [] # List[Dict] - 注文更新
}
===== Reconstruction 実行例 =====
async def reconstruct_orderbook_example():
"""復元処理の実行例"""
reconstructor = OrderbookReconstructor(symbol="BTC-USDT", depth=20)
# 実際の実装ではDataSourceからデータを取得
# async with DataSource(api_key="YOUR_API_KEY") as ds:
# data = await ds.fetch_historical_data("binance", "BTC-USDT", start_ts, end_ts)
# ① 初期快照を適用
sample_snapshot = {
'timestamp': 1704067200000, # 2024-01-01 00:00:00 UTC
'bids': [
{'price': '42150.00', 'quantity': '1.234'},
{'price': '42149.50', 'quantity': '2.456'},
],
'asks': [
{'price': '42150.50', 'quantity': '0.876'},
{'price': '42151.00', 'quantity': '1.543'},
]
}
reconstructor.apply_snapshot(sample_snapshot)
# ② 取引・更新を時系列順に適用
sample_trades = [
{'price': '42150.50', 'quantity': '0.500', 'side': 'buy', 'timestamp': 1704067201000},
{'price': '42151.00', 'quantity': '0.300', 'side': 'sell', 'timestamp': 1704067202000},
]
for trade in sample_trades:
reconstructor.apply_trade(trade)
# ③ 特定時点の板状態を取得
current_state = reconstructor.get_state(timestamp=1704067202000)
print(f"Best Bid: {current_state.best_bid()}")
print(f"Best Ask: {current_state.best_ask()}")
print(f"Spread: {current_state.spread()}")
if __name__ == "__main__":
asyncio.run(reconstruct_orderbook_example())
HolySheep AIによる分析統合
復元された板情報はそのままでは有用ですが、HolySheep AIのAPIを組み合わせることで、自然言語による分析クエリや自動化された市場レポート生成が可能になります。以下に、私が必要と感じたAI分析統合の実装例を示します:
import aiohttp
import json
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
===== HolySheep AI分析クライアント =====
class HolySheepAnalysisClient:
"""
HolySheep AI API統合クライアント
ドキュメント: https://docs.holysheep.ai
注册: https://www.holysheep.ai/register
"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
)
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def analyze_market_structure(
self,
orderbook_snapshots: List[Dict],
trades: List[Dict]
) -> Dict:
"""
板構造と取引データから市場分析レポートを生成
Args:
orderbook_snapshots: 時系列の板快照リスト
trades: 取引履歴リスト
Returns:
AI生成による分析結果
"""
# 統計サマリーを計算
if trades:
avg_trade_size = sum(t.get('quantity', 0) for t in trades) / len(trades)
price_volatility = self._calculate_volatility(trades)
else:
avg_trade_size = 0
price_volatility = 0
# HolySheep AIに送信する分析プロンプト構築
analysis_prompt = self._build_analysis_prompt(
snapshots=orderbook_snapshots,
trades=trades,
avg_trade_size=avg_trade_size,
volatility=price_volatility
)
# DeepSeek V3.2モデルで分析実行($0.42/MTok - 低コスト高性能)
response = await self._call_llm(
model="deepseek-chat-v3.2",
messages=[
{
"role": "system",
"content": """あなたは暗号通貨市場マイクロストラクチャーの専門家です。
提供された板情報と取引データに基づき、流動性・ volatility・注文パターンを分析してください。
日本語で構造化されたレポートを出力してください。"""
},
{
"role": "user",
"content": analysis_prompt
}
],
max_tokens=2000,
temperature=0.3
)
return {
'analysis': response,
'statistics': {
'total_snapshots': len(orderbook_snapshots),
'total_trades': len(trades),
'avg_trade_size': avg_trade_size,
'volatility': price_volatility
}
}
async def _call_llm(
self,
model: str,
messages: List[Dict],
max_tokens: int = 1000,
temperature: float = 0.7
) -> str:
"""HolySheep AI Chat Completions API呼び出し"""
payload = {
"model": model,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature
}
async with self.session.post(
f"{self.base_url}/chat/completions",
json=payload
) as response:
if response.status != 200:
error_body = await response.text()
raise Exception(f"API Error {response.status}: {error_body}")
result = await response.json()
return result['choices'][0]['message']['content']
def _build_analysis_prompt(
self,
snapshots: List[Dict],
trades: List[Dict],
avg_trade_size: float,
volatility: float
) -> str:
"""分析用プロンプト構築"""
# 最新と最古の板状態を抽出
latest = snapshots[-1] if snapshots else {}
earliest = snapshots[0] if snapshots else {}
prompt = f"""
分析対象データ概要
- データポイント数: {len(snapshots)} 快照, {len(trades)} 取引
- 平均取引サイズ: {avg_trade_size:.6f}
- 価格変動性: {volatility:.4f}
最新板状態(Bid/Ask各3レベル)
【買い注文】
{self._format_price_levels(latest.get('bids', [])[:3])}
【売り注文】
{self._format_price_levels(latest.get('asks', [])[:3])}
分析依頼
1. 流動性の分布と深度の評価
2. 買い手・売り力のバランス分析
3. 潜在的な流動性バランス(板の歪みやクラスタリング)
4. 異常注文パターンや市場操作的兆候の有無
5. トレーディング戦略への示唆
日本語で詳細に分析してください。
"""
return prompt
def _format_price_levels(self, levels: List[Dict]) -> str:
"""価格レベルリストの整形表示"""
if not levels:
return "(データなし)"
return "\n".join([
f" - {lv.get('price', 'N/A')}: {lv.get('quantity', 'N/A')} ({lv.get('order_count', 1)}件)"
for lv in levels
])
def _calculate_volatility(self, trades: List[Dict]) -> float:
"""単純価格変動性を計算"""
if len(trades) < 2:
return 0.0
prices = [float(t['price']) for t in trades]
mean = sum(prices) / len(prices)
variance = sum((p - mean) ** 2 for p in prices) / len(prices)
return variance ** 0.5
===== 自動化された日次レポート生成 =====
async def generate_daily_market_report(
holy_sheep_client: HolySheepAnalysisClient,
date: str, # "2024-01-01"
snapshots: List[Dict],
trades: List[Dict]
) -> str:
"""
日次の市場分析レポートを自動生成
HolySheep AIのDeepSeek V3.2モデルでコスト効率良く処理
"""
# 時間帯別分析のためデータを分割
hourly_data = defaultdict(lambda: {'snapshots': [], 'trades': []})
for snap in snapshots:
ts = snap.get('timestamp', 0)
hour = datetime.utcfromtimestamp(ts / 1000).hour
hourly_data[hour]['snapshots'].append(snap)
for trade in trades:
ts = trade.get('timestamp', 0)
hour = datetime.utcfromtimestamp(ts / 1000).hour
hourly_data[hour]['trades'].append(trade)
# レポートセクション生成
sections = [f"# {date} 市場分析レポート\n"]
for hour in sorted(hourly_data.keys()):
data = hourly_data[hour]
if not data['trades']:
continue
section = await holy_sheep_client.analyze_market_structure(
orderbook_snapshots=data['snapshots'],
trades=data['trades']
)
sections.append(f"## {hour:02d}:00 - {hour:02d}:59\n")
sections.append(section['analysis'])
sections.append("\n---\n")
return "\n".join(sections)
===== 使用例 =====
async def main():
"""HolySheep AI分析統合の完全な使用例"""
# HolySheep AIクライアント初期化
# 登録: https://www.holysheep.ai/register
async with HolySheepAnalysisClient(api_key="YOUR_HOLYSHEEP_API_KEY") as client:
# サンプル板データ
sample_snapshots = [
{
'timestamp': 1704067200000,
'bids': [{'price': '42100', 'quantity': '1.5'}, {'price': '42099', 'quantity': '2.0'}],
'asks': [{'price': '42101', 'quantity': '1.2'}, {'price': '42102', 'quantity': '0.8'}]
},
{
'timestamp': 1704067260000,
'bids': [{'price': '42102', 'quantity': '2.1'}, {'price': '42101', 'quantity': '1.8'}],
'asks': [{'price': '42103', 'quantity': '1.5'}, {'price': '42104', 'quantity': '0.9'}]
}
]
sample_trades = [
{'price': '42101', 'quantity': '0.5', 'side': 'buy', 'timestamp': 1704067230000},
{'price': '42102', 'quantity': '0.3', 'side': 'sell', 'timestamp': 1704067240000},
]
# 市場構造分析実行
result = await client.analyze_market_structure(
orderbook_snapshots=sample_snapshots,
trades=sample_trades
)
print("=== 分析結果 ===")
print(result['analysis'])
print(f"\n統計サマリー: {result['statistics']}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
価格とROI
板情報復元システムの運用において、HolySheep AIの導入は分析業務の自動化とコスト最適化の両面で大きなROIをもたらします。以下に主要なLLMプロバイダーとの比較を示します:
| プロバイダー | モデル | 入力価格($/MTok) | 出力価格($/MTok) | HolySheep比 |
|---|---|---|---|---|
| HolySheep AI | DeepSeek V3.2 | $0.21 | $0.42 | 基準 |
| OpenAI | GPT-4.1 | $2.50 | $8.00 | 19x高 |
| Anthropic | Claude Sonnet 4.5 | $3.00 | $15.00 | 36x高 |
| Gemini 2.5 Flash | $0.30 | $2.50 | 6x高 | |
| HolySheep AI | Gemini 2.5 Flash | $0.15 | $2.50 | 6x(入力側50%off) |
計算例:日次レポート1,000件(各2,000トークン出力)の場合
- GPT-4.1使用時:$8.00 × 2,000 × 1,000 / 1,000 = $16,000/月
- HolySheep DeepSeek V3.2使用時:$0.42 × 2,000 × 1,000 / 1,000 = $840/月
- 節約額:$15,160/月(95%削減)
HolySheep AIは登録時に無料クレジットが付与され、レートは¥1=$1( 공식¥7.3=$1比85%節約)という破格のコストで運用を開始できます。
HolySheepを選ぶ理由
私自身が板情報復元システムにHolySheep AIを採用している理由は以下の通りです:
- コスト効率:DeepSeek V3.2モデルの出力価格が$0.42/MTokと業界最安水準。1日1,000回の分析クエリを処理しても月額$1,000以下に抑えられます。
- 低レイテンシ:<50msの応答速度を実現。リアルタイム市場分析ワークロードでもストレスなく処理できます。
- 柔軟な決済手段:WeChat Pay・Alipay対応により、国際的なクレジットカード不要で即座にクレディットを追加可能。
- シンプルなAPI統合:OpenAI互換のインターフェース设计により、既存のLangChain・LlamaIndexワークフローに最小限の変更で統合可能。
- 日本語ドキュメントの充実:HolySheepのドキュメントは私が普段使うシナリオに即した日本語 достаで、社区のレスポンスも迅速。
よくあるエラーと対処法
エラー1:Snapshot缺失による復元不能
# 問題:初期快照がない場合、板復元が不可能
エラー例:
ValueError: No snapshot available for timestamp reconstruction
解決策:複数のSnapshotを補間するクラス実装
class SnapshotInterpolator:
"""缺失したSnapshotを線形補間で補完"""
def __init__(self, snapshots: List[Orderbook]):
self.snapshots = sorted(snapshots, key=lambda x: x.timestamp)
def get_snapshot(self, timestamp: int) -> Optional[Orderbook]:
"""指定時刻に最も近いSnapshotを返す"""
if not self.snapshots:
return None
# 時刻以前の最新Snapshot
before = None
for snap in reversed(self.snapshots):
if snap.timestamp <= timestamp:
before = snap
break
# 時刻以降の最初Snapshot
after = None
for snap in self.snapshots:
if snap.timestamp >= timestamp:
after = snap
break
if before is None:
# 目標時刻より古いSnapshotがない場合
if after:
return after
return None
if after is None:
# 目標時刻より新しいSnapshotがない場合
return before
# 両方が存在する場合:加重平均で補間
weight = (timestamp - before.timestamp) / (after.timestamp - before.timestamp)
return self._interpolate(before, after, weight)
def _interpolate(
self,
before: Orderbook,
after: Orderbook,
weight: float
) -> Orderbook:
"""2つのSnapshot間の数量を線形補間"""
interpolated_bids = []
all_prices = set(
[b.price for b in before.bids] +
[a.price for a in after.bids]
)
for price in sorted(all_prices, reverse=True)[:20]:
q_before = next((b.quantity for b in before.bids if b.price == price), 0)
q_after = next((a.quantity for a in after.bids if a.price == price), 0)
q_interp = q_before + (q_after - q_before) * weight
interpolated_bids.append(PriceLevel(price=price, quantity=q_interp))
# Asksも同様に処理(省略)
interpolated_asks = []
return Orderbook(
timestamp=int(before.timestamp + (after.timestamp - before.timestamp) * weight),
bids=interpolated_bids,
asks=interpolated_asks
)
エラー2:時刻同期の不整合
# 問題:exchange間のタイムスタンプ形式が異なる
Binance: ミリ秒タイムスタンプ
Coinbase: RFC3339文字列
Kraken: Unix秒(小数点以下6桁)
解決策:統一的な時刻正規化ユーティリティ
from datetime import datetime, timezone
from typing import Union
class TimestampNormalizer:
"""exchange間の時刻形式差异を吸収"""
@staticmethod
def normalize(timestamp: Union[int, float, str], exchange: str) -> int:
"""
すべてのタイムスタンプをミリ秒単位のUnixタイムスタンプに変換
Args:
timestamp: 各exchange原生の日時表現
exchange: 'binance', 'coinbase', 'kraken', 'bybit'など
Returns:
ミリ秒単位のUnixタイムスタンプ
"""
if exchange == 'binance':
# Binance: ミリ秒そのまま
return int(timestamp)
elif exchange == 'coinbase':
# Coinbase: ISO8601/RFC3339文字列
if isinstance(timestamp, str):
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
return int(dt.timestamp() * 1000)
return int(timestamp) # すでにミリ秒の場合
elif exchange == 'kraken':
# Kraken: Unix秒(小数点6桁)またはミリ秒
if isinstance(timestamp, str):
ts = float(timestamp)
else:
ts = timestamp
# 小数点以下の桁数で判定
if ts < 1e12: # 秒単位と判定
ts = ts * 1000
return int(ts)
elif exchange == 'bybit':
# Bybit: ミリ秒(秒の場合もある)
ts = float(timestamp) if isinstance(timestamp, str) else timestamp
if ts < 1e12:
ts = ts * 1000
return int(ts)
else:
# デフォルト:ミリ秒として扱う
return int(timestamp)
@staticmethod
def to_datetime(ms_timestamp: int) -> datetime:
"""ミリ秒タイムスタンプをdatetimeに変換"""
return datetime.fromtimestamp(ms_timestamp / 1000, tz=timezone.utc)
@staticmethod
def to_iso(ms_timestamp: int) -> str:
"""ミリ秒タイムスタンプをISO8601文字列に変換"""
return TimestampNormalizer.to_datetime(ms_timestamp).isoformat()
使用例
normalizer = TimestampNormalizer()
ts_binance = normalizer.normalize(1704067200000, 'binance')
ts_coinbase = normalizer.normalize('2024-01-01T00:00:00.000Z', 'coinbase')
ts_kraken = normalizer.normalize('1704067200.123456', 'kraken')
print(f"Binance: {ts_binance} -> {normalizer.to_iso(ts_binance)}")
print(f"Coinbase: {ts_coinbase} -> {normalizer.to_iso(ts_coinbase)}")
print(f"Kraken: {ts_kraken} -> {normalizer.to_iso(ts_kraken)}")
すべて同じISO8601時刻を表す
エラー3:HolySheep API呼び出しのレートリミット超過
# 問題:API呼び出しがレートリミットで失敗
HTTP 429: Too Many Requests
解決策:指数バックオフ+リクエストキュー実装
import asyncio
import time
from collections import deque
from typing import Optional, Callable, Any
import aiohttp
class RateLimitedClient:
"""指数バックオフ付きレート制限クライアント"""
def __init__(
self,
api_key: str,
base_url: str = "https://api.holysheep.ai/v1",
requests_per_minute: int = 60,
max_retries: int = 5
):
self.api_key = api_key
self.base_url = base_url
self.rpm = requests_per_minute
self.max_retries = max_retries
self.request_times: deque = deque(maxlen=requests_per_minute)
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
headers={"Authorization": f"Bearer {self.api_key}"}
)
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def call_with_retry(
self,
endpoint: str,
method: str = "POST",
payload: Optional[Dict] = None,
base_delay: float = 1.0
) -> Dict:
"""
レート制限を考慮してAPI呼び出しを実行
Args:
endpoint: APIエンドポイント
method: HTTPメソッド
payload: リクエストボディ
base_delay: 初期バックオフ遅延(秒)
Returns:
APIレスポンス
"""
for attempt in range(self.max_retries):
try:
# レート制限チェック
await self._throttle()
# API呼び出し
async with self.session.request(
method=method,
url=f"{self.base_url}/{endpoint}",
json=payload
) as response:
if response.status == 200:
return await response.json()
elif response.status == 429:
# レート制限超過:指数バックオフ
retry_after = response.headers.get('Retry-After', '1')
delay = max(float(retry_after), base_delay * (2 ** attempt))
print(f"[RateLimit] Attempt {attempt+1}/{self.max_retries}: "
f"Waiting {delay:.1f}s")
await asyncio.sleep(delay)
elif response.status == 500:
# サーバーエラー:再試行
delay = base_delay * (2 ** attempt)
print(f"[ServerError] Attempt {attempt+1}/{self.max_retries}: "
f"Waiting {delay:.1f}s")
await asyncio.sleep(delay)
else:
error_body = await response.text()
raise Exception(f"API Error {response.status}: {error_body}")
except aiohttp.ClientError as e:
delay = base_delay * (2 ** attempt)
print(f"[NetworkError] Attempt {attempt+1}/{self.max_retries}: {e}")
await asyncio.sleep(delay)
raise Exception(f"Failed after {self.max_retries} retries")
async def _throttle(self):
"""リクエスト間のスロットリング"""
now = time.time()
# 過去1分間のリクエストをクリア
while self.request_times and now - self.request_times[0] > 60:
self.request_times.popleft()
# RPM制限に達している場合は待機
if len(self.request_times) >= self.rpm:
wait_time = 60 - (now - self.request_times[0])
if wait_time > 0:
print(f"[Throttle] Rate limit reached, waiting {wait_time:.1f}s")
await asyncio.sleep(wait_time)
self.request_times.append(time.time())
使用例
async def rate_limited_analysis():
async with RateLimitedClient(
api_key="YOUR_HOLYSHEEP_API_KEY",
requests_per_minute=30 # 安全マージン
) as client:
for i in range(100):
result = await client.call_with_retry(
endpoint="chat/completions",
payload={
"model": "deepseek-chat-v3.2",
"messages": [{"role": "user", "content": f"Query {i}"}]
}
)
print(f"Request {i+1} completed")