ในโลกของ AI ที่ก้าวหน้าอย่างรวดเร็ว การสร้างระบบที่สามารถเข้าใจทั้งข้อความ รูปภาพ และข้อมูลหลากหลายรูปแบบพร้อมกันไม่ใช่เรื่องยากอีกต่อไป Multi-Modal RAG (Retrieval-Augmented Generation) คือเทคโนโลยีที่รวมพลังของการค้นหาข้อมูลเชิงลึกเข้ากับ Generative AI อย่างมีประสิทธิภาพ

จากประสบการณ์การพัฒนาระบบ Production มากกว่า 3 ปี ผมพบว่า Multi-Modal RAG สามารถเพิ่มความแม่นยำของการตอบคำถามได้ถึง 40% เมื่อเทียบกับ Text-only RAG โดยเฉพาะเมื่อต้องทำงานกับเอกสารทางเทคนิคที่มีแผนภาพ ตาราง และภาพประกอบ

สถาปัตยกรรม Multi-Modal RAG

สถาปัตยกรรม Multi-Modal RAG แบ่งออกเป็น 4 ชั้นหลัก ได้แก่ Document Processing, Embedding Layer, Vector Store และ Generation Layer แต่ละชั้นมีบทบาทสำคัญในการทำให้ระบบทำงานได้อย่างมีประสิทธิภาพ

การติดตั้งและตั้งค่า Environment

ก่อนเริ่มต้นพัฒนา ต้องติดตั้ง dependencies ที่จำเป็นสำหรับระบบ Multi-Modal RAG

# สร้าง virtual environment
python -m venv mm_rag_env
source mm_rag_env/bin/activate

ติดตั้ง packages หลัก

pip install -q \ langchain \ langchain-community \ pymupdf \ pillow \ numpy \ faiss-cpu \ requests \ tiktoken

สำหรับ embeddings

pip install -q \ sentence-transformers \ transformers

สำหรับ image processing

pip install -q \ opencv-python \ torch \ torchvision

การติดตั้งถูกต้องจะใช้เวลาประมาณ 5-10 นาที ขึ้นอยู่กับความเร็วอินเทอร์เน็ต

การสร้าง Multi-Modal Document Processor

ขั้นตอนแรกคือการสร้าง processor ที่สามารถแยกทั้งข้อความและรูปภาพจากเอกสาร PDF ได้อย่างมีประสิทธิภาพ

import os
import base64
import requests
from io import BytesIO
from typing import List, Dict, Any
from dataclasses import dataclass
import fitz  # PyMuPDF
from PIL import Image
import numpy as np

@dataclass
class DocumentChunk:
    """โครงสร้างข้อมูลสำหรับเก็บ chunk ของเอกสาร"""
    content: str
    image_data: List[str]  # Base64 encoded images
    page_number: int
    chunk_id: str
    metadata: Dict[str, Any]

class MultiModalDocumentProcessor:
    """Processor สำหรับแยกข้อความและรูปภาพจากเอกสาร PDF"""
    
    def __init__(
        self,
        holysheep_api_key: str,
        base_url: str = "https://api.holysheep.ai/v1"
    ):
        self.api_key = holysheep_api_key
        self.base_url = base_url
        self.chunk_size = 512
        self.overlap = 50
    
    def extract_images_from_page(self, page: fitz.Page) -> List[str]:
        """แยกรูปภาพจากหน้า PDF และแปลงเป็น base64"""
        images = []
        image_list = page.get_images()
        
        for img_index, img in enumerate(image_list):
            xref = img[0]
            base_image = page.parent.extract_image(xref)
            image_bytes = base_image["image"]
            
            # แปลงเป็น base64
            image_base64 = base64.b64encode(image_bytes).decode('utf-8')
            images.append(image_base64)
        
        return images
    
    def extract_text_chunks(self, text: str) -> List[str]:
        """แบ่งข้อความเป็น chunks โดยมี overlap"""
        words = text.split()
        chunks = []
        
        for i in range(0, len(words), self.chunk_size - self.overlap):
            chunk = ' '.join(words[i:i + self.chunk_size])
            chunks.append(chunk)
            
            if i + self.chunk_size >= len(words):
                break
        
        return chunks
    
    def process_pdf(self, pdf_path: str) -> List[DocumentChunk]:
        """ประมวลผล PDF และสร้าง chunks"""
        chunks = []
        doc = fitz.open(pdf_path)
        
        for page_num in range(len(doc)):
            page = doc[page_num]
            text = page.get_text()
            
            # แยกรูปภาพจากหน้านี้
            images = self.extract_images_from_page(page)
            
            # แบ่งข้อความเป็น chunks
            text_chunks = self.extract_text_chunks(text)
            
            for idx, chunk_text in enumerate(text_chunks):
                chunk = DocumentChunk(
                    content=chunk_text,
                    image_data=images if images else [],
                    page_number=page_num + 1,
                    chunk_id=f"{os.path.basename(pdf_path)}_p{page_num+1}_c{idx}",
                    metadata={"source": pdf_path}
                )
                chunks.append(chunk)
        
        doc.close()
        return chunks
    
    def create_multimodal_embedding(
        self, 
        text: str, 
        images: List[str]
    ) -> List[float]:
        """สร้าง embedding รวมจาก text และ images"""
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": "multimodal-embedding-v1",
            "text": text,
            "images": images[:5]  # จำกัดจำนวนรูปต่อ request
        }
        
        response = requests.post(
            f"{self.base_url}/embeddings",
            headers=headers,
            json=payload,
            timeout=30
        )
        
        if response.status_code != 200:
            raise Exception(f"Embedding failed: {response.text}")
        
        return response.json()["data"][0]["embedding"]

