บทความนี้จะพาท่านสร้างระบบ RAG (Retrieval-Augmented Generation) ตั้งแต่เริ่มต้นจนถึงระดับ Production โดยครอบคลุมทุกขั้นตอนตั้งแต่การ parse เอกสาร การแปลงเป็น vector การค้นหาแบบ semantic ไปจนถึงการ generate คำตอบด้วย LLM พร้อมโค้ดที่พร้อมใช้งานจริงและ benchmark ประสิทธิภาพ

สถาปัตยกรรมโดยรวมของระบบ RAG

ก่อนลงมือเขียนโค้ด มาทำความเข้าใจสถาปัตยกรรมของระบบ RAG กันก่อน ระบบ RAG ประกอบด้วย 4 ส่วนหลักที่เชื่อมต่อกัน:

สำหรับ API LLM เราจะใช้ HolySheep AI ซึ่งมีความเร็วต่ำกว่า 50ms และราคาประหยัดกว่า 85% เมื่อเทียบกับ OpenAI

การติดตั้ง Dependencies และ Configuration

pip install langchain langchain-community langchain-huggingface
pip install unstructured pypdf python-docx python-pptx
pip install faiss-cpu sentence-transformers pydantic
pip install httpx tiktoken
# config.py
import os
from typing import Literal

HolySheep AI Configuration

HOLYSHEEP_API_KEY = os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY") HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1"

Model Configuration

EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" LLM_MODEL = "gpt-4.1" # หรือ deepseek-v3.2 สำหรับประหยัดต้นทุน

Vector Store Configuration

INDEX_TYPE: Literal["faiss", "chroma", "milvus"] = "faiss" EMBEDDING_DIMENSION = 384

Chunking Configuration

CHUNK_SIZE = 512 CHUNK_OVERLAP = 64

Retrieval Configuration

TOP_K = 5 SIMILARITY_THRESHOLD = 0.7

Document Parsing Pipeline — รองรับทุกรูปแบบไฟล์

ขั้นตอนแรกคือการ parse เอกสารให้เป็น plain text ที่สามารถ process ต่อได้ ระบบนี้รองรับ PDF, DOCX, PPTX, Markdown และ HTML

# document_parser.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Optional
import re

@dataclass
class Document:
    """โครงสร้างข้อมูลสำหรับเอกสารที่ถูก parse แล้ว"""
    page_content: str
    metadata: dict
    
    def __repr__(self):
        preview = self.page_content[:100].replace('\n', ' ')
        return f"Document(content='{preview}...', metadata={self.metadata})"


class BaseDocumentParser(ABC):
    """Abstract base class สำหรับ document parsers"""
    
    @abstractmethod
    def parse(self, file_path: str) -> List[Document]:
        pass
    
    def clean_text(self, text: str) -> str:
        """ทำความสะอาด text ด้วยกฎพื้นฐาน"""
        text = re.sub(r'\s+', ' ', text)  # ลบ whitespaces ซ้ำ
        text = re.sub(r'[^\w\s\u0E00-\u0E7F.,!?()-]', '', text)  # รองรับภาษาไทย
        return text.strip()


class PDFParser(BaseDocumentParser):
    """Parser สำหรับไฟล์ PDF"""
    
    def __init__(self):
        from pypdf import PdfReader
        self.reader_class = PdfReader
    
    def parse(self, file_path: str) -> List[Document]:
        reader = self.reader_class(file_path)
        documents = []
        
        for page_num, page in enumerate(reader.pages):
            text = page.extract_text()
            if text:
                text = self.clean_text(text)
                documents.append(Document(
                    page_content=text,
                    metadata={
                        "source": file_path,
                        "page": page_num + 1,
                        "total_pages": len(reader.pages),
                        "file_type": "pdf"
                    }
                ))
        
        return documents


