私はRAG(Retrieval-Augmented Generation)システムの運用において、最も頭を悩ませてきたのが「データの新鲜度」と「インデックス更新の効率性」の両立です。ドキュメントが更新されるたびにフル再インデックスを行うと、処理時間とコストが膨大になります。本稿では、HolySheep AIを活用した增量索引更新(Incremental Indexing)の実装戦略と、データ新鲜度を保証するための実践的なアプローチを解説します。

RAGにおける增量更新の必要性と課題

RAGシステムでは、ベクトルデータベースに埋め込み(Embedding)を保存し、ユーザーのクエリに最適なドキュメント断片を検索します。しかし、以下の課題が存在します:

HolySheep AIでは、DeepSeek V3.2が$0.42/MTokという低価格で高品質なEmbeddingを提供しており、增量更新の経済的負担を大幅に軽減できます。

增量索引更新アーキテクチャの設計

1. 変更検知システムの実装

最初のステップは、ドキュメントの変更を効率的に検知することです。私は以下の方針で実装しています:

import hashlib
import json
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, field

@dataclass
class DocumentMetadata:
    """ドキュメントメタデータ"""
    doc_id: str
    version: int
    content_hash: str
    last_modified: datetime
    source_path: str
    chunk_count: int = 0

@dataclass
class ChangeDetectionResult:
    """変更検知結果"""
    doc_id: str
    status: str  # 'new', 'modified', 'unchanged', 'deleted'
    old_hash: Optional[str] = None
    new_hash: Optional[str] = None
    affected_chunks: List[str] = field(default_factory=list)

class IncrementalChangeDetector:
    """
    增量変更検知クラス
    ドキュメントのCRC/MD5ハッシュを監視し、
    変更があったドキュメントのみを検出
    """
    
    def __init__(self, metadata_store: Dict[str, DocumentMetadata]):
        self.metadata_store = metadata_store
        self.hash_algorithm = hashlib.md5
    
    def compute_content_hash(self, content: str) -> str:
        """コンテンツからハッシュ値を計算"""
        normalized = content.strip().replace('\r\n', '\n')
        return self.hash_algorithm(normalized.encode('utf-8')).hexdigest()
    
    def detect_changes(
        self, 
        doc_id: str, 
        new_content: str,
        source_path: str
    ) -> ChangeDetectionResult:
        """ドキュメントの変更を検知"""
        
        new_hash = self.compute_content_hash(new_content)
        
        # 新規ドキュメント
        if doc_id not in self.metadata_store:
            return ChangeDetectionResult(
                doc_id=doc_id,
                status='new',
                new_hash=new_hash
            )
        
        old_metadata = self.metadata_store[doc_id]
        
        # 削除
        if new_content is None or new_content == "":
            return ChangeDetectionResult(
                doc_id=doc_id,
                status='deleted',
                old_hash=old_metadata.content_hash
            )
        
        # 変更なし
        if old_metadata.content_hash == new_hash:
            return ChangeDetectionResult(
                doc_id=doc_id,
                status='unchanged',
                old_hash=old_hash,
                new_hash=new_hash
            )
        
        # 更新あり
        return ChangeDetectionResult(
            doc_id=doc_id,
            status='modified',
            old_hash=old_metadata.content_hash,
            new_hash=new_hash,
            affected_chunks=self._identify_affected_chunks(doc_id)
        )
    
    def _identify_affected_chunks(self, doc_id: str) -> List[str]:
        """影響を受けるチャンクを特定(高性能実装)"""
        # 実際の実装では、ベクトルDBのチャンクマッピングを参照
        # チャンクプレフィックスに基づいて影響範囲を限定
        return [f"{doc_id}_chunk_{i}" for i in range(
            self.metadata_store[doc_id].chunk_count
        )]

2. HolySheep AI APIを活用したEmbedding生成

変更が検知されたドキュメントのEmbeddingを生成します。HolySheep AIのAPIは<50msのレイテンシを実現しており、リアルタイム更新Requirementsに応えます。

import httpx
import asyncio
from typing import List, Dict, Any
import json

