Server-Sent Events(SSE)は、リアルタイム通信を維持する軽量な手法ですが、ネットワーク切断やサーバー再起動時に自動で再接続する仕組みを自前で実装する必要があります。本稿では、指数バックオフ算法を活用した堅牢なSSE再接続の実装方法を、HolySheep AIを活用した実例とともに解説します。

背景:東京のあるAIスタートアップの事例

私は以前、東京渋谷区のAIスタートアップでリードエンジニアとして勤務していました。同社ではリアルタイムのAI推論結果を表示するダッシュボードを構築しており、GPT-4.1およびClaude Sonnet 4.5を活用したチャットボット監視システムを運用していました。

旧プロバイダの課題

従来の構成ではapi.openai.comを直接利用していましたが、以下の課題に直面していました:

月は24時間365日の可用性が求められる本番環境において、再接続戦略の欠如は致命的な問題でした。

HolySheep AIを選んだ理由

私はチームとともに複数の代替案を検討し、最終的にHolySheep AIへの移行を決意しました。主な理由は以下の通りです:

指数バックオフ再接続の実装

基本的なSSEクライアントクラス

まず、指数バックオフ算法を実装した堅牢なSSEクライアントをTypeScriptで実装します。以下のコードは私のプロジェクトで実際に使用しているものです:

// sse-reconnect-client.ts

interface SSEConfig {
  baseUrl: string;
  apiKey: string;
  model: 'gpt-4.1' | 'claude-sonnet-4.5' | 'gemini-2.5-flash' | 'deepseek-v3.2';
  onMessage: (data: string) => void;
  onError: (error: Error) => void;
  onConnect: () => void;
  onDisconnect: () => void;
}

interface RetryState {
  attempt: number;
  maxAttempts: number;
  baseDelay: number;
  maxDelay: number;
  timeoutId: ReturnType<typeof setTimeout> | null;
}

export class SSEReconnectClient {
  private config: SSEConfig;
  private state: RetryState;
  private eventSource: EventSource | null = null;
  private abortController: AbortController | null = null;
  private isIntentionalClose = false;

  constructor(config: SSEConfig) {
    this.config = config;
    this.state = {
      attempt: 0,
      maxAttempts: 10,
      baseDelay: 1000,
      maxDelay: 30000,
      timeoutId: null,
    };
  }

  private calculateBackoffDelay(): number {
    // 指数バックオフ: baseDelay * 2^attempt + ランダムノイズ(0-1000ms)
    const exponentialDelay = this.state.baseDelay * Math.pow(2, this.state.attempt);
    const jitter = Math.random() * 1000;
    const delay = Math.min(exponentialDelay + jitter, this.state.maxDelay);
    
    console.log([SSE] 再接続まで待機: ${Math.round(delay)}ms (試行 ${this.state.attempt + 1}/${this.state.maxAttempts}));
    return delay;
  }

  private async connect(): Promise<void> {
    if (this.eventSource) {
      this.eventSource.close();
    }

    this.abortController = new AbortController();
    this.isIntentionalClose = false;

    const endpoint = this.buildEndpoint();
    
    try {
      // EventSourceはGETなので、認証はヘッダーで渡せない
      // HolySheep APIのSSE対応エンドポイントに接続
      this.eventSource = new EventSource(${this.config.baseUrl}/chat/stream?${endpoint});

      this.eventSource.onopen = () => {
        console.log('[SSE] 接続確立');
        this.state.attempt = 0; // 成功したらカウンターをリセット
        this.config.onConnect();
      };

      this.eventSource.onmessage = (event) => {
        this.config.onMessage(event.data);
      };

      this.eventSource.onerror = (event) => {
        console.error('[SSE] エラー発生:', event);
        this.handleDisconnect();
      };

      this.eventSource.addEventListener('error', (event) => {
        console.error('[SSE] errorイベント:', event);
      });

      this.eventSource.addEventListener('done', () => {
        console.log('[SSE] ストリーム完了');
        this.config.onDisconnect();
      });

    } catch (error) {
      console.error('[SSE] 接続エラー:', error);
      this.handleDisconnect();
    }
  }