class DocxParser(BaseDocumentParser):
    """Parser สำหรับไฟล์ Word (.docx)"""
    
    def parse(self, file_path: str) -> List[Document]:
        from docx import Document as DocxDocument
        doc = DocxDocument(file_path)
        documents = []
        
        full_text = []
        for para in doc.paragraphs:
            if para.text.strip():
                full_text.append(para.text)
        
        # รวม paragraphs เป็น sections
        content = '\n'.join(full_text)
        content = self.clean_text(content)
        
        documents.append(Document(
            page_content=content,
            metadata={
                "source": file_path,
                "file_type": "docx",
                "paragraph_count": len(full_text)
            }
        ))
        
        return documents


class MarkdownParser(BaseDocumentParser):
    """Parser สำหรับไฟล์ Markdown"""
    
    def parse(self, file_path: str) -> List[Document]:
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        # แบ่งตาม headers
        sections = re.split(r'\n(?=#)', content)
        documents = []
        
        for i, section in enumerate(sections):
            if section.strip():
                documents.append(Document(
                    page_content=self.clean_text(section),
                    metadata={
                        "source": file_path,
                        "section": i,
                        "file_type": "markdown"
                    }
                ))
        
        return documents


class DocumentParserFactory:
    """Factory สำหรับสร้าง parser ตามประเภทไฟล์"""
    
    _parsers = {
        '.pdf': PDFParser,
        '.docx': DocxParser,
        '.doc': DocxParser,  # ใช้ docx parser เดียวกัน
        '.md': MarkdownParser,
        '.txt': MarkdownParser,  # ใช้ markdown parser เหมือนกัน
    }
    
    @classmethod
    def get_parser(cls, file_path: str) -> BaseDocumentParser:
        ext = '.' + file_path.rsplit('.', 1)[-1].lower()
        parser_class = cls._parsers.get(ext)
        
        if not parser_class:
            raise ValueError(f"ไม่รองรับไฟล์ประเภท {ext}")
        
        return parser_class()
    
    @classmethod
    def parse_file(cls, file_path: str) -> List[Document]:
        parser = cls.get_parser(file_path)
        return parser.parse(file_path)

Chunking Strategy — กลยุทธ์การแบ่งเอกสาร

การ chunk เอกสารเป็นส่วนสำคัญมากสำหรับ RAG เพราะต้อง平衡ระหว่าง context และ relevance

# chunking.py
from typing import List, Callable, Optional
from dataclasses import dataclass
import re

@dataclass
class Chunk:
    """โครงสร้างข้อมูลสำหรับ text chunk"""
    content: str
    chunk_id: str
    metadata: dict
    token_count: int


class ChunkingStrategy:
    """Base class สำหรับ chunking strategies"""
    
    def __init__(self, chunk_size: int = 512, overlap: int = 64):
        self.chunk_size = chunk_size
        self.overlap = overlap
    
    def count_tokens(self, text: str) -> int:
        """นับ token โดยประมาณ (ใช้ tiktoken จะแม่นยำกว่า)"""
        # วิธีง่าย: 1 token ≈ 4 characters สำหรับภาษาอังกฤษ
        # สำหรับภาษาไทย: 1 token ≈ 2-3 characters
        thai_chars = len(re.findall(r'[\u0E00-\u0E7F]', text))
        other_chars = len(text) - thai_chars
        return int(thai_chars / 2.5 + other_chars / 4)
    
    def create_chunk(self, content: str, chunk_id: str, metadata: dict) -> Chunk:
        return Chunk(
            content=content,
            chunk_id=chunk_id,
            metadata=metadata,
            token_count=self.count_tokens(content)
        )


class RecursiveCharacterChunker(ChunkingStrategy):
    """Recursive character-based chunking — แบ่งตาม separators"""
    
    def __init__(
        self,
        chunk_size: int = 512,
        overlap: int = 64,
        separators: List[str] = None
    ):
        super().__init__(chunk_size, overlap)
        self.separators = separators or ['\n\n', '\n', '. ', ' ', '']
    
    def chunk(self, text: str, metadata: dict) -> List[Chunk]:
        chunks = []
        chunk_id_counter = 0
        
        # Split by paragraphs first
        paragraphs = text.split('\n\n')
        current_chunk = ""
        
        for para in paragraphs:
            # ถ้า paragraph ใหญ่กว่า chunk_size ให้ split ต่อ
            if self.count_tokens(current_chunk + para) > self.chunk_size:
                if current_chunk:
                    chunks.append(self.create_chunk(
                        current_chunk.strip(),
                        f"chunk_{chunk_id_counter}",
                        {**metadata, "chunk_index": chunk_id_counter}
                    ))
                    chunk_id_counter += 1
                    
                    # Keep overlap
                    words = current_chunk.split()
                    overlap_words = ' '.join(words[-self.overlap // 4:])
                    current_chunk = overlap_words + para
                else:
                    # Paragraph too big, split by smaller separators
                    sub_chunks = self._split_big_paragraph(para, metadata, chunk_id_counter)
                    chunks.extend(sub_chunks)
                    chunk_id_counter += len(sub_chunks)
                    current_chunk = ""
            else:
                current_chunk += '\n\n' + para if current_chunk else para
        
        # Add remaining chunk
        if current_chunk.strip():
            chunks.append(self.create_chunk(
                current_chunk.strip(),
                f"chunk_{chunk_id_counter}",
                {**metadata, "chunk_index": chunk_id_counter}
            ))
        
        return chunks
    
    def _split_big_paragraph(
        self,
        para: str,
        metadata: dict,
        start_id: int
    ) -> List[Chunk]:
        chunks = []
        current = ""
        chunk_id = start_id
        
        for separator in self.separators[1:]:  # Skip empty separator
            if separator in para:
                parts = para.split(separator)
                for part in parts:
                    if self.count_tokens(current + part) <= self.chunk_size:
                        current += separator + part if current else part
                    else:
                        if current:
                            chunks.append(self.create_chunk(
                                current.strip(),
                                f"chunk_{chunk_id}",
                                {**metadata, "chunk_index": chunk_id}
                            ))
                            chunk_id += 1
                        current = part
                break
        
        if current.strip():
            chunks.append(self.create_chunk(
                current.strip(),
                f"chunk_{chunk_id}",
                {**metadata, "chunk_index": chunk_id}
            ))
        
        return chunks


class SemanticChunker(ChunkingStrategy):
    """Semantic chunking — ใช้ sentence similarity แบ่ง chunk"""
    
    def __init__(self, chunk_size: int = 512, overlap: int = 64):
        super().__init__(chunk_size, overlap)
        from sentence_transformers import SentenceTransformer
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
    
    def chunk(self, text: str, metadata: dict) -> List[Chunk]:
        # Split into sentences first
        sentences = re.split(r'(?<=[.!?])\s+', text)
        chunks = []
        
        if not sentences:
            return [self.create_chunk(text, "chunk_0", metadata)]
        
        # Calculate embeddings
        embeddings = self.model.encode(sentences)
        
        current_chunk = [sentences[0]]
        current_tokens = self.count_tokens(sentences[0])
        
        for i in range(1, len(sentences)):
            sentence_tokens = self.count_tokens(sentences[i])
            
            # Check if adding this sentence exceeds chunk size
            if current_tokens + sentence_tokens > self.chunk_size:
                # Create chunk
                chunk_content = ' '.join(current_chunk)
                chunks.append(self.create_chunk(
                    chunk_content,
                    f"chunk_{len(chunks)}",
                    {**metadata, "chunk_index": len(chunks)}
                ))
                
                # Start new chunk with overlap
                overlap_count = max(1, len(current_chunk) // 4)
                current_chunk = current_chunk[-overlap_count:] + [sentences[i]]
                current_tokens = sum(self.count_tokens(s) for s in current_chunk)
            else:
                current_chunk.append(sentences[i])
                current_tokens += sentence_tokens
        
        # Add last chunk
        if current_chunk:
            chunks.append(self.create_chunk(
                ' '.join(current_chunk),
                f"chunk_{len(chunks)}",
                {**metadata, "chunk_index": len(chunks)}
            ))
        
        return chunks

Embedding และ Vectorization — สร้าง Vector Store

ขั้นตอนนี้แปลง text chunks เป็น vectors