บทนำ
ผมเพิ่งเริ่มศึกษาเรื่อง RAG (Retrieval-Augmented Generation) เมื่อสองเดือนก่อน และเจอปัญหาใหญ่ที่สุดคือ ข้อมูลที่โมเดล AI ตอบนั้นมันเก่ามาก ๆ บางครั้งตอบข้อมูลที่ผิดพลาดเพราะข้อมูลฐาน knowledge base ยังไม่อัพเดต วันนี้ผมจะมาสอนเพื่อน ๆ ที่เพิ่งเริ่มต้นอย่างละเอียดที่สุดครับ
RAG คือเทคนิคที่ช่วยให้ AI โมเดลสามารถค้นหาข้อมูลจากฐานความรู้ของเราก่อนแล้วค่อยตอบ แทนที่จะตอบจากความรู้ที่โมเดลเคยเรียนมาเพียงอย่างเดียว ทำให้คำตอบมีความถูกต้องและตรงกับข้อมูลปัจจุบันมากขึ้น
ปัญหาที่ RAG แบบเดิมมี
เมื่อก่อนเวลาเราสร้าง RAG เราจะต้องนำเอกสารทั้งหมดมาประมวลผลครั้งเดียว แล้วสร้าง index หรือดัชนีค้นหาขึ้นมา แต่ปัญหาคือ
- เมื่อมีเอกสารใหม่เพิ่มเข้ามา เราต้องสร้าง index ใหม่ทั้งหมด
- ใช้เวลานานมากถ้ามีเอกสารจำนวนมาก
- ทรัพยากรเซิร์ฟเวอร์สูงมากในตอน re-index
- ข้อมูลล้าสมัยเร็วเมื่อมีการอัพเดตบ่อย
วิธีแก้: Incremental Index Update
Incremental Update คือการอัพเดตเฉพาะส่วนที่มีการเปลี่ยนแปลง ไม่ต้องทำใหม่ทั้งหมด วิธีนี้ช่วยประหยัดเวลาและทรัพยากรได้มหาศาล
หลักการทำงานของ Incremental Update
1. **เก็บ Hash ของเอกสาร** - สร้างรหัสเฉพาะจากเนื้อหาเอกสาร
2. **เปรียบเทียบกับฐานข้อมูล** - ถ้า Hash เปลี่ยนแปลง แสดงว่าเอกสารถูกแก้ไข
3. **อัพเดตเฉพาะส่วนที่เปลี่ยน** - ลบข้อมูลเก่าและเพิ่มข้อมูลใหม่เข้าไป
4. **เก็บประวัติการเปลี่ยนแปลง** - บันทึกเวลาและรายละเอียดการแก้ไข
การติดตั้งและใช้งานทีละขั้นตอน
ขั้นตอนที่ 1: ติดตั้งโปรแกรมที่จำเป็น
ก่อนอื่นเราต้องติดตั้ง Python และไลบรารีต่าง ๆ ก่อนครับ สำหรับมือใหม่แนะนำให้ติดตั้ง Python 3.10 ขึ้นไป
pip install openai faiss-cpu numpy pandas python-dotenv langchain-community
ไลบรารีเหล่านี้มีหน้าที่ดังนี้
- **openai** - สำหรับเชื่อมต่อกับ AI API
- **faiss-cpu** - สำหรับค้นหาข้อมูลความเร็วสูง
- **numpy** และ **pandas** - สำหรับจัดการข้อมูล
- **langchain-community** - เครื่องมือช่วยสร้าง RAG
ขั้นตอนที่ 2: สร้างไฟล์ config
สร้างไฟล์ชื่อ
.env ในโฟลเดอร์โปรเจกต์ของเรา
HOLYSHEEP_API_KEY=YOUR_HOLYSHEEP_API_KEY
HOLYSHEEP_BASE_URL=https://api.holysheep.ai/v1
INDEX_STORAGE_PATH=./index_data
DOCUMENT_TRACKER_PATH=./document_tracker.json
สำหรับผู้ที่ยังไม่มี API Key สามารถ
สมัครที่นี่ได้เลยครับ HolySheep AI มีความเร็วในการตอบสนองต่ำกว่า 50 มิลลิวินาที และราคาถูกกว่าบริการอื่นมากถึง 85 เปอร์เซ็นต์
ขั้นตอนที่ 3: สร้างระบบ Incremental Index
ผมจะอธิบายโค้ดทีละส่วนเพื่อให้เพื่อน ๆ เข้าใจได้ง่ายที่สุด
import os
import json
import hashlib
import numpy as np
from datetime import datetime
from typing import List, Dict, Tuple
from pathlib import Path
import faiss
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
class IncrementalRAGIndexer:
"""
คลาสสำหรับจัดการ RAG Index แบบอัพเดตทีละส่วน
ออกแบบมาสำหรับมือใหม่ที่ยังไม่มีประสบการณ์
"""
def __init__(self, base_url: str, api_key: str,
index_path: str, tracker_path: str):
"""
กำหนดค่าเริ่มต้นสำหรับระบบ
base_url: ที่อยู่ API ของ HolySheep
api_key: รหัส API ส่วนตัว
index_path: ที่เก็บไฟล์ index
tracker_path: ที่เก็บประวัติการเปลี่ยนแปลง
"""
self.base_url = base_url
self.api_key = api_key
self.index_path = Path(index_path)
self.tracker_path = Path(tracker_path)
self.dimension = 1536 # ขนาด vector สำหรับ text-embedding-ada-002
# สร้างโฟลเดอร์ถ้ายังไม่มี
self.index_path.mkdir(parents=True, exist_ok=True)
# โหลดประวัติการเปลี่ยนแปลง
self.document_tracker = self._load_tracker()
# โหลด index ที่มีอยู่
self.index = self._load_or_create_index()
def _load_tracker(self) -> Dict:
"""โหลดประวัติการเปลี่ยนแปลงจากไฟล์"""
if self.tracker_path.exists():
with open(self.tracker_path, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def _save_tracker(self):
"""บันทึกประวัติการเปลี่ยนแปลงลงไฟล์"""
with open(self.tracker_path, 'w', encoding='utf-8') as f:
json.dump(self.document_tracker, f, ensure_ascii=False, indent=2)
def _calculate_hash(self, content: str) -> str:
"""สร้างรหัสเฉพาะจากเนื้อหาเอกสาร"""
return hashlib.md5(content.encode('utf-8')).hexdigest()
def _load_or_create_index(self) -> faiss.Index:
"""โหลด index เดิมหรือสร้างใหม่ถ้ายังไม่มี"""
index_file = self.index_path / "faiss.index"
if index_file.exists():
return faiss.read_index(str(index_file))
# สร้าง index ใหม่แบบความคล้ายครง
index = faiss.IndexFlatL2(self.dimension)
return index
def _save_index(self):
"""บันทึก index ลงไฟล์"""
index_file = self.index_path / "faiss.index"
faiss.write_index(self.index, str(index_file))
คำอธิบายโค้ดข้างบน
- **คลาส IncrementalRAGIndexer** เป็นตัวจัดการหลักของระบบ
- **_calculate_hash** จะสร้างรหัสเฉพาะจากเนื้อหาทุกครั้งที่มีการแก้ไข
- **faiss.IndexFlatL2** คือวิธีค้นหาข้อมูลที่เร็วมาก
ขั้นตอนที่ 4: สร้างฟังก์ชันอัพเดตข้อมูล
import requests
class EmbeddingGenerator:
"""
คลาสสำหรับสร้าง vector embedding จาก HolySheep API
ผมเลือกใช้ HolySheep เพราะมีความเร็วต่ำกว่า 50ms
และราคาถูกมากเมื่อเทียบกับ OpenAI
"""
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.api_key = api_key
self.model = "text-embedding-ada-002"
def get_embedding(self, text: str) -> List[float]:
"""
สร้าง embedding vector จากข้อความ
คืนค่าเป็น list ของตัวเลขทศนิยม
"""
url = f"{self.base_url}/embeddings"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": self.model,
"input": text
}
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
return data["data"][0]["embedding"]
def get_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
"""
สร้าง embedding หลายข้อความพร้อมกัน
ช่วยประหยัดเวลาและลดจำนวน API call
"""
url = f"{self.base_url}/embeddings"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": self.model,
"input": texts
}
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
# เรียงลำดับตาม input ที่ส่งไป
embeddings = [item["embedding"] for item in data["data"]]
return embeddings
ข้อดีของ HolySheep API ที่ผมเจอคือความเร็วในการตอบสนองที่ต่ำกว่า 50 มิลลิวินาทีจริง ๆ และรองรับ batch request ที่ช่วยให้ประหยัดค่าใช้จ่ายได้มาก
ขั้นตอนที่ 5: สร้างระบบอัพเดตอัตโนมัติ
import time
from threading import Thread
class IncrementalUpdateManager:
"""
ตัวจัดการการอัพเดตแบบ incremental
จะตรวจสอบการเปลี่ยนแปลงและอัพเดตอัตโนมัติ
"""
def __init__(self, indexer: IncrementalRAGIndexer,
embedding_generator: EmbeddingGenerator):
self.indexer = indexer
self.embedder = embedding_generator
self.update_queue = []
self.is_running = False
def check_document_changes(self, documents: List[Dict]) -> List[Dict]:
"""
ตรวจสอบว่าเอกสารใดมีการเปลี่ยนแปลงบ้าง
documents: list ของ dict ที่มี 'id', 'content', 'metadata'
return: list ของเอกสารที่ถูกแก้ไขหรือเพิ่มใหม่
"""
changed_docs = []
for doc in documents:
doc_id = doc.get('id')
content = doc.get('content', '')
new_hash = self.indexer._calculate_hash(content)
if doc_id not in self.indexer.document_tracker:
# เอกสารใหม่
changed_docs.append({
**doc,
'action': 'add',
'hash': new_hash
})
elif self.indexer.document_tracker[doc_id]['hash'] != new_hash:
# เอกสารที่ถูกแก้ไข
changed_docs.append({
**doc,
'action': 'update',
'hash': new_hash
})
return changed_docs
def process_updates(self, documents: List[Dict]):
"""
ประมวลผลการอัพเดตทั้งหมด
ลบข้อมูลเก่าที่เปลี่ยนแปลงและเพิ่มข้อมูลใหม่
"""
changed_docs = self.check_document_changes(documents)
if not changed_docs:
print("ไม่มีการเปลี่ยนแปลงที่ต้องอัพเดต")
return
print(f"พบการเปลี่ยนแปลง {len(changed_docs)} รายการ")
for doc in changed_docs:
doc_id = doc['id']
content = doc['content']
action = doc['action']
new_hash = doc['hash']
if action == 'update':
# ลบข้อมูลเก่าออกก่อน
self._remove_document(doc_id)
# แบ่งเอกสารเป็นส่วนเล็ก ๆ
chunks = self._split_into_chunks(content)
# สร้าง embedding และเพิ่มเข้า index
embeddings = self.embedder.get_embeddings_batch(chunks)
for chunk, embedding in zip(chunks, embeddings):
self.indexer.add_to_index(
chunk,
embedding,
doc_id,
doc.get('metadata', {})
)
# บันทึกประวัติการเปลี่ยนแปลง
self.indexer.document_tracker[doc_id] = {
'hash': new_hash,
'last_updated': datetime.now().isoformat(),
'chunk_count': len(chunks)
}
# บันทึกการเปลี่ยนแปลง
self.indexer._save_tracker()
self.indexer._save_index()
print(f"อัพเดตเสร็จสิ้น {len(changed_docs)} รายการ")
def _split_into_chunks(self, text: str, chunk_size: int = 500) -> List[str]:
"""
แบ่งเอกสารยาวเป็นส่วนเล็ก ๆ
chunk_size: จำนวนตัวอักษรต่อส่วน
"""
# แบ่งตามย่อหน้าก่อน
paragraphs = text.split('\n\n')
chunks = []
current_chunk = ""
for para in paragraphs
แหล่งข้อมูลที่เกี่ยวข้อง
บทความที่เกี่ยวข้อง