class HolySheepEmbeddingClient:
    """
    HolySheep AI APIクライアント
    增量更新専用のEmbedding生成クラス
    """
    
    BASE_URL = "https://api.holysheep.ai/v1"
    
    def __init__(self, api_key: str, timeout: float = 30.0):
        self.api_key = api_key
        self.timeout = timeout
        self.client = httpx.AsyncClient(timeout=timeout)
    
    async def generate_embeddings(
        self, 
        texts: List[str],
        model: str = "deepseek-embed"
    ) -> List[List[float]]:
        """
        批量でEmbeddingを生成
        
        Args:
            texts: 埋め込み対象テキストリスト
            model: 使用するEmbeddingモデル
        
        Returns:
            ベクトルEmbeddingリスト
        """
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "input": texts,
            "model": model,
            "encoding_format": "float"
        }
        
        try:
            response = await self.client.post(
                f"{self.BASE_URL}/embeddings",
                headers=headers,
                json=payload
            )
            response.raise_for_status()
            
            result = response.json()
            return [item["embedding"] for item in result["data"]]
            
        except httpx.TimeoutException:
            raise ConnectionError(f"Embedding生成タイムアウト: {len(texts)}件のテキスト")
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 401:
                raise AuthenticationError(
                    "APIキー認証失敗: 正しいYOUR_HOLYSHEEP_API_KEYを設定してください"
                )
            raise RuntimeError(f"Embedding生成エラー: {e.response.status_code}")
    
    async def batch_embed_documents(
        self,
        documents: List[Dict[str, Any]],
        batch_size: int = 100
    ) -> List[Dict[str, Any]]:
        """
        ドキュメントの一括Embedding生成(增量更新対応)
        
        Args:
            documents: [{"id": "doc1", "content": "...", "metadata": {...}}]
            batch_size: 一批あたりの処理数
        
        Returns:
            [{"id": "doc1", "embedding": [...], "token_count": 123}, ...]
        """
        results = []
        
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]
            texts = [doc["content"] for doc in batch]
            
            # HolySheep AI API呼び出し
            embeddings = await self.generate_embeddings(texts)
            
            for doc, embedding in zip(batch, embeddings):
                results.append({
                    "id": doc["id"],
                    "embedding": embedding,
                    "token_count": len(doc["content"]) // 4,  # 概算
                    "metadata": doc.get("metadata", {})
                })
            
            # レート制限を考慮した待機(HolySheepは寛容な制限)
            if i + batch_size < len(documents):
                await asyncio.sleep(0.1)
        
        return results
    
    async def close(self):
        await self.client.aclose()


使用例

async def main(): client = HolySheepEmbeddingClient(api_key="YOUR_HOLYSHEEP_API_KEY") documents = [ { "id": "doc_001", "content": "RAGシステムの新着ドキュメントです...", "metadata": {"source": "blog", "category": "tech"} }, { "id": "doc_002", "content": "HolySheep AIの活用方法...", "metadata": {"source": "blog", "category": "ai"} } ] results = await client.batch_embed_documents(documents) for result in results: print(f"Document: {result['id']}, " f"Tokens: {result['token_count']}, " f"Embedding次元: {len(result['embedding'])}") await client.close()

asyncio.run(main())

データ新鲜度を保証する3つの戦略

戦略1:TTL(Time-To-Live)ベースの自動失效

最もシンプルな方法は、各ドキュメントにTTLを設定し、一定時間後に自動的に再インデックスをトリガーすることです。

from datetime import datetime, timedelta
from enum import Enum

class FreshnessLevel(Enum):
    """データ新鲜度レベル"""
    REALTIME = timedelta(seconds=0)      # 即時更新
    NEAR_REALTIME = timedelta(minutes=5)  # 5分以内
    HOURLY = timedelta(hours=1)           # 1時間以内
    DAILY = timedelta(days=1)             # 1日以内
    WEEKLY = timedelta(weeks=1)           # 1週間以内

@dataclass
class FreshnessConfig:
    """新鲜度設定"""
    level: FreshnessLevel
    ttl: timedelta
    auto_refresh: bool
    max_age_hours: int

class TTLBasedRefreshManager:
    """
    TTLベースの自動更新マネージャー
    ドキュメントの新鲜度を監視し、必要に応じて更新をトリガー
    """
    
    def __init__(self, config: FreshnessConfig):
        self.config = config
    
    def should_refresh(self, last_updated: datetime) -> bool:
        """更新が必要か判定"""
        now = datetime.utcnow()
        age = now - last_updated
        return age >= self.config.ttl
    
    def get_refresh_priority(self, last_updated: datetime) -> int:
        """
        更新の優先度を計算
        返り値が大きいほど優先度が高い
        """
        age = datetime.utcnow() - last_updated
        
        if age >= self.config.ttl:
            # 期限切れ = 最高優先度
            return 1000
        elif age >= self.config.ttl * 0.8:
            # 8割超過 = 高優先度
            return 100
        elif age >= self.config.ttl * 0.5:
            # 5割超過 = 中優先度
            return 10
        else:
            # 5割未満 = 低優先度
            return 1
    
    def schedule_refresh(self, documents: List[DocumentMetadata]) -> List[str]:
        """更新が必要なドキュメントIDリストを返す"""
        priority_docs = []
        
        for doc in documents:
            if self.should_refresh(doc.last_modified):
                priority_docs.append(doc.doc_id)
        
        # 優先度順にソート
        return sorted(
            priority_docs,
            key=lambda d: self.get_refresh_priority(
                next(m.last_modified for m in documents if m.doc_id == d)
            ),
            reverse=True
        )

戦略2:Webhook/イベント駆動型更新

ドキュメント管理システムからWebhookを受け取り、変更時に即座にインデックスを更新します。

from fastapi import FastAPI, WebHook, Header, HTTPException
from pydantic import BaseModel
import hmac
import hashlib
import asyncio

app = FastAPI()

class DocumentChangeEvent(BaseModel):
    """ドキュメント変更イベント"""
    event_type: str  # 'create', 'update', 'delete'
    doc_id: str
    source: str
    timestamp: str
    content_hash: str
    metadata: dict = {}

class EventDrivenIndexUpdater:
    """
    イベント駆動型インデックス更新
    Webhookを受けてリアルタイムで更新を処理
    """
    
    def __init__(
        self, 
        embedding_client: HolySheepEmbeddingClient,
        vector_store: Any,  # 実際のベクトルストア
        webhook_secret: str
    ):
        self.embedding_client = embedding_client
        self.vector_store = vector_store
        self.webhook_secret = webhook_secret
        self.update_queue = asyncio.Queue()
    
    def verify_webhook_signature(
        self, 
        payload: bytes, 
        signature: str
    ) -> bool:
        """Webhook署名の検証"""
        expected = hmac.new(
            self.webhook_secret.encode(),
            payload,
            hashlib.sha256
        ).hexdigest()
        return hmac.compare_digest(f"sha256={expected}", signature)
    
    async def process_change_event(self, event: DocumentChangeEvent):
        """変更イベントを処理"""
        
        if event.event_type == 'delete':
            # 削除処理
            await self.vector_store.delete(event.doc_id)
            return
        
        # 変更または作成の場合、コンテンツを取得して再Embedding
        content = await self._fetch_document_content(event.doc_id)
        
        if not content:
            raise ValueError(f"ドキュメントが見つかりません: {event.doc_id}")
        
        # HolySheep AIでEmbedding生成
        embeddings = await self.embedding_client.generate_embeddings([content])
        
        # ベクトルストアに保存
        await self.vector_store.upsert(
            id=event.doc_id,
            embedding=embeddings[0],
            content=content,
            metadata=event.metadata
        )
    
    async def _fetch_document_content(self, doc_id: str) -> str:
        """ドキュメントコンテンツを取得(実装はシステムに応じて)"""
        # 実際の実装では、CMSやデータベースから取得
        pass

FastAPIエンドポイント

updater = EventDrivenIndexUpdater( embedding_client=None, # 初期化時に設定 vector_store=None, webhook_secret="your_webhook_secret" ) @app.post("/webhook/documents") async def handle_document_webhook( event: DocumentChangeEvent, x_webhook_signature: str = Header(None) ): """ドキュメント変更Webhookエンドポイント""" if not updater.verify_webhook_signature( event.json().encode(), x_webhook_signature or "" ): raise HTTPException(status_code=401, detail="署名検証失敗") try: await updater.process_change_event(event) return {"status": "success", "doc_id": event.doc_id} except Exception as e: raise HTTPException(status_code=500, detail=str(e))

戦略3:バージョン管理とロールバック

インデックス更新時にバージョンを記録し、問題発生時にロールバックできる体制を構築します。

from datetime import datetime
from typing import Optional
import json

@dataclass
class IndexVersion:
    """インデックスバージョン情報"""
    version_id: str
    timestamp: datetime
    doc_ids: List[str]
    checksum: str
    status: str  # 'active', 'archived', 'rolling_back'
    parent_version: Optional[str] = None

class VersionedIndexManager:
    """
    バージョン管理付きインデックスマネージャー
    增量更新の安全なロールバックをサポート
    """
    
    def __init__(self, storage_path: str):
        self.storage_path = storage_path
        self.versions: Dict[str, IndexVersion] = {}
        self.current_version: Optional[str] = None
    
    async def create_checkpoint(
        self, 
        doc_ids: List[str],
        vector_store_state: Any
    ) -> IndexVersion:
        """現在のインデックス状態をチェックポイントとして保存"""
        
        version_id = f"v_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
        
        # ベクトルストアの状態をシリアライズ
        state_checksum = self._compute_state_checksum(vector_store_state)
        
        version = IndexVersion(
            version_id=version_id,
            timestamp=datetime.utcnow(),
            doc_ids=doc_ids,
            checksum=state_checksum,
            status='active',
            parent_version=self.current_version
        )
        
        self.versions[version_id] = version
        self.current_version = version_id
        
        # 永続化
        await self._persist_version(version)
        
        return version
    
    def _compute_state_checksum(self, state: Any) -> str:
        """状態のチェックサムを計算"""
        state_str = json.dumps(state, sort_keys=True, default=str)
        return hashlib.sha256(state_str.encode()).hexdigest()[:16]
    
    async def rollback_to_version(self, version_id: str) -> bool:
        """
        指定バージョンにロールバック
        
        Returns:
            成功したかどうか
        """
        if version_id not in self.versions:
            raise ValueError(f"バージョンが見つかりません: {version_id}")
        
        version = self.versions[version_id]
        version.status = 'rolling_back'
        
        # 実際のロールバック処理
        # ベクトルストア的状态を旧バージョンに戻す
        
        # 旧バージョンをアクティブにする
        version.status = 'active'
        self.current_version = version_id
        
        # 間のバージョンをアーカイブ
        for v_id, v in self.versions.items():
            if v_id != version_id and v.status == 'active':
                v.status = 'archived'
        
        return True

RAGパイプラインの統合実装

以上のコンポーネントを統合した、完全なRAGパイプラインを構築します。

from typing import List, Dict, Optional, Callable
import asyncio

class IncrementalRAGPipeline:
    """
    增量更新対応RAGパイプライン
    HolySheep AI APIを活用した完全な実装
    """
    
    def __init__(
        self,
        api_key: str,
        vector_store,
        chunk_size: int = 512,
        chunk_overlap: int = 64
    ):
        self.embedding_client = HolySheepEmbeddingClient(api_key)
        self.vector_store = vector_store
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.change_detector = IncrementalChangeDetector({})
        self.refresh_manager = TTLBasedRefreshManager(
            FreshnessConfig(
                level=FreshnessLevel.HOURLY,
                ttl=timedelta(hours=1),
                auto_refresh=True,
                max_age_hours=24
            )
        )
    
    def chunk_document(self, content: str, doc_id: str) -> List[Dict]:
        """ドキュメントをチャンクに分割"""
        words = content.split()
        chunks = []
        
        start = 0
        chunk_idx = 0
        
        while start < len(words):
            end = min(start + self.chunk_size, len(words))
            chunk_text = ' '.join(words[start:end])
            
            chunks.append({
                "id": f"{doc_id}_chunk_{chunk_idx}",
                "content": chunk_text,
                "metadata": {
                    "doc_id": doc_id,
                    "chunk_index": chunk_idx,
                    "position": start
                }
            })
            
            start += self.chunk_size - self.chunk_overlap
            chunk_idx += 1
        
        return chunks
    
    async def add_or_update_documents(
        self,
        documents: List[Dict[str, str]]
    ) -> Dict[str, str]:
        """
        ドキュメントを追加または更新
        
        Args:
            documents: [{"id": "...", "content": "..."}]
        
        Returns:
            処理結果 {"added": N, "updated": N, "unchanged": N}
        """
        results = {"added": 0, "updated": 0, "unchanged": 0}
        chunks_to_embed = []
        
        for doc in documents:
            doc_id = doc["id"]
            content = doc["content"]
            
            # 変更検知
            change = self.change_detector.detect_changes(
                doc_id, content, doc.get("source", "")
            )
            
            if change.status == 'unchanged':
                results["unchanged"] += 1
                continue
            
            # チャンク分割
            chunks = self.chunk_document(content, doc_id)
            chunks_to_embed.extend(chunks)
            
            # メタデータ更新
            self.change_detector.metadata_store[doc_id] = DocumentMetadata(
                doc_id=doc_id,
                version=1,
                content_hash=change.new_hash,
                last_modified=datetime.utcnow(),
                source_path=doc.get("source", ""),
                chunk_count=len(chunks)
            )
            
            if change.status == 'new':
                results["added"] += 1
            else:
                results["updated"] += 1
        
        # HolySheep AIでEmbedding生成
        if chunks_to_embed:
            embeddings = await self.embedding_client.batch_embed_documents(
                chunks_to_embed
            )
            
            # ベクトルストアに保存
            for chunk, embedding in zip(chunks_to_embed, embeddings):
                await self.vector_store.upsert(
                    id=chunk["id"],
                    embedding=embedding["embedding"],
                    content=chunk["content"],
                    metadata=chunk["metadata"]
                )
        
        return results
    
    async def query(
        self, 
        query_text: str, 
        top_k: int = 5,
        filters: Optional[Dict] = None
    ) -> List[Dict]:
        """
        RAGクエリを実行
        
        Returns:
            関連ドキュメントとスコア
        """
        # クエリの