Das Problem, das wir alle kennen

Stellen Sie sich folgendes Szenario vor: Es ist Freitagabend, 23:47 Uhr. Ihr Produktions-System läuft seit Monaten stabil mit Embeddings der Version "text-embedding-3-small". Plötzlich meldet Ihr Monitoring einen Drift in den Suchergebnissen. Der Grund: Der API-Anbieter hat stillschweigend die Modelldimension von 1536 auf 384 geändert, um Kosten zu sparen. Ihre Vektor-Datenbank erwartet jedoch weiterhin 1536-Dimensionale Vektoren. Das Ergebnis ist eine flaschenartige Exception, die Ihre gesamte semantische Suche lahmlegt. Sie stehen vor der Wahl: sofortiges Re-Indexing von Millionen Dokumenten (Stunden bis Tage) oder das System offline nehmen. Genau dieses Dilemma hat mich vor achtzehn Monaten dazu inspiriert, eine robuste Architektur für versionierte Embeddings zu entwickeln. In diesem Tutorial zeige ich Ihnen, wie Sie solche Szenarien vermeiden und Ihr System zukunftssicher machen.

Warum Modellversionen zum Albtraum werden können

Embedding-Modelle entwickeln sich rasant weiter. Anbieter veröffentlichen regelmäßig verbesserte Versionen mit besseren Repräsentationen, niedrigeren Kosten oder reduzierter Latenz. Das Problem: Selbst kleine Änderungen in der Dimension oder im Trainingsprozess führen dazu, dass alte Vektoren nicht mehr kompatibel mit neuen Modellen sind. Wenn Sie zehn Millionen Dokumente indiziert haben, ist ein vollständiges Re-Indexing keine triviale Angelegenheit. Die Lösung liegt in einer cleveren Abstraktionsschicht, die ich "Version-Aware Embedding Proxy" nenne. Diese Schicht ermöglicht es Ihnen, Modelle zu wechseln, ohne Ihre bestehenden Indizes zu berühren. Ich werde Ihnen drei bewährte Strategien vorstellen, die ich in Produktionsumgebungen getestet habe.

Strategie 1: Der Dimensions-Normalisierer

Die eleganteste Lösung für Dimensionsinkompatibilitäten ist die Projektion. Dabei werden Vektoren mit höherer Dimensionalität auf niedrigere Dimensionen projiziert, ohne dass ein vollständiges Re-Indexing erforderlich ist.
"""
Embedding Version Manager mit Dimensionsprojektion
Kompatibel mit HolySheep AI API
"""

import numpy as np
from typing import List, Optional, Dict
import hashlib

class EmbeddingVersionManager:
    """Verwaltet mehrere Embedding-Versionen mit automatischer Projektion"""
    
    SUPPORTED_VERSIONS = {
        "text-embedding-3-small": 1536,
        "text-embedding-3-large": 3072,
        "holysheep-embed-v2": 1024,  # HolySheep Standard
        "holysheep-embed-v3": 2048   # HolySheep High-Precision
    }
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.holysheep.ai/v1"
        self.cache: Dict[str, np.ndarray] = {}
        self.projection_matrices: Dict[str, np.ndarray] = {}
    
    def _get_projection_matrix(
        self, 
        source_dim: int, 
        target_dim: int
    ) -> np.ndarray:
        """Erstellt eine orthogonale Projektionsmatrix"""
        cache_key = f"{source_dim}-{target_dim}"
        
        if cache_key in self.projection_matrices:
            return self.projection_matrices[cache_key]
        
        # Generiere deterministische Projektion basierend auf Seed
        np.random.seed(hash(cache_key) % (2**32))
        projection = np.random.randn(source_dim, target_dim)
        
        # QR-Zerlegung für orthogonale Projektion
        Q, _ = np.linalg.qr(projection)
        self.projection_matrices[cache_key] = Q / np.sqrt(target_dim)
        
        return self.projection_matrices[cache_key]
    
    def project_embedding(
        self, 
        embedding: np.ndarray, 
        target_dim: int
    ) -> np.ndarray:
        """Proiziert einen Vektor auf die Zieldimension"""
        source_dim = len(embedding)
        
        if source_dim == target_dim:
            return embedding
        
        projection = self._get_projection_matrix(source_dim, target_dim)
        return embedding @ projection
    
    def embed_with_version(
        self, 
        texts: List[str], 
        model_version: str = "holysheep-embed-v3"
    ) -> np.ndarray:
        """Erstellt Embeddings mit automatischem Dimensionsmanagement"""
        import requests
        
        # Bestimme Zieldimension basierend auf Metadaten im Index
        target_dim = 1024  # Annahme: Index nutzt 1024-Dim
        
        # API-Call zu HolySheep
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "input": texts,
            "model": model_version,
            "encoding_format": "float"
        }
        
        response = requests.post(
            f"{self.base_url}/embeddings",
            headers=headers,
            json=payload,
            timeout=30
        )
        
        if response.status_code == 200:
            data = response.json()
            embeddings = np.array([item["embedding"] for item in data["data"]])
            
            # Automatische Projektion wenn Dimensionen nicht übereinstimmen
            if embeddings.shape[1] != target_dim:
                embeddings = np.array([
                    self.project_embedding(e, target_dim) 
                    for e in embeddings
                ])
            
            return embeddings
        
        raise Exception(f"Embedding API Fehler: {response.status_code}")


