class HybridLongContextRAG(LongContextRAG):
"""ベクトル検索 + キーワード検索のハイブリッド方式"""
def __init__(self):
super().__init__()
import re
self.bm25_weights = [0.3, 0.7] # [BM25, Vector]
def bm25_score(self, query: str, document: str, k1: float = 1.5,
b: float = 0.75) -> float:
"""BM25 スコア計算"""
query_terms = query.lower().split()
doc_terms = document.lower().split()
doc_len = len(doc_terms)
avg_len = doc_len # 簡略化
score = 0
doc_freq = {}
# IDF計算(簡略化)
for term in query_terms:
freq = doc_terms.count(term)
idf = np.log((len(self.documents) - freq + 0.5) / (freq + 0.5) + 1)
tf = (k1 * freq) / (freq + k1 * (1 - b + b * doc_len / avg_len))
score += idf * tf
return score
def hybrid_retrieve(self, query: str, chunks: List[Dict],
top_k: int = 20) -> List[Dict]:
"""ハイブリッド検索の実行"""
# ベクトル類似度
vector_scores = self._get_vector_scores(query, chunks)
# BM25スコア
bm25_scores = []
for chunk in chunks:
bm25 = self.bm25_score(query, chunk["text"])
bm25_scores.append(bm25)
# 正規化
vector_scores_norm = np.array(vector_scores) / max(vector_scores)
bm25_scores_norm = np.array(bm25_scores) / max(bm25_scores) if max(bm25_scores) > 0 else bm25_scores
# 重み付け合成
combined_scores = (
self.bm25_weights[0] * bm25_scores_norm +
self.bm25_weights[1] * vector_scores_norm
)
top_indices = np.argsort(combined_scores)[-top_k:][::-1]
return [chunks[i] for i in top_indices]
def _get_vector_scores(self, query: str, chunks: List[Dict]) -> List[float]:
"""ベクトル検索スコアの取得"""
query_emb = self._get_embedding(query)
chunk_embs = self._batch_get_embeddings([c["text"] for c in chunks])
scores = []
for emb in chunk_embs:
sim = np.dot(query_emb, emb) / (np.linalg.norm(query_emb) * np.linalg.norm(emb))
scores.append(float(sim))
return scores
よくあるエラーと対処法
エラー1:ConnectionError: timeout after 30s
# ❌ 失敗する例:タイムアウト
response = client.chat.completions.create(
model="gemini-2.5-flash",
messages=[{"role": "user", "content": large_text}]
# timeout デフォルトは 60s だが、長文脈では不足
)
✅ 解決法:タイムアウト延長 + リトライロジック
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=10, max=60))
def robust_generate(client, prompt, max_tokens=4096):
try:
response = client.chat.completions.create(
model="gemini-2.5-flash",
messages=[{"role": "user", "content": prompt}],
timeout=120, # 2分に延長
max_tokens=max_tokens
)
return response
except Exception as e:
print(f"エラー発生: {e}, リトライ中...")
raise
エラー2:401 Unauthorized — Invalid API Key
# ❌ 失敗する例:環境変数の読み込みミス
client = OpenAI(
api_key="sk-xxxx" # 直接記述は非推奨
)
✅ 解決法:環境変数 + バリデーション
import os
from dotenv import load_dotenv
load_dotenv() # .env ファイルから読み込み
API_KEY = os.environ.get("HOLYSHEEP_API_KEY")
if not API_KEY or not API_KEY.startswith("sk-"):
raise ValueError(
"有効な HOLYSHEEP_API_KEY を設定してください。\n"
"https://www.holysheep.ai/register で取得できます"
)
client = OpenAI(
api_key=API_KEY,
base_url="https://api.holysheep.ai/v1"
)
接続テスト
def test_connection():
try:
models = client.models.list()
print("接続成功!利用可能なモデル:")
for model in models.data:
print(f" - {model.id}")
except Exception as e:
print(f"接続エラー: {e}")
エラー3:RateLimitError — レート制限超過
# ❌ 失敗する例:一括送信でレート制限
for chunk in huge_chunks:
response = client.chat.completions.create(...) # 同時呼び出しで制限
✅ 解決法:キュー + 指数関数的バックオフ
import asyncio
from collections import deque
import time
class RateLimitedClient:
def __init__(self, client, requests_per_minute=60):
self.client = client
self.rpm = requests_per_minute
self.request_times = deque(maxlen=requests_per_minute)
self.lock = asyncio.Lock()
async def throttled_generate(self, prompt, max_retries=5):
for attempt in range(max_retries):
async with self.lock:
now = time.time()
# 1分以内のリクエストをクリア
while self.request_times and now - self.request_times[0] < 60:
self.request_times.popleft()
if len(self.request_times) >= self.rpm:
wait_time = 60 - (now - self.request_times[0])
print(f"レート制限: {wait_time:.1f}秒待機")
await asyncio.sleep(wait_time)
self.request_times.append(time.time())
try:
response = self.client.chat.completions.create(
model="gemini-2.5-flash",
messages=[{"role": "user", "content": prompt}]
)
return response
except Exception as e:
if "rate_limit" in str(e).lower():
wait = (2 ** attempt) * 1.5 # 指数関数的バックオフ
print(f"リトライ {attempt+1}: {wait:.1f}秒待機")
await asyncio.sleep(wait)
else:
raise
raise Exception("最大リトライ回数を超過")
使用例
async def process_documents():
client_wrapper = RateLimitedClient(client)
tasks = [client_wrapper.throttled_generate(prompt) for prompt in prompts]
results = await asyncio.gather(*tasks)
return results
エラー4:JSONDecodeError — 無効なJSON応答
# ❌ 失敗する例:応答のバリデーションなし
response = client.chat.completions.create(...)
result = json.loads(response.choices[0].message.content) # 失敗の可能性
✅ 解決法:構造化出力の活用
from pydantic import BaseModel, ValidationError
from typing import List, Optional
class DocumentSummary(BaseModel):
title: str
key_points: List[str]
confidence: float
citations: Optional[List[str]] = None
def structured_generate(query: str, context: str) -> DocumentSummary:
prompt = f"""{context}
質問: {query}
結果を以下のJSON形式で返してください:
{{
"title": "文書のタイトル",
"key_points": ["要点1", "要点2", "要点3"],
"confidence": 0.0-1.0の確信度,
"citations": ["出典1", "出典2"]
}}"""
response = client.chat.completions.create(
model="gemini-2.5-flash",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"} # JSON出力の強制
)
content = response.choices[0].message.content
try:
data = json.loads(content)
return DocumentSummary(**data)
except (json.JSONDecodeError, ValidationError) as e:
print(f"解析エラー: {e}, 生データを返します")
return {"raw_content": content}
コスト最適化技巧
HolySheep AI の ¥1=$1 レート(公式¥7.3=$1 比85%節約)を活用したコスト最適化の実践例:
class CostOptimizedRAG:
"""コストを最小化しつつ精度を維持するRAG"""
PRICING = {
"gemini-2.5-flash": {"input": 2.50, "output": 10.00}, # $/MTok
"gemini-2.5-pro": {"input": 15.00, "output": 60.00},
"deepseek-v3.2": {"input": 0.42, "output": 2.70} # 更なるコスト削減
}
def smart_model_selection(self, query_complexity: str,
context_length: int) -> str:
"""
タスク复杂度に応じて最適なモデルを選択
"""
token_count = context_length / 4 # 概算
if query_complexity == "simple" and token_count < 100_000:
# 軽量タスクは DeepSeek V3.2 ($0.42/MTok)
return "deepseek-v3.2"
elif token_count > 1_000_000:
# 超長文脈は Gemini 2.5 Flash
return "gemini-2.5-flash"
else:
# 標準タスクは Gemini 2.5 Flash
return "gemini-2.5-flash"
def estimate_cost(self, input_tokens: int, output_tokens: int,
model: str) -> float:
"""コスト見積もり(USD)"""
pricing = self.PRICING.get(model, self.PRICING["gemini-2.5-flash"])
input_cost = (input_tokens / 1_000_000) * pricing["input"]
output_cost = (output_tokens / 1_000_000) * pricing["output"]
return input_cost + output_cost
def optimize_context(self, chunks: List[Dict],
max_tokens: int = 1_800_000) -> List[Dict]:
"""
コンテキストを最適化してコストを削減
2Mトークンのうち200Kをバッファとして確保
"""
optimized = []
total_tokens = 0
for chunk in chunks:
chunk_tokens = chunk.get("token_count", len(chunk["text"]) // 4)
if total_tokens + chunk_tokens <= max_tokens:
optimized.append(chunk)
total_tokens += chunk_tokens
else:
break
return optimized
まとめ
本稿では、Gemini 2.5 の2Mトークンコンテキストを活用した長文脈 RAG システムの構築方法を解説しました。筆者の实践经验から、以下の点が重要だと確信しています:
- HolySheep AI の ¥1=$1 レートと <50ms レイテンシを組み合わせることで、商用レベルの RAG システムを低コスト・高速度で構築可能
- ハイブリッド