AI API を運用していると、一時的なネットワーク障害、レートリミット、サーバー過負荷によるエラーに直面することは避けられません。私は以前、重要なバッチ処理中最に ConnectionError: timeout が発生し、大量のリクエストが丸ごと失敗した経験があります。この問題を解決するのが、Python の tenacity ライブラリです。

本稿では、tenacity を使ったインテリジェントなリトライ戦略の構築方法を、HolySheep AI のようなプロダクショングレードの API 呼び出しを例にとって解説します。tenacity は指数関数的バックオフから条件付きリトライまで、幅広い再試行戦略をサポートしています。

tenacity の基本設定

tenacity は pip install tenacity でインストールでき、Python 3.7 以上で動作します。 Decorator ベースの API が主流で、最小限のコードで強力なリトライロジックを実装できます。

pip install tenacity requests

基本的なリトライの実装

最もシンプルな設定から見ていきましょう。HolySheep AI API への基本的な呼び出しでエラーが発生した際に自動的にリトライする例です:

import requests
from tenacity import retry, stop_after_attempt, wait_exponential

HolySheep AI API設定

BASE_URL = "https://api.holysheep.ai/v1" API_KEY = "YOUR_HOLYSHEEP_API_KEY" headers = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" } @retry( stop=stop_after_attempt(5), # 最大5回試行 wait=wait_exponential(multiplier=1, min=2, max=10) # 指数関数的バックオフ ) def call_holysheep_api(messages: list) -> dict: """HolySheep AI Chat Completions APIを呼び出す""" response = requests.post( f"{BASE_URL}/chat/completions", headers=headers, json={ "model": "gpt-4.1", "messages": messages, "max_tokens": 1000 }, timeout=30 ) # 4xx/5xx エラーの場合は例外を発生させてリトライをトリガー response.raise_for_status() return response.json()

使用例

try: result = call_holysheep_api([ {"role": "user", "content": "Hello, HolySheep!"} ]) print(f"成功: {result['choices'][0]['message']['content']}") except Exception as e: print(f"最終エラー: {e}")

この設定では、最大5回の試行を行い、各試行間の待機時間は指数関数的に増加します(1秒→2秒→4秒→8秒、最大10秒)。multiplier=1 は基底の待機秒数、min=2 は最小待機時間、max=10 は最大待機時間を意味します。

エラー条件に応じた柔軟なリトライ

すべてのエラーに同じ戦略を適用するのは非効率です。レートリミットには長めの待機が必要ですが、認証エラーではリトライしても無駄です。tenacity の retry_if_exception_typeretry_if_result を使って条件を精密に制御しましょう:

import requests
from tenacity import (
    retry, stop_after_attempt, wait_exponential,
    retry_if_exception_type, retry_if_result,
    RetryError, TryAgain
)
import time

例外クラスの定義

class RateLimitError(Exception): """レートリミットExceededエラー""" def __init__(self, retry_after: int = None): self.retry_after = retry_after super().__init__(f"Rate limit exceeded. Retry after {retry_after}s") class TemporaryServerError(Exception): """一時的なサーバーエラー""" pass

条件付きリトライデコレータ

@retry( stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=60), retry=( retry_if_exception_type(TemporaryServerError) | retry_if_exception_type(requests.exceptions.ConnectionError) | retry_if_exception_type(requests.exceptions.Timeout) | retry_if_exception_type(RateLimitError) ), before_sleep=lambda retry_state: print( f"リトライ {retry_state.attempt_number}回目 - " f"待機 {retry_state.next_action.sleep}s秒" ) ) def call_api_with_conditional_retry(messages: list, model: str = "deepseek-v3.2") -> dict: """条件付きリトライ対応のAPI呼び出し""" response = requests.post( f"{BASE_URL}/chat/completions", headers=headers, json={ "model": model, "messages": messages, "max_tokens": 500 }, timeout=30 ) # ステータスコードによる分岐処理 if response.status_code == 429: # Rate LimitExceeded retry_after = int(response.headers.get("Retry-After", 30)) raise RateLimitError(retry_after=retry_after) elif response.status_code == 500 or response.status_code == 502 or response.status_code == 503: # サーバーサイドの一時エラー raise TemporaryServerError(f"Server error: {response.status_code}") elif response.status_code == 401: # 認証エラー - リトライしても解決しないのですぐ失敗させる raise PermissionError("Invalid API key - no point retrying") elif response.status_code == 400: # リクエストエラー - パラメータ修正が必要 error_detail = response.json().get("error", {}).get("message", "Bad request") raise ValueError(f"Invalid request: {error_detail}") response.raise_for_status() return response.json()

実際の使用例

def batch_process(prompts: list) -> list: """一括処理でリトライロジックを活用""" results = [] for prompt in prompts: try: result = call_api_with_conditional_retry([ {"role": "user", "content": prompt} ]) results.append(result["choices"][0]["message"]["content"]) except PermissionError: # 認証エラーは致命的 raise RuntimeError("致命的エラー: APIキーが無効です") except ValueError: # リクエストエラーも致命的 raise RuntimeError("致命的エラー: リクエストパラメータを確認してください") except Exception as e: print(f"最終リトライ後も失敗: {prompt[:30]}... - {e}") results.append(None) return results

このコードでは、5xx サーバーエラーとネットワークエラー、レートリミットには指数関数的バックオフでリトライしますが、401/400 エラー時には即座に例外を送出します。これにより、無意味なリトライを排除し、応答性を向上させます。

Jitter(ジッター)を追加した高度なバックオフ

単純な指数関数的バックオフでは、複数のクライアントが同時に同じ間隔で再試行し、 thundering herd problem を引き起こす可能性があります。これを防ぐには、Jitter(ランダムな待機時間の追加)が有効です:

import random
import asyncio
import aiohttp
from tenacity import (
    retry, stop_after_attempt, wait_random_exponential,
    AsyncRetrying
)

@retry(
    stop=stop_after_attempt(4),
    wait=wait_random_exponential(multiplier=0.5, max=10)
)
async def async_call_api(messages: list) -> dict:
    """Jitter付き非同期API呼び出し"""
    async with aiohttp.ClientSession() as session:
        async with session.post(
            f"{BASE_URL}/chat/completions",
            headers=headers,
            json={
                "model": "gemini-2.5-flash",
                "messages": messages
            },
            timeout=aiohttp.ClientTimeout(total=30)
        ) as response:
            if response.status == 429:
                await asyncio.sleep(int(response.headers.get("Retry-After", 5)))
                raise TryAgain  # 明示的なリトライトリガー
            
            response.raise_for_status()
            return await response.json()

async def concurrent_api_calls(prompts: list, max_concurrent: int = 10) -> list:
    """同時実行数制限付きのバッチ処理"""
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def limited_call(prompt: str):
        async with semaphore:
            return await async_call_api([
                {"role": "user", "content": prompt}
            ])
    
    tasks = [limited_call(p) for p in prompts]
    return await asyncio.gather(*tasks, return_exceptions=True)

実行例

if __name__ == "__main__": prompts = [f"Query {i}: Explain AI retry mechanisms" for i in range(20)] results = asyncio.run(concurrent_api_calls(prompts)) successful = [r for r in results if not isinstance(r, Exception)] print(f"成功率: {len(successful)}/{len(prompts)}")

wait_random_exponential は base * 0.5 ~ base * 1.0 秒の範囲でランダムな待機時間を生成します。これにより、複数のリクエストが同時に再来するリスクを軽減し、HolySheep AI のような API への負荷を平準化できます。

リトライコールバックの活用

tenacity はリトライの各段階でカスタムロジックを実行できるコールバックをサポートしています。これを活用すると、メトリクス収集、アラート送信、失敗時の代替処理などが実装できます:

from tenacity import (
    Retrying, StopAfterAttempt, WaitExponential,
    before_sleep_nothing, after_nothing, before_nothing
)

def log_retry(retry_state):
    """リトライ時のログ記録"""
    if retry_state.outcome and retry_state.outcome.failed:
        exception = retry_state.outcome.exception()
        print(
            f"[リトライログ] Attempt {retry_state.attempt_number} 失敗 - "
            f"Exception: {type(exception).__name__}: {exception}"
        )

def send_alert(retry_state):
    """一定回数失敗後のアラート送信"""
    if retry_state.attempt_number >= 3:
        # 実際には Slack/PagerDuty などに通知
        print(f"[⚠️ アラート] {retry_state.attempt_number}回リトライしても失敗中")

def cleanup_on_failure(retry_state):
    """失敗時のリソースクリーンアップ"""
    # データベース接続の解放、一時ファイルの削除など
    print("[クリーンアップ] リソース解放を実行")

retrying = Retrying(
    stop=StopAfterAttempt(5),
    wait=WaitExponential(multiplier=1, min=2, max=30),
    retry=(
        retry_if_exception_type(requests.exceptions.ConnectionError) |
        retry_if_exception_type(TemporaryServerError)
    ),
    before_sleep=log_retry,  # 各リトライ前にログ出力
    after=send_alert,         # リトライ後にアラートチェック
    before=before_nothing,    # リトライ前に実行(今回は何もしない)
    reraise=True              # 全リトライ失敗時に例外を再送出
)

def call_api_with_logging(messages: list) -> dict:
    """ログ・モニタリング付きAPI呼び出し"""
    try:
        result = retrying(call_api_with_conditional_retry, messages)
        return result
    except RetryError as e:
        # 全リトライ失敗時の処理
        cleanup_on_failure(None)
        raise RuntimeError(f"API呼び出しが完全に失敗: {e}")

HolySheep AI での実装例

HolySheep AI は 登録 すると無料クレジットを獲得でき、レート ¥1=$1(公式比85%節約)というコスト効率の良さを提供しています。私のプロジェクトでは、DeepSeek V3.2($0.42/MTok)や Gemini 2.5 Flash($2.50/MTok)を組み合わせたマルチモデル構成で運用していますが、各モデルのエラー特性に合わせたリトライ戦略が必須でした。

以下の例は、本番環境での設定を簡略化したものです:

import requests
from tenacity import (
    retry, stop_after_attempt, wait_exponential_jitter,
    retry_if_exception_type
)

モデル別設定

MODEL_CONFIGS = { "gpt-4.1": {"max_retries": 3, "timeout": 60}, "claude-sonnet-4.5": {"max_retries": 4, "timeout": 45}, "gemini-2.5-flash": {"max_retries": 5, "timeout": 30}, "deepseek-v3.2": {"max_retries": 6, "timeout": 25} } def create_model_specific_retry(model: str): """モデル別のリトライ設定でAPI呼び出しを行うデコレータを生成""" config = MODEL_CONFIGS.get(model, MODEL_CONFIGS["deepseek-v3.2"]) return retry( stop=stop_after_attempt(config["max_retries"]), wait=wait_exponential_jitter( initial=1, max=30, exp_base=2, jitter=Jitter() ), retry=retry_if_exception_type((ConnectionError, TimeoutError)), before_sleep=lambda rs: print( f"[{model}] Retry {rs.attempt_number}/{config['max_retries']} " f"after {rs.next_action.sleep:.1f}s" ) ) @create_model_specific_retry("deepseek-v3.2") def call_deepseek(messages: list) -> dict: """DeepSeek V3.2 向け最適化されたAPI呼び出し""" response = requests.post( f"{BASE_URL}/chat/completions", headers=headers, json={ "model": "deepseek-v3.2", "messages": messages, "temperature": 0.7 }, timeout=MODEL_CONFIGS["deepseek-v3.2"]["timeout"] ) response.raise_for_status() return response.json()

コスト効率重視のバッチ処理

def cost_optimized_batch(prompts: list, budget_limit: float = 10.0) -> list: """コスト上限付きのバッチ処理""" total_cost = 0.0 results = [] for i, prompt in enumerate(prompts): # 予算チェック estimated_cost = 0.001 # 実際のトークン数で計算すべき if total_cost + estimated_cost > budget_limit: print(f"予算上限(${budget_limit})到達、{len(prompts) - i}件をスキップ") results.extend([None] * (len(prompts) - i)) break try: result = call_deepseek([{"role": "user", "content": prompt}]) content = result["choices"][0]["message"]["content"] results.append(content) # 実際のコスト計算(usage フィールドから) # total_cost += calculate_cost(result["usage"], "deepseek-v3.2") except Exception as e: print(f"エラー({prompt[:20]}...): {e}") results.append(None) return results

よくあるエラーと対処法

エラー1: ConnectionError: timeout(接続タイムアウト)

発生状況:ネットワーク不安定時、または API サーバーが高負荷時のリクエスト送信中に発生。HolySheep AI でも稀に発生しますが、概して50ms未満のレイテンシを維持しているため、再試行でほぼ解決します。

# 解決コード
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=2, max=15),
    retry_if_exception_type(requests.exceptions.Timeout)
)
def robust_api_call(messages: list) -> dict:
    """
    タイムアウトに対して頑健なAPI呼び出し
    - 最初の試行: 即座に実行
    - 2回目: 2秒待機
    - 3回目: 4秒待機
    - 4回目: 8秒待機
    - 5回目: 15秒待機(最大値)
    """
    response = requests.post(
        f"{BASE_URL}/chat/completions",
        headers=headers,
        json={"model": "deepseek-v3.2", "messages": messages},
        timeout=(10, 30)  # (接続タイムアウト, 読み取りタイムアウト)
    )
    response.raise_for_status()
    return response.json()

エラー2: 401 Unauthorized(認証エラー)

発生状況:無効な API キー、使用期限切れのキー、フォーマットミスの Authorization ヘッダー。大規模言語モデルの API は軒並みこのエラーを返しますが、リトライでは解決しないため、早期検出とログ記録が重要です。

# 解決コード
import requests

def validate_and_call_api(messages: list) -> dict:
    """
    認証エラーを早期検出し、無意味なリトライを回避
    """
    response = requests.post(
        f"{BASE_URL}/chat/completions",
        headers=headers,
        json={"model": "gpt-4.1", "messages": messages},
        timeout=30
    )
    
    if response.status_code == 401:
        # リトライしても無駄 - 立即終了してログ記録
        error_detail = response.json().get("error", {}).get("message", "Unauthorized")
        print(f"[致命的エラー] 認証失敗 - {error_detail}")
        print("APIキーの有効性を https://www.holysheep.ai/dashboard で確認してください")
        raise PermissionError(f"Authentication failed: {error_detail}")
    
    response.raise_for_status()
    return response.json()

APIキーの事前検証

def validate_api_key(api_key: str) -> bool: """APIキーの有効性を軽量なリクエストで検証""" try: response = requests.get( f"{BASE_URL}/models", headers={"Authorization": f"Bearer {api_key}"}, timeout=10 ) return response.status_code == 200 except Exception: return False

エラー3: 429 Rate Limit Exceeded(レート制限超過)

発生状況:短時間に大量のリクエストを送信ikor時、または HolySheep AI の秒間リクエスト制限を超えた場合。WeChat Pay や Alipay での支払いでもこの制限に達することがありますが、適切なバックオフで回避できます。

# 解決コード
import time
import requests

class RateLimitHandler:
    """レート制限対応のAPIクライアント"""
    
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url
        self.headers = {"Authorization": f"Bearer {api_key}"}
        self.last_request_time = 0
        self.min_interval = 0.1  # 秒間最大10リクエスト
    
    def _respect_rate_limit(self):
        """レート制限を遵守するための待機"""
        elapsed = time.time() - self.last_request_time
        if elapsed < self.min_interval:
            time.sleep(self.min_interval - elapsed)
        self.last_request_time = time.time()
    
    def call_with_retry(self, messages: list, model: str = "gemini-2.5-flash") -> dict:
        """レートリミット対応のリトライ付きAPI呼び出し"""
        max_retries = 10
        retry_count = 0
        
        while retry_count < max_retries:
            self._respect_rate_limit()
            
            try:
                response = requests.post(
                    f"{self.base_url}/chat/completions",
                    headers=self.headers,
                    json={"model": model, "messages": messages},
                    timeout=30
                )
                
                if response.status_code == 429:
                    # Retry-After ヘッダの確認
                    retry_after = int(response.headers.get("Retry-After", 5))
                    print(f"レート制限 - {retry_after}秒待機后再試行({retry_count + 1}/{max_retries})")
                    time.sleep(retry_after)
                    retry_count += 1
                    continue
                
                response.raise_for_status()
                return response.json()
                
            except requests.exceptions.RequestException as e:
                if retry_count < max_retries - 1:
                    wait_time = 2 ** retry_count  # 指数バックオフ
                    print(f"接続エラー - {wait_time}秒待機后再試行({retry_count + 1}/{max_retries})")
                    time.sleep(wait_time)
                    retry_count += 1
                else:
                    raise
        
        raise RuntimeError(f"最大リトライ回数({max_retries})を超えた")

使用例

client = RateLimitHandler(BASE_URL, "YOUR_HOLYSHEEP_API_KEY") result = client.call_with_retry([{"role": "user", "content": "Hello"}])

エラー4: 503 Service Unavailable(サービス一時停止)

発生状況:サーバー maintenanc 或いは過負荷時の意図的なシャットダウン。HolySheep AI でもメンテナンスウィンドウ中に発生することがあり、通常は数分以内に回復します。

# 解決コード
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(8),
    wait=wait_exponential(multiplier=2, min=10, max=120),
    retry_if_exception_type(requests.exceptions.HTTPError)
)
def call_with_service_unavailable_handling(messages: list) -> dict:
    """503エラーに対して段階的にバックオフするAPI呼び出し"""
    response = requests.post(
        f"{BASE_URL}/chat/completions",
        headers=headers,
        json={"model": "claude-sonnet-4.5", "messages": messages},
        timeout=60
    )
    
    # 503 Service Unavailable の特別処理
    if response.status_code == 503:
        retry_after = response.headers.get("Retry-After", 30)
        raise TryAgain(f"503 Service Unavailable, retry after {retry_after}s")
    
    response.raise_for_status()
    return response.json()

メンテナンス検出と通知

def check_service_status() -> dict: """APIサービスの状態を確認""" try: response = requests.get( f"{BASE_URL}/health", timeout=5 ) if response.status_code == 200: return {"status": "healthy", "details": response.json()} else: return {"status": "degraded", "code": response.status_code} except Exception as e: return {"status": "unavailable", "error": str(e)}

まとめ

tenacity ライブラリを活用することで、AI API 呼び出しの信頼性を大幅に向上させられます。重要なポイントは3つです:

AI API の安定稼働には適切なリトライ戦略が不可欠です。HolySheep AI の ¥1=$1 汇率と50ms未満のレイテンシを組み合わせることで、コスト効率と信頼性の両方を実現できます。

👉 HolySheep AI に登録して無料クレジットを獲得 ```