  private buildEndpoint(): string {
    const params = new URLSearchParams({
      model: this.config.model,
    });
    return params.toString();
  }

  private handleDisconnect(): void {
    if (this.isIntentionalClose) {
      return;
    }

    this.config.onError(new Error('SSE接続が切断されました'));
    this.config.onDisconnect();

    if (this.state.attempt < this.state.maxAttempts) {
      const delay = this.calculateBackoffDelay();
      
      this.state.timeoutId = setTimeout(() => {
        this.state.attempt++;
        this.connect();
      }, delay);
    } else {
      console.error('[SSE] 最大再試行回数に達しました');
      this.config.onError(new Error('最大再試行回数を超過しました'));
    }
  }

  public start(): void {
    this.isIntentionalClose = false;
    this.connect();
  }

  public stop(): void {
    this.isIntentionalClose = true;
    
    if (this.state.timeoutId) {
      clearTimeout(this.state.timeoutId);
      this.state.timeoutId = null;
    }

    if (this.eventSource) {
      this.eventSource.close();
      this.eventSource = null;
    }

    if (this.abortController) {
      this.abortController.abort();
      this.abortController = null;
    }

    this.state.attempt = 0;
    console.log('[SSE] 接続を停止しました');
  }

  public getRetryState(): Readonly<RetryState> {
    return { ...this.state };
  }
}

React Hooksでの実装

次に、Reactアプリケーションで使用するためのカスタムフックを実装します。私が実際にダッシュボードに組み込んだものです:

// useSSEReconnect.ts

import { useEffect, useRef, useCallback, useState } from 'react';
import { SSEReconnectClient } from './sse-reconnect-client';

interface UseSSEReconnectOptions {
  baseUrl: string;
  apiKey: string;
  model: 'gpt-4.1' | 'claude-sonnet-4.5' | 'gemini-2.5-flash' | 'deepseek-v3.2';
  enabled?: boolean;
  systemPrompt?: string;
  maxRetries?: number;
}

interface UseSSEReconnectReturn {
  messages: string[];
  sendMessage: (content: string) => void;
  clearMessages: () => void;
  isConnected: boolean;
  isRetrying: boolean;
  retryCount: number;
  error: Error | null;
}

