私はRAG(Retrieval-Augmented Generation)システムの運用において、最も頭を悩ませてきたのが「データの新鲜度」と「インデックス更新の効率性」の両立です。ドキュメントが更新されるたびにフル再インデックスを行うと、処理時間とコストが膨大になります。本稿では、HolySheep AIを活用した增量索引更新(Incremental Indexing)の実装戦略と、データ新鲜度を保証するための実践的なアプローチを解説します。
RAGにおける增量更新の必要性と課題
RAGシステムでは、ベクトルデータベースに埋め込み(Embedding)を保存し、ユーザーのクエリに最適なドキュメント断片を検索します。しかし、以下の課題が存在します:
- フル再インデックスのコスト:1万ドキュメントの再インデックスに数十分〜数時間
- ダウンタイム:更新中のサービス停止や性能低下
- データ整合性:更新途中の不整合な検索結果
- コスト増大:OpenAIのEmbedding APIは1,000トークンあたり$0.0001〜$0.0004
HolySheep AIでは、DeepSeek V3.2が$0.42/MTokという低価格で高品質なEmbeddingを提供しており、增量更新の経済的負担を大幅に軽減できます。
增量索引更新アーキテクチャの設計
1. 変更検知システムの実装
最初のステップは、ドキュメントの変更を効率的に検知することです。私は以下の方針で実装しています:
import hashlib
import json
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, field
@dataclass
class DocumentMetadata:
"""ドキュメントメタデータ"""
doc_id: str
version: int
content_hash: str
last_modified: datetime
source_path: str
chunk_count: int = 0
@dataclass
class ChangeDetectionResult:
"""変更検知結果"""
doc_id: str
status: str # 'new', 'modified', 'unchanged', 'deleted'
old_hash: Optional[str] = None
new_hash: Optional[str] = None
affected_chunks: List[str] = field(default_factory=list)
class IncrementalChangeDetector:
"""
增量変更検知クラス
ドキュメントのCRC/MD5ハッシュを監視し、
変更があったドキュメントのみを検出
"""
def __init__(self, metadata_store: Dict[str, DocumentMetadata]):
self.metadata_store = metadata_store
self.hash_algorithm = hashlib.md5
def compute_content_hash(self, content: str) -> str:
"""コンテンツからハッシュ値を計算"""
normalized = content.strip().replace('\r\n', '\n')
return self.hash_algorithm(normalized.encode('utf-8')).hexdigest()
def detect_changes(
self,
doc_id: str,
new_content: str,
source_path: str
) -> ChangeDetectionResult:
"""ドキュメントの変更を検知"""
new_hash = self.compute_content_hash(new_content)
# 新規ドキュメント
if doc_id not in self.metadata_store:
return ChangeDetectionResult(
doc_id=doc_id,
status='new',
new_hash=new_hash
)
old_metadata = self.metadata_store[doc_id]
# 削除
if new_content is None or new_content == "":
return ChangeDetectionResult(
doc_id=doc_id,
status='deleted',
old_hash=old_metadata.content_hash
)
# 変更なし
if old_metadata.content_hash == new_hash:
return ChangeDetectionResult(
doc_id=doc_id,
status='unchanged',
old_hash=old_hash,
new_hash=new_hash
)
# 更新あり
return ChangeDetectionResult(
doc_id=doc_id,
status='modified',
old_hash=old_metadata.content_hash,
new_hash=new_hash,
affected_chunks=self._identify_affected_chunks(doc_id)
)
def _identify_affected_chunks(self, doc_id: str) -> List[str]:
"""影響を受けるチャンクを特定(高性能実装)"""
# 実際の実装では、ベクトルDBのチャンクマッピングを参照
# チャンクプレフィックスに基づいて影響範囲を限定
return [f"{doc_id}_chunk_{i}" for i in range(
self.metadata_store[doc_id].chunk_count
)]
2. HolySheep AI APIを活用したEmbedding生成
変更が検知されたドキュメントのEmbeddingを生成します。HolySheep AIのAPIは<50msのレイテンシを実現しており、リアルタイム更新Requirementsに応えます。
import httpx
import asyncio
from typing import List, Dict, Any
import json
class HolySheepEmbeddingClient:
"""
HolySheep AI APIクライアント
增量更新専用のEmbedding生成クラス
"""
BASE_URL = "https://api.holysheep.ai/v1"
def __init__(self, api_key: str, timeout: float = 30.0):
self.api_key = api_key
self.timeout = timeout
self.client = httpx.AsyncClient(timeout=timeout)
async def generate_embeddings(
self,
texts: List[str],
model: str = "deepseek-embed"
) -> List[List[float]]:
"""
批量でEmbeddingを生成
Args:
texts: 埋め込み対象テキストリスト
model: 使用するEmbeddingモデル
Returns:
ベクトルEmbeddingリスト
"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"input": texts,
"model": model,
"encoding_format": "float"
}
try:
response = await self.client.post(
f"{self.BASE_URL}/embeddings",
headers=headers,
json=payload
)
response.raise_for_status()
result = response.json()
return [item["embedding"] for item in result["data"]]
except httpx.TimeoutException:
raise ConnectionError(f"Embedding生成タイムアウト: {len(texts)}件のテキスト")
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
raise AuthenticationError(
"APIキー認証失敗: 正しいYOUR_HOLYSHEEP_API_KEYを設定してください"
)
raise RuntimeError(f"Embedding生成エラー: {e.response.status_code}")
async def batch_embed_documents(
self,
documents: List[Dict[str, Any]],
batch_size: int = 100
) -> List[Dict[str, Any]]:
"""
ドキュメントの一括Embedding生成(增量更新対応)
Args:
documents: [{"id": "doc1", "content": "...", "metadata": {...}}]
batch_size: 一批あたりの処理数
Returns:
[{"id": "doc1", "embedding": [...], "token_count": 123}, ...]
"""
results = []
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
texts = [doc["content"] for doc in batch]
# HolySheep AI API呼び出し
embeddings = await self.generate_embeddings(texts)
for doc, embedding in zip(batch, embeddings):
results.append({
"id": doc["id"],
"embedding": embedding,
"token_count": len(doc["content"]) // 4, # 概算
"metadata": doc.get("metadata", {})
})
# レート制限を考慮した待機(HolySheepは寛容な制限)
if i + batch_size < len(documents):
await asyncio.sleep(0.1)
return results
async def close(self):
await self.client.aclose()
使用例
async def main():
client = HolySheepEmbeddingClient(api_key="YOUR_HOLYSHEEP_API_KEY")
documents = [
{
"id": "doc_001",
"content": "RAGシステムの新着ドキュメントです...",
"metadata": {"source": "blog", "category": "tech"}
},
{
"id": "doc_002",
"content": "HolySheep AIの活用方法...",
"metadata": {"source": "blog", "category": "ai"}
}
]
results = await client.batch_embed_documents(documents)
for result in results:
print(f"Document: {result['id']}, "
f"Tokens: {result['token_count']}, "
f"Embedding次元: {len(result['embedding'])}")
await client.close()
asyncio.run(main())
データ新鲜度を保証する3つの戦略
戦略1:TTL(Time-To-Live)ベースの自動失效
最もシンプルな方法は、各ドキュメントにTTLを設定し、一定時間後に自動的に再インデックスをトリガーすることです。
from datetime import datetime, timedelta
from enum import Enum
class FreshnessLevel(Enum):
"""データ新鲜度レベル"""
REALTIME = timedelta(seconds=0) # 即時更新
NEAR_REALTIME = timedelta(minutes=5) # 5分以内
HOURLY = timedelta(hours=1) # 1時間以内
DAILY = timedelta(days=1) # 1日以内
WEEKLY = timedelta(weeks=1) # 1週間以内
@dataclass
class FreshnessConfig:
"""新鲜度設定"""
level: FreshnessLevel
ttl: timedelta
auto_refresh: bool
max_age_hours: int
class TTLBasedRefreshManager:
"""
TTLベースの自動更新マネージャー
ドキュメントの新鲜度を監視し、必要に応じて更新をトリガー
"""
def __init__(self, config: FreshnessConfig):
self.config = config
def should_refresh(self, last_updated: datetime) -> bool:
"""更新が必要か判定"""
now = datetime.utcnow()
age = now - last_updated
return age >= self.config.ttl
def get_refresh_priority(self, last_updated: datetime) -> int:
"""
更新の優先度を計算
返り値が大きいほど優先度が高い
"""
age = datetime.utcnow() - last_updated
if age >= self.config.ttl:
# 期限切れ = 最高優先度
return 1000
elif age >= self.config.ttl * 0.8:
# 8割超過 = 高優先度
return 100
elif age >= self.config.ttl * 0.5:
# 5割超過 = 中優先度
return 10
else:
# 5割未満 = 低優先度
return 1
def schedule_refresh(self, documents: List[DocumentMetadata]) -> List[str]:
"""更新が必要なドキュメントIDリストを返す"""
priority_docs = []
for doc in documents:
if self.should_refresh(doc.last_modified):
priority_docs.append(doc.doc_id)
# 優先度順にソート
return sorted(
priority_docs,
key=lambda d: self.get_refresh_priority(
next(m.last_modified for m in documents if m.doc_id == d)
),
reverse=True
)
戦略2:Webhook/イベント駆動型更新
ドキュメント管理システムからWebhookを受け取り、変更時に即座にインデックスを更新します。
from fastapi import FastAPI, WebHook, Header, HTTPException
from pydantic import BaseModel
import hmac
import hashlib
import asyncio
app = FastAPI()
class DocumentChangeEvent(BaseModel):
"""ドキュメント変更イベント"""
event_type: str # 'create', 'update', 'delete'
doc_id: str
source: str
timestamp: str
content_hash: str
metadata: dict = {}
class EventDrivenIndexUpdater:
"""
イベント駆動型インデックス更新
Webhookを受けてリアルタイムで更新を処理
"""
def __init__(
self,
embedding_client: HolySheepEmbeddingClient,
vector_store: Any, # 実際のベクトルストア
webhook_secret: str
):
self.embedding_client = embedding_client
self.vector_store = vector_store
self.webhook_secret = webhook_secret
self.update_queue = asyncio.Queue()
def verify_webhook_signature(
self,
payload: bytes,
signature: str
) -> bool:
"""Webhook署名の検証"""
expected = hmac.new(
self.webhook_secret.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
async def process_change_event(self, event: DocumentChangeEvent):
"""変更イベントを処理"""
if event.event_type == 'delete':
# 削除処理
await self.vector_store.delete(event.doc_id)
return
# 変更または作成の場合、コンテンツを取得して再Embedding
content = await self._fetch_document_content(event.doc_id)
if not content:
raise ValueError(f"ドキュメントが見つかりません: {event.doc_id}")
# HolySheep AIでEmbedding生成
embeddings = await self.embedding_client.generate_embeddings([content])
# ベクトルストアに保存
await self.vector_store.upsert(
id=event.doc_id,
embedding=embeddings[0],
content=content,
metadata=event.metadata
)
async def _fetch_document_content(self, doc_id: str) -> str:
"""ドキュメントコンテンツを取得(実装はシステムに応じて)"""
# 実際の実装では、CMSやデータベースから取得
pass
FastAPIエンドポイント
updater = EventDrivenIndexUpdater(
embedding_client=None, # 初期化時に設定
vector_store=None,
webhook_secret="your_webhook_secret"
)
@app.post("/webhook/documents")
async def handle_document_webhook(
event: DocumentChangeEvent,
x_webhook_signature: str = Header(None)
):
"""ドキュメント変更Webhookエンドポイント"""
if not updater.verify_webhook_signature(
event.json().encode(),
x_webhook_signature or ""
):
raise HTTPException(status_code=401, detail="署名検証失敗")
try:
await updater.process_change_event(event)
return {"status": "success", "doc_id": event.doc_id}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
戦略3:バージョン管理とロールバック
インデックス更新時にバージョンを記録し、問題発生時にロールバックできる体制を構築します。
from datetime import datetime
from typing import Optional
import json
@dataclass
class IndexVersion:
"""インデックスバージョン情報"""
version_id: str
timestamp: datetime
doc_ids: List[str]
checksum: str
status: str # 'active', 'archived', 'rolling_back'
parent_version: Optional[str] = None
class VersionedIndexManager:
"""
バージョン管理付きインデックスマネージャー
增量更新の安全なロールバックをサポート
"""
def __init__(self, storage_path: str):
self.storage_path = storage_path
self.versions: Dict[str, IndexVersion] = {}
self.current_version: Optional[str] = None
async def create_checkpoint(
self,
doc_ids: List[str],
vector_store_state: Any
) -> IndexVersion:
"""現在のインデックス状態をチェックポイントとして保存"""
version_id = f"v_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
# ベクトルストアの状態をシリアライズ
state_checksum = self._compute_state_checksum(vector_store_state)
version = IndexVersion(
version_id=version_id,
timestamp=datetime.utcnow(),
doc_ids=doc_ids,
checksum=state_checksum,
status='active',
parent_version=self.current_version
)
self.versions[version_id] = version
self.current_version = version_id
# 永続化
await self._persist_version(version)
return version
def _compute_state_checksum(self, state: Any) -> str:
"""状態のチェックサムを計算"""
state_str = json.dumps(state, sort_keys=True, default=str)
return hashlib.sha256(state_str.encode()).hexdigest()[:16]
async def rollback_to_version(self, version_id: str) -> bool:
"""
指定バージョンにロールバック
Returns:
成功したかどうか
"""
if version_id not in self.versions:
raise ValueError(f"バージョンが見つかりません: {version_id}")
version = self.versions[version_id]
version.status = 'rolling_back'
# 実際のロールバック処理
# ベクトルストア的状态を旧バージョンに戻す
# 旧バージョンをアクティブにする
version.status = 'active'
self.current_version = version_id
# 間のバージョンをアーカイブ
for v_id, v in self.versions.items():
if v_id != version_id and v.status == 'active':
v.status = 'archived'
return True
RAGパイプラインの統合実装
以上のコンポーネントを統合した、完全なRAGパイプラインを構築します。
from typing import List, Dict, Optional, Callable
import asyncio
class IncrementalRAGPipeline:
"""
增量更新対応RAGパイプライン
HolySheep AI APIを活用した完全な実装
"""
def __init__(
self,
api_key: str,
vector_store,
chunk_size: int = 512,
chunk_overlap: int = 64
):
self.embedding_client = HolySheepEmbeddingClient(api_key)
self.vector_store = vector_store
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.change_detector = IncrementalChangeDetector({})
self.refresh_manager = TTLBasedRefreshManager(
FreshnessConfig(
level=FreshnessLevel.HOURLY,
ttl=timedelta(hours=1),
auto_refresh=True,
max_age_hours=24
)
)
def chunk_document(self, content: str, doc_id: str) -> List[Dict]:
"""ドキュメントをチャンクに分割"""
words = content.split()
chunks = []
start = 0
chunk_idx = 0
while start < len(words):
end = min(start + self.chunk_size, len(words))
chunk_text = ' '.join(words[start:end])
chunks.append({
"id": f"{doc_id}_chunk_{chunk_idx}",
"content": chunk_text,
"metadata": {
"doc_id": doc_id,
"chunk_index": chunk_idx,
"position": start
}
})
start += self.chunk_size - self.chunk_overlap
chunk_idx += 1
return chunks
async def add_or_update_documents(
self,
documents: List[Dict[str, str]]
) -> Dict[str, str]:
"""
ドキュメントを追加または更新
Args:
documents: [{"id": "...", "content": "..."}]
Returns:
処理結果 {"added": N, "updated": N, "unchanged": N}
"""
results = {"added": 0, "updated": 0, "unchanged": 0}
chunks_to_embed = []
for doc in documents:
doc_id = doc["id"]
content = doc["content"]
# 変更検知
change = self.change_detector.detect_changes(
doc_id, content, doc.get("source", "")
)
if change.status == 'unchanged':
results["unchanged"] += 1
continue
# チャンク分割
chunks = self.chunk_document(content, doc_id)
chunks_to_embed.extend(chunks)
# メタデータ更新
self.change_detector.metadata_store[doc_id] = DocumentMetadata(
doc_id=doc_id,
version=1,
content_hash=change.new_hash,
last_modified=datetime.utcnow(),
source_path=doc.get("source", ""),
chunk_count=len(chunks)
)
if change.status == 'new':
results["added"] += 1
else:
results["updated"] += 1
# HolySheep AIでEmbedding生成
if chunks_to_embed:
embeddings = await self.embedding_client.batch_embed_documents(
chunks_to_embed
)
# ベクトルストアに保存
for chunk, embedding in zip(chunks_to_embed, embeddings):
await self.vector_store.upsert(
id=chunk["id"],
embedding=embedding["embedding"],
content=chunk["content"],
metadata=chunk["metadata"]
)
return results
async def query(
self,
query_text: str,
top_k: int = 5,
filters: Optional[Dict] = None
) -> List[Dict]:
"""
RAGクエリを実行
Returns:
関連ドキュメントとスコア
"""
# クエリの