บทนำ

ผมเพิ่งเริ่มศึกษาเรื่อง RAG (Retrieval-Augmented Generation) เมื่อสองเดือนก่อน และเจอปัญหาใหญ่ที่สุดคือ ข้อมูลที่โมเดล AI ตอบนั้นมันเก่ามาก ๆ บางครั้งตอบข้อมูลที่ผิดพลาดเพราะข้อมูลฐาน knowledge base ยังไม่อัพเดต วันนี้ผมจะมาสอนเพื่อน ๆ ที่เพิ่งเริ่มต้นอย่างละเอียดที่สุดครับ RAG คือเทคนิคที่ช่วยให้ AI โมเดลสามารถค้นหาข้อมูลจากฐานความรู้ของเราก่อนแล้วค่อยตอบ แทนที่จะตอบจากความรู้ที่โมเดลเคยเรียนมาเพียงอย่างเดียว ทำให้คำตอบมีความถูกต้องและตรงกับข้อมูลปัจจุบันมากขึ้น

ปัญหาที่ RAG แบบเดิมมี

เมื่อก่อนเวลาเราสร้าง RAG เราจะต้องนำเอกสารทั้งหมดมาประมวลผลครั้งเดียว แล้วสร้าง index หรือดัชนีค้นหาขึ้นมา แต่ปัญหาคือ - เมื่อมีเอกสารใหม่เพิ่มเข้ามา เราต้องสร้าง index ใหม่ทั้งหมด - ใช้เวลานานมากถ้ามีเอกสารจำนวนมาก - ทรัพยากรเซิร์ฟเวอร์สูงมากในตอน re-index - ข้อมูลล้าสมัยเร็วเมื่อมีการอัพเดตบ่อย

วิธีแก้: Incremental Index Update

Incremental Update คือการอัพเดตเฉพาะส่วนที่มีการเปลี่ยนแปลง ไม่ต้องทำใหม่ทั้งหมด วิธีนี้ช่วยประหยัดเวลาและทรัพยากรได้มหาศาล

หลักการทำงานของ Incremental Update

1. **เก็บ Hash ของเอกสาร** - สร้างรหัสเฉพาะจากเนื้อหาเอกสาร 2. **เปรียบเทียบกับฐานข้อมูล** - ถ้า Hash เปลี่ยนแปลง แสดงว่าเอกสารถูกแก้ไข 3. **อัพเดตเฉพาะส่วนที่เปลี่ยน** - ลบข้อมูลเก่าและเพิ่มข้อมูลใหม่เข้าไป 4. **เก็บประวัติการเปลี่ยนแปลง** - บันทึกเวลาและรายละเอียดการแก้ไข

การติดตั้งและใช้งานทีละขั้นตอน

ขั้นตอนที่ 1: ติดตั้งโปรแกรมที่จำเป็น

ก่อนอื่นเราต้องติดตั้ง Python และไลบรารีต่าง ๆ ก่อนครับ สำหรับมือใหม่แนะนำให้ติดตั้ง Python 3.10 ขึ้นไป
pip install openai faiss-cpu numpy pandas python-dotenv langchain-community
ไลบรารีเหล่านี้มีหน้าที่ดังนี้ - **openai** - สำหรับเชื่อมต่อกับ AI API - **faiss-cpu** - สำหรับค้นหาข้อมูลความเร็วสูง - **numpy** และ **pandas** - สำหรับจัดการข้อมูล - **langchain-community** - เครื่องมือช่วยสร้าง RAG

ขั้นตอนที่ 2: สร้างไฟล์ config

สร้างไฟล์ชื่อ .env ในโฟลเดอร์โปรเจกต์ของเรา
HOLYSHEEP_API_KEY=YOUR_HOLYSHEEP_API_KEY
HOLYSHEEP_BASE_URL=https://api.holysheep.ai/v1
INDEX_STORAGE_PATH=./index_data
DOCUMENT_TRACKER_PATH=./document_tracker.json
สำหรับผู้ที่ยังไม่มี API Key สามารถสมัครที่นี่ได้เลยครับ HolySheep AI มีความเร็วในการตอบสนองต่ำกว่า 50 มิลลิวินาที และราคาถูกกว่าบริการอื่นมากถึง 85 เปอร์เซ็นต์

ขั้นตอนที่ 3: สร้างระบบ Incremental Index

