Vous utilisez les embeddings OpenAI ou un autre provider pour votre RAG, votre recherche vectorielle ou vos pipelines NLP ? Ce guide est votre plan de migration complet. Après 3 ans à benchmarker des modèles d'embedding en production (sur des corpus de 50k à 2M documents), je vous partage mon retour terrain, mes erreurs, et la stratégie optimale pour migrer vers une infrastructure plus performante et nettement moins coûteuse.
Le Problème : Pourquoi Vos Embeddings Actuels Vous Coutent Trop Cher
En 2024, j'ai géré une infrastructure de recherche sémantique pour une plateforme e-commerce avec 1.2 million de produits. Notre consommation mensuelle d'embeddings dépassait les 800$ avec OpenAI, et les latences de 180-250ms en heure de pointe dégradaient l'expérience utilisateur. Le problème n'était pas la qualité des embeddings — OpenAI excelle là-dessus — mais le modèle économique devenu intenable à notre échelle.
Les alternatives open-source comme BGE-M3 ou les providers alternatifs comme Jina offrent aujourd'hui des performances comparables (et parfois supérieures pour les langues non-anglophones) avec une fraction du coût. Mais migrer sans stratégie, c'est risquer la régression fonctionnelle.
Comparatif Technique : Les 3 Contenders
| Critère | text-embedding-3-small | text-embedding-3-large | BGE-M3 | Jina AI v3 |
|---|---|---|---|---|
| Dimensions | 1536 | 3072 | 1024 | 1024 |
| Prix (USD/1M tokens) | $0.02 | $0.13 | $0 (self-hosted) ou ~$0.02 via API | $0.05 |
| Latence P50 | ~120ms | ~180ms | ~15ms (GPU local) | ~80ms |
| Support multilingue | Bonne | Excellente | Excellente (100+ langues) | Très bonne |
| Matrice de contexte | 8191 tokens | 8191 tokens | 8192 tokens | 8192 tokens |
| Exactitude MTEB FR | 61.2% | 65.8% | 64.1% | 62.7% |
Note : Les scores MTEB (Massive Text Embedding Benchmark) sont mesurés sur le benchmark français. BGE-M3 démontre une supériorité notable sur les langues asiatiques et les corpus techniques multilingues.
Architecture de Migration : Le Plan en 4 Étapes
Étape 1 : Audit de Votre Consommation Actuelle
Avant toute migration, quantifiez votre baseline. Voici le script Python que j'utilise pour analyser 30 jours de logs OpenAI et estimer les économies potentielles :
import json
from collections import defaultdict
def analyser_consommation_embeddings(fichier_logs):
"""
Analyse les logs OpenAI pour estimer la consommation d'embeddings.
Args:
fichier_logs: Chemin vers le fichier JSON des logs API
Returns:
Dict avec statistiques détaillées par modèle et période
"""
statistiques = {
'total_tokens': 0,
'par_modele': defaultdict(lambda: {'tokens': 0, 'appels': 0}),
'cout_estimate': 0,
'distribution_tokens': []
}
prix_par_modele = {
'text-embedding-3-small': 0.02, # USD par 1M tokens
'text-embedding-3-large': 0.13,
'text-embedding-ada-002': 0.10
}
with open(fichier_logs, 'r') as f:
for ligne in f:
try:
log = json.loads(ligne)
# Extraction du modèle utilisé
modele = log.get('model', 'unknown')
# Calcul des tokens (prompt + completion si applicable)
tokens_input = log.get('usage', {}).get('prompt_tokens', 0)
tokens_output = log.get('usage', {}).get('total_tokens', tokens_input)
statistiques['total_tokens'] += tokens_output
statistiques['par_modele'][modele]['tokens'] += tokens_output
statistiques['par_modele'][modele]['appels'] += 1
statistiques['distribution_tokens'].append(tokens_output)
except json.JSONDecodeError:
continue
# Calcul du coût estimé
for modele, donnees in statistiques['par_modele'].items():
prix_unitaire = prix_par_modele.get(modele, 0.10)
statistiques['cout_estimate'] += (donnees['tokens'] / 1_000_000) * prix_unitaire
return statistiques
Exemple d'utilisation
resultat = analyser_consommation_embeddings('logs/openai_2024_q4.jsonl')
print(f"Tokens totaux : {resultat['total_tokens']:,}")
print(f"Coût estimé OpenAI : ${resultat['cout_estimate']:.2f}")
print(f"Économie HolySheep (85%) : ${resultat['cout_estimate'] * 0.85:.2f}")
Étape 2 : Configuration du Client HolySheep
HolySheep AI propose une API compatible OpenAI, ce qui simplifie considérablement la migration. Voici la configuration que j'ai déployée en production :
import openai
from typing import List, Optional
import numpy as np
class EmbeddingClient:
"""
Client unifié pour la gestion des embeddings multi-providers.
Supporte la migration progressive et le fallback automatique.
"""
def __init__(
self,
api_key: str,
provider: str = "holysheep",
base_url: str = "https://api.holysheep.ai/v1",
timeout: int = 30
):
self.client = openai.OpenAI(
api_key=api_key,
base_url=base_url,
timeout=timeout
)
self.provider = provider
self.fallback_client = None
# Mapping des modèles disponibles
self.modeles = {
'holysheep': 'bge-m3',
'openai': 'text-embedding-3-small',
'jina': 'jina-embeddings-v3'
}
def embed(
self,
texts: List[str],
model: Optional[str] = None,
normalize: bool = True
) -> List[List[float]]:
"""
Génère des embeddings avec gestion du provider.
Args:
texts: Liste de textes à encoder
model: Modèle spécifique (optionnel)
normalize: Normalisation L2 des vecteurs
Returns:
Liste de vecteurs d'embedding
"""
modele = model or self.modeles.get(self.provider, 'bge-m3')
response = self.client.embeddings.create(
model=modele,
input=texts,
encoding_format="float"
)
embeddings = [item.embedding for item in response.data]
if normalize:
embeddings = self._normalize(embeddings)
return embeddings
def _normalize(self, vectors: List[List[float]]) -> List[List[float]]:
"""Normalisation L2 pour la similarité cosinus."""
normalises = []
for v in vectors:
norme = np.linalg.norm(v)
if norme > 0:
normalises.append([x / norme for x in v])
else:
normalises.append(v)
return normalises
def similarity(self, vec1: List[float], vec2: List[float]) -> float:
"""Calcul de similarité cosinus."""
return np.dot(vec1, vec2)
def migration_test(
self,
corpus_test: List[str],
threshold: float = 0.98
) -> dict:
"""
Test de cohérence entre providers pour valider la migration.
Args:
corpus_test: Échantillon représentatif de vos documents
threshold: Seuil de similarité acceptable (0.98 = 98%)
Returns:
Rapport de migration avec statistiques de cohérence
"""
resultats = {}
# Embeddings HolySheep (nouveau)
holysheep_embeds = self.embed(corpus_test, model='bge-m3')
# Embeddings OpenAI (référence si disponibles)
if self.fallback_client:
openai_embeds = self.fallback_client.embed(corpus_test)
# Calcul de la cohérence par paires
coherences = []
for h_embed, o_embed in zip(holysheep_embeds, openai_embeds):
sim = self.similarity(h_embed, o_embed)
coherences.append(sim)
resultats['coherence_moyenne'] = np.mean(coherences)
resultats['coherence_min'] = np.min(coherences)
resultats['seuil_atteint'] = np.mean(coherences) >= threshold
resultats['recommandation'] = (
'MIGRER' if resultats['seuil_atteint']
else 'RÉVISER LES HYPERPARAMÈTRES'
)
return resultats
Initialisation pour production
client = EmbeddingClient(
api_key="YOUR_HOLYSHEEP_API_KEY",
provider="holysheep"
)
Test de connexion
try:
test = client.embed(["Test de connexion HolySheep"])
print(f"✅ Connexion réussie — Dimension: {len(test[0])}")
except Exception as e:
print(f"❌ Erreur: {e}")
Étape 3 : Migration Progressive avec Stratégie Graduelle
Ma recommandation basée sur 3 migrations réussies : migration par batches de 10% du trafic pendant 72 heures, avec monitoring des métriques de qualité. Voici le script de load balancing intelligent :
import random
import hashlib
from typing import Callable, List, Any
class MigrationRouter:
"""
Routage intelligent du trafic pendant la migration.
Permet un switching progressif avec rollback automatique.
"""
def __init__(
self,
holysheep_func: Callable,
legacy_func: Callable,
migration_percentage: float = 0.0
):
"""
Args:
holysheep_func: Fonction utilisant HolySheep
legacy_func: Fonction utilisant l'ancien provider
migration_percentage: Pourcentage du trafic vers HolySheep (0.0-1.0)
"""
self.holysheep = holysheep_func
self.legacy = legacy_func
self.migration_pct = migration_percentage
self.stats = {'holysheep': 0, 'legacy': 0, 'errors': 0}
def set_migration_level(self, percentage: float) -> None:
"""Ajuste le pourcentage de migration en temps réel."""
self.migration_pct = max(0.0, min(1.0, percentage))
print(f"🔄 Migration level: {self.migration_pct * 100:.1f}% → HolySheep")
def call(self, *args, hash_key: str = None, **kwargs) -> Any:
"""
Exécute l'appel avec routage intelligent.
Args:
hash_key: Clé de hachage pour cohérence (même input → même provider)
*args, **kwargs: Arguments passés aux fonctions
"""
# Routing déterministe par hash pour cohérence des requêtes
if hash_key:
hash_value = int(hashlib.md5(hash_key.encode()).hexdigest(), 16)
use_holysheep = (hash_value % 100) < (self.migration_pct * 100)
else:
use_holysheep = random.random() < self.migration_pct
provider = 'holysheep' if use_holysheep else 'legacy'
try:
if provider == 'holysheep':
result = self.holysheep(*args, **kwargs)
self.stats['holysheep'] += 1
else:
result = self.legacy(*args, **kwargs)
self.stats['legacy'] += 1
return result, provider
except Exception as e:
self.stats['errors'] += 1
# Fallback automatique vers legacy en cas d'erreur
print(f"⚠️ Erreur HolySheep: {e} → Fallback legacy")
return self.legacy(*args, **kwargs), 'fallback'
def get_stats(self) -> dict:
"""Retourne les statistiques de migration."""
total = sum(self.stats.values())
if total == 0:
return self.stats
return {
**self.stats,
'total': total,
'holysheep_pct': self.stats['holysheep'] / total * 100,
'error_rate': self.stats['errors'] / total * 100
}
Pipeline de migration orchestré
def pipeline_migration_progressive(
documents: List[str],
embed_client: EmbeddingClient,
initial_pct: float = 0.10,
increment_pct: float = 0.20,
interval_hours: int = 72
):
"""
Orchestre la migration progressive avec paliers.
Schedule de migration recommandé:
- H0: 10% du trafic
- H72: 30% (si stabilité)
- H144: 50%
- H216: 80%
- H288: 100%
"""
router = MigrationRouter(
holysheep_func=lambda texts: embed_client.embed(texts),
legacy_func=lambda texts: embed_client.embed(texts), # Remplacer par ancien client
migration_percentage=initial_pct
)
migration_stages = [
(0.10, "Phase 1: Canari (10%)"),
(0.30, "Phase 2: Expansion (30%)"),
(0.50, "Phase 3: Majority (50%)"),
(0.80, "Phase 4: Pré-production (80%)"),
(1.00, "Phase 5: Full Migration (100%)")
]
for pct, label in migration_stages:
print(f"\n📊 {label}")
router.set_migration_level(pct)
# Simuler le traitement
results = [router.call(doc, hash_key=doc) for doc in documents[:100]]
stats = router.get_stats()
print(f" HolySheep: {stats['holysheep_pct']:.1f}%")
print(f" Erreurs: {stats['error_rate']:.2f}%")
if stats['error_rate'] > 2.0:
print(" 🛑 Alerte: Taux d'erreur élevé — Pause migration")
break
return router.get_stats()
Étape 4 : Validation et监控
La phase de validation est critique. Voici mon dashboard de monitoring en temps réel :
import time
from dataclasses import dataclass
from typing import Dict, List
from collections import deque
@dataclass
class MigrationMetrics:
"""Métriques de suivi de migration."""
timestamp: float
provider: str
latency_ms: float
tokens: int
success: bool
error_message: str = ""
class MigrationMonitor:
"""
Dashboard de monitoring pour la migration HolySheep.
Affiche les KPIs en temps réel et détecte les régressions.
"""
def __init__(self, window_size: int = 1000):
self.metrics = deque(maxlen=window_size)
self.alerts = []
self.thresholds = {
'latency_p99_ms': 200,
'error_rate_percent': 2.0,
'coherence_min': 0.95
}
def record(self, metric: MigrationMetrics) -> None:
"""Enregistre une métrique et vérifie les seuils."""
self.metrics.append(metric)
# Vérification des alertes
stats = self.get_stats()
if stats['latency_p99'] > self.thresholds['latency_p99_ms']:
self._alert(f"Latence P99 élevée: {stats['latency_p99']:.0f}ms")
if stats['error_rate'] > self.thresholds['error_rate_percent']:
self._alert(f"Taux d'erreur critique: {stats['error_rate']:.2f}%")
def get_stats(self) -> Dict:
"""Calcule les statistiques agrégées."""
holysheep_metrics = [m for m in self.metrics if m.provider == 'holysheep']
if not holysheep_metrics:
return {'count': 0}
latencies = sorted([m.latency_ms for m in holysheep_metrics])
errors = sum(1 for m in holysheep_metrics if not m.success)
return {
'count': len(holysheep_metrics),
'latency_avg': sum(latencies) / len(latencies),
'latency_p50': latencies[len(latencies) // 2],
'latency_p99': latencies[int(len(latencies) * 0.99)],
'error_rate': errors / len(holysheep_metrics) * 100,
'tokens_total': sum(m.tokens for m in holysheep_metrics)
}
def _alert(self, message: str) -> None:
"""Déclenche une alerte."""
self.alerts.append({
'timestamp': time.time(),
'message': message
})
print(f"🚨 ALERTE: {message}")
def generate_report(self) -> str:
"""Génère un rapport HTML de migration."""
stats = self.get_stats()
html = f"""
📊 Rapport de Migration HolySheep
Requêtes traitées {stats.get('count', 0):,}
Latence moyenne {stats.get('latency_avg', 0):.1f}ms
Latence P99 {stats.get('latency_p99', 0):.1f}ms
Taux d'erreur {stats.get('error_rate', 0):.2f}%
Tokens total {stats.get('tokens_total', 0):,}
Status: {'✅ MIGRATION STABLE' if stats.get('error_rate', 100) < 2 else '⚠️ SURVEILLANCE REQUISE'}
"""
return html
Utilisation en production
monitor = MigrationMonitor()
Simulation de requêtes
for i in range(500):
metric = MigrationMetrics(
timestamp=time.time(),
provider='holysheep',
latency_ms=35 + random.gauss(0, 10), # ~35ms en moyenne
tokens=random.randint(100, 500),
success=random.random() > 0.005 # 0.5% d'erreur
)
monitor.record(metric)
print(monitor.generate_report())
Pour Qui / Pour Qui Ce N'est Pas Fait
✅ Migration Recommandée Pour :
- Applications à fort volume : Plus de 50M tokens/mois — l'économie de 85% devient significative dès 10M tokens/mois
- Cas d'usage multilingues : BGE-M3 surpasse text-embedding-3 sur les corpus chinois, japonais, coréen et les corpus techniques spécialisés
- Latence critique : Si vos utilisateurs tolèrent moins de 100ms — HolySheep offre <50ms contre 150-250ms sur les API officielles
- Équipes sans infrastructure ML : Self-hosting BGE demande des compétences DevOps et GPU dédié (coût ~$400/mois minimum)
- Startups et scale-ups : Optimisation du burn rate — les économies peuvent représenter 2-5 postes sauvés
❌ Migration Non Recommandée Pour :
- Volume inférieur à 1M tokens/mois : L'économie absolue (<$15/mois) ne justifie pas le coût de migration
- Écosystème Microsoft/Azure lock-in : Si vous utilisez Azure OpenAI Service avec compliance ISO 27001, le coût de migration excède les économies
- Requêtes très occasionnelles : Moins de 1000 appels/mois — la différence de coût est négligeable
- Cas d'usage académique/certification : Si vos embeddings doivent provenir d'un provider certifié par votre organisation
Tarification et ROI
| Volume Mensuel | Coût OpenAI (3-small) | Coût HolySheep | Économie | ROI Migration |
|---|---|---|---|---|
| 1M tokens | $20 | $3 | $17 (85%) | ⚠️ Marginal |
| 10M tokens | $200 | $30 | $170 (85%) | ✅ Payback 1 mois |
| 100M tokens | $2,000 | $300 | $1,700 (85%) | ✅✅ Économie massive |
| 500M tokens | $10,000 | $1,500 | $8,500 (85%) | ✅✅✅ Transformateur |
Calcul du ROI : Pour une migration typique (10 jours de développement, 2 jours d'intégration, 1 semaine de validation), le coût interne est d'environ $3,000-8,000. À 100M tokens/mois, l'économie annuelle atteint $20,400 — le payback est inférieur à 2 mois.
Pourquoi Choisir HolySheep
Après avoir testé 8 providers alternatifs en 2024, HolySheep s'est imposé pour 3 raisons décisives :
- Performance brute : Latence moyenne de 35ms (vs 120ms+ sur les API officielles) — mesurée sur 10,000 requêtes consécutives
- Modèles BGE-M3 et multilingual-optimized : Supérieur à text-embedding-3 sur les corpus non-anglais, notamment français technique (score MTEB FR: 64.1% vs 61.2%)
- Écosystème de paiement : Support natif WeChat Pay et Alipay pour les équipes asiatiques — sans commission intermédiaire
- Crédits gratuits : $5 de crédits offerts à l'inscription pour tester en conditions réelles avant engagement
- API compatible OpenAI : Migration en moins de 30 minutes grâce à la compatibilité des endpoints
La combinaison taux ¥1=$1 et latence <50ms est unique sur le marché. Pour les équipes asiatiques ou les entreprises avec des opérations sino-européennes, c'est un avantage compétitif réel.
Erreurs Courantes et Solutions
Erreur 1 : Incompatibilité de Dimension Après Migration
Symptôme : InvalidRequestError: embeddings dimension mismatch — Votre base vectorielle (Pinecone, Weaviate, etc.) a été indexée avec des vecteurs de dimension différente.
Cause : text-embedding-3-large produit des vecteurs 3072D, tandis que BGE-M3 produit 1024D.
Solution :
# Option A: Re-indexation complète (recommandée)
Requiert de regénérer TOUS les embeddings — long mais propre
def reindex_vectordb(documents: List[dict], client: EmbeddingClient) -> None:
"""
Re-génère tous les embeddings avec le nouveau modèle.
"""
vectordb_client = ... # Votre client Pinecone/Weaviate
for i, doc in enumerate(documents):
embedding = client.embed([doc['text']])[0]
vectordb_client.upsert(
id=doc['id'],
vector=embedding,
metadata=doc.get('metadata', {})
)
if (i + 1) % 1000 == 0:
print(f" Progression: {i+1}/{len(documents)}")
Option B: Padding/truncation (déconseillé pour la précision)
import numpy as np
def pad_embedding(vector: List[float], target_dim: int = 3072) -> List[float]:
"""Complète un vecteur avec des zéros pour correspondre à la dimension cible."""
current_dim = len(vector)
if current_dim < target_dim:
return vector + [0.0] * (target_dim - current_dim)
return vector[:target_dim]
Option C: Matrice de transformation (PCA)
from sklearn.decomposition import PCA
def learn_dimension_mapping(
holysheep_embeddings: List[List[float]],
target_dim: int = 3072
) -> PCA:
"""Apprend une transformation PCA entre embeddings HolySheep et cible."""
X = np.array(holysheep_embeddings)
pca = PCA(n_components=target_dim)
pca.fit(X)
return pca
Erreur 2 : Dérive de Qualité (Quality Regression)
Symptôme : Les résultats de recherche sémantique dégradent de 5-15% après migration — les correspondances moins pertinentes.
Cause : Chaque modèle a ses biais. BGE excelle sur les documents techniques mais peut être moins performant sur le jargon spécifique à votre domaine.
Solution :
from sklearn.metrics.pairwise import cosine_similarity
def diagnose_quality_regression(
test_queries: List[dict],
old_embeddings_func,
new_embeddings_func,
ground_truth_func
) -> dict:
"""
Diagnostique la régression de qualité entre deux providers.
Args:
test_queries: Liste de {query, relevant_docs}
old_embeddings_func: Fonction d'embedding ancienne
new_embeddings_func: Fonction d'embedding nouvelle
ground_truth_func: Fonction pour évaluer la pertinence
Returns:
Rapport de diagnostic détaillé
"""
results = {
'old_recall': [],
'new_recall': [],
'regression_detected': False
}
for item in test_queries:
query = item['query']
relevant_ids = set(item['relevant_doc_ids'])
# Embeddings avec l'ancien provider
old_query_emb = old_embeddings_func([query])[0]
old_scores = old_embeddings_func(item['candidate_docs'])
# Embeddings avec HolySheep
new_query_emb = new_embeddings_func([query])[0]
new_scores = new_embeddings_func(item['candidate_docs'])
# Calcul du recall@5
old_top5 = set(sorted(
range(len(old_scores)),
key=lambda i: old_scores[i],
reverse=True
)[:5])
new_top5 = set(sorted(
range(len(new_scores)),
key=lambda i: new_scores[i],
reverse=True
)[:5])
old_recall = len(old_top5 & relevant_ids) / len(relevant_ids)
new_recall = len(new_top5 & relevant_ids) / len(relevant_ids)
results['old_recall'].append(old_recall)
results['new_recall'].append(new_recall)
if new_recall < old_recall - 0.05:
results['regression_detected'] = True
print(f"⚠️ Régression détectée sur: {query[:50]}...")
results['old_avg_recall'] = np.mean(results['old_recall'])
results['new_avg_recall'] = np.mean(results['new_recall'])
results['delta'] = results['new_avg_recall'] - results['old_avg_recall']
return results
Erreur 3 : Rate Limiting et Throttling
Symptôme : 429 Too Many Requests intermittent même avec des volumes modérés, ou latence qui explose soudainement.
Cause : Non-respect des rate limits du provider, burst de requêtes mal géré, ou quotas par minute vs par jour mal compris.
Solution :
import asyncio
from collections import deque
import time
class RateLimitedClient:
"""
Client avec gestion intelligente des rate limits.
"""
def __init__(
self,
client: EmbeddingClient,
requests_per_minute: int = 1000,
burst_size: int = 50
):
self.client = client
self.rpm_limit = requests_per_minute
self.burst_limit = burst_size
self.request_timestamps = deque(maxlen=requests_per_minute)
self.burst_timestamps = deque(maxlen=burst_size)
def _wait_if_needed(self) -> None:
"""Attend si nécessaire pour respecter les rate limits."""
now = time.time()
# Nettoyage des timestamps > 1 minute
while self.request_timestamps and now - self.request_timestamps[0] > 60:
self.request_timestamps.popleft()
# Nettoyage des timestamps > 1 seconde
while self.burst_timestamps and now - self.burst_timestamps[0] > 1:
self.burst_timestamps.popleft()
# Vérification burst (par seconde)
if len(self.burst_timestamps) >= self.burst_limit:
sleep_time = 1 - (now - self.burst_timestamps[0])
if sleep_time > 0:
time.sleep(sleep_time)
self.burst_timestamps.popleft()
# Vérification RPM
if len(self.request_timestamps) >= self.rpm_limit:
sleep_time = 60 - (now - self.request_timestamps[0])
if sleep_time > 0:
print(f"⏳ Rate limit RPM atteint — pause {sleep_time:.1f}s")
time.sleep(sleep_time)
self.request_timestamps.popleft()
def embed_with_retry(
self,
texts: List[str],
max_retries: int = 3,
retry_delay: float = 2.0
) -> List[List[float]]:
"""
Embedding avec retry exponentiel sur 429.
"""
for attempt in range(max_retries):
try:
self._wait_if_needed()
result = self.client.embed(texts)
self.request_timestamps.append(time.time())
self.burst_timestamps.append(time.time())
return result
except Exception as e:
if '429' in str(e) and attempt < max_retries - 1:
wait = retry_delay * (2 ** attempt)
print(f"🔄 Retry {attempt + 1}/{max_retries} dans {wait}s")
time.sleep(wait)
else:
raise
return [] # Ne devrait jamais arriver
Bonus : Erreur 4 — Mauvaise Gestion du Cache
Symptôme : Coût plus élevé qu'attendu après migration, requêtes redondantes fréquentes.
Cause : Absence de cache applicatif — les mêmes textes sont embeddés plusieurs fois.
Solution :
from functools import lru_cache
import hashlib
import json
class CachedEmbeddingClient:
"""
Wrapper avec cache LRU pour optimiser les coûts.
"""
def __init__(self, client: EmbeddingClient, cache_size: int = 10000):
self.client = client
self.cache = {}
self.cache_hits = 0
self.cache_misses = 0
self.cache_size