AI APIを高速に呼び出す必要がある場面では、マルチスレッド活用が不可欠ですが、同時にrace condition(競合状態)という厄介な問題が発生します。本稿では、私が実際に遭遇した具体的なエラーシナリオから始まり、HolySheep AIを活用した安全な多线程実装方法を解説します。

実際に発生したエラーシナリオ

私のプロジェクトでは、1秒あたり100件以上のプロンプトを処理する必要があり、以下のようなコードを実装していました:

import requests
import threading
from queue import Queue

class AIBatchProcessor:
    def __init__(self, api_key):
        self.api_key = api_key
        self.url = "https://api.holysheep.ai/v1/chat/completions"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        self.results = []
        self.lock = threading.Lock()
    
    def process_single(self, prompt, task_id):
        """スレッド内で実行される単一リクエスト処理"""
        payload = {
            "model": "gpt-4o",
            "messages": [{"role": "user", "content": prompt}],
            "max_tokens": 500
        }
        
        try:
            response = requests.post(
                self.url, 
                headers=self.headers, 
                json=payload,
                timeout=30
            )
            result = response.json()
            
            # ❌ これがrace conditionの温床
            self.results.append({
                "task_id": task_id,
                "content": result["choices"][0]["message"]["content"]
            })
        except Exception as e:
            print(f"Task {task_id} failed: {e}")

問題のある実装

processor = AIBatchProcessor("YOUR_HOLYSHEEP_API_KEY") threads = [] for i in range(100): t = threading.Thread( target=processor.process_single, args=(f"タスク{i}のプロンプト内容", i) ) threads.append(t) t.start() for t in threads: t.join() print(f"結果数: {len(processor.results)}") # 本当は100件なのに...

実行結果は以下のようになりました:

# 出力例
Task 45 failed: ConnectionError: timeout
Task 78 failed: 401 Unauthorized
Task 89 failed: 429 Too Many Requests
結果数: 97  # 本当は100件なければならない
【エラーなし】でも results に不整合が発生

race conditionの根本原因

上記のコードには3つの重大な問題があります:

1. 共有リソースへの同時アクセス

self.results.append()に対するthreading.Lock()がありません。Pythonのリスト操作はアトミックではないため、複数のスレッドが同時にappend()を実行するとデータが破損します。

2. レート制限の無視

HolySheep AIを含む主要なAPIは、1秒あたりのリクエスト数に制限があります。無制御で100スレッドを一気に起動すると、429 Too Many Requestsエラーが頻発します。

3. 認証情報の競合

複数のスレッドが同時にrequests.session()を共有すると稀に認証エラーが発生します。

解決策:スレッドセーフな実装

Solution 1: Semaphoreによる同時接続制御

import requests
import threading
from queue import Queue
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

class HolySheepAIClient:
    """HolySheep AI API 専用スレッドセーフクライアント"""
    
    def __init__(self, api_key, max_concurrent=10):
        self.api_key = api_key
        self.base_url = "https://api.holysheep.ai/v1"
        self.max_concurrent = max_concurrent
        self.semaphore = threading.Semaphore(max_concurrent)
        self.results = []
        self.results_lock = threading.Lock()
        self.error_log = []
        self.error_lock = threading.Lock()
        
    def _make_request(self, prompt, task_id):
        """スレッド内で実行されるAPIリクエスト(セマフォ制御付き)"""
        with self.semaphore:  # 同時接続数を制限
            payload = {
                "model": "gpt-4o",
                "messages": [{"role": "user", "content": prompt}],
                "max_tokens": 500,
                "temperature": 0.7
            }
            
            headers = {
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json"
            }
            
            try:
                response = requests.post(
                    f"{self.base_url}/chat/completions",
                    headers=headers,
                    json=payload,
                    timeout=60
                )
                response.raise_for_status()
                
                result = response.json()
                output = {
                    "task_id": task_id,
                    "content": result["choices"][0]["message"]["content"],
                    "usage": result.get("usage", {}),
                    "status": "success"
                }
                
                # スレッドセーフなリスト操作
                with self.results_lock:
                    self.results.append(output)
                    
                return output
                
            except requests.exceptions.Timeout:
                error = {"task_id": task_id, "error": "timeout", "retry": True}
                with self.error_lock:
                    self.error_log.append(error)
                return error
                
            except requests.exceptions.HTTPError as e:
                error = {"task_id": task_id, "error": str(e), "status_code": e.response.status_code}
                with self.error_lock:
                    self.error_log.append(error)
                return error
    
    def batch_process(self, prompts, max_workers=20):
        """一括処理のメインエントリーポイント"""
        tasks = [(prompt, i) for i, prompt in enumerate(prompts)]
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(self._make_request, prompt, idx): idx
                for idx, prompt in tasks
            }
            
            for future in as_completed(futures):
                idx = futures[future]
                try:
                    result = future.result()
                    print(f"タスク {idx} 完了: {result.get('status', 'error')}")
                except Exception as e:
                    print(f"タスク {idx} 予期しないエラー: {e}")
        
        return {
            "success": self.results.copy(),
            "errors": self.error_log.copy(),
            "total": len(prompts),
            "success_count": len(self.results),
            "error_count": len(self.error_log)
        }

使用例

client = HolySheepAIClient( api_key="YOUR_HOLYSHEEP_API_KEY", max_concurrent=10 # 同時接続10件に制限 ) prompts = [f"タスク{i}の質問内容" for i in range(100)] results = client.batch_process(prompts, max_workers=20) print(f"成功: {results['success_count']}/{results['total']}") print(f"エラー: {results['error_count']}/{results['total']}")

Solution 2: asyncio + aiohttp による非同期実装

より高い并发処理が必要な場合は、asyncioベースの非同期実装が有効です:

import asyncio
import aiohttp
import json

class AsyncHolySheepClient:
    """非同期 HolySheep AI クライアント - race condition ゼロ"""
    
    def __init__(self, api_key, max_concurrent=50):
        self.api_key = api_key
        self.base_url = "https://api.holysheep.ai/v1"
        self.max_concurrent = asyncio.Semaphore(max_concurrent)
        self.results = []
        self.errors = []
        
    async def _single_request(self, session, prompt, task_id):
        """単一リクエストの実行"""
        async with self.max_concurrent:  # 同時接続制御
            payload = {
                "model": "gpt-4o",
                "messages": [{"role": "user", "content": prompt}],
                "max_tokens": 500
            }
            
            headers = {
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json"
            }