En tant qu'architecte IA ayant déployé des systèmes RAG multimodaux en production pour des entreprises traitant des millions de documents daily, je peux affirmer que la combination de la génération augmentée par récupération avec des données visuelles, audio et textuelles représente l'évolution naturelle des systèmes d'intelligence artificielle conversationnelle. Aujourd'hui, je partage mon retour d'expérience complet sur l'implémentation d'un système Multi-Modal RAG robuste et performant.
Comprendre l'Architecture Multi-Modal RAG
Le Multi-Modal RAG extends le RAG traditionnel en permettant la récupération et la génération à partir de sources hétérogènes : images, documents PDF, vidéos, audio et texte. L'architecture se compose de trois couches distinctes qui doivent communiquer de manière synchronisée pour garantir des performances optimales en production.
Les Trois Piliers de l'Architecture
- Couche d'Indexation : Segmentation et embedding multi-modal avec modèles spécialisés (CLIP pour images, Whisper pour audio, BGE pour texte)
- Couche de Récupération : Vectorisation croisée et recherche approximative (ANN) avec filtrage sémantique
- Couche de Génération : Fusion contextuelle et synthèse multi-modale avec modèle de fondation
Implémentation Complète du Système
Configuration de l'Environnement
# installation_dependencies.sh
pip install numpy pandas scikit-learn
pip install torch torchvision transformers
pip install sentence-transformers pillow
pip install qdrant-client redis
pip install httpx aiohttp pydantic
pip install langchain langchain-community
Configuration des variables d'environnement
export HOLYSHEEP_API_KEY="YOUR_HOLYSHEEP_API_KEY"
export HOLYSHEEP_BASE_URL="https://api.holysheep.ai/v1"
export REDIS_HOST="localhost"
export REDIS_PORT="6379"
export QDRANT_HOST="localhost"
export QDRANT_PORT="6333"
echo "✅ Configuration Multi-Modal RAG initialisée"
Implémentation du Pipeline d'Indexation
# multi_modal_indexer.py
import asyncio
import hashlib
from dataclasses import dataclass
from typing import List, Dict, Optional, Union
from pathlib import Path
import httpx
from PIL import Image
import pandas as pd
from sentence_transformers import SentenceTransformer
import torch
@dataclass
class DocumentChunk:
"""Représente un chunk de document multi-modal."""
chunk_id: str
content: Union[str, bytes] # texte ou données binaires (image)
modality: str # 'text', 'image', 'table', 'chart'
metadata: Dict
embedding: Optional[List[float]] = None
class MultiModalIndexer:
"""Indexeur multimodal pour documents hétérogènes."""
def __init__(self, api_key: str, base_url: str = "https://api.holysheep.ai/v1"):
self.api_key = api_key
self.base_url = base_url
self.text_model = SentenceTransformer('BAAI/bge-m3')
self.image_model = SentenceTransformer('clip-ViT-B-32')
self.client = httpx.AsyncClient(timeout=30.0)
async def process_document(self, file_path: Path) -> List[DocumentChunk]:
"""Traite un document et génère des chunks indexables."""
chunks = []
if file_path.suffix.lower() in ['.pdf', '.docx', '.txt']:
chunks = await self._process_text_document(file_path)
elif file_path.suffix.lower() in ['.jpg', '.jpeg', '.png', '.gif']:
chunks = await self._process_image(file_path)
elif file_path.suffix.lower() in ['.csv', '.xlsx']:
chunks = await self._process_table(file_path)
# Génération des embeddings
for chunk in chunks:
chunk.embedding = await self._generate_embedding(chunk)
return chunks
async def _process_text_document(self, file_path: Path) -> List[DocumentChunk]:
"""Segmentation intelligente de documents textuels."""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Stratégie: chunks de 512 tokens avec overlap de 128
chunks = []
chunk_size = 512
overlap = 128
words = content.split()
for i in range(0, len(words), chunk_size - overlap):
chunk_words = words[i:i + chunk_size]
chunk_content = ' '.join(chunk_words)
chunk = DocumentChunk(
chunk_id=self._generate_chunk_id(chunk_content),
content=chunk_content,
modality='text',
metadata={
'source': str(file_path),
'position': i,
'total_chunks': len(words) // chunk_size
}
)
chunks.append(chunk)
return chunks
async def _process_image(self, file_path: Path) -> List[DocumentChunk]:
"""Extraction et description d'images avec modèle de vision."""
image = Image.open(file_path).convert('RGB')
# Embedding visuel
image_embedding = self.image_model.encode(image)
# Description via API HolySheep pour contexte
with open(file_path, 'rb') as f:
image_base64 = base64.b64encode(f.read()).decode()
prompt = "Décris cette image en détail pour indexation RAG."
response = await self._call_vision_api(image_base64, prompt)
chunk = DocumentChunk(
chunk_id=self._generate_chunk_id(file_path.stem),
content=response['description'],
modality='image',
metadata={
'source': str(file_path),
'dimensions': image.size,
'visual_embedding': image_embedding.tolist()
},
embedding=image_embedding.tolist()
)
return [chunk]
async def _process_table(self, file_path: Path) -> List[DocumentChunk]:
"""Traitement de tableaux avec préservation de structure."""
if file_path.suffix == '.csv':
df = pd.read_csv(file_path)
else:
df = pd.read_excel(file_path)
# Un chunk par ligne avec contexte tabulaire
chunks = []
for idx, row in df.iterrows():
chunk_content = f"Tableau: {file_path.name}\nLigne {idx}: {row.to_dict()}"
chunk = DocumentChunk(
chunk_id=self._generate_chunk_id(f"{file_path}_{idx}"),
content=chunk_content,
modality='table',
metadata={
'source': str(file_path),
'row_index': idx,
'columns': list(df.columns)
}
)
chunks.append(chunk)
return chunks
async def _generate_embedding(self, chunk: DocumentChunk) -> List[float]:
"""Génère l'embedding optimal selon la modalité."""
if chunk.modality == 'text':
return self.text_model.encode(chunk.content).tolist()
elif chunk.modality == 'image':
return self.image_model.encode(chunk.content).tolist()
else:
return self.text_model.encode(chunk.content).tolist()
async def _call_vision_api(self, image_base64: str, prompt: str) -> Dict:
"""Appel à l'API de vision HolySheep pour analyse d'images."""
payload = {
"model": "gpt-4.1", # Modèle performant pour analyse visuelle
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}}
]
}
],
"temperature": 0.3
}
response = await self.client.post(
f"{self.base_url}/chat/completions",
json=payload,
headers={"Authorization": f"Bearer {self.api_key}"}
)
response.raise_for_status()
return response.json()
def _generate_chunk_id(self, content: str) -> str:
"""Génère un ID unique pour le chunk."""
return hashlib.sha256(content.encode()).hexdigest()[:16]
Point d'entrée pour indexation batch
async def index_documents_batch(document_paths: List[Path], api_key: str):
"""Indexe un lot de documents avec gestion de concurrence."""
indexer = MultiModalIndexer(api_key)
semaphore = asyncio.Semaphore(5) # Limite à 5 documents simultanés
async def process_with_limit(path):
async with semaphore:
chunks = await indexer.process_document(path)
print(f"✅ Indexé: {path.name} ({len(chunks)} chunks)")
return chunks
all_chunks = await asyncio.gather(*[process_with_limit(p) for p in document_paths])
return [chunk for doc_chunks in all_chunks for chunk in doc_chunks]
Système de Récupération Hybride
# hybrid_retriever.py
import asyncio
from typing import List, Tuple, Optional, Dict
from dataclasses import dataclass
import numpy as np
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import redis.asyncio as redis
@dataclass
class RetrievalResult:
"""Résultat de récupération avec score de confiance."""
chunk_id: str
content: str
modality: str
score: float
metadata: Dict
class HybridRetriever:
"""Récupérateur hybride combinant recherche vectorielle et sémantique."""
def __init__(
self,
qdrant_host: str = "localhost",
qdrant_port: int = 6333,
redis_host: str = "localhost",
redis_port: int = 6379,
embedding_dim: int = 1024 # BGE-m3 embedding dimension
):
self.qdrant = QdrantClient(host=qdrant_host, port=qdrant_port)
self.redis = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
self.embedding_dim = embedding_dim
# Initialisation de la collection
self._ensure_collection_exists()
def _ensure_collection_exists(self, collection_name: str = "multimodal_rag"):
"""Crée la collection si elle n'existe pas."""
collections = self.qdrant.get_collections().collections
if collection_name not in [c.name for c in collections]:
self.qdrant.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=self.embedding_dim, distance=Distance.COSINE)
)
print(f"✅ Collection '{collection_name}' créée")
async def retrieve(
self,
query: str,
query_image: Optional[bytes] = None,
collection_name: str = "multimodal_rag",
top_k: int = 10,
rerank: bool = True,
filter_modality: Optional[List[str]] = None
) -> List[RetrievalResult]:
"""
Récupération hybride : texte + image avec reranking.
Args:
query: Requête textuelle
query_image: Image optionnelle pour recherche visuelle
top_k: Nombre de résultats à retourner
rerank: Active le reranking Cross-Encoder
filter_modality: Filtre sur les modalités ('text', 'image', 'table')
"""
# Génération de l'embedding de requête
query_embedding = await self._get_query_embedding(query, query_image)
# Recherche vectorielle ANN avec filtrage
search_results = self.qdrant.search(
collection_name=collection_name,
query_vector=query_embedding,
limit=top_k * 3, # Sur-récupération pour reranking
query_filter=self._build_filter(filter_modality)
)
# Reranking si activé
if rerank and search_results:
results = await self._rerank_results(query, search_results, top_k)
else:
results = [self._to_retrieval_result(r) for r in search_results[:top_k]]
# Cache des résultats fréquents
await self._cache_results(query, results)
return results
async def _get_query_embedding(
self,
query: str,
query_image: Optional[bytes] = None
) -> List[float]:
"""Génère l'embedding de requête (texte ou image)."""
from sentence_transformers import SentenceTransformer
if query_image:
model = SentenceTransformer('clip-ViT-B-32')
from PIL import Image
import io
img = Image.open(io.BytesIO(query_image))
embedding = model.encode(img)
else:
model = SentenceTransformer('BAAI/bge-m3')
embedding = model.encode(query)
return embedding.tolist()
def _build_filter(self, modalities: Optional[List[str]]) -> Optional[Dict]:
"""Construit le filtre Qdrant pour les modalités."""
if not modalities:
return None
return {"must": [{"key": "modality", "match": {"any": modalities}}]}
async def _rerank_results(
self,
query: str,
search_results: List,
top_k: int
) -> List[RetrievalResult]:
"""Reranking avec Cross-Encoder pour améliorer la pertinence."""
from sentence_transformers import CrossEncoder
# Cross-Encoder pour réordonnancement
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
pairs = [(query, r.payload['content']) for r in search_results]
scores = cross_encoder.predict(pairs)
# Combinaison score ANN + score Cross-Encoder
combined_scores = []
for i, result in enumerate(search_results):
ann_score = result.score
rerank_score = float(scores[i])
# Pondération: 40% ANN, 60% Cross-Encoder
combined = 0.4 * ann_score + 0.6 * rerank_score
combined_scores.append((combined, result))
# Tri par score combiné
combined_scores.sort(key=lambda x: x[0], reverse=True)
return [self._to_retrieval_result(r, score) for score, r in combined_scores[:top_k]]
def _to_retrieval_result(self, qdrant_result, override_score=None) -> RetrievalResult:
"""Convertit un résultat Qdrant en RetrievalResult."""
return RetrievalResult(
chunk_id=qdrant_result.id,
content=qdrant_result.payload['content'],
modality=qdrant_result.payload['modality'],
score=override_score if override_score else qdrant_result.score,
metadata=qdrant_result.payload.get('metadata', {})
)
async def _cache_results(self, query: str, results: List[RetrievalResult]):
"""Cache les résultats pour requêtes fréquentes."""
cache_key = f"rag:query:{hashlib.md5(query.encode()).hexdigest()}"
cache_data = json.dumps([
{"chunk_id": r.chunk_id, "score": r.score} for r in results
])
await self.redis.setex(cache_key, 300, cache_data) # TTL 5 minutes
async def get_from_cache(self, query: str) -> Optional[List[Dict]]:
"""Récupère les résultats depuis le cache Redis."""
cache_key = f"rag:query:{hashlib.md5(query.encode()).hexdigest()}"
cached = await self.redis.get(cache_key)
if cached:
return json.loads(cached)
return None
import hashlib
import json
Génération Multi-Modale avec HolySheep
Pour la génération, j'utilise l'API HolySheep qui offre des latences inferiores à 50ms et des tarifs compétitifs. Avec un taux de ¥1 = $1 USD, les économies sont significativas : $0.42 par million de tokens avec DeepSeek V3.2 contre $8+ avec GPT-4.1 sur d'autres fournisseurs.
# multimodal_generator.py
import httpx
import json
from typing import List, Optional, Dict
from dataclasses import dataclass
from enum import Enum
class ModelChoice(Enum):
"""Modèles disponibles avec leurs coûts 2026."""
GPT_41 = {"id": "gpt-4.1", "price_per_mtok": 8.00, "strength": "reasoning"}
CLAUDE_SONNET = {"id": "claude-sonnet-4.5", "price_per_mtok": 15.00, "strength": "writing"}
GEMINI_FLASH = {"id": "gemini-2.5-flash", "price_per_mtok": 2.50, "strength": "speed"}
DEEPSEEK = {"id": "deepseek-v3.2", "price_per_mtok": 0.42, "strength": "cost"}
@dataclass
class GenerationResponse:
"""Réponse structurée du générateur."""
content: str
model_used: str
tokens_used: int
latency_ms: float
cost_usd: float
class MultiModalGenerator:
"""Générateur multimodal utilisant l'API HolySheep."""
def __init__(
self,
api_key: str,
base_url: str = "https://api.holysheep.ai/v1",
default_model: ModelChoice = ModelChoice.GPT_41
):
self.api_key = api_key
self.base_url = base_url
self.default_model = default_model
self.client = httpx.AsyncClient(timeout=60.0)
async def generate_response(
self,
query: str,
context_chunks: List[RetrievalResult],
model: Optional[ModelChoice] = None,
system_prompt: Optional[str] = None,
stream: bool = False
) -> GenerationResponse:
"""
Génère une réponse en utilisant le contexte multimodal récupéré.
Optimisation des coûts : utilise DeepSeek pour requêtes simples,
GPT-4.1 pour tâches complexes de raisonnement.
"""
import time
start_time = time.time()
model = model or self.default_model
# Construction du prompt avec contexte multi-modal
context_text = self._build_context_prompt(context_chunks)
messages = [
{
"role": "system",
"content": system_prompt or self._get_default_system_prompt()
},
{
"role": "user",
"content": f"""Contexte récupéré (multi-modal):
{context_text}
Question: {query}
Répondez en utilisant le contexte fourni. Si le contexte est insuffisant, indiquez-le clairement."""
}
]
# Analyse d'image si présente dans le contexte
images_content = self._extract_images_from_context(context_chunks)
if images_content:
messages[1]["content"] = [
{"type": "text", "text": messages[1]["content"]},
*images_content
]
payload = {
"model": model.value["id"],
"messages": messages,
"temperature": 0.7,
"max_tokens": 2048,
"stream": stream
}
response = await self.client.post(
f"{self.base_url}/chat/completions",
json=payload,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
)
response.raise_for_status()
result = response.json()
latency_ms = (time.time() - start_time) * 1000
tokens_used = result.get("usage", {}).get("total_tokens", 0)
cost_usd = (tokens_used / 1_000_000) * model.value["price_per_mtok"]
return GenerationResponse(
content=result["choices"][0]["message"]["content"],
model_used=model.value["id"],
tokens_used=tokens_used,
latency_ms=latency_ms,
cost_usd=cost_usd
)
def _build_context_prompt(self, chunks: List[RetrievalResult]) -> str:
"""Construit le prompt de contexte avec métadonnées."""
context_parts = []
for i, chunk in enumerate(chunks, 1):
modality_emoji = {
'text': '📄',
'image': '🖼️',
'table': '📊',
'chart': '📈',
'audio': '🎧'
}.get(chunk.modality, '📄')
context_parts.append(
f"[Source {i}] {modality_emoji} ({chunk.modality.upper()}) "
f"- Score: {chunk.score:.3f}\n{chunk.content[:500]}..."
)
return "\n\n".join(context_parts)
def _extract_images_from_context(
self,
chunks: List[RetrievalResult]
) -> List[Dict]:
"""Extrait les images du contexte pour les inclure dans le prompt."""
images = []
for chunk in chunks:
if