AI API を活用したアプリケーションでは、大量のリクエストを効率的に処理することが重要です。同期処理では1件ずつリクエストを送信するため、スループットが著しく低下します。本稿では、Python の asyncio と aiohttp を活用した高并发リクエスト設計の核心部分を詳しく解説します。

并发请求のアーキテクチャ設計

AI API 调用を并发处理するには、以下の要素を考慮する必要があります:

ベースクライアントの実装

HolySheep AI の API を高效的かつ低コストで活用するためのベースクライアントを実装します。HolySheep AI は ¥1=$1 の為替レートを提供しており、公式価格と比較して85%のコスト削減が可能です。

import asyncio
import aiohttp
import json
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
from datetime import datetime
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class HolySheepConfig:
    api_key: str
    base_url: str = "https://api.holysheep.ai/v1"
    max_concurrent: int = 50
    requests_per_minute: int = 3000
    timeout: int = 60
    max_retries: int = 3


class HolySheepAIOClient:
    """HolySheep AI API 用 非同期クライアント"""
    
    def __init__(self, config: HolySheepConfig):
        self.config = config
        self._session: Optional[aiohttp.ClientSession] = None
        self._semaphore: Optional[asyncio.Semaphore] = None
        self._rate_limiter: Optional[asyncio.Semaphore] = None
        
    async def __aenter__(self):
        await self._init_session()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self._session:
            await self._session.close()
    
    async def _init_session(self):
        """aiohttp セッションの初期化"""
        timeout = aiohttp.ClientTimeout(total=self.config.timeout)
        connector = aiohttp.TCPConnector(
            limit=self.config.max_concurrent * 2,
            limit_per_host=self.config.max_concurrent
        )
        self._session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout
        )
        self._semaphore = asyncio.Semaphore(self.config.max_concurrent)
        
        # レートリミット用のセマフォ(1分あたりのリクエスト数制御)
        rpm = self.config.requests_per_minute
        self._rate_limiter = asyncio.Semaphore(rpm)
    
    def _get_headers(self) -> Dict[str, str]:
        return {
            "Authorization": f"Bearer {self.config.api_key}",
            "Content-Type": "application/json"
        }
    
    async def _rate_limit_acquire(self):
        """レートリミット制御"""
        await self._rate_limiter.acquire()
        asyncio.get_event_loop().call_later(60 / self.config.requests_per_minute, 
                                            self._rate_limiter.release)

聊天完成 API の并发実装

以下の実装では、複数のプロンプトを同時送信し、各リクエストのレイテンシとコストを個別に記録します。DeepSeek V3.2 は $0.42/MTok と非常に低コストであり、大量処理に適しています。

import time
from typing import List, Dict, Any
from dataclasses import dataclass, field


@dataclass
class RequestMetrics:
    """リクエスト.metrics"""
    request_id: str
    model: str
    prompt_tokens: int
    completion_tokens: int
    latency_ms: float
    success: bool
    error: Optional[str] = None
    timestamp: datetime = field(default_factory=datetime.now)


class ConcurrentChatClient(HolySheepAIOClient):
    """并发聊天请求クライアント"""
    
    async def chat_completion(
        self,
        messages: List[Dict[str, str]],
        model: str = "gpt-4.1",
        temperature: float = 0.7,
        max_tokens: int = 2048,
        request_id: Optional[str] = None
    ) -> Dict[str, Any]:
        """单个聊天完成リクエストを実行"""
        url = f"{self.config.base_url}/chat/completions"
        payload = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
            "max_tokens": max_tokens
        }
        
        async with self._semaphore:
            await self._rate_limit_acquire()
            start_time = time.perf_counter()
            
            try:
                async with self._session.post(
                    url,
                    headers=self._get_headers(),
                    json=payload
                ) as response:
                    latency_ms = (time.perf_counter() - start_time) * 1000
                    
                    if response.status == 200:
                        data = await response.json()
                        return {
                            "success": True,
                            "data": data,
                            "latency_ms": latency_ms,
                            "request_id": request_id
                        }
                    else:
                        error_text = await response.text()
                        return {
                            "success": False,
                            "error": f"HTTP {response.status}: {error_text}",
                            "latency_ms": latency_ms,
                            "request_id": request_id
                        }
                        
            except asyncio.TimeoutError:
                return {
                    "success": False,
                    "error": "リクエストタイムアウト",
                    "latency_ms": (time.perf_counter() - start_time) * 1000,
                    "request_id": request_id
                }
            except Exception as e:
                return {
                    "success": False,
                    "error": str(e),
                    "latency_ms": (time.perf_counter() - start_time) * 1000,
                    "request_id": request_id
                }
    
    async def batch_chat_completions(
        self,
        requests: List[Dict[str, Any]],
        model: str = "gpt-4.1",
        return_results: bool = False
    ) -> List[Dict[str, Any]]:
        """批量并发リクエストを実行
        
        Args:
            requests: リクエストリスト [{"messages": [...], "request_id": "..."}]
            model: 使用するモデル
            return_results: True の場合、レスポンスデータも返す
        
        Returns:
            results: 各リクエストの結果リスト
        """
        tasks = []
        for req in requests:
            task = self.chat_completion(
                messages=req["messages"],
                model=model,
                temperature=req.get("temperature", 0.7),
                max_tokens=req.get("max_tokens", 2048),
                request_id=req.get("request_id", f"req_{len(tasks)}")
            )
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # メトリクス 수집
        metrics = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                metrics.append(RequestMetrics(
                    request_id=requests[i].get("request_id", f"req_{i}"),
                    model=model,
                    prompt_tokens=0,
                    completion_tokens=0,
                    latency_ms=0,
                    success=False,
                    error=str(result)
                ))
            else:
                data = result.get("data", {})
                usage = data.get("usage", {})
                metrics.append(RequestMetrics(
                    request_id=result.get("request_id", f"req_{i}"),
                    model=model,
                    prompt_tokens=usage.get("prompt_tokens", 0),
                    completion_tokens=usage.get("completion_tokens", 0),
                    latency_ms=result.get("latency_ms", 0),
                    success=result.get("success", False),
                    error=result.get("error")
                ))
        
        return metrics


async def example_batch_processing():
    """批量処理の exemplo"""
    config = HolySheepConfig(
        api_key="YOUR_HOLYSHEEP_API_KEY",
        max_concurrent=100,
        requests_per_minute=5000
    )
    
    # テスト用リクエスト生成
    test_requests = [
        {
            "messages": [{"role": "user", "content": f"テストプロンプト {i}"}],
            "request_id": f"batch_req_{i}"
        }
        for i in range(500)
    ]
    
    async with ConcurrentChatClient(config) as client:
        start = time.perf_counter()
        metrics = await client.batch_chat_completions(
            requests=test_requests,
            model="deepseek-chat"  # DeepSeek V3.2: $0.42/MTok で低コスト
        )
        total_time = time.perf_counter() - start
        
        # 結果集計
        successful = [m for m in metrics if m.success]
        failed = [m for m in metrics if not m.success]
        avg_latency = sum(m.latency_ms for m in successful) / len(successful) if successful else 0
        total_tokens = sum(m.completion_tokens for m in successful)
        
        print(f"総リクエスト数: {len(metrics)}")
        print(f"成功: {len(successful)}, 失敗: {len(failed)}")
        print(f"総実行時間: {total_time:.2f}秒")
        print(f"平均レイテンシ: {avg_latency:.2f}ms")
        print(f"処理-throughput: {len(metrics) / total_time:.2f} req/s")
        print(f"合計出力トークン: {total_tokens:,}")

レートリミットと流量制御の詳細

HolySheep AI はWeChat Pay / Alipayに対応しており¥1=$1のレートで充值でき、公式¥7.3=$1と比較して大幅な節約になります。本節では、API のレートリミットを適切に管理する流量制御机制を説明します。

import asyncio
from collections import deque
from dataclasses import dataclass
from typing import Optional
import time


class TokenBucketRateLimiter:
    """トークンバケット方式のレートリミッター
    
    より精细な流量制御が必要な场合に使用
    """
    
    def __init__(self, rate: int, capacity: int):
        """
        Args:
            rate: 1秒あたりの许可トークン数
            capacity: バケットの最大容量
        """
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.monotonic()
        self._lock = asyncio.Lock()
    
    async def acquire(self, tokens: int = 1):
        """トークンを消費"""
        async with self._lock:
            while True:
                now = time.monotonic()
                elapsed = now - self.last_update
                
                # 時間経過でトークン回復
                self.tokens = min(
                    self.capacity,
                    self.tokens + elapsed * self.rate
                )
                self.last_update = now
                
                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return
                
                # 不足分待つ時間を計算
                wait_time = (tokens - self.tokens) / self.rate
                await asyncio.sleep(wait_time)


class AdaptiveRateLimiter:
    """適応的レートリミッター
    
    429 (Too Many Requests) レスポンスに応じて
    自動的に流量を調整します
    """
    
    def __init__(self, initial_rate: int, min_rate: int = 10):
        self.current_rate = initial_rate
        self.min_rate = min_rate
        self.bucket = TokenBucketRateLimiter(initial_rate, initial_rate)
        self.retry_after = 60
    
    async def acquire(self):
        """流量制御しながらリクエスト許可"""
        await self.bucket.acquire()
    
    def report_rate_limit(self, retry_after: Optional[int] = None):
        """レートリミット遭遇を報告"""
        self.current_rate = max(self.min_rate, self.current_rate // 2)
        self.bucket.rate = self.current_rate
        self.bucket.capacity = self.current_rate
        
        if retry_after:
            self.retry_after = retry_after
        else:
            self.retry_after = min(self.retry_after * 2, 300)
        
        print(f"レートリミット検出: 流量を {self.current_rate} req/s に削減")
    
    def report_success(self):
        """成功を報告,逐步恢复流量"""
        if self.current_rate < self.bucket.rate * 1.5:
            self.current_rate = min(
                int(self.current_rate * 1.1),
                self.bucket.rate * 2
            )
            self.bucket.rate = self.current_rate

リトライ逻辑とエクスポネンシャルバックオフ

API 调用には一時的な障害が含まれることがあります。エクスポネンシャルバックオフを実装することで、服务器への負荷を最小限に抑えながら恢复を試みます。

import asyncio
import random
from typing import Callable, Any, Optional
from functools import wraps


def exponential_backoff_retry(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True
):
    """エクスポネンシャルバックオフデコレータ
    
    Args:
        max_retries: 最大リトライ回数
        base_delay: 基本遅延秒数
        max_delay: 最大遅延秒数
        exponential_base: 指数の底
        jitter: True の場合、ランダムな揺れを追加
    """
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> Any:
            last_exception = None
            
            for attempt in range(max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                    
                except aiohttp.ClientResponseError as e:
                    last_exception = e
                    
                    # 429以外のエラーは即座に失敗
                    if e.status != 429 and e.status != 500:
                        raise
                    
                    # 最終試行失敗
                    if attempt == max_retries:
                        raise
                    
                    delay = min(
                        base_delay * (exponential_base ** attempt),
                        max_delay
                    )
                    
                    if jitter:
                        delay = delay * (0.5 + random.random())
                    
                    retry_after = e.headers.get("Retry-After")
                    if retry_after:
                        delay = max(delay, int(retry_after))
                    
                    print(f"リトライ {attempt + 1}/{max_retries}: "
                          f"{delay:.1f}秒後に再試行...")
                    await asyncio.sleep(delay)
                    
                except asyncio.TimeoutError:
                    last_exception = TimeoutError("リクエストタイムアウト")
                    
                    if attempt == max_retries:
                        raise last_exception
                    
                    delay = min(
                        base_delay * (exponential_base ** attempt),
                        max_delay
                    )
                    await asyncio.sleep(delay)
            
            raise last_exception
        
        return wrapper
    return decorator


使用例

class RobustChatClient(ConcurrentChatClient): """リトライ機能付きチャットクライアント""" @exponential_backoff_retry(max_retries=3, base_delay=1.0) async def chat_completion_with_retry(self, **kwargs) -> Dict[str, Any]: result = await self.chat_completion(**kwargs) if not result.get("success"): error = result.get("error", "") if "429" in str(error) or "rate_limit" in str(error).lower(): # レートリミットの場合、カスタム例外を発生させてリトライ_trigger raise aiohttp.ClientResponseError( request_info=None, history=None, status=429, message=error, headers={} ) return result

パフォーマンスベンチマーク

実際のベンチマーク结果を以下に示します。HolySheep AI は平均<50msのレイテンシを提供しており、批量処理の效率が大きく向上します。

関連リソース

関連記事

🔥 HolySheep AIを使ってみる

直接AI APIゲートウェイ。Claude、GPT-5、Gemini、DeepSeekに対応。VPN不要。

👉 無料登録 →

并发数総リクエスト