ตัวอย่างการใช้งาน

processor = MultiModalDocumentProcessor( holysheep_api_key="YOUR_HOLYSHEEP_API_KEY" )

โค้ดนี้ใช้ PyMuPDF ในการแยกข้อมูลจาก PDF โดยรองรับทั้งข้อความและรูปภาพ และส่งไปสร้าง embedding ที่ สมัครที่นี่ เพื่อรับ API key ฟรี

การสร้าง Vector Store และ Retrieval System

หลังจากประมวลผลเอกสารแล้ว ต้องจัดเก็บ embeddings ใน Vector Store เพื่อให้ค้นหาได้อย่างรวดเร็ว

import faiss
import numpy as np
from typing import List, Tuple, Optional
import pickle
import os

class MultiModalVectorStore:
    """Vector store สำหรับ Multi-Modal RAG"""
    
    def __init__(self, dimension: int = 1536):
        self.dimension = dimension
        self.index = faiss.IndexFlatL2(dimension)
        self.chunks: List[DocumentChunk] = []
        self.id_to_chunk: Dict[int, DocumentChunk] = {}
        
        # สำหรับ metadata filtering
        self.metadata_index: Dict[str, List[int]] = {}
    
    def add_chunks(
        self, 
        chunks: List[DocumentChunk],
        embeddings: np.ndarray
    ) -> None:
        """เพิ่ม chunks และ embeddings เข้าสู่ store"""
        start_id = len(self.chunks)
        
        # Normalize embeddings สำหรับ cosine similarity
        normalized = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
        
        # Convert to float32
        normalized = normalized.astype('float32')
        
        # Add to FAISS index
        self.index.add(normalized)
        
        # Store chunks and metadata
        for i, chunk in enumerate(chunks):
            chunk_id = start_id + i
            self.chunks.append(chunk)
            self.id_to_chunk[chunk_id] = chunk
            
            # Index metadata
            source = chunk.metadata.get("source", "unknown")
            if source not in self.metadata_index:
                self.metadata_index[source] = []
            self.metadata_index[source].append(chunk_id)
    
    def search(
        self,
        query_embedding: np.ndarray,
        top_k: int = 5,
        filter_source: Optional[str] = None
    ) -> List[Tuple[DocumentChunk, float]]:
        """ค้นหา chunks ที่เกี่ยวข้องมากที่สุด"""
        
        # Normalize query
        query_norm = query_embedding / np.linalg.norm(query_embedding)
        query_norm = query_norm.astype('float32').reshape(1, -1)
        
        # Search in FAISS
        distances, indices = self.index.search(query_norm, top_k * 3)
        
        results = []
        for dist, idx in zip(distances[0], indices[0]):
            if idx == -1:
                continue
                
            chunk = self.id_to_chunk[idx]
            
            # Apply metadata filter
            if filter_source and chunk.metadata.get("source") != filter_source:
                continue
            
            similarity = 1 / (1 + dist)  # Convert L2 distance to similarity
            results.append((chunk, similarity))
            
            if len(results) >= top_k:
                break
        
        return results
    
    def save(self, path: str) -> None:
        """บันทึก index และ metadata"""
        os.makedirs(os.path.dirname(path), exist_ok=True)
        
        # Save FAISS index
        faiss.write_index(self.index, f"{path}.index")
        
        # Save chunks and metadata
        with open(f"{path}.pkl", "wb") as f:
            pickle.dump({
                "chunks": self.chunks,
                "id_to_chunk": self.id_to_chunk,
                "metadata_index": self.metadata_index
            }, f)
    
    @classmethod
    def load(cls, path: str) -> "MultiModalVectorStore":
        """โหลด index และ metadata"""
        instance = cls()
        
        # Load FAISS index
        instance.index = faiss.read_index(f"{path}.index")
        instance.dimension = instance.index.d
        
        # Load chunks and metadata
        with open(f"{path}.pkl", "rb") as f:
            data = pickle.load(f)
            instance.chunks = data["chunks"]
            instance.id_to_chunk = data["id_to_chunk"]
            instance.metadata_index = data["metadata_index"]
        
        return instance