Beispielnutzung

manager = EmbeddingVersionManager(api_key="YOUR_HOLYSHEEP_API_KEY") embeddings = manager.embed_with_version( ["Hallo Welt", "Maschinelles Lernen ist faszinierend"], model_version="holysheep-embed-v3" ) print(f"Embedding-Shape: {embeddings.shape}") # Komppatibel mit 1024-Dim Index
Dieser Ansatz bietet Ihnen volle Flexibilität. Sie können jederzeit auf leistungsfähigere Modelle umsteigen, ohne Ihre bestehenden Indizes zu berühren. Die Projektion ist deterministisch und reproduzierbar, was für Konsistenz in verteilten Systemen sorgt.

Strategie 2: Hybrid-Index mit Metadaten-Tagging

Eine weitere bewährte Methode ist die Verwendung von Metadaten, um verschiedene Embedding-Versionen im selben Index zu verwalten. Diese Technik nutze ich besonders gerne bei schrittweisen Migrationen.
"""
Vector Store Adapter für versionierte Embeddings
"""

from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
import numpy as np

@dataclass
class EmbeddingMetadata:
    """Metadaten für versionierte Embedding-Einträge"""
    model_version: str
    original_dim: int
    indexed_dim: int
    created_at: str
    project_id: str = "default"

@dataclass
class VectorDocument:
    """Dokument mit versionierten Embeddings"""
    id: str
    content: str
    vector: np.ndarray
    metadata: EmbeddingMetadata
    chunk_index: int = 0

class VersionAwareVectorStore:
    """
    Verwaltet mehrere Embedding-Versionen im selben Index
    """
    
    def __init__(self, target_dimension: int = 1024):
        self.target_dimension = target_dimension
        self.documents: Dict[str, VectorDocument] = {}
        self.version_stats: Dict[str, int] = {}
    
    def add_document(
        self, 
        content: str, 
        vector: np.ndarray,
        model_version: str,
        doc_id: Optional[str] = None
    ) -> str:
        """Fügt ein Dokument mit Versionsinformationen hinzu"""
        
        if doc_id is None:
            doc_id = hashlib.md5(content.encode()).hexdigest()
        
        metadata = EmbeddingMetadata(
            model_version=model_version,
            original_dim=len(vector),
            indexed_dim=self.target_dimension,
            created_at=datetime.now().isoformat()
        )
        
        # Normalisiere Dimension falls nötig
        normalized_vector = self._normalize_dimension(vector)
        
        document = VectorDocument(
            id=doc_id,
            content=content,
            vector=normalized_vector,
            metadata=metadata
        )
        
        self.documents[doc_id] = document
        self.version_stats[model_version] = self.version_stats.get(model_version, 0) + 1
        
        return doc_id
    
    def _normalize_dimension(self, vector: np.ndarray) -> np.ndarray:
        """Normalisiert Vektordimension auf Zieldimension"""
        if len(vector) == self.target_dimension:
            return vector
        
        if len(vector) > self.target_dimension:
            # Projektion mit Qualitätserhalt
            return self._project_with_pca(vector, self.target_dimension)
        else:
            # Padding mit Nullen (nicht ideal, aber funktional)
            padded = np.zeros(self.target_dimension)
            padded[:len(vector)] = vector
            return padded
    
    def _project_with_pca(self, vector: np.ndarray, target: int) -> np.ndarray:
        """PCA-basierte Projektion für Dimensionsreduktion"""
        # Vereinfachte PCA-Projektion
        from numpy.linalg import svd
        
        v = vector[:target]  # Lineare Projektion
        norm = np.linalg.norm(v)
        return v / norm * np.sqrt(target)  # Reskalierung
    
    def search(
        self, 
        query_vector: np.ndarray,
        top_k: int = 10,
        version_filter: Optional[str] = None
    ) -> List[Tuple[str, float]]:
        """
        Ähnlichkeitssuche mit optionaler Versionsfilterung
        
        Args:
            query_vector: Der Suchvektor
            top_k: Anzahl der Ergebnisse
            version_filter: Optionaler Filter nach Modellversion
        
        Returns:
            Liste von (doc_id, similarity_score) Tuples
        """
        query_normalized = self._normalize_dimension(query_vector)
        
        results = []
        for doc_id, doc in self.documents.items():
            if version_filter and doc.metadata.model_version != version_filter:
                continue
            
            similarity = self._cosine_similarity(query_normalized, doc.vector)
            results.append((doc_id, similarity))
        
        # Sortiere nach Ähnlichkeit und gebe Top-k zurück
        results.sort(key=lambda x: x[1], reverse=True)
        return results[:top_k]
    
    def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
        """Berechnet Kosinus-Ähnlichkeit zwischen zwei Vektoren"""
        dot_product = np.dot(a, b)
        norm_a = np.linalg.norm(a)
        norm_b = np.linalg.norm(b)
        return dot_product / (norm_a * norm_b)
    
    def get_version_stats(self) -> Dict[str, int]:
        """Gibt Statistiken über die Modellversionen im Index"""
        return self.version_stats.copy()
    
    def migrate_version(
        self, 
        doc_ids: List[str], 
        new_model_version: str,
        new_embeddings: np.ndarray
    ) -> int:
        """
        Migriert spezifische Dokumente zu neuer Version
        
        Returns:
            Anzahl der migrierten Dokumente
        """
        migrated = 0
        for i, doc_id in enumerate(doc_ids):
            if doc_id in self.documents:
                doc = self.documents[doc_id]
                old_version = doc.metadata.model_version
                
                doc.vector = self._normalize_dimension(new_embeddings[i])
                doc.metadata = EmbeddingMetadata(
                    model_version=new_model_version,
                    original_dim=len(new_embeddings[i]),
                    indexed_dim=self.target_dimension,
                    created_at=datetime.now().isoformat()
                )
                
                # Statistiken aktualisieren
                self.version_stats[old_version] -= 1
                self.version_stats[new_model_version] = \
                    self.version_stats.get(new_model_version, 0) + 1
                
                migrated += 1
        
        return migrated


