AI APIを高速に呼び出す必要がある場面では、マルチスレッド活用が不可欠ですが、同時にrace condition(競合状態)という厄介な問題が発生します。本稿では、私が実際に遭遇した具体的なエラーシナリオから始まり、HolySheep AIを活用した安全な多线程実装方法を解説します。
実際に発生したエラーシナリオ
私のプロジェクトでは、1秒あたり100件以上のプロンプトを処理する必要があり、以下のようなコードを実装していました:
import requests
import threading
from queue import Queue
class AIBatchProcessor:
def __init__(self, api_key):
self.api_key = api_key
self.url = "https://api.holysheep.ai/v1/chat/completions"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
self.results = []
self.lock = threading.Lock()
def process_single(self, prompt, task_id):
"""スレッド内で実行される単一リクエスト処理"""
payload = {
"model": "gpt-4o",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 500
}
try:
response = requests.post(
self.url,
headers=self.headers,
json=payload,
timeout=30
)
result = response.json()
# ❌ これがrace conditionの温床
self.results.append({
"task_id": task_id,
"content": result["choices"][0]["message"]["content"]
})
except Exception as e:
print(f"Task {task_id} failed: {e}")
問題のある実装
processor = AIBatchProcessor("YOUR_HOLYSHEEP_API_KEY")
threads = []
for i in range(100):
t = threading.Thread(
target=processor.process_single,
args=(f"タスク{i}のプロンプト内容", i)
)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"結果数: {len(processor.results)}") # 本当は100件なのに...
実行結果は以下のようになりました:
# 出力例
Task 45 failed: ConnectionError: timeout
Task 78 failed: 401 Unauthorized
Task 89 failed: 429 Too Many Requests
結果数: 97 # 本当は100件なければならない
【エラーなし】でも results に不整合が発生
race conditionの根本原因
上記のコードには3つの重大な問題があります:
1. 共有リソースへの同時アクセス
self.results.append()に対するthreading.Lock()がありません。Pythonのリスト操作はアトミックではないため、複数のスレッドが同時にappend()を実行するとデータが破損します。
2. レート制限の無視
HolySheep AIを含む主要なAPIは、1秒あたりのリクエスト数に制限があります。無制御で100スレッドを一気に起動すると、429 Too Many Requestsエラーが頻発します。
3. 認証情報の競合
複数のスレッドが同時にrequests.session()を共有すると稀に認証エラーが発生します。
解決策:スレッドセーフな実装
Solution 1: Semaphoreによる同時接続制御
import requests
import threading
from queue import Queue
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
class HolySheepAIClient:
"""HolySheep AI API 専用スレッドセーフクライアント"""
def __init__(self, api_key, max_concurrent=10):
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.max_concurrent = max_concurrent
self.semaphore = threading.Semaphore(max_concurrent)
self.results = []
self.results_lock = threading.Lock()
self.error_log = []
self.error_lock = threading.Lock()
def _make_request(self, prompt, task_id):
"""スレッド内で実行されるAPIリクエスト(セマフォ制御付き)"""
with self.semaphore: # 同時接続数を制限
payload = {
"model": "gpt-4o",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 500,
"temperature": 0.7
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
try:
response = requests.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload,
timeout=60
)
response.raise_for_status()
result = response.json()
output = {
"task_id": task_id,
"content": result["choices"][0]["message"]["content"],
"usage": result.get("usage", {}),
"status": "success"
}
# スレッドセーフなリスト操作
with self.results_lock:
self.results.append(output)
return output
except requests.exceptions.Timeout:
error = {"task_id": task_id, "error": "timeout", "retry": True}
with self.error_lock:
self.error_log.append(error)
return error
except requests.exceptions.HTTPError as e:
error = {"task_id": task_id, "error": str(e), "status_code": e.response.status_code}
with self.error_lock:
self.error_log.append(error)
return error
def batch_process(self, prompts, max_workers=20):
"""一括処理のメインエントリーポイント"""
tasks = [(prompt, i) for i, prompt in enumerate(prompts)]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(self._make_request, prompt, idx): idx
for idx, prompt in tasks
}
for future in as_completed(futures):
idx = futures[future]
try:
result = future.result()
print(f"タスク {idx} 完了: {result.get('status', 'error')}")
except Exception as e:
print(f"タスク {idx} 予期しないエラー: {e}")
return {
"success": self.results.copy(),
"errors": self.error_log.copy(),
"total": len(prompts),
"success_count": len(self.results),
"error_count": len(self.error_log)
}
使用例
client = HolySheepAIClient(
api_key="YOUR_HOLYSHEEP_API_KEY",
max_concurrent=10 # 同時接続10件に制限
)
prompts = [f"タスク{i}の質問内容" for i in range(100)]
results = client.batch_process(prompts, max_workers=20)
print(f"成功: {results['success_count']}/{results['total']}")
print(f"エラー: {results['error_count']}/{results['total']}")
Solution 2: asyncio + aiohttp による非同期実装
より高い并发処理が必要な場合は、asyncioベースの非同期実装が有効です:
import asyncio
import aiohttp
import json
class AsyncHolySheepClient:
"""非同期 HolySheep AI クライアント - race condition ゼロ"""
def __init__(self, api_key, max_concurrent=50):
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.max_concurrent = asyncio.Semaphore(max_concurrent)
self.results = []
self.errors = []
async def _single_request(self, session, prompt, task_id):
"""単一リクエストの実行"""
async with self.max_concurrent: # 同時接続制御
payload = {
"model": "gpt-4o",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 500
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}