ベクトルデータベースとEmbedding技術の進化により、数百万〜数十億の高次元ベクトルから類似アイテムを高速に検索する必要性が急増しています。本稿では、Approximate Nearest Neighbor (ANN) 検索を百万スケールベクトルに実装する実践的なアプローチを、HolySheep AIを活用した具体的なコード例と共に解説します。

なぜ近似探索が必要なのか

100万件のベクトル(例:1536次元GPT-4Embedding)で線形探索を行う場合、1クエリあたり最大100万回の距離計算が必要です。これは実用的ではありません。ANNアルゴリズムは、数百〜数千件の候補のみを探索することで、99%以上の精度を保ちつつ100〜1000倍高速な検索を実現します。

代表的なANNアルゴリズム

実装前の準備

まずは環境構築とAPI接続の確認を行いましょう。HolySheep AIでは、今すぐ登録して無料クレジットを獲得できます。HolySheep AIのAPIは<50msのレイテンシを実現しており、レートは¥1=$1(公式¥7.3=$1比85%節約)と非常にコスト効率的です。

プロジェクト構造

ann-search-project/
├── config.py              # API設定
├── embeddings.py          # ベクトル生成
├── hnsw_index.py          # HNSWインデックス構築
├── search.py              # 類似性検索
├── requirements.txt
└── main.py                # メイン処理

設定ファイルの作成

# config.py
import os

HolySheep AI設定

HOLYSHEEP_API_KEY = os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY") BASE_URL = "https://api.holysheep.ai/v1"

ANNインデックス設定

HNSW_M = 16 # 接続数(精度と速度のトレードオフ) HNSW_EF_CONSTRUCTION = 200 # 構築時の探索幅 HNSW_EF_SEARCH = 100 # 検索時の探索幅 DIMENSION = 1536 # Embedding次元数(text-embedding-3-small)

データパス

INDEX_PATH = "hnsw_index.bin" VECTORS_PATH = "vectors.npy" METADATA_PATH = "metadata.json"

HolySheep AIでEmbeddingを生成

# embeddings.py
import json
import numpy as np
from typing import List, Dict
import requests