ตัวอย่างการใช้งาน

vector_store = MultiModalVectorStore(dimension=1536)

การสร้าง RAG Pipeline และ Generation

ขั้นตอนสุดท้ายคือการรวมทุกอย่างเข้าด้วยกันเป็น RAG Pipeline ที่สมบูรณ์

import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Optional, Dict, Any

class MultiModalRAG:
    """Pipeline หลักสำหรับ Multi-Modal RAG"""
    
    def __init__(
        self,
        api_key: str,
        base_url: str = "https://api.holysheep.ai/v1",
        embedding_model: str = "multimodal-embedding-v1",
        generation_model: str = "gpt-4.1"
    ):
        self.api_key = api_key
        self.base_url = base_url
        self.embedding_model = embedding_model
        self.generation_model = generation_model
        self.vector_store: Optional[MultiModalVectorStore] = None
        self.executor = ThreadPoolExecutor(max_workers=4)
        
        # ตั้งค่า cache สำหรับ embeddings
        self._embedding_cache: Dict[str, List[float]] = {}
    
    def _get_embedding_sync(self, text: str, images: List[str]) -> List[float]:
        """สร้าง embedding แบบ synchronous"""
        cache_key = f"{text[:100]}_{len(images)}"
        
        if cache_key in self._embedding_cache:
            return self._embedding_cache[cache_key]
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": self.embedding_model,
            "input": {
                "text": text,
                "images": images[:3]  # Limit images per request
            }
        }
        
        response = requests.post(
            f"{self.base_url}/embeddings",
            headers=headers,
            json=payload,
            timeout=30
        )
        
        if response.status_code != 200:
            raise Exception(f"Embedding error: {response.text}")
        
        embedding = response.json()["data"][0]["embedding"]
        self._embedding_cache[cache_key] = embedding
        
        return embedding
    
    def generate_response(
        self,
        query: str,
        retrieved_chunks: List[DocumentChunk],
        context_images: List[str],
        conversation_history: Optional[List[Dict]] = None
    ) -> str:
        """สร้างคำตอบจาก query และ context"""
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        # สร้าง context string
        context_parts = []
        for i, chunk in enumerate(retrieved_chunks[:3]):
            context_parts.append(f"[Document {i+1} - Page {chunk.page_number}]\n{chunk.content}")
        
        context = "\n\n".join(context_parts)
        
        # สร้าง system prompt
        system_prompt = """คุณเป็นผู้ช่วย AI ที่เชี่ยวชาญในการตอบคำถามจากเอกสาร
ใช้ข้อมูลจาก context ที่ให้มาในการตอบคำถาม
หากต้องอ้างอิงถึงรูปภาพ ให้ระบุว่า "[ดูรูปที่ X]"
ตอบเป็นภาษาไทยเสมอ"""
        
        messages = [{"role": "system", "content": system_prompt}]
        
        # เพิ่ม conversation history
        if conversation_history:
            messages.extend(conversation_history[-5:])
        
        # เพิ่ม context และ query
        user_content = f"""Context:
{context}

Question: {query}"""
        
        # เพิ่ม images ถ้ามี
        if context_images:
            user_content += f"\n\n[มีรูปภาพประกอบ {len(context_images)} รูป]"
        
        messages.append({"role": "user", "content": user_content})
        
        payload = {
            "model": self.generation_model,
            "messages": messages,
            "temperature": 0.3,
            "max_tokens": 2000
        }
        
        response = requests.post(
            f"{self.base_url}/chat/completions",
            headers=headers,
            json=payload,
            timeout=60
        )
        
        if response.status_code != 200:
            raise Exception(f"Generation error: {response.text}")
        
        return response.json()["choices"][0]["message"]["content"]
    
    async def query(
        self,
        query