Das Problem, das wir alle kennen
Stellen Sie sich folgendes Szenario vor: Es ist Freitagabend, 23:47 Uhr. Ihr Produktions-System läuft seit Monaten stabil mit Embeddings der Version "text-embedding-3-small". Plötzlich meldet Ihr Monitoring einen Drift in den Suchergebnissen. Der Grund: Der API-Anbieter hat stillschweigend die Modelldimension von 1536 auf 384 geändert, um Kosten zu sparen. Ihre Vektor-Datenbank erwartet jedoch weiterhin 1536-Dimensionale Vektoren. Das Ergebnis ist eine flaschenartige Exception, die Ihre gesamte semantische Suche lahmlegt. Sie stehen vor der Wahl: sofortiges Re-Indexing von Millionen Dokumenten (Stunden bis Tage) oder das System offline nehmen.
Genau dieses Dilemma hat mich vor achtzehn Monaten dazu inspiriert, eine robuste Architektur für versionierte Embeddings zu entwickeln. In diesem Tutorial zeige ich Ihnen, wie Sie solche Szenarien vermeiden und Ihr System zukunftssicher machen.
Warum Modellversionen zum Albtraum werden können
Embedding-Modelle entwickeln sich rasant weiter. Anbieter veröffentlichen regelmäßig verbesserte Versionen mit besseren Repräsentationen, niedrigeren Kosten oder reduzierter Latenz. Das Problem: Selbst kleine Änderungen in der Dimension oder im Trainingsprozess führen dazu, dass alte Vektoren nicht mehr kompatibel mit neuen Modellen sind. Wenn Sie zehn Millionen Dokumente indiziert haben, ist ein vollständiges Re-Indexing keine triviale Angelegenheit.
Die Lösung liegt in einer cleveren Abstraktionsschicht, die ich "Version-Aware Embedding Proxy" nenne. Diese Schicht ermöglicht es Ihnen, Modelle zu wechseln, ohne Ihre bestehenden Indizes zu berühren. Ich werde Ihnen drei bewährte Strategien vorstellen, die ich in Produktionsumgebungen getestet habe.
Strategie 1: Der Dimensions-Normalisierer
Die eleganteste Lösung für Dimensionsinkompatibilitäten ist die Projektion. Dabei werden Vektoren mit höherer Dimensionalität auf niedrigere Dimensionen projiziert, ohne dass ein vollständiges Re-Indexing erforderlich ist.
"""
Embedding Version Manager mit Dimensionsprojektion
Kompatibel mit HolySheep AI API
"""
import numpy as np
from typing import List, Optional, Dict
import hashlib
class EmbeddingVersionManager:
"""Verwaltet mehrere Embedding-Versionen mit automatischer Projektion"""
SUPPORTED_VERSIONS = {
"text-embedding-3-small": 1536,
"text-embedding-3-large": 3072,
"holysheep-embed-v2": 1024, # HolySheep Standard
"holysheep-embed-v3": 2048 # HolySheep High-Precision
}
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.cache: Dict[str, np.ndarray] = {}
self.projection_matrices: Dict[str, np.ndarray] = {}
def _get_projection_matrix(
self,
source_dim: int,
target_dim: int
) -> np.ndarray:
"""Erstellt eine orthogonale Projektionsmatrix"""
cache_key = f"{source_dim}-{target_dim}"
if cache_key in self.projection_matrices:
return self.projection_matrices[cache_key]
# Generiere deterministische Projektion basierend auf Seed
np.random.seed(hash(cache_key) % (2**32))
projection = np.random.randn(source_dim, target_dim)
# QR-Zerlegung für orthogonale Projektion
Q, _ = np.linalg.qr(projection)
self.projection_matrices[cache_key] = Q / np.sqrt(target_dim)
return self.projection_matrices[cache_key]
def project_embedding(
self,
embedding: np.ndarray,
target_dim: int
) -> np.ndarray:
"""Proiziert einen Vektor auf die Zieldimension"""
source_dim = len(embedding)
if source_dim == target_dim:
return embedding
projection = self._get_projection_matrix(source_dim, target_dim)
return embedding @ projection
def embed_with_version(
self,
texts: List[str],
model_version: str = "holysheep-embed-v3"
) -> np.ndarray:
"""Erstellt Embeddings mit automatischem Dimensionsmanagement"""
import requests
# Bestimme Zieldimension basierend auf Metadaten im Index
target_dim = 1024 # Annahme: Index nutzt 1024-Dim
# API-Call zu HolySheep
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"input": texts,
"model": model_version,
"encoding_format": "float"
}
response = requests.post(
f"{self.base_url}/embeddings",
headers=headers,
json=payload,
timeout=30
)
if response.status_code == 200:
data = response.json()
embeddings = np.array([item["embedding"] for item in data["data"]])
# Automatische Projektion wenn Dimensionen nicht übereinstimmen
if embeddings.shape[1] != target_dim:
embeddings = np.array([
self.project_embedding(e, target_dim)
for e in embeddings
])
return embeddings
raise Exception(f"Embedding API Fehler: {response.status_code}")
Beispielnutzung
manager = EmbeddingVersionManager(api_key="YOUR_HOLYSHEEP_API_KEY")
embeddings = manager.embed_with_version(
["Hallo Welt", "Maschinelles Lernen ist faszinierend"],
model_version="holysheep-embed-v3"
)
print(f"Embedding-Shape: {embeddings.shape}") # Komppatibel mit 1024-Dim Index
Dieser Ansatz bietet Ihnen volle Flexibilität. Sie können jederzeit auf leistungsfähigere Modelle umsteigen, ohne Ihre bestehenden Indizes zu berühren. Die Projektion ist deterministisch und reproduzierbar, was für Konsistenz in verteilten Systemen sorgt.
Strategie 2: Hybrid-Index mit Metadaten-Tagging
Eine weitere bewährte Methode ist die Verwendung von Metadaten, um verschiedene Embedding-Versionen im selben Index zu verwalten. Diese Technik nutze ich besonders gerne bei schrittweisen Migrationen.
"""
Vector Store Adapter für versionierte Embeddings
"""
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
import numpy as np
@dataclass
class EmbeddingMetadata:
"""Metadaten für versionierte Embedding-Einträge"""
model_version: str
original_dim: int
indexed_dim: int
created_at: str
project_id: str = "default"
@dataclass
class VectorDocument:
"""Dokument mit versionierten Embeddings"""
id: str
content: str
vector: np.ndarray
metadata: EmbeddingMetadata
chunk_index: int = 0
class VersionAwareVectorStore:
"""
Verwaltet mehrere Embedding-Versionen im selben Index
"""
def __init__(self, target_dimension: int = 1024):
self.target_dimension = target_dimension
self.documents: Dict[str, VectorDocument] = {}
self.version_stats: Dict[str, int] = {}
def add_document(
self,
content: str,
vector: np.ndarray,
model_version: str,
doc_id: Optional[str] = None
) -> str:
"""Fügt ein Dokument mit Versionsinformationen hinzu"""
if doc_id is None:
doc_id = hashlib.md5(content.encode()).hexdigest()
metadata = EmbeddingMetadata(
model_version=model_version,
original_dim=len(vector),
indexed_dim=self.target_dimension,
created_at=datetime.now().isoformat()
)
# Normalisiere Dimension falls nötig
normalized_vector = self._normalize_dimension(vector)
document = VectorDocument(
id=doc_id,
content=content,
vector=normalized_vector,
metadata=metadata
)
self.documents[doc_id] = document
self.version_stats[model_version] = self.version_stats.get(model_version, 0) + 1
return doc_id
def _normalize_dimension(self, vector: np.ndarray) -> np.ndarray:
"""Normalisiert Vektordimension auf Zieldimension"""
if len(vector) == self.target_dimension:
return vector
if len(vector) > self.target_dimension:
# Projektion mit Qualitätserhalt
return self._project_with_pca(vector, self.target_dimension)
else:
# Padding mit Nullen (nicht ideal, aber funktional)
padded = np.zeros(self.target_dimension)
padded[:len(vector)] = vector
return padded
def _project_with_pca(self, vector: np.ndarray, target: int) -> np.ndarray:
"""PCA-basierte Projektion für Dimensionsreduktion"""
# Vereinfachte PCA-Projektion
from numpy.linalg import svd
v = vector[:target] # Lineare Projektion
norm = np.linalg.norm(v)
return v / norm * np.sqrt(target) # Reskalierung
def search(
self,
query_vector: np.ndarray,
top_k: int = 10,
version_filter: Optional[str] = None
) -> List[Tuple[str, float]]:
"""
Ähnlichkeitssuche mit optionaler Versionsfilterung
Args:
query_vector: Der Suchvektor
top_k: Anzahl der Ergebnisse
version_filter: Optionaler Filter nach Modellversion
Returns:
Liste von (doc_id, similarity_score) Tuples
"""
query_normalized = self._normalize_dimension(query_vector)
results = []
for doc_id, doc in self.documents.items():
if version_filter and doc.metadata.model_version != version_filter:
continue
similarity = self._cosine_similarity(query_normalized, doc.vector)
results.append((doc_id, similarity))
# Sortiere nach Ähnlichkeit und gebe Top-k zurück
results.sort(key=lambda x: x[1], reverse=True)
return results[:top_k]
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
"""Berechnet Kosinus-Ähnlichkeit zwischen zwei Vektoren"""
dot_product = np.dot(a, b)
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
return dot_product / (norm_a * norm_b)
def get_version_stats(self) -> Dict[str, int]:
"""Gibt Statistiken über die Modellversionen im Index"""
return self.version_stats.copy()
def migrate_version(
self,
doc_ids: List[str],
new_model_version: str,
new_embeddings: np.ndarray
) -> int:
"""
Migriert spezifische Dokumente zu neuer Version
Returns:
Anzahl der migrierten Dokumente
"""
migrated = 0
for i, doc_id in enumerate(doc_ids):
if doc_id in self.documents:
doc = self.documents[doc_id]
old_version = doc.metadata.model_version
doc.vector = self._normalize_dimension(new_embeddings[i])
doc.metadata = EmbeddingMetadata(
model_version=new_model_version,
original_dim=len(new_embeddings[i]),
indexed_dim=self.target_dimension,
created_at=datetime.now().isoformat()
)
# Statistiken aktualisieren
self.version_stats[old_version] -= 1
self.version_stats[new_model_version] = \
self.version_stats.get(new_model_version, 0) + 1
migrated += 1
return migrated
import hashlib
from datetime import datetime
Beispielnutzung
store = VersionAwareVectorStore(target_dimension=1024)
Alte Dokumente mit Version v2
old_vectors = np.random.randn(3, 2048)
for i, content in enumerate(["Artikel 1", "Artikel 2", "Artikel 3"]):
store.add_document(content, old_vectors[i], "holysheep-embed-v2")
Neue Dokumente mit Version v3
new_vectors = np.random.randn(2, 1024)
for i, content in enumerate(["Artikel 4", "Artikel 5"]):
store.add_document(content, new_vectors[i], "holysheep-embed-v3")
print(f"Version-Statistiken: {store.get_version_stats()}")
Ausgabe: {'holysheep-embed-v2': 3, 'holysheep-embed-v3': 2}
Diese Architektur ermöglicht Ihnengranulare Kontrolle über die Migration. Sie können beispielsweise zunächst neue Dokumente mit dem aktuellen Modell indexieren und ältere Dokumente schrittweise migrieren, je nach Abfragehäufigkeit oder Geschäftswert.
Strategie 3: Livemigration mit Read-Through-Caching
Die dritte Strategie kombiniert beide Ansätze und fügt einen intelligenten Cache hinzu, der die Migration für den Endbenutzer unsichtbar macht. Dies ist besonders nützlich bei großskaligen Systemen.
"""
Livemigration mit automatischer Cache-Auffüllung
"""
import asyncio
import hashlib
from collections import OrderedDict
from typing import List, Dict, Callable, Awaitable
import numpy as np
class MigrationCache:
"""
LRU-Cache mit automaticcher Versionskonvertierung
"""
def __init__(self, max_size: int = 10000, target_dim: int = 1024):
self.cache: OrderedDict[str, np.ndarray] = OrderedDict()
self.max_size = max_size
self.target_dim = target_dim
self.hits = 0
self.misses = 0
self.conversions = 0
def _generate_key(self, text: str, version: str) -> str:
"""Generiert eindeutigen Cache-Key"""
content = f"{version}:{text}"
return hashlib.sha256(content.encode()).hexdigest()[:32]
def get(
self,
text: str,
version: str,
converter: Callable[[np.ndarray, int], np.ndarray]
) -> np.ndarray:
"""Holt Embedding aus Cache oder erstellt es"""
key = self._generate_key(text, version)
if key in self.cache:
self.hits += 1
# Move to end (most recently used)
self.cache.move_to_end(key)
return self.cache[key]
self.misses += 1
raise KeyError(f"Cache Miss für Key: {key[:8]}...")
def put(
self,
text: str,
version: str,
embedding: np.ndarray,
converter: Callable[[np.ndarray, int], np.ndarray]
):
"""Speichert Embedding im Cache mit automatischer Konvertierung"""
key = self._generate_key(text, version)
# Konvertiere wenn nötig
if len(embedding) != self.target_dim:
embedding = converter(embedding, self.target_dim)
self.conversions += 1
# LRU-Eviction falls nötig
if len(self.cache) >= self.max_size:
self.cache.popitem(last=False)
self.cache[key] = embedding
self.cache.move_to_end(key)
def get_stats(self) -> Dict[str, float]:
"""Gibt Cache-Statistiken zurück"""
total = self.hits + self.misses
hit_rate = self.hits / total if total > 0 else 0
return {
"hits": self.hits,
"misses": self.misses,
"hit_rate": hit_rate,
"conversions": self.conversions
}
class AsyncEmbeddingMigrator:
"""
Asynchroner Migrator für Livemigration ohne Ausfallzeit
"""
def __init__(
self,
api_key: str,
source_version: str,
target_version: str,
target_dim: int = 1024
):
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.source_version = source_version
self.target_version = target_version
self.target_dim = target_dim
self.cache = MigrationCache(max_size=50000, target_dim=target_dim)
self.migration_queue: asyncio.Queue = asyncio.Queue()
self.migration_progress = 0
self.migration_total = 0
async def embed_texts_async(
self,
texts: List[str],
version: str = None
) -> np.ndarray:
"""
Asynchrones Embedding mit automatischem Version-Handling
"""
import aiohttp
if version is None:
version = self.target_version
# Prüfe Cache zuerst
cached_results = []
uncached_indices = []
uncached_texts = []
for i, text in enumerate(texts):
try:
cached = self.cache.get(text, version, self._convert_dimension)
cached_results.append((i, cached))
except KeyError:
uncached_indices.append(i)
uncached_texts.append(text)
# Hole fehlende Embeddings asynchron
if uncached_texts:
new_embeddings = await self._fetch_embeddings_async(
uncached_texts,
version
)
# Cache auffüllen und Ergebnisse sammeln
for idx, (text, embedding) in zip(uncached_indices, new_embeddings):
self.cache.put(text, version, embedding, self._convert_dimension)
cached_results.append((idx, embedding))
# Sortiere nach Original-Index und extrahiere Vektoren
cached_results.sort(key=lambda x: x[0])
return np.array([emb for _, emb in cached_results])
async def _fetch_embeddings_async(
self,
texts: List[str],
version: str
) -> List[np.ndarray]:
"""Holt Embeddings von HolySheep API"""
import aiohttp
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"input": texts,
"model": version,
"encoding_format": "float"
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}/embeddings",
headers=headers,
json=payload,
timeout=aiohttp.ClientTimeout(total=60)
) as response:
if response.status == 200:
data = await response.json()
return [np.array(item["embedding"]) for item in data["data"]]
else:
raise Exception(f"API Error: {response.status}")
def _convert_dimension(
self,
embedding: np.ndarray,
target: int
) -> np.ndarray:
"""Konvertiert Embedding-Dimension"""
if len(embedding) == target:
return embedding
# Lineare Projektion mit Reskalierung
if len(embedding) > target:
projected = embedding[:target]
else:
projected = np.pad(embedding, (0, target - len(embedding)))
return projected / np.linalg.norm(projected) * np.sqrt(target)
async def migrate_batch(
self,
texts: List[str],
batch_size: int = 100,
progress_callback: Callable[[float], Awaitable[None]] = None
):
"""
Migriert einen Batch von Dokumenten zur neuen Version
Args:
texts: Liste der zu migrierenden Texte
batch_size: Anzahl pro Batch
progress_callback: Async Callback für Fortschrittsanzeige
"""
self.migration_total = len(texts)
self.migration_progress = 0
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
# Hole Quellembeddings falls noch nicht gecached
try:
await self.embed_texts_async(batch, self.source_version)
except Exception:
pass # Ignoriere fehlende Quellversion
# Hole Zielembeddings (oder erstelle sie)
await self.embed_texts_async(batch, self.target_version)
self.migration_progress += len(batch)
if progress_callback:
progress = self.migration_progress / self.migration_total
await progress_callback(progress)
async def get
Verwandte Ressourcen
Verwandte Artikel