class HolySheepEmbeddings:
    """HolySheep AI APIを使用したEmbedding生成"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.holysheep.ai/v1"
    
    def embed_texts(self, texts: List[str], model: str = "text-embedding-3-small") -> np.ndarray:
        """
        テキストリストからEmbeddingベクトルを生成
        
        Args:
            texts: 埋め込むテキストのリスト
            model: 使用するEmbeddingモデル
            
        Returns:
            numpy.ndarray: ベクトル配列 (len(texts), dimension)
        """
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "input": texts,
            "model": model
        }
        
        response = requests.post(
            f"{self.base_url}/embeddings",
            headers=headers,
            json=payload,
            timeout=30
        )
        
        if response.status_code == 401:
            raise ConnectionError("401 Unauthorized: Invalid API key. Please check HOLYSHEEP_API_KEY")
        
        if response.status_code == 429:
            raise ConnectionError("429 Too Many Requests: Rate limit exceeded. Retry after cooldown.")
        
        response.raise_for_status()
        
        data = response.json()
        embeddings = np.array([item["embedding"] for item in data["data"]])
        
        return embeddings
    
    def generate_dataset_embeddings(self, texts: List[str], batch_size: int = 100) -> tuple:
        """
        百万スケールデータセットのEmbeddingをバッチ処理で生成
        
        Args:
            texts: 全テキストデータ
            batch_size: バッチサイズ
            
        Returns:
            tuple: (embeddings, usage_info)
        """
        all_embeddings = []
        total_tokens = 0
        
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]
            embeddings = self.embed_texts(batch)
            all_embeddings.append(embeddings)
            
            if i % 1000 == 0:
                print(f"Progress: {i}/{len(texts)} ({100*i/len(texts):.1f}%)")
        
        return np.vstack(all_embeddings), {"total_items": len(texts)}


使用例

if __name__ == "__main__": # HolySheep AI APIキー設定 api_key = "YOUR_HOLYSHEEP_API_KEY" # テストテキスト sample_texts = [ "機械学習の基礎概念", "深層学習のアーキテクチャ", "自然言語処理の最新手法", "ベクトルデータベースの設計", "ANN検索アルゴリズムの実装" ] embeddings_client = HolySheepEmbeddings(api_key) try: vectors = embeddings_client.embed_texts(sample_texts) print(f"Generated {vectors.shape[0]} embeddings with dimension {vectors.shape[1]}") print(f"Vector sample (first 5 dims): {vectors[0][:5]}") except ConnectionError as e: print(f"Connection Error: {e}")

HNSWインデックスの構築と検索

# hnsw_index.py
import numpy as np
import hnswlib
import json
from typing import List, Dict, Tuple, Optional
import pickle
from config import (
    HNSW_M, HNSW_EF_CONSTRUCTION, HNSW_EF_SEARCH,
    DIMENSION, INDEX_PATH, VECTORS_PATH, METADATA_PATH
)

class MillionScaleVectorIndex:
    """百万スケール対応HNSWインデックス"""
    
    def __init__(self, dimension: int = DIMENSION, space: str = "cosine"):
        """
        Args:
            dimension: ベクトルの次元数
            space: 距離尺度 ("l2", "ip"=内積, "cosine"=コサイン類似度)
        """
        self.dimension = dimension
        self.space = space
        self.index: Optional[hnswlib.Index] = None
        self.metadata: List[Dict] = []
        self.id_to_idx: Dict[int, int] = {}
    
    def build_index(self, vectors: np.ndarray, metadata: List[Dict], 
                    ef_construction: int = HNSW_EF_CONSTRUCTION,
                    m: int = HNSW_M) -> Dict:
        """
        HNSWインデックスを構築
        
        Args:
            vectors: ベクトル配列 (N, dimension)
            metadata: 各ベクトルに対応するメタデータ
            ef_construction: 構築時の探索幅(大きい=精度高いが低速)
            m: 各層の最大接続数
            
        Returns:
            Dict: 構築統計情報
        """
        print(f"Building HNSW index for {len(vectors):,} vectors...")
        
        # インデックス初期化
        self.index = hnswlib.Index(space=self.space, dim=self.dimension)
        
        # インデックスパラメータ設定
        self.index.init_index(
            max_elements=len(vectors) * 2,  # 拡張用バッファ
            ef_construction=ef_construction,
            M=m,
            random_seed=42
        )
        
        # ベクトル追加(IDはインデックス内の連番)
        self.index.add_items(vectors, np.arange(len(vectors)))
        
        # メタデータ保存
        self.metadata = metadata
        for idx, item in enumerate(metadata):
            self.id_to_idx[item.get("id", idx)] = idx
        
        stats = {
            "total_vectors": len(vectors),
            "dimension": self.dimension,
            "ef_construction": ef_construction,
            "M": m,
            "space": self.space
        }
        
        print(f"Index built successfully. {stats}")
        return stats
    
    def set_search_params(self, ef_search: int = HNSW_EF_SEARCH):
        """
        検索パラメータを設定
        
        Args:
            ef_search: 検索時の探索幅(大きい=精度高いが低速)
        """
        if self.index is None:
            raise RuntimeError("Index not built yet. Call build_index() first.")
        self.index.set_ef(ef_search)
    
    def search(self, query_vector: np.ndarray, k: int = 10) -> List[Dict]:
        """
        近似最近傍検索を実行
        
        Args:
            query_vector: クエリベクトル (dimension,)
            k: 取得する近傍数
            
        Returns:
            List[Dict]: 上位k件の検索結果 (id, text, distance, score)
        """
        if self.index is None:
            raise RuntimeError("Index not built yet. Call build_index() first.")
        
        # ベクトルを2Dにreshape(hnswlibの要件)
        if query_vector.ndim == 1:
            query_vector = query_vector.reshape(1, -1)
        
        # ANN検索実行
        labels, distances = self.index.knn_query(query_vector, k=k)
        
        # 結果成型
        results = []
        for idx, label in enumerate(labels[0]):
            if label < len(self.metadata):
                result = {
                    "rank": len(results) + 1,
                    "id": self.metadata[label].get("id", int(label)),
                    "distance": float(distances[0][len(results)]),
                    "score": float(1 / (1 + distances[0][len(results)])),  # コサイン距離から類似度に変換
                    **self.metadata[label]
                }
                results.append(result)
        
        return results
    
    def batch_search(self, query_vectors: np.ndarray, k: int = 10) -> List[List[Dict]]:
        """
        批量検索(複数のクエリを同時処理)
        
        Args:
            query_vectors: クエリベクトル配列 (N, dimension)
            k: 各クエリに対して取得する近傍数
            
        Returns:
            List[List[Dict]]: 各クエリの検索結果リスト
        """
        if self.index is None:
            raise RuntimeError("Index not built yet.")
        
        labels, distances = self.index.knn_query(query_vectors, k=k)
        
        all_results = []
        for query_idx in range(len(query_vectors)):
            results = []
            for i in range(k):
                label = labels[query_idx][i]
                if label < len(self.metadata):
                    result = {
                        "rank": i + 1,
                        "id": self.metadata[label].get("id", int(label)),
                        "distance": float(distances[query_idx][i]),
                        "score": float(1 / (1 + distances[query_idx][i])),
                        **self.metadata[label]
                    }
                    results.append(result)
            all_results.append(results)
        
        return all_results
    
    def save(self, index_path: str = INDEX_PATH, metadata_path: str = METADATA_PATH):
        """インデックスとメタデータを保存"""
        if self.index is None:
            raise RuntimeError("No index to save.")
        
        self.index.save_index(index_path)
        
        with open(metadata_path, "w", encoding="utf-8") as f:
            json.dump(self.metadata, f, ensure_ascii=False, indent=2)
        
        print(f"Saved index to {index_path} and metadata to {metadata_path}")
    
    def load(self, index_path: str = INDEX_PATH, metadata_path: str = METADATA_PATH):
        """インデックスとメタデータを読み込み"""
        self.index = hnswlib.Index(space=self.space, dim=self.dimension)
        self.index.load_index(index_path)
        
        with open(metadata_path, "r", encoding="utf-8") as f:
            self.metadata = json.load(f)
        
        self.id_to_idx = {item.get("id", idx): idx for idx, item in enumerate(self.metadata)}
        
        print(f"Loaded index with {len(self.metadata):,} vectors")


メイン処理例

if __name__ == "__main__": import time # インデックス生成 index = MillionScaleVectorIndex(dimension=1536, space="cosine") # ダミーデータでテスト(実際には100万件のEmbeddingを使用) num_vectors = 10000 dummy_vectors = np.random.randn(num_vectors, 1536).astype("float32") # L2正規化(コサイン類似度用) norms = np.linalg.norm(dummy_vectors, axis=1, keepdims=True) dummy_vectors = dummy_vectors / (norms + 1e-8) # メタデータ生成 dummy_metadata = [ {"id": i, "text": f"Document {i}", "category": f"cat_{i % 100}"} for i in range(num_vectors) ] # インデックス構築時間測定 start_time = time.time() stats = index.build_index(dummy_vectors, dummy_metadata) build_time = time.time() - start_time print(f"Build time: {build_time:.2f}s") print(f"Throughput: {num_vectors / build_time:.0f} vectors/sec") # 検索パフォーマンス測定 index.set_search_params(ef_search=100) query = dummy_vectors[0].reshape(1, -1) start_time = time.time() for _ in range(100): results = index.search(query[0], k=10) search_time = (time.time() - start_time) / 100 * 1000 # ms print(f"Average search time: {search_time:.2f}ms") print(f"Top 3 results: {results[:3]}")

HolySheep AI API統合の完全な例

# main.py - 完全なパイプライン
import numpy as np
import json
import time
import requests
from typing import List, Dict
from embeddings import HolySheepEmbeddings
from hnsw_index import MillionScaleVectorIndex
from config import HOLYSHEEP_API_KEY, DIMENSION

class ANNVectorPipeline:
    """Embedding生成からANN検索までの完全パイプライン"""
    
    def __init__(self, api_key: str):
        self.embeddings_client = HolySheepEmbeddings(api_key)
        self.index = MillionScaleVectorIndex(dimension=DIMENSION, space="cosine")
    
    def initialize_index(self, texts: List[str], batch_size: int = 100) -> Dict:
        """
        インデックスを初期化(全データの一括処理)
        
        Args:
            texts: インデックス化するテキストリスト
            batch_size: Embedding生成のバッチサイズ
            
        Returns:
            Dict: 処理統計
        """
        start_time = time.time()
        
        # Step 1: 全テキストのEmbedding生成
        print(f"[1/3] Generating embeddings for {len(texts):,} texts...")
        vectors, _ = self.embeddings_client.generate_dataset_embeddings(
            texts, batch_size=batch_size
        )
        
        # L2正規化(コサイン類似度用)
        norms = np.linalg.norm(vectors, axis=1, keepdims=True)
        vectors = vectors / (norms + 1e-8)
        
        # Step 2: メタデータ準備
        print(f"[2/3] Preparing metadata...")
        metadata = [{"id": i, "text": text[:500]} for i, text in enumerate(texts)]
        
        # Step 3: インデックス構築
        print(f"[3/3] Building HNSW index...")
        stats = self.index.build_index(vectors, metadata)
        
        total_time = time.time() - start_time
        
        return {
            **stats,
            "total_time_seconds": total_time,
            "throughput_per_second": len(texts) / total_time
        }
    
    def semantic_search(self, query: str, top_k: int = 10) -> List[Dict]:
        """
        セマンティック検索を実行
        
        Args:
            query: 検索クエリ
            top_k: 取得件数
            
        Returns:
            List[Dict]: 検索結果
        """
        start_time = time.time()
        
        # クエリのEmbedding生成
        query_vector = self.embeddings_client.embed_texts([query])[0]
        
        # L2正規化
        query_vector = query_vector / (np.linalg.norm(query_vector) + 1e-8)
        
        # ANN検索
        results = self.index.search(query_vector, k=top_k)
        
        search_time_ms = (time.time() - start_time) * 1000
        
        # 結果に処理時間を追加
        for r in results:
            r["query_time_ms"] = search_time_ms
        
        return results
    
    def batch_semantic_search(self, queries: List[str], top_k: int = 10) -> List[List[Dict]]:
        """
        批量セマンティック検索
        
        Args:
            queries: 検索クエリリスト
            top_k: 各クエリの取得件数
            
        Returns:
            List[List[Dict]]: 各クエリの検索結果
        """
        start_time = time.time()
        
        # 全クエリのEmbedding一括生成(API呼び出し回数を最小化)
        query_vectors = self.embeddings_client.embed_texts(queries)
        
        # L2正規化
        norms = np.linalg.norm(query_vectors, axis=1, keepdims=True)
        query_vectors = query_vectors / (norms + 1e-8)
        
        # 批量検索
        all_results = self.index.batch_search(query_vectors, k=top_k)
        
        total_time_ms = (time.time() - start_time) * 1000
        
        # 結果に処理時間を追加
        for results in all_results:
            for r in results:
                r["query_time_ms"] = total_time_ms / len(queries)
        
        return all_results


実行例

if __name__ == "__main__": # HolySheep AI APIキー api_key = "YOUR_HOLYSHEEP_API_KEY" # パイプライン初期化 pipeline = ANNVectorPipeline(api_key) # サンプルデータ sample_documents = [ "深層学習を用いた画像認識技术的发展与应用", "自然言語処理におけるTransformerモデルの革新", "推薦システムにおける協調フィルタリングの実装方法", "クラウドネイティブアプリケーションのアーキテクチャ設計", "マイクロサービス間の通信とサービスメッシュ" ] * 1000 # 5000ドキュメント try: # インデックス初期化 stats = pipeline.initialize_index(sample_documents) print(f"Initialization complete: {json.dumps(stats, indent=2)}") # セマンティック検索テスト query = "機械学習とAIの最新技術" results = pipeline.semantic_search(query, top_k=5) print(f"\nSearch results for: '{query}'") for r in results: print(f" [{r['rank']}] Score: {r['score']:.4f} - {r['text'][:60]}...") except requests.exceptions.ConnectionError as e: print(f"Connection Error: {e}") print("Please check your internet connection and API key.") except requests.exceptions.Timeout as e: print(f"Timeout Error: {e}") print("The request took too long. Try again or check network conditions.") except ValueError as e: print(f"Validation Error: {e}") print("Please check your input data format.")

パフォーマンス最適化Tips

よくあるエラーと対処法

1. ConnectionError: 401 Unauthorized

# 問題: APIキーが無効または期限切れ

原因:

- 環境変数HOLYSHEEP_API_KEYが正しく設定されていない

- APIキーが無効化されている

- スペルミス("HOLYSHEEP_API_KEY" vs "HOLYSHEEP_APIKEY")

解決方法:

import os

正しい環境変数名の確認

API_KEY = os.environ.get("HOLYSHEEP_API_KEY", "") if not API_KEY or API_KEY == "YOUR_HOLYSHEEP_API_KEY": raise ConnectionError( "Invalid API Key. Please set HOLYSHEEP_API_KEY environment variable.\n" "Register at: https://www.holysheep.ai/register" )

または直接指定(開発時のみ)

client = HolySheepEmbeddings(api_key="sk-holysheep-xxxxxxxxxxxx")

2. ConnectionError: 429 Too Many Requests(レート制限)

# 問題: リクエスト数が上限を超えた

原因:

- 短時間に大量のAPI呼び出しを行った

- プランのクォータに達した

解決方法: リトライロジックと指数バックオフを実装

import time