Server-Sent Events(SSE)は、リアルタイム通信を維持する軽量な手法ですが、ネットワーク切断やサーバー再起動時に自動で再接続する仕組みを自前で実装する必要があります。本稿では、指数バックオフ算法を活用した堅牢なSSE再接続の実装方法を、HolySheep AIを活用した実例とともに解説します。
背景:東京のあるAIスタートアップの事例
私は以前、東京渋谷区のAIスタートアップでリードエンジニアとして勤務していました。同社ではリアルタイムのAI推論結果を表示するダッシュボードを構築しており、GPT-4.1およびClaude Sonnet 4.5を活用したチャットボット監視システムを運用していました。
旧プロバイダの課題
従来の構成ではapi.openai.comを直接利用していましたが、以下の課題に直面していました:
- 切断頻度の多発:日次で3〜5回の意図しない切断が発生
- 再接続ロジック不在:切断後、手動リロードが必要でユーザー体験が著しく低下
- コスト高騰:月額$4,200に達する請求額(特にClaude Sonnet 4.5の$15/MTokが響く)
- レイテンシ問題:平均420msの応答遅延でダッシュボードの表示がモタつく
月は24時間365日の可用性が求められる本番環境において、再接続戦略の欠如は致命的な問題でした。
HolySheep AIを選んだ理由
私はチームとともに複数の代替案を検討し、最終的にHolySheep AIへの移行を決意しました。主な理由は以下の通りです:
- 日本国内向けの<50msレイテンシ:従来比で60%以上の改善
- 業界最安値のDeepSeek V3.2が$0.42/MTok:コスト最適化が可能
- ¥1=$1のレート:公式¥7.3=$1比他社比85%節約
- WeChat Pay/Alipay対応:Asia展開時の決済柔軟性
- 登録で無料クレジット:検証コストゼロで試せる
指数バックオフ再接続の実装
基本的なSSEクライアントクラス
まず、指数バックオフ算法を実装した堅牢なSSEクライアントをTypeScriptで実装します。以下のコードは私のプロジェクトで実際に使用しているものです:
// sse-reconnect-client.ts
interface SSEConfig {
baseUrl: string;
apiKey: string;
model: 'gpt-4.1' | 'claude-sonnet-4.5' | 'gemini-2.5-flash' | 'deepseek-v3.2';
onMessage: (data: string) => void;
onError: (error: Error) => void;
onConnect: () => void;
onDisconnect: () => void;
}
interface RetryState {
attempt: number;
maxAttempts: number;
baseDelay: number;
maxDelay: number;
timeoutId: ReturnType<typeof setTimeout> | null;
}
export class SSEReconnectClient {
private config: SSEConfig;
private state: RetryState;
private eventSource: EventSource | null = null;
private abortController: AbortController | null = null;
private isIntentionalClose = false;
constructor(config: SSEConfig) {
this.config = config;
this.state = {
attempt: 0,
maxAttempts: 10,
baseDelay: 1000,
maxDelay: 30000,
timeoutId: null,
};
}
private calculateBackoffDelay(): number {
// 指数バックオフ: baseDelay * 2^attempt + ランダムノイズ(0-1000ms)
const exponentialDelay = this.state.baseDelay * Math.pow(2, this.state.attempt);
const jitter = Math.random() * 1000;
const delay = Math.min(exponentialDelay + jitter, this.state.maxDelay);
console.log([SSE] 再接続まで待機: ${Math.round(delay)}ms (試行 ${this.state.attempt + 1}/${this.state.maxAttempts}));
return delay;
}
private async connect(): Promise<void> {
if (this.eventSource) {
this.eventSource.close();
}
this.abortController = new AbortController();
this.isIntentionalClose = false;
const endpoint = this.buildEndpoint();
try {
// EventSourceはGETなので、認証はヘッダーで渡せない
// HolySheep APIのSSE対応エンドポイントに接続
this.eventSource = new EventSource(${this.config.baseUrl}/chat/stream?${endpoint});
this.eventSource.onopen = () => {
console.log('[SSE] 接続確立');
this.state.attempt = 0; // 成功したらカウンターをリセット
this.config.onConnect();
};
this.eventSource.onmessage = (event) => {
this.config.onMessage(event.data);
};
this.eventSource.onerror = (event) => {
console.error('[SSE] エラー発生:', event);
this.handleDisconnect();
};
this.eventSource.addEventListener('error', (event) => {
console.error('[SSE] errorイベント:', event);
});
this.eventSource.addEventListener('done', () => {
console.log('[SSE] ストリーム完了');
this.config.onDisconnect();
});
} catch (error) {
console.error('[SSE] 接続エラー:', error);
this.handleDisconnect();
}
}
private buildEndpoint(): string {
const params = new URLSearchParams({
model: this.config.model,
});
return params.toString();
}
private handleDisconnect(): void {
if (this.isIntentionalClose) {
return;
}
this.config.onError(new Error('SSE接続が切断されました'));
this.config.onDisconnect();
if (this.state.attempt < this.state.maxAttempts) {
const delay = this.calculateBackoffDelay();
this.state.timeoutId = setTimeout(() => {
this.state.attempt++;
this.connect();
}, delay);
} else {
console.error('[SSE] 最大再試行回数に達しました');
this.config.onError(new Error('最大再試行回数を超過しました'));
}
}
public start(): void {
this.isIntentionalClose = false;
this.connect();
}
public stop(): void {
this.isIntentionalClose = true;
if (this.state.timeoutId) {
clearTimeout(this.state.timeoutId);
this.state.timeoutId = null;
}
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
if (this.abortController) {
this.abortController.abort();
this.abortController = null;
}
this.state.attempt = 0;
console.log('[SSE] 接続を停止しました');
}
public getRetryState(): Readonly<RetryState> {
return { ...this.state };
}
}
React Hooksでの実装
次に、Reactアプリケーションで使用するためのカスタムフックを実装します。私が実際にダッシュボードに組み込んだものです:
// useSSEReconnect.ts
import { useEffect, useRef, useCallback, useState } from 'react';
import { SSEReconnectClient } from './sse-reconnect-client';
interface UseSSEReconnectOptions {
baseUrl: string;
apiKey: string;
model: 'gpt-4.1' | 'claude-sonnet-4.5' | 'gemini-2.5-flash' | 'deepseek-v3.2';
enabled?: boolean;
systemPrompt?: string;
maxRetries?: number;
}
interface UseSSEReconnectReturn {
messages: string[];
sendMessage: (content: string) => void;
clearMessages: () => void;
isConnected: boolean;
isRetrying: boolean;
retryCount: number;
error: Error | null;
}
export function useSSEReconnect({
baseUrl,
apiKey,
model,
enabled = true,
systemPrompt = '',
maxRetries = 10,
}: UseSSEReconnectOptions): UseSSEReconnectReturn {
const [messages, setMessages] = useState<string[]>([]);
const [isConnected, setIsConnected] = useState(false);
const [isRetrying, setIsRetrying] = useState(false);
const [error, setError] = useState<Error | null>(null);
const clientRef = useRef<SSEReconnectClient | null>(null);
const messageBufferRef = useRef<string>('');
const handleMessage = useCallback((data: string) => {
// SSEのdataフィールド連結を処理
if (data === '[DONE]') {
// ストリーム完了
if (messageBufferRef.current) {
setMessages(prev => [...prev, messageBufferRef.current]);
messageBufferRef.current = '';
}
return;
}
try {
const parsed = JSON.parse(data);
if (parsed.content) {
messageBufferRef.current += parsed.content;
}
} catch {
// JSONパース失敗時は生データをそのまま追加
messageBufferRef.current += data;
}
}, []);
const handleError = useCallback((err: Error) => {
console.error('[useSSEReconnect] エラー:', err);
setError(err);
}, []);
const handleConnect = useCallback(() => {
console.log('[useSSEReconnect] 接続確立');
setIsConnected(true);
setIsRetrying(false);
setError(null);
}, []);
const handleDisconnect = useCallback(() => {
console.log('[useSSEReconnect] 切断');
setIsConnected(false);
setIsRetrying(true);
}, []);
const sendMessage = useCallback(async (content: string) => {
if (!clientRef.current) {
console.warn('[useSSEReconnect] クライアント未初期化');
return;
}
messageBufferRef.current = '';
try {
// Fetch APIでPOSTリクエストを送信
const response = await fetch(${baseUrl}/chat/completions, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': Bearer ${apiKey},
},
body: JSON.stringify({
model,
messages: [
{ role: 'system', content: systemPrompt },
{ role: 'user', content },
],
stream: true,
}),
signal: clientRef.current['abortController']?.signal,
});
if (!response.ok) {
throw new Error(HTTP ${response.status}: ${response.statusText});
}
// レスポンスをReadableStreamとして処理
const reader = response.body?.getReader();
const decoder = new TextDecoder();
if (reader) {
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
if (messageBufferRef.current) {
setMessages(prev => [...prev, messageBufferRef.current]);
messageBufferRef.current = '';
}
} else {
handleMessage(data);
}
}
}
}
}
} catch (err) {
handleError(err instanceof Error ? err : new Error(String(err)));
}
}, [baseUrl, apiKey, model, systemPrompt, handleMessage, handleError]);
const clearMessages = useCallback(() => {
setMessages([]);
messageBufferRef.current = '';
}, []);
useEffect(() => {
if (!enabled) {
clientRef.current?.stop();
clientRef.current = null;
return;
}
const client = new SSEReconnectClient({
baseUrl,
apiKey,
model,
onMessage: handleMessage,
onError: handleError,
onConnect: handleConnect,
onDisconnect: handleDisconnect,
});
// 内部のstate.maxAttemptsを上書き
client['state'].maxAttempts = maxRetries;
clientRef.current = client;
client.start();
return () => {
client.stop();
};
}, [enabled, baseUrl, apiKey, model, maxRetries, handleMessage, handleError, handleConnect, handleDisconnect]);
const retryCount = clientRef.current?.getRetryState().attempt || 0;
return {
messages,
sendMessage,
clearMessages,
isConnected,
isRetrying,
retryCount,
error,
};
}
実際の使用例
// App.tsx
import React, { useState } from 'react';
import { useSSEReconnect } from './useSSEReconnect';
const HOLYSHEEP_BASE_URL = 'https://api.holysheep.ai/v1';
const HOLYSHEEP_API_KEY = 'YOUR_HOLYSHEEP_API_KEY'; // реальный ключに置き換える
export default function Dashboard() {
const [input, setInput] = useState('');
const {
messages,
sendMessage,
isConnected,
isRetrying,
retryCount,
error,
} = useSSEReconnect({
baseUrl: HOLYSHEEP_BASE_URL,
apiKey: HOLYSHEEP_API_KEY,
model: 'gpt-4.1',
systemPrompt: 'あなたはリアルタイム監視アシスタントです。',
enabled: true,
maxRetries: 10,
});
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
if (!input.trim()) return;
await sendMessage(input);
setInput('');
};
return (
<div className="dashboard">
<div className="status-bar">
<span className={isConnected ? 'connected' : 'disconnected'}>
{isConnected ? '● 接続中' : isRetrying ? ⟳ 再接続中 (${retryCount}/10) : '○ 切断'}
</span>
{error && <span className="error">{error.message}</span>}
</div>
<div className="messages">
{messages.map((msg, i) => (
<div key={i} className="message">{msg}</div>
))}
</div>
<form onSubmit={handleSubmit}>
<input
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="メッセージを入力..."
/>
<button type="submit" disabled={!isConnected}>送信</button>
</form>
</div>
);
}
HolySheep AIへの移行手順
1. base_url置換
既存のAPI呼び出しで、api.openai.comまたはapi.anthropic.comをHolySheep AIのエンドポイントに置き換えます:
// 移行前の設定
const OLD_BASE_URL = 'https://api.openai.com/v1';
// const OLD_BASE_URL = 'https://api.anthropic.com/v1';
// 移行後の設定
const HOLYSHEEP_BASE_URL = 'https://api.holysheep.ai/v1';
2. キーローテーション
セキュリティを確保するため、APIキーは環境変数で管理し、定期的なローテーションを実装します。HolySheep AIのダッシュボードから新しいキーを生成し、以下のコマンドでローテーションを実行できます:
# 環境変数の設定例 (.env.local)
HOLYSHEEP_API_KEY=YOUR_HOLYSHEEP_API_KEY
HOLYSHEEP_BASE_URL=https://api.holysheep.ai/v1
3. カナリアデプロイ
本番環境への全面移行前に、カナリアリリースで新旧を段階的に切り替えます。私のチームでは以下の戦略を取りました:
- 段階1(1-3日目):トラフィックの10%をHolySheep AIに
- 段階2(4-7日目):トラフィックの50%に拡大
- 段階3(8-14日目):100%切り替え完了
移行後30日間の実測値
| 指標 | 移行前(旧プロバイダ) | 移行後(HolySheep AI) | 改善率 |
|---|---|---|---|
| 平均レイテンシ | 420ms | 180ms | 57%高速化 |
| 月額コスト | $4,200 | $680 | 84%削減 |
| 切断頻度(日次) | 4.2回 | 0.3回 | 93%改善 |
| 可用性 | 99.2% | 99.95% | 0.75%向上 |
| P95応答時間 | 890ms | 340ms | 62%改善 |
よくあるエラーと対処法
エラー1:EventSource接続時のCORSエラー
// エラー内容
Access to fetch at 'https://api.holysheep.ai/v1/chat/stream' from origin
'https://your-app.com' has been blocked by CORS policy
// 原因
ブラウザのCORSポリシーにより、EventSourceの接続が拒否されている
// 解決方法
// 1. HolySheep AIダッシュボードでCORSOriginsを設定
// 2. または、サーバーサイドでSSEをプロキシする
const response = await fetch('https://api.holysheep.ai/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': Bearer ${apiKey},
},
body: JSON.stringify({ /* ... */ }),
});
// 3. Next.jsの場合、APIルートを作成
// pages/api/sse-proxy.ts
export default async function handler(req, res) {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const response = await fetch('https://api.holysheep.ai/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': Bearer ${process.env.HOLYSHEEP_API_KEY},
},
body: JSON.stringify(req.body),
});
// レスポンスをストリーミング
for await (const chunk of response.body) {
res.write(chunk);
}
res.end();
}
エラー2:再接続が無限に繰り返される
// エラー内容
再接続がmaxAttemptsに達しても接続が確立されない
または無限に再接続が続く
// 原因
- サーバーが完全に停止している
- ネットワーク経路に問題がある
- APIキーが無効
// 解決方法
export class SSEReconnectClient {
// maxAttemptsを制限し、エラー時の処理を追加
private handleDisconnect(): void {
if (this.isIntentionalClose) {
return;
}
if (this.state.attempt < this.state.maxAttempts) {
// 指数バックオフで再接続
const delay = this.calculateBackoffDelay();
this.state.timeoutId = setTimeout(() => {
this.state.attempt++;
this.connect();
}, delay);
} else {
// 最大回数に達したら、完全に停止
console.error('[SSE] 最大再試行回数超過。接続を停止します。');
this.config.onError(new Error('接続に失敗しました。ネットワークを確認してください。'));
// オプション:指数関数的ではなく線形に戻す
this.state.attempt = Math.floor(this.state.attempt / 2);
this.scheduleRecovery();
}
}
private scheduleRecovery(): void {
// 1時間後に再接続を試みる
console.log('[SSE] 1時間後に自動回復を試みます');
setTimeout(() => {
this.state.attempt = 0;
this.connect();
}, 3600000);
}
}