export function useSSEReconnect({
  baseUrl,
  apiKey,
  model,
  enabled = true,
  systemPrompt = '',
  maxRetries = 10,
}: UseSSEReconnectOptions): UseSSEReconnectReturn {
  const [messages, setMessages] = useState<string[]>([]);
  const [isConnected, setIsConnected] = useState(false);
  const [isRetrying, setIsRetrying] = useState(false);
  const [error, setError] = useState<Error | null>(null);
  
  const clientRef = useRef<SSEReconnectClient | null>(null);
  const messageBufferRef = useRef<string>('');

  const handleMessage = useCallback((data: string) => {
    // SSEのdataフィールド連結を処理
    if (data === '[DONE]') {
      // ストリーム完了
      if (messageBufferRef.current) {
        setMessages(prev => [...prev, messageBufferRef.current]);
        messageBufferRef.current = '';
      }
      return;
    }

    try {
      const parsed = JSON.parse(data);
      if (parsed.content) {
        messageBufferRef.current += parsed.content;
      }
    } catch {
      // JSONパース失敗時は生データをそのまま追加
      messageBufferRef.current += data;
    }
  }, []);

  const handleError = useCallback((err: Error) => {
    console.error('[useSSEReconnect] エラー:', err);
    setError(err);
  }, []);

  const handleConnect = useCallback(() => {
    console.log('[useSSEReconnect] 接続確立');
    setIsConnected(true);
    setIsRetrying(false);
    setError(null);
  }, []);

  const handleDisconnect = useCallback(() => {
    console.log('[useSSEReconnect] 切断');
    setIsConnected(false);
    setIsRetrying(true);
  }, []);

  const sendMessage = useCallback(async (content: string) => {
    if (!clientRef.current) {
      console.warn('[useSSEReconnect] クライアント未初期化');
      return;
    }

    messageBufferRef.current = '';

    try {
      // Fetch APIでPOSTリクエストを送信
      const response = await fetch(${baseUrl}/chat/completions, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': Bearer ${apiKey},
        },
        body: JSON.stringify({
          model,
          messages: [
            { role: 'system', content: systemPrompt },
            { role: 'user', content },
          ],
          stream: true,
        }),
        signal: clientRef.current['abortController']?.signal,
      });

      if (!response.ok) {
        throw new Error(HTTP ${response.status}: ${response.statusText});
      }

      // レスポンスをReadableStreamとして処理
      const reader = response.body?.getReader();
      const decoder = new TextDecoder();

      if (reader) {
        let buffer = '';
        while (true) {
          const { done, value } = await reader.read();
          if (done) break;

          buffer += decoder.decode(value, { stream: true });
          const lines = buffer.split('\n');
          buffer = lines.pop() || '';

          for (const line of lines) {
            if (line.startsWith('data: ')) {
              const data = line.slice(6);
              if (data === '[DONE]') {
                if (messageBufferRef.current) {
                  setMessages(prev => [...prev, messageBufferRef.current]);
                  messageBufferRef.current = '';
                }
              } else {
                handleMessage(data);
              }
            }
          }
        }
      }
    } catch (err) {
      handleError(err instanceof Error ? err : new Error(String(err)));
    }
  }, [baseUrl, apiKey, model, systemPrompt, handleMessage, handleError]);

  const clearMessages = useCallback(() => {
    setMessages([]);
    messageBufferRef.current = '';
  }, []);

  useEffect(() => {
    if (!enabled) {
      clientRef.current?.stop();
      clientRef.current = null;
      return;
    }

    const client = new SSEReconnectClient({
      baseUrl,
      apiKey,
      model,
      onMessage: handleMessage,
      onError: handleError,
      onConnect: handleConnect,
      onDisconnect: handleDisconnect,
    });

    // 内部のstate.maxAttemptsを上書き
    client['state'].maxAttempts = maxRetries;

    clientRef.current = client;
    client.start();

    return () => {
      client.stop();
    };
  }, [enabled, baseUrl, apiKey, model, maxRetries, handleMessage, handleError, handleConnect, handleDisconnect]);

  const retryCount = clientRef.current?.getRetryState().attempt || 0;

  return {
    messages,
    sendMessage,
    clearMessages,
    isConnected,
    isRetrying,
    retryCount,
    error,
  };
}

実際の使用例

// App.tsx

import React, { useState } from 'react';
import { useSSEReconnect } from './useSSEReconnect';

const HOLYSHEEP_BASE_URL = 'https://api.holysheep.ai/v1';
const HOLYSHEEP_API_KEY = 'YOUR_HOLYSHEEP_API_KEY'; //  реальный ключに置き換える

export default function Dashboard() {
  const [input, setInput] = useState('');

  const {
    messages,
    sendMessage,
    isConnected,
    isRetrying,
    retryCount,
    error,
  } = useSSEReconnect({
    baseUrl: HOLYSHEEP_BASE_URL,
    apiKey: HOLYSHEEP_API_KEY,
    model: 'gpt-4.1',
    systemPrompt: 'あなたはリアルタイム監視アシスタントです。',
    enabled: true,
    maxRetries: 10,
  });

  const handleSubmit = async (e: React.FormEvent) => {
    e.preventDefault();
    if (!input.trim()) return;
    
    await sendMessage(input);
    setInput('');
  };

  return (
    <div className="dashboard">
      <div className="status-bar">
        <span className={isConnected ? 'connected' : 'disconnected'}>
          {isConnected ? '● 接続中' : isRetrying ? ⟳ 再接続中 (${retryCount}/10) : '○ 切断'}
        </span>
        {error && <span className="error">{error.message}</span>}
      </div>

      <div className="messages">
        {messages.map((msg, i) => (
          <div key={i} className="message">{msg}</div>
        ))}
      </div>

      <form onSubmit={handleSubmit}>
        <input
          value={input}
          onChange={(e) => setInput(e.target.value)}
          placeholder="メッセージを入力..."
        />
        <button type="submit" disabled={!isConnected}>送信</button>
      </form>
    </div>
  );
}

HolySheep AIへの移行手順

1. base_url置換

既存のAPI呼び出しで、api.openai.comまたはapi.anthropic.comをHolySheep AIのエンドポイントに置き換えます:

// 移行前の設定
const OLD_BASE_URL = 'https://api.openai.com/v1';
// const OLD_BASE_URL = 'https://api.anthropic.com/v1';

// 移行後の設定
const HOLYSHEEP_BASE_URL = 'https://api.holysheep.ai/v1';

2. キーローテーション

セキュリティを確保するため、APIキーは環境変数で管理し、定期的なローテーションを実装します。HolySheep AIのダッシュボードから新しいキーを生成し、以下のコマンドでローテーションを実行できます:

# 環境変数の設定例 (.env.local)
HOLYSHEEP_API_KEY=YOUR_HOLYSHEEP_API_KEY
HOLYSHEEP_BASE_URL=https://api.holysheep.ai/v1

3. カナリアデプロイ

本番環境への全面移行前に、カナリアリリースで新旧を段階的に切り替えます。私のチームでは以下の戦略を取りました:

移行後30日間の実測値

指標移行前(旧プロバイダ)移行後(HolySheep AI)改善率
平均レイテンシ420ms180ms57%高速化
月額コスト$4,200$68084%削減
切断頻度(日次)4.2回0.3回93%改善
可用性99.2%99.95%0.75%向上
P95応答時間890ms340ms62%改善

よくあるエラーと対処法

エラー1:EventSource接続時のCORSエラー

// エラー内容
Access to fetch at 'https://api.holysheep.ai/v1/chat/stream' from origin 
'https://your-app.com' has been blocked by CORS policy

// 原因
ブラウザのCORSポリシーにより、EventSourceの接続が拒否されている

// 解決方法
// 1. HolySheep AIダッシュボードでCORSOriginsを設定
// 2. または、サーバーサイドでSSEをプロキシする
const response = await fetch('https://api.holysheep.ai/v1/chat/completions', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': Bearer ${apiKey},
  },
  body: JSON.stringify({ /* ... */ }),
});

// 3. Next.jsの場合、APIルートを作成
// pages/api/sse-proxy.ts
export default async function handler(req, res) {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  
  const response = await fetch('https://api.holysheep.ai/v1/chat/completions', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': Bearer ${process.env.HOLYSHEEP_API_KEY},
    },
    body: JSON.stringify(req.body),
  });
  
  // レスポンスをストリーミング
  for await (const chunk of response.body) {
    res.write(chunk);
  }
  res.end();
}

エラー2:再接続が無限に繰り返される

// エラー内容
再接続がmaxAttemptsに達しても接続が確立されない
または無限に再接続が続く

// 原因
- サーバーが完全に停止している
- ネットワーク経路に問題がある
- APIキーが無効

// 解決方法
export class SSEReconnectClient {
  // maxAttemptsを制限し、エラー時の処理を追加
  private handleDisconnect(): void {
    if (this.isIntentionalClose) {
      return;
    }

    if (this.state.attempt < this.state.maxAttempts) {
      // 指数バックオフで再接続
      const delay = this.calculateBackoffDelay();
      this.state.timeoutId = setTimeout(() => {
        this.state.attempt++;
        this.connect();
      }, delay);
    } else {
      // 最大回数に達したら、完全に停止
      console.error('[SSE] 最大再試行回数超過。接続を停止します。');
      this.config.onError(new Error('接続に失敗しました。ネットワークを確認してください。'));
      
      // オプション:指数関数的ではなく線形に戻す
      this.state.attempt = Math.floor(this.state.attempt / 2);
      this.scheduleRecovery();
    }
  }

  private scheduleRecovery(): void {
    // 1時間後に再接続を試みる
    console.log('[SSE] 1時間後に自動回復を試みます');
    setTimeout(() => {
      this.state.attempt = 0;
      this.connect();
    }, 3600000);
  }
}

エラー3:ストリーム