AIアプリケーションにおいて、長時間かかる処理の進捗をリアルタイムでユーザーに伝えることは、ユーザー体験を左右する重要な要素です。本稿では、東京のAIスタートアップ「TechFlow Labs」の事例を通じて、Server-Sent Events(SSE)を使った進捗インジケーターの実装方法を詳細に解説します。
業務背景:バッチ処理のユーザー体験課題
TechFlow Labsは、大量のドキュメント解析サービスを展開している企業です。同社の主力サービスでは、毎日数千件のPDFファイルをAIに分析させ、重要な情報を抽出する処理が実行されています。
従来のシステムでは、処理開始後から結果返信まで数分以上待たせる必要があり、ユーザーから「処理が正常に進んでいるのかわからない」「いつ終わるのか予測できない」といった不満が頻発していました。特に、月次レポート生成処理では最長10分以上待たせるケースもあり、CSScancellation(ユーザー離反)に直結する深刻な問題となっていました。
旧プロバイダの課題とHolySheep AIへの移行
旧プロバイダの問題点
- 処理状況可視化の欠如:非同期処理後の結果取得のみとなり、途中の進捗が把握できない
- 高レイテンシ:旧プロバイダの平均応答遅延が420msと長く、ユーザー体験を損ねていた
- コスト増大:月額コストが$4,200に達し、スケーラビリティに課題があった
- レート制限の厳格さ:ピーク時間帯にレート制限に引っかかり、処理が途切れることがあった
HolySheep AIを選んだ理由
TechFlow LabsがHolySheep AIへの登録決めた理由は以下の通りです。
- 超高レイテンシ性能:平均レイテンシが50ms未満という高速応答を実現
- 圧倒的なコスト効率:レートが¥1=$1の固定レート(旧プロバイダ比85%節約)
- 柔軟な決済手段:WeChat PayやAlipayにも対応し、国際的なチームでも困ることはない
- деле pesquisacheCKPOHNOСТорerningSSEサポート:リアルタイム進捗通知に最適なSSEプロトコルをネイティブサポート
- 無料クレジット提供:登録時点で無料クレジットが付与され、リスクなく試用可能
SSE進捗インジケーターのアーキテクチャ設計
システム構成概要
┌─────────────┐ ┌─────────────────┐ ┌──────────────────┐
│ Client │────▶│ Backend API │────▶│ HolySheep AI │
│ (Browser) │◀────│ (FastAPI) │◀────│ Streaming API │
└─────────────┘ SSE └─────────────────┘ └──────────────────┘
│ │
│ ▼
│ ┌─────────────────┐
│ │ Task Queue │
│ │ (Redis/Queue) │
│ └─────────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ Progress: 45% | ETA: 2m 30s | Tokens: 12,450 │
└─────────────────────────────────────────────────┘
実装ステップ1:HolySheep AI SDKのセットアップ
# requirements.txt
fastapi==0.109.0
uvicorn==0.27.0
sse-starlette==1.8.2
httpx==0.26.0
python-dotenv==1.0.0
redis==5.0.1
asyncio-redis==0.16.0
インストール
pip install -r requirements.txt
# config.py
import os
from dotenv import load_dotenv
load_dotenv()
HolySheep AI設定
HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1"
HOLYSHEEP_API_KEY = os.getenv("YOUR_HOLYSHEEP_API_KEY") # реальный ключ
アプリケーション設定
HOST = "0.0.0.0"
PORT = 8000
TASK_TIMEOUT = 600 # 10分間
print(f"HolySheep AI endpoint configured: {HOLYSHEEP_BASE_URL}")
実装ステップ2:SSE進捗エンドポイントの実装
# main.py
import asyncio
import json
import time
from typing import AsyncGenerator
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import httpx
from config import HOLYSHEEP_BASE_URL, HOLYSHEEP_API_KEY
app = FastAPI(title="AI Document Analysis API with SSE Progress")
進捗状態を保持するメモリ内ストア(本番ではRedisを使用)
task_progress = {}
async def stream_ai_processing(
task_id: str,
documents: list[str],
model: str = "gpt-4.1"
) -> AsyncGenerator[dict, None]:
"""
HolySheep AI APIを呼び出し、進捗状況をSSEでストリーミングする
"""
total_docs = len(documents)
processed_tokens = 0
start_time = time.time()
async with httpx.AsyncClient(timeout=120.0) as client:
for idx, doc_content in enumerate(documents):
# 処理開始の通知
yield {
"event": "progress",
"data": json.dumps({
"task_id": task_id,
"status": "processing",
"current": idx + 1,
"total": total_docs,
"progress_percent": int((idx / total_docs) * 100),
"current_document": f"doc_{idx + 1}.pdf",
"timestamp": datetime.utcnow().isoformat()
})
}
# HolySheep AI APIへのリクエスト
headers = {
"Authorization": f"Bearer {HOLYSHEEP_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": [
{
"role": "system",
"content": "あなたは文書分析の専門家です。提供された文書を分析し、主要な情報を抽出してください。"
},
{
"role": "user",
"content": f"次の文書を分析してください:\n\n{doc_content[:2000]}"
}
],
"stream": True,
"temperature": 0.3,
"max_tokens": 2048
}
try:
response = await client.post(
f"{HOLYSHEEP_BASE_URL}/chat/completions",
headers=headers,
json=payload
)
response.raise_for_status()
# ストリーミングレスポンスの処理
full_response = ""
async for line in response.aiter_lines():
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
try:
chunk = json.loads(data)
if "choices" in chunk and len(chunk["choices"]) > 0:
delta = chunk["choices"][0].get("delta", {})
if "content" in delta:
content = delta["content"]
full_response += content
processed_tokens += 1
# 100トークンごとに進捗更新
if processed_tokens % 100 == 0:
yield {
"event": "tokens",
"data": json.dumps({
"task_id": task_id,
"tokens_processed": processed_tokens,
"elapsed_seconds": int(time.time() - start_time)
})
}
except json.JSONDecodeError:
continue
except httpx.HTTPStatusError as e:
yield {
"event": "error",
"data": json.dumps({
"task_id": task_id,
"error": f"API Error: {e.response.status_code}",
"message": str(e)
})
}
return
# ドキュメント間待機(レート制限対応)
await asyncio.sleep(0.1)
# 完了通知
elapsed = int(time.time() - start_time)
yield {
"event": "complete",
"data": json.dumps({
"task_id": task_id,
"status": "completed",
"total_documents": total_docs,
"total_tokens": processed_tokens,
"total_time_seconds": elapsed,
"avg_latency_ms": int((elapsed / processed_tokens) * 1000) if processed_tokens > 0 else 0
})
}
@app.get("/api/analyze/documents")
async def analyze_documents_stream(request: Request):
"""
ドキュメント分析の進捗をSSEでストリーミングするエンドポイント
"""
# サンプルドキュメント(実際はリクエストボディから取得)
sample_docs = [
"これは最初のサンプル文書です。AIが重要情報を抽出します。",
" второй документ содержит важные данные для анализа.",
"三番目の文書には財務データが含まれています。",
"4番目の文書は技術仕様書です。",
"最後の文書には市場分析が含まれています。"
] * 10 # テスト用に50文書に膨張
task_id = f"task_{int(time.time())}"
return EventSourceResponse(
stream_ai_processing(task_id, sample_docs),
media_type="text/event-stream"
)
@app.get("/api/task/{task_id}/status")
async def get_task_status(task_id: str):
"""特定のタスクのステータス取得"""
progress = task_progress.get(task_id, {"status": "not_found"})
return progress
if __name__ == "__main__":
import uvicorn
print(f"Starting server at http://localhost:{8000}")
print(f"HolySheep AI endpoint: {HOLYSHEEP_BASE_URL}")
uvicorn.run(app, host="0.0.0.0", port=8000)
実装ステップ3:フロントエンド進捗表示コンポーネント
<!-- index.html -->
<!DOCTYPE html>
<html lang="ja">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AI Document Analyzer - Real-time Progress</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
padding: 40px 20px;
}
.container {
max-width: 800px;
margin: 0 auto;
background: white;
border-radius: 16px;
box-shadow: 0 20px 60px rgba(0, 0, 0, 0.3);
overflow: hidden;
}
.header {
background: linear-gradient(135deg, #1a1a2e 0%, #16213e 100%);
color: white;
padding: 30px;
text-align: center;
}
.header h1 {
font-size: 28px;
margin-bottom: 10px;
}
.content {
padding: 40px;
}
.progress-section {
display: none;
margin-top: 30px;
}
.progress-section.active {
display: block;
}
.progress-container {
background: #f0f0f0;
border-radius: 12px;
height: 30px;
overflow: hidden;
position: relative;
}
.progress-bar {
height: 100%;
background: linear-gradient(90deg, #00d4aa, #00b894);
border-radius: 12px;
transition: width 0.3s ease;
display: flex;
align-items: center;
justify-content: flex-end;
padding-right: 10px;
color: white;
font-weight: bold;
font-size: 14px;
}
.stats-grid {
display: grid;
grid-template-columns: repeat(3, 1fr);
gap: 20px;
margin-top: 30px;
}
.stat-card {
background: #f8f9fa;
padding: 20px;
border-radius: 12px;
text-align: center;
}
.stat-label {
color: #666;
font-size: 12px;
text-transform: uppercase;
margin-bottom: 8px;
}
.stat-value {
font-size: 24px;
font-weight: bold;
color: #1a1a2e;
}
.log-container {
margin-top: 30px;
max-height: 300px;
overflow-y: auto;
background: #1a1a2e;
color: #00ff88;
padding: 20px;
border-radius: 12px;
font-family: 'Courier New', monospace;
font-size: 13px;
line-height: 1.6;
}
.log-entry {
padding: 4px 0;
border-bottom: 1px solid #333;
}
.log-entry:last-child {
border-bottom: none;
}
.log-timestamp {
color: #888;
margin-right: 10px;
}
.btn-analyze {
width: 100%;
padding: 18px;
font-size: 18px;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border: none;
border-radius: 12px;
cursor: pointer;
transition: transform 0.2s, box-shadow 0.2s;
}
.btn-analyze:hover {
transform: translateY(-2px);
box-shadow: 0 10px 30px rgba(102, 126, 234, 0.4);
}
.btn-analyze:disabled {
background: #ccc;
cursor: not-allowed;
transform: none;
}
.status-indicator {
display: inline-block;
width: 12px;
height: 12px;
border-radius: 50%;
margin-right: 8px;
animation: pulse 1.5s infinite;
}
.status-processing {
background: #f39c12;
}
.status-complete {
background: #27ae60;
animation: none;
}
.status-error {
background: #e74c3c;
animation: none;
}
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.5; }
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>📊 AI Document Analyzer</h1>
<p>リアルタイム進捗確認付きドキュメント分析システム</p>
</div>
<div class="content">
<button id="btnAnalyze" class="btn-analyze" onclick="startAnalysis()">
🚀 ドキュメント分析を開始
</button>
<div id="progressSection" class="progress-section">
<h3>
<span id="statusIndicator" class="status-indicator status-processing"></span>
<span id="statusText">処理中...</span>
</h3>
<div class="progress-container">
<div id="progressBar" class="progress-bar" style="width: 0%">0%</div>
</div>
<div class="stats-grid">
<div class="stat-card">
<div class="stat-label">処理済ドキュメント</div>
<div class="stat-value" id="docCount">0 / 0</div>
</div>
<div class="stat-card">
<div class="stat-label">処理済トークン</div>
<div class="stat-value" id="tokenCount">0</div>
</div>
<div class="stat-card">
<div class="stat-label">経過時間</div>
<div class="stat-value" id="elapsedTime">0秒</div>
</div>
</div>
<div id="logContainer" class="log-container">
<div class="log-entry">
<span class="log-timestamp">[--:--:--]</span>
ログを待機中...
</div>
</div>
</div>
</div>
</div>
<script>
let eventSource = null;
let startTime = null;
function addLog(message) {
const container = document.getElementById('logContainer');
const timestamp = new Date().toLocaleTimeString('ja-JP');
const entry = document.createElement('div');
entry.className = 'log-entry';
entry.innerHTML = <span class="log-timestamp">[${timestamp}]</span>${message};
container.appendChild(entry);
container.scrollTop = container.scrollHeight;
}
function updateUI(data) {
// 進捗バー更新
const progressBar = document.getElementById('progressBar');
progressBar.style.width = ${data.progress_percent}%;
progressBar.textContent = ${data.progress_percent}%;
// ドキュメント数更新
document.getElementById('docCount').textContent =
${data.current} / ${data.total};
// ステータステキスト更新
document.getElementById('statusText').textContent =
ドキュメント ${data.current_document} を処理中...;
addLog(📄 進捗: ${data.progress_percent}% (${data.current}/${data.total}));
}
function updateTokens(data) {
document.getElementById('tokenCount').textContent =
data.tokens_processed.toLocaleString();
addLog(🔢 トークン処理: ${data.tokens_processed} (${data.elapsed_seconds}秒));
}
function showComplete(data) {
const statusIndicator = document.getElementById('statusIndicator');
statusIndicator.className = 'status-indicator status-complete';
document.getElementById('statusText').textContent = '処理完了 ✓';
document.getElementById('btnAnalyze').disabled = false;
document.getElementById('btnAnalyze').textContent = '🔄 もう一度分析';
addLog(✅ 完了: ${data.total_documents}ドキュメント, ${data.total_tokens}トークン);
addLog(⏱️ 合計時間: ${data.total_time_seconds}秒);
addLog(⚡ 平均レイテンシ: ${data.avg_latency_ms}ms);
}
function showError(data) {
const statusIndicator = document.getElementById('statusIndicator');
statusIndicator.className = 'status-indicator status-error';
document.getElementById('statusText').textContent = エラー: ${data.error};
document.getElementById('btnAnalyze').disabled = false;
addLog(❌ エラー: ${data.message});
}
function startAnalysis() {
// UI初期化
document.getElementById('btnAnalyze').disabled = true;
document.getElementById('btnAnalyze').textContent = '処理中...';
document.getElementById('progressSection').classList.add('active');
const statusIndicator = document.getElementById('statusIndicator');
statusIndicator.className = 'status-indicator status-processing';
document.getElementById('statusText').textContent = '接続中...';
document.getElementById('progressBar').style.width = '0%';
document.getElementById('progressBar').textContent = '0%';
document.getElementById('docCount').textContent = '0 / 0';
document.getElementById('tokenCount').textContent = '0';
document.getElementById('elapsedTime').textContent = '0秒';
document.getElementById('logContainer').innerHTML = '';
addLog('🔌 HolySheep AIに接続中...');
// SSE接続開始