AI API を活用したアプリケーションでは、大量のリクエストを効率的に処理することが重要です。同期処理では1件ずつリクエストを送信するため、スループットが著しく低下します。本稿では、Python の asyncio と aiohttp を活用した高并发リクエスト設計の核心部分を詳しく解説します。
并发请求のアーキテクチャ設計
AI API 调用を并发处理するには、以下の要素を考慮する必要があります:
- 接続プール管理:複数の HTTP 接続を効率的に再利用
- セマフォによる流量制御:API のレートリミットを超えない制御
- 指数関数的バックオフ:リトライ時の負荷分散
- エラーハンドリング:部分的な失敗を許容する設計
ベースクライアントの実装
HolySheep AI の API を高效的かつ低コストで活用するためのベースクライアントを実装します。HolySheep AI は ¥1=$1 の為替レートを提供しており、公式価格と比較して85%のコスト削減が可能です。
import asyncio
import aiohttp
import json
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class HolySheepConfig:
api_key: str
base_url: str = "https://api.holysheep.ai/v1"
max_concurrent: int = 50
requests_per_minute: int = 3000
timeout: int = 60
max_retries: int = 3
class HolySheepAIOClient:
"""HolySheep AI API 用 非同期クライアント"""
def __init__(self, config: HolySheepConfig):
self.config = config
self._session: Optional[aiohttp.ClientSession] = None
self._semaphore: Optional[asyncio.Semaphore] = None
self._rate_limiter: Optional[asyncio.Semaphore] = None
async def __aenter__(self):
await self._init_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._session:
await self._session.close()
async def _init_session(self):
"""aiohttp セッションの初期化"""
timeout = aiohttp.ClientTimeout(total=self.config.timeout)
connector = aiohttp.TCPConnector(
limit=self.config.max_concurrent * 2,
limit_per_host=self.config.max_concurrent
)
self._session = aiohttp.ClientSession(
connector=connector,
timeout=timeout
)
self._semaphore = asyncio.Semaphore(self.config.max_concurrent)
# レートリミット用のセマフォ(1分あたりのリクエスト数制御)
rpm = self.config.requests_per_minute
self._rate_limiter = asyncio.Semaphore(rpm)
def _get_headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self.config.api_key}",
"Content-Type": "application/json"
}
async def _rate_limit_acquire(self):
"""レートリミット制御"""
await self._rate_limiter.acquire()
asyncio.get_event_loop().call_later(60 / self.config.requests_per_minute,
self._rate_limiter.release)
聊天完成 API の并发実装
以下の実装では、複数のプロンプトを同時送信し、各リクエストのレイテンシとコストを個別に記録します。DeepSeek V3.2 は $0.42/MTok と非常に低コストであり、大量処理に適しています。
import time
from typing import List, Dict, Any
from dataclasses import dataclass, field
@dataclass
class RequestMetrics:
"""リクエスト.metrics"""
request_id: str
model: str
prompt_tokens: int
completion_tokens: int
latency_ms: float
success: bool
error: Optional[str] = None
timestamp: datetime = field(default_factory=datetime.now)
class ConcurrentChatClient(HolySheepAIOClient):
"""并发聊天请求クライアント"""
async def chat_completion(
self,
messages: List[Dict[str, str]],
model: str = "gpt-4.1",
temperature: float = 0.7,
max_tokens: int = 2048,
request_id: Optional[str] = None
) -> Dict[str, Any]:
"""单个聊天完成リクエストを実行"""
url = f"{self.config.base_url}/chat/completions"
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens
}
async with self._semaphore:
await self._rate_limit_acquire()
start_time = time.perf_counter()
try:
async with self._session.post(
url,
headers=self._get_headers(),
json=payload
) as response:
latency_ms = (time.perf_counter() - start_time) * 1000
if response.status == 200:
data = await response.json()
return {
"success": True,
"data": data,
"latency_ms": latency_ms,
"request_id": request_id
}
else:
error_text = await response.text()
return {
"success": False,
"error": f"HTTP {response.status}: {error_text}",
"latency_ms": latency_ms,
"request_id": request_id
}
except asyncio.TimeoutError:
return {
"success": False,
"error": "リクエストタイムアウト",
"latency_ms": (time.perf_counter() - start_time) * 1000,
"request_id": request_id
}
except Exception as e:
return {
"success": False,
"error": str(e),
"latency_ms": (time.perf_counter() - start_time) * 1000,
"request_id": request_id
}
async def batch_chat_completions(
self,
requests: List[Dict[str, Any]],
model: str = "gpt-4.1",
return_results: bool = False
) -> List[Dict[str, Any]]:
"""批量并发リクエストを実行
Args:
requests: リクエストリスト [{"messages": [...], "request_id": "..."}]
model: 使用するモデル
return_results: True の場合、レスポンスデータも返す
Returns:
results: 各リクエストの結果リスト
"""
tasks = []
for req in requests:
task = self.chat_completion(
messages=req["messages"],
model=model,
temperature=req.get("temperature", 0.7),
max_tokens=req.get("max_tokens", 2048),
request_id=req.get("request_id", f"req_{len(tasks)}")
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# メトリクス 수집
metrics = []
for i, result in enumerate(results):
if isinstance(result, Exception):
metrics.append(RequestMetrics(
request_id=requests[i].get("request_id", f"req_{i}"),
model=model,
prompt_tokens=0,
completion_tokens=0,
latency_ms=0,
success=False,
error=str(result)
))
else:
data = result.get("data", {})
usage = data.get("usage", {})
metrics.append(RequestMetrics(
request_id=result.get("request_id", f"req_{i}"),
model=model,
prompt_tokens=usage.get("prompt_tokens", 0),
completion_tokens=usage.get("completion_tokens", 0),
latency_ms=result.get("latency_ms", 0),
success=result.get("success", False),
error=result.get("error")
))
return metrics
async def example_batch_processing():
"""批量処理の exemplo"""
config = HolySheepConfig(
api_key="YOUR_HOLYSHEEP_API_KEY",
max_concurrent=100,
requests_per_minute=5000
)
# テスト用リクエスト生成
test_requests = [
{
"messages": [{"role": "user", "content": f"テストプロンプト {i}"}],
"request_id": f"batch_req_{i}"
}
for i in range(500)
]
async with ConcurrentChatClient(config) as client:
start = time.perf_counter()
metrics = await client.batch_chat_completions(
requests=test_requests,
model="deepseek-chat" # DeepSeek V3.2: $0.42/MTok で低コスト
)
total_time = time.perf_counter() - start
# 結果集計
successful = [m for m in metrics if m.success]
failed = [m for m in metrics if not m.success]
avg_latency = sum(m.latency_ms for m in successful) / len(successful) if successful else 0
total_tokens = sum(m.completion_tokens for m in successful)
print(f"総リクエスト数: {len(metrics)}")
print(f"成功: {len(successful)}, 失敗: {len(failed)}")
print(f"総実行時間: {total_time:.2f}秒")
print(f"平均レイテンシ: {avg_latency:.2f}ms")
print(f"処理-throughput: {len(metrics) / total_time:.2f} req/s")
print(f"合計出力トークン: {total_tokens:,}")
レートリミットと流量制御の詳細
HolySheep AI はWeChat Pay / Alipayに対応しており¥1=$1のレートで充值でき、公式¥7.3=$1と比較して大幅な節約になります。本節では、API のレートリミットを適切に管理する流量制御机制を説明します。
import asyncio
from collections import deque
from dataclasses import dataclass
from typing import Optional
import time
class TokenBucketRateLimiter:
"""トークンバケット方式のレートリミッター
より精细な流量制御が必要な场合に使用
"""
def __init__(self, rate: int, capacity: int):
"""
Args:
rate: 1秒あたりの许可トークン数
capacity: バケットの最大容量
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_update = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1):
"""トークンを消費"""
async with self._lock:
while True:
now = time.monotonic()
elapsed = now - self.last_update
# 時間経過でトークン回復
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
self.last_update = now
if self.tokens >= tokens:
self.tokens -= tokens
return
# 不足分待つ時間を計算
wait_time = (tokens - self.tokens) / self.rate
await asyncio.sleep(wait_time)
class AdaptiveRateLimiter:
"""適応的レートリミッター
429 (Too Many Requests) レスポンスに応じて
自動的に流量を調整します
"""
def __init__(self, initial_rate: int, min_rate: int = 10):
self.current_rate = initial_rate
self.min_rate = min_rate
self.bucket = TokenBucketRateLimiter(initial_rate, initial_rate)
self.retry_after = 60
async def acquire(self):
"""流量制御しながらリクエスト許可"""
await self.bucket.acquire()
def report_rate_limit(self, retry_after: Optional[int] = None):
"""レートリミット遭遇を報告"""
self.current_rate = max(self.min_rate, self.current_rate // 2)
self.bucket.rate = self.current_rate
self.bucket.capacity = self.current_rate
if retry_after:
self.retry_after = retry_after
else:
self.retry_after = min(self.retry_after * 2, 300)
print(f"レートリミット検出: 流量を {self.current_rate} req/s に削減")
def report_success(self):
"""成功を報告,逐步恢复流量"""
if self.current_rate < self.bucket.rate * 1.5:
self.current_rate = min(
int(self.current_rate * 1.1),
self.bucket.rate * 2
)
self.bucket.rate = self.current_rate
リトライ逻辑とエクスポネンシャルバックオフ
API 调用には一時的な障害が含まれることがあります。エクスポネンシャルバックオフを実装することで、服务器への負荷を最小限に抑えながら恢复を試みます。
import asyncio
import random
from typing import Callable, Any, Optional
from functools import wraps
def exponential_backoff_retry(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
"""エクスポネンシャルバックオフデコレータ
Args:
max_retries: 最大リトライ回数
base_delay: 基本遅延秒数
max_delay: 最大遅延秒数
exponential_base: 指数の底
jitter: True の場合、ランダムな揺れを追加
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except aiohttp.ClientResponseError as e:
last_exception = e
# 429以外のエラーは即座に失敗
if e.status != 429 and e.status != 500:
raise
# 最終試行失敗
if attempt == max_retries:
raise
delay = min(
base_delay * (exponential_base ** attempt),
max_delay
)
if jitter:
delay = delay * (0.5 + random.random())
retry_after = e.headers.get("Retry-After")
if retry_after:
delay = max(delay, int(retry_after))
print(f"リトライ {attempt + 1}/{max_retries}: "
f"{delay:.1f}秒後に再試行...")
await asyncio.sleep(delay)
except asyncio.TimeoutError:
last_exception = TimeoutError("リクエストタイムアウト")
if attempt == max_retries:
raise last_exception
delay = min(
base_delay * (exponential_base ** attempt),
max_delay
)
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
使用例
class RobustChatClient(ConcurrentChatClient):
"""リトライ機能付きチャットクライアント"""
@exponential_backoff_retry(max_retries=3, base_delay=1.0)
async def chat_completion_with_retry(self, **kwargs) -> Dict[str, Any]:
result = await self.chat_completion(**kwargs)
if not result.get("success"):
error = result.get("error", "")
if "429" in str(error) or "rate_limit" in str(error).lower():
# レートリミットの場合、カスタム例外を発生させてリトライ_trigger
raise aiohttp.ClientResponseError(
request_info=None,
history=None,
status=429,
message=error,
headers={}
)
return result
パフォーマンスベンチマーク
実際のベンチマーク结果を以下に示します。HolySheep AI は平均<50msのレイテンシを提供しており、批量処理の效率が大きく向上します。
| 并发数 | 総リクエスト |
|---|