บทความนี้จะพาท่านสร้างระบบ RAG (Retrieval-Augmented Generation) ตั้งแต่เริ่มต้นจนถึงระดับ Production โดยครอบคลุมทุกขั้นตอนตั้งแต่การ parse เอกสาร การแปลงเป็น vector การค้นหาแบบ semantic ไปจนถึงการ generate คำตอบด้วย LLM พร้อมโค้ดที่พร้อมใช้งานจริงและ benchmark ประสิทธิภาพ
สถาปัตยกรรมโดยรวมของระบบ RAG
ก่อนลงมือเขียนโค้ด มาทำความเข้าใจสถาปัตยกรรมของระบบ RAG กันก่อน ระบบ RAG ประกอบด้วย 4 ส่วนหลักที่เชื่อมต่อกัน:
- Ingestion Pipeline — รับเอกสารเข้ามา ทำความสะอาด parse แบ่ง chunk และสร้าง embedding
- Vector Store — จัดเก็บ vectors และ metadata ในรูปแบบที่ค้นหาได้เร็ว
- Retrieval Engine — ค้นหา documents ที่เกี่ยวข้องจาก query
- Generation Module — ส่ง context ที่ retrieve มาให้ LLM สร้างคำตอบ
สำหรับ API LLM เราจะใช้ HolySheep AI ซึ่งมีความเร็วต่ำกว่า 50ms และราคาประหยัดกว่า 85% เมื่อเทียบกับ OpenAI
การติดตั้ง Dependencies และ Configuration
pip install langchain langchain-community langchain-huggingface
pip install unstructured pypdf python-docx python-pptx
pip install faiss-cpu sentence-transformers pydantic
pip install httpx tiktoken
# config.py
import os
from typing import Literal
HolySheep AI Configuration
HOLYSHEEP_API_KEY = os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY")
HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1"
Model Configuration
EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
LLM_MODEL = "gpt-4.1" # หรือ deepseek-v3.2 สำหรับประหยัดต้นทุน
Vector Store Configuration
INDEX_TYPE: Literal["faiss", "chroma", "milvus"] = "faiss"
EMBEDDING_DIMENSION = 384
Chunking Configuration
CHUNK_SIZE = 512
CHUNK_OVERLAP = 64
Retrieval Configuration
TOP_K = 5
SIMILARITY_THRESHOLD = 0.7
Document Parsing Pipeline — รองรับทุกรูปแบบไฟล์
ขั้นตอนแรกคือการ parse เอกสารให้เป็น plain text ที่สามารถ process ต่อได้ ระบบนี้รองรับ PDF, DOCX, PPTX, Markdown และ HTML
# document_parser.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Optional
import re
@dataclass
class Document:
"""โครงสร้างข้อมูลสำหรับเอกสารที่ถูก parse แล้ว"""
page_content: str
metadata: dict
def __repr__(self):
preview = self.page_content[:100].replace('\n', ' ')
return f"Document(content='{preview}...', metadata={self.metadata})"
class BaseDocumentParser(ABC):
"""Abstract base class สำหรับ document parsers"""
@abstractmethod
def parse(self, file_path: str) -> List[Document]:
pass
def clean_text(self, text: str) -> str:
"""ทำความสะอาด text ด้วยกฎพื้นฐาน"""
text = re.sub(r'\s+', ' ', text) # ลบ whitespaces ซ้ำ
text = re.sub(r'[^\w\s\u0E00-\u0E7F.,!?()-]', '', text) # รองรับภาษาไทย
return text.strip()
class PDFParser(BaseDocumentParser):
"""Parser สำหรับไฟล์ PDF"""
def __init__(self):
from pypdf import PdfReader
self.reader_class = PdfReader
def parse(self, file_path: str) -> List[Document]:
reader = self.reader_class(file_path)
documents = []
for page_num, page in enumerate(reader.pages):
text = page.extract_text()
if text:
text = self.clean_text(text)
documents.append(Document(
page_content=text,
metadata={
"source": file_path,
"page": page_num + 1,
"total_pages": len(reader.pages),
"file_type": "pdf"
}
))
return documents
class DocxParser(BaseDocumentParser):
"""Parser สำหรับไฟล์ Word (.docx)"""
def parse(self, file_path: str) -> List[Document]:
from docx import Document as DocxDocument
doc = DocxDocument(file_path)
documents = []
full_text = []
for para in doc.paragraphs:
if para.text.strip():
full_text.append(para.text)
# รวม paragraphs เป็น sections
content = '\n'.join(full_text)
content = self.clean_text(content)
documents.append(Document(
page_content=content,
metadata={
"source": file_path,
"file_type": "docx",
"paragraph_count": len(full_text)
}
))
return documents
class MarkdownParser(BaseDocumentParser):
"""Parser สำหรับไฟล์ Markdown"""
def parse(self, file_path: str) -> List[Document]:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# แบ่งตาม headers
sections = re.split(r'\n(?=#)', content)
documents = []
for i, section in enumerate(sections):
if section.strip():
documents.append(Document(
page_content=self.clean_text(section),
metadata={
"source": file_path,
"section": i,
"file_type": "markdown"
}
))
return documents
class DocumentParserFactory:
"""Factory สำหรับสร้าง parser ตามประเภทไฟล์"""
_parsers = {
'.pdf': PDFParser,
'.docx': DocxParser,
'.doc': DocxParser, # ใช้ docx parser เดียวกัน
'.md': MarkdownParser,
'.txt': MarkdownParser, # ใช้ markdown parser เหมือนกัน
}
@classmethod
def get_parser(cls, file_path: str) -> BaseDocumentParser:
ext = '.' + file_path.rsplit('.', 1)[-1].lower()
parser_class = cls._parsers.get(ext)
if not parser_class:
raise ValueError(f"ไม่รองรับไฟล์ประเภท {ext}")
return parser_class()
@classmethod
def parse_file(cls, file_path: str) -> List[Document]:
parser = cls.get_parser(file_path)
return parser.parse(file_path)
Chunking Strategy — กลยุทธ์การแบ่งเอกสาร
การ chunk เอกสารเป็นส่วนสำคัญมากสำหรับ RAG เพราะต้อง平衡ระหว่าง context และ relevance
# chunking.py
from typing import List, Callable, Optional
from dataclasses import dataclass
import re
@dataclass
class Chunk:
"""โครงสร้างข้อมูลสำหรับ text chunk"""
content: str
chunk_id: str
metadata: dict
token_count: int
class ChunkingStrategy:
"""Base class สำหรับ chunking strategies"""
def __init__(self, chunk_size: int = 512, overlap: int = 64):
self.chunk_size = chunk_size
self.overlap = overlap
def count_tokens(self, text: str) -> int:
"""นับ token โดยประมาณ (ใช้ tiktoken จะแม่นยำกว่า)"""
# วิธีง่าย: 1 token ≈ 4 characters สำหรับภาษาอังกฤษ
# สำหรับภาษาไทย: 1 token ≈ 2-3 characters
thai_chars = len(re.findall(r'[\u0E00-\u0E7F]', text))
other_chars = len(text) - thai_chars
return int(thai_chars / 2.5 + other_chars / 4)
def create_chunk(self, content: str, chunk_id: str, metadata: dict) -> Chunk:
return Chunk(
content=content,
chunk_id=chunk_id,
metadata=metadata,
token_count=self.count_tokens(content)
)
class RecursiveCharacterChunker(ChunkingStrategy):
"""Recursive character-based chunking — แบ่งตาม separators"""
def __init__(
self,
chunk_size: int = 512,
overlap: int = 64,
separators: List[str] = None
):
super().__init__(chunk_size, overlap)
self.separators = separators or ['\n\n', '\n', '. ', ' ', '']
def chunk(self, text: str, metadata: dict) -> List[Chunk]:
chunks = []
chunk_id_counter = 0
# Split by paragraphs first
paragraphs = text.split('\n\n')
current_chunk = ""
for para in paragraphs:
# ถ้า paragraph ใหญ่กว่า chunk_size ให้ split ต่อ
if self.count_tokens(current_chunk + para) > self.chunk_size:
if current_chunk:
chunks.append(self.create_chunk(
current_chunk.strip(),
f"chunk_{chunk_id_counter}",
{**metadata, "chunk_index": chunk_id_counter}
))
chunk_id_counter += 1
# Keep overlap
words = current_chunk.split()
overlap_words = ' '.join(words[-self.overlap // 4:])
current_chunk = overlap_words + para
else:
# Paragraph too big, split by smaller separators
sub_chunks = self._split_big_paragraph(para, metadata, chunk_id_counter)
chunks.extend(sub_chunks)
chunk_id_counter += len(sub_chunks)
current_chunk = ""
else:
current_chunk += '\n\n' + para if current_chunk else para
# Add remaining chunk
if current_chunk.strip():
chunks.append(self.create_chunk(
current_chunk.strip(),
f"chunk_{chunk_id_counter}",
{**metadata, "chunk_index": chunk_id_counter}
))
return chunks
def _split_big_paragraph(
self,
para: str,
metadata: dict,
start_id: int
) -> List[Chunk]:
chunks = []
current = ""
chunk_id = start_id
for separator in self.separators[1:]: # Skip empty separator
if separator in para:
parts = para.split(separator)
for part in parts:
if self.count_tokens(current + part) <= self.chunk_size:
current += separator + part if current else part
else:
if current:
chunks.append(self.create_chunk(
current.strip(),
f"chunk_{chunk_id}",
{**metadata, "chunk_index": chunk_id}
))
chunk_id += 1
current = part
break
if current.strip():
chunks.append(self.create_chunk(
current.strip(),
f"chunk_{chunk_id}",
{**metadata, "chunk_index": chunk_id}
))
return chunks
class SemanticChunker(ChunkingStrategy):
"""Semantic chunking — ใช้ sentence similarity แบ่ง chunk"""
def __init__(self, chunk_size: int = 512, overlap: int = 64):
super().__init__(chunk_size, overlap)
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer('all-MiniLM-L6-v2')
def chunk(self, text: str, metadata: dict) -> List[Chunk]:
# Split into sentences first
sentences = re.split(r'(?<=[.!?])\s+', text)
chunks = []
if not sentences:
return [self.create_chunk(text, "chunk_0", metadata)]
# Calculate embeddings
embeddings = self.model.encode(sentences)
current_chunk = [sentences[0]]
current_tokens = self.count_tokens(sentences[0])
for i in range(1, len(sentences)):
sentence_tokens = self.count_tokens(sentences[i])
# Check if adding this sentence exceeds chunk size
if current_tokens + sentence_tokens > self.chunk_size:
# Create chunk
chunk_content = ' '.join(current_chunk)
chunks.append(self.create_chunk(
chunk_content,
f"chunk_{len(chunks)}",
{**metadata, "chunk_index": len(chunks)}
))
# Start new chunk with overlap
overlap_count = max(1, len(current_chunk) // 4)
current_chunk = current_chunk[-overlap_count:] + [sentences[i]]
current_tokens = sum(self.count_tokens(s) for s in current_chunk)
else:
current_chunk.append(sentences[i])
current_tokens += sentence_tokens
# Add last chunk
if current_chunk:
chunks.append(self.create_chunk(
' '.join(current_chunk),
f"chunk_{len(chunks)}",
{**metadata, "chunk_index": len(chunks)}
))
return chunks
Embedding และ Vectorization — สร้าง Vector Store
ขั้นตอนนี้แปลง text chunks เป็น vectors