ผมจะอธิบายโค้ดทีละส่วนเพื่อให้เพื่อน ๆ เข้าใจได้ง่ายที่สุด
import os
import json
import hashlib
import numpy as np
from datetime import datetime
from typing import List, Dict, Tuple
from pathlib import Path
import faiss
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

class IncrementalRAGIndexer:
    """
    คลาสสำหรับจัดการ RAG Index แบบอัพเดตทีละส่วน
    ออกแบบมาสำหรับมือใหม่ที่ยังไม่มีประสบการณ์
    """
    
    def __init__(self, base_url: str, api_key: str, 
                 index_path: str, tracker_path: str):
        """
        กำหนดค่าเริ่มต้นสำหรับระบบ
        base_url: ที่อยู่ API ของ HolySheep
        api_key: รหัส API ส่วนตัว
        index_path: ที่เก็บไฟล์ index
        tracker_path: ที่เก็บประวัติการเปลี่ยนแปลง
        """
        self.base_url = base_url
        self.api_key = api_key
        self.index_path = Path(index_path)
        self.tracker_path = Path(tracker_path)
        self.dimension = 1536  # ขนาด vector สำหรับ text-embedding-ada-002
        
        # สร้างโฟลเดอร์ถ้ายังไม่มี
        self.index_path.mkdir(parents=True, exist_ok=True)
        
        # โหลดประวัติการเปลี่ยนแปลง
        self.document_tracker = self._load_tracker()
        
        # โหลด index ที่มีอยู่
        self.index = self._load_or_create_index()
    
    def _load_tracker(self) -> Dict:
        """โหลดประวัติการเปลี่ยนแปลงจากไฟล์"""
        if self.tracker_path.exists():
            with open(self.tracker_path, 'r', encoding='utf-8') as f:
                return json.load(f)
        return {}
    
    def _save_tracker(self):
        """บันทึกประวัติการเปลี่ยนแปลงลงไฟล์"""
        with open(self.tracker_path, 'w', encoding='utf-8') as f:
            json.dump(self.document_tracker, f, ensure_ascii=False, indent=2)
    
    def _calculate_hash(self, content: str) -> str:
        """สร้างรหัสเฉพาะจากเนื้อหาเอกสาร"""
        return hashlib.md5(content.encode('utf-8')).hexdigest()
    
    def _load_or_create_index(self) -> faiss.Index:
        """โหลด index เดิมหรือสร้างใหม่ถ้ายังไม่มี"""
        index_file = self.index_path / "faiss.index"
        
        if index_file.exists():
            return faiss.read_index(str(index_file))
        
        # สร้าง index ใหม่แบบความคล้ายครง
        index = faiss.IndexFlatL2(self.dimension)
        return index
    
    def _save_index(self):
        """บันทึก index ลงไฟล์"""
        index_file = self.index_path / "faiss.index"
        faiss.write_index(self.index, str(index_file))
คำอธิบายโค้ดข้างบน - **คลาส IncrementalRAGIndexer** เป็นตัวจัดการหลักของระบบ - **_calculate_hash** จะสร้างรหัสเฉพาะจากเนื้อหาทุกครั้งที่มีการแก้ไข - **faiss.IndexFlatL2** คือวิธีค้นหาข้อมูลที่เร็วมาก

ขั้นตอนที่ 4: สร้างฟังก์ชันอัพเดตข้อมูล

import requests

class EmbeddingGenerator:
    """
    คลาสสำหรับสร้าง vector embedding จาก HolySheep API
    ผมเลือกใช้ HolySheep เพราะมีความเร็วต่ำกว่า 50ms 
    และราคาถูกมากเมื่อเทียบกับ OpenAI
    """
    
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url
        self.api_key = api_key
        self.model = "text-embedding-ada-002"
    
    def get_embedding(self, text: str) -> List[float]:
        """
        สร้าง embedding vector จากข้อความ
        คืนค่าเป็น list ของตัวเลขทศนิยม
        """
        url = f"{self.base_url}/embeddings"
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        payload = {
            "model": self.model,
            "input": text
        }
        
        response = requests.post(url, headers=headers, json=payload)
        response.raise_for_status()
        
        data = response.json()
        return data["data"][0]["embedding"]
    
    def get_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
        """
        สร้าง embedding หลายข้อความพร้อมกัน
        ช่วยประหยัดเวลาและลดจำนวน API call
        """
        url = f"{self.base_url}/embeddings"
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        payload = {
            "model": self.model,
            "input": texts
        }
        
        response = requests.post(url, headers=headers, json=payload)
        response.raise_for_status()
        
        data = response.json()
        # เรียงลำดับตาม input ที่ส่งไป
        embeddings = [item["embedding"] for item in data["data"]]
        return embeddings
ข้อดีของ HolySheep API ที่ผมเจอคือความเร็วในการตอบสนองที่ต่ำกว่า 50 มิลลิวินาทีจริง ๆ และรองรับ batch request ที่ช่วยให้ประหยัดค่าใช้จ่ายได้มาก

ขั้นตอนที่ 5: สร้างระบบอัพเดตอัตโนมัติ

import time
from threading import Thread

class IncrementalUpdateManager:
    """
    ตัวจัดการการอัพเดตแบบ incremental
    จะตรวจสอบการเปลี่ยนแปลงและอัพเดตอัตโนมัติ
    """
    
    def __init__(self, indexer: IncrementalRAGIndexer, 
                 embedding_generator: EmbeddingGenerator):
        self.indexer = indexer
        self.embedder = embedding_generator
        self.update_queue = []
        self.is_running = False
    
    def check_document_changes(self, documents: List[Dict]) -> List[Dict]:
        """
        ตรวจสอบว่าเอกสารใดมีการเปลี่ยนแปลงบ้าง
        documents: list ของ dict ที่มี 'id', 'content', 'metadata'
        return: list ของเอกสารที่ถูกแก้ไขหรือเพิ่มใหม่
        """
        changed_docs = []
        
        for doc in documents:
            doc_id = doc.get('id')
            content = doc.get('content', '')
            new_hash = self.indexer._calculate_hash(content)
            
            if doc_id not in self.indexer.document_tracker:
                # เอกสารใหม่
                changed_docs.append({
                    **doc,
                    'action': 'add',
                    'hash': new_hash
                })
            elif self.indexer.document_tracker[doc_id]['hash'] != new_hash:
                # เอกสารที่ถูกแก้ไข
                changed_docs.append({
                    **doc,
                    'action': 'update',
                    'hash': new_hash
                })
        
        return changed_docs
    
    def process_updates(self, documents: List[Dict]):
        """
        ประมวลผลการอัพเดตทั้งหมด
        ลบข้อมูลเก่าที่เปลี่ยนแปลงและเพิ่มข้อมูลใหม่
        """
        changed_docs = self.check_document_changes(documents)
        
        if not changed_docs:
            print("ไม่มีการเปลี่ยนแปลงที่ต้องอัพเดต")
            return
        
        print(f"พบการเปลี่ยนแปลง {len(changed_docs)} รายการ")
        
        for doc in changed_docs:
            doc_id = doc['id']
            content = doc['content']
            action = doc['action']
            new_hash = doc['hash']
            
            if action == 'update':
                # ลบข้อมูลเก่าออกก่อน
                self._remove_document(doc_id)
            
            # แบ่งเอกสารเป็นส่วนเล็ก ๆ
            chunks = self._split_into_chunks(content)
            
            # สร้าง embedding และเพิ่มเข้า index
            embeddings = self.embedder.get_embeddings_batch(chunks)
            
            for chunk, embedding in zip(chunks, embeddings):
                self.indexer.add_to_index(
                    chunk, 
                    embedding, 
                    doc_id, 
                    doc.get('metadata', {})
                )
            
            # บันทึกประวัติการเปลี่ยนแปลง
            self.indexer.document_tracker[doc_id] = {
                'hash': new_hash,
                'last_updated': datetime.now().isoformat(),
                'chunk_count': len(chunks)
            }
        
        # บันทึกการเปลี่ยนแปลง
        self.indexer._save_tracker()
        self.indexer._save_index()
        
        print(f"อัพเดตเสร็จสิ้น {len(changed_docs)} รายการ")
    
    def _split_into_chunks(self, text: str, chunk_size: int = 500) -> List[str]:
        """
        แบ่งเอกสารยาวเป็นส่วนเล็ก ๆ
        chunk_size: จำนวนตัวอักษรต่อส่วน
        """
        # แบ่งตามย่อหน้าก่อน
        paragraphs = text.split('\n\n')
        chunks = []
        current_chunk = ""
        
        for para in paragraphs