import hashlib
from datetime import datetime

Beispielnutzung

store = VersionAwareVectorStore(target_dimension=1024)

Alte Dokumente mit Version v2

old_vectors = np.random.randn(3, 2048) for i, content in enumerate(["Artikel 1", "Artikel 2", "Artikel 3"]): store.add_document(content, old_vectors[i], "holysheep-embed-v2")

Neue Dokumente mit Version v3

new_vectors = np.random.randn(2, 1024) for i, content in enumerate(["Artikel 4", "Artikel 5"]): store.add_document(content, new_vectors[i], "holysheep-embed-v3") print(f"Version-Statistiken: {store.get_version_stats()}")

Ausgabe: {'holysheep-embed-v2': 3, 'holysheep-embed-v3': 2}

Diese Architektur ermöglicht Ihnengranulare Kontrolle über die Migration. Sie können beispielsweise zunächst neue Dokumente mit dem aktuellen Modell indexieren und ältere Dokumente schrittweise migrieren, je nach Abfragehäufigkeit oder Geschäftswert.

Strategie 3: Livemigration mit Read-Through-Caching

Die dritte Strategie kombiniert beide Ansätze und fügt einen intelligenten Cache hinzu, der die Migration für den Endbenutzer unsichtbar macht. Dies ist besonders nützlich bei großskaligen Systemen.
"""
Livemigration mit automatischer Cache-Auffüllung
"""

import asyncio
import hashlib
from collections import OrderedDict
from typing import List, Dict, Callable, Awaitable
import numpy as np

class MigrationCache:
    """
    LRU-Cache mit automaticcher Versionskonvertierung
    """
    
    def __init__(self, max_size: int = 10000, target_dim: int = 1024):
        self.cache: OrderedDict[str, np.ndarray] = OrderedDict()
        self.max_size = max_size
        self.target_dim = target_dim
        self.hits = 0
        self.misses = 0
        self.conversions = 0
    
    def _generate_key(self, text: str, version: str) -> str:
        """Generiert eindeutigen Cache-Key"""
        content = f"{version}:{text}"
        return hashlib.sha256(content.encode()).hexdigest()[:32]
    
    def get(
        self, 
        text: str, 
        version: str,
        converter: Callable[[np.ndarray, int], np.ndarray]
    ) -> np.ndarray:
        """Holt Embedding aus Cache oder erstellt es"""
        key = self._generate_key(text, version)
        
        if key in self.cache:
            self.hits += 1
            # Move to end (most recently used)
            self.cache.move_to_end(key)
            return self.cache[key]
        
        self.misses += 1
        raise KeyError(f"Cache Miss für Key: {key[:8]}...")
    
    def put(
        self, 
        text: str, 
        version: str, 
        embedding: np.ndarray,
        converter: Callable[[np.ndarray, int], np.ndarray]
    ):
        """Speichert Embedding im Cache mit automatischer Konvertierung"""
        key = self._generate_key(text, version)
        
        # Konvertiere wenn nötig
        if len(embedding) != self.target_dim:
            embedding = converter(embedding, self.target_dim)
            self.conversions += 1
        
        # LRU-Eviction falls nötig
        if len(self.cache) >= self.max_size:
            self.cache.popitem(last=False)
        
        self.cache[key] = embedding
        self.cache.move_to_end(key)
    
    def get_stats(self) -> Dict[str, float]:
        """Gibt Cache-Statistiken zurück"""
        total = self.hits + self.misses
        hit_rate = self.hits / total if total > 0 else 0
        return {
            "hits": self.hits,
            "misses": self.misses,
            "hit_rate": hit_rate,
            "conversions": self.conversions
        }


class AsyncEmbeddingMigrator:
    """
    Asynchroner Migrator für Livemigration ohne Ausfallzeit
    """
    
    def __init__(
        self,
        api_key: str,
        source_version: str,
        target_version: str,
        target_dim: int = 1024
    ):
        self.api_key = api_key
        self.base_url = "https://api.holysheep.ai/v1"
        self.source_version = source_version
        self.target_version = target_version
        self.target_dim = target_dim
        self.cache = MigrationCache(max_size=50000, target_dim=target_dim)
        self.migration_queue: asyncio.Queue = asyncio.Queue()
        self.migration_progress = 0
        self.migration_total = 0
    
    async def embed_texts_async(
        self,
        texts: List[str],
        version: str = None
    ) -> np.ndarray:
        """
        Asynchrones Embedding mit automatischem Version-Handling
        """
        import aiohttp
        
        if version is None:
            version = self.target_version
        
        # Prüfe Cache zuerst
        cached_results = []
        uncached_indices = []
        uncached_texts = []
        
        for i, text in enumerate(texts):
            try:
                cached = self.cache.get(text, version, self._convert_dimension)
                cached_results.append((i, cached))
            except KeyError:
                uncached_indices.append(i)
                uncached_texts.append(text)
        
        # Hole fehlende Embeddings asynchron
        if uncached_texts:
            new_embeddings = await self._fetch_embeddings_async(
                uncached_texts, 
                version
            )
            
            # Cache auffüllen und Ergebnisse sammeln
            for idx, (text, embedding) in zip(uncached_indices, new_embeddings):
                self.cache.put(text, version, embedding, self._convert_dimension)
                cached_results.append((idx, embedding))
        
        # Sortiere nach Original-Index und extrahiere Vektoren
        cached_results.sort(key=lambda x: x[0])
        return np.array([emb for _, emb in cached_results])
    
    async def _fetch_embeddings_async(
        self, 
        texts: List[str], 
        version: str
    ) -> List[np.ndarray]:
        """Holt Embeddings von HolySheep API"""
        import aiohttp
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "input": texts,
            "model": version,
            "encoding_format": "float"
        }
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{self.base_url}/embeddings",
                headers=headers,
                json=payload,
                timeout=aiohttp.ClientTimeout(total=60)
            ) as response:
                if response.status == 200:
                    data = await response.json()
                    return [np.array(item["embedding"]) for item in data["data"]]
                else:
                    raise Exception(f"API Error: {response.status}")
    
    def _convert_dimension(
        self, 
        embedding: np.ndarray, 
        target: int
    ) -> np.ndarray:
        """Konvertiert Embedding-Dimension"""
        if len(embedding) == target:
            return embedding
        
        # Lineare Projektion mit Reskalierung
        if len(embedding) > target:
            projected = embedding[:target]
        else:
            projected = np.pad(embedding, (0, target - len(embedding)))
        
        return projected / np.linalg.norm(projected) * np.sqrt(target)
    
    async def migrate_batch(
        self, 
        texts: List[str],
        batch_size: int = 100,
        progress_callback: Callable[[float], Awaitable[None]] = None
    ):
        """
        Migriert einen Batch von Dokumenten zur neuen Version
        
        Args:
            texts: Liste der zu migrierenden Texte
            batch_size: Anzahl pro Batch
            progress_callback: Async Callback für Fortschrittsanzeige
        """
        self.migration_total = len(texts)
        self.migration_progress = 0
        
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]
            
            # Hole Quellembeddings falls noch nicht gecached
            try:
                await self.embed_texts_async(batch, self.source_version)
            except Exception:
                pass  # Ignoriere fehlende Quellversion
            
            # Hole Zielembeddings (oder erstelle sie)
            await self.embed_texts_async(batch, self.target_version)
            
            self.migration_progress += len(batch)
            
            if progress_callback:
                progress = self.migration_progress / self.migration_total
                await progress_callback(progress)
    
    async def get