ในฐานะ Senior AI Engineer ที่ผ่านการสร้าง AI Agent มาหลายสิบโปรเจกต์ ผมเคยเจอปัญหาคลาสสิกที่ทุกทีมต้องเจอ: ทำอย่างไรให้ AI Agent จำสถานะการสนทนาได้ตลอดทั้งเซสชัน จัดการ error ได้อย่างมีประสิทธิภาพ และ deploy ขึ้น production ได้โดยไม่ต้องมานั่ง debug ทุกวัน

วันนี้ผมจะเล่าถึง LangGraph ที่มีดาวบน GitHub ถึง 90,000 ดาว ว่าทำไมมันถึงเปลี่ยนวิธีคิดเรื่อง AI Agent workflow ไปตลอดกาล และจะแชร์โค้ดจริงที่ใช้งานได้จากประสบการณ์ตรงของผม

บทนำ: ทำไม LangGraph ถึงเปลี่ยนเกม

ถ้าคุณเคยใช้ LangChain มาก่อน คุณจะรู้ว่ามันยอดเยี่ยมสำหรับ prototyping แต่พอต้องเอาไปใช้จริงใน production กลับเจอปัญหาเรื่อง state management ที่ยุ่งยาก ตัว LangGraph ออกมาเพื่อแก้ปัญหานี้โดยเฉพาะ ด้วยแนวคิด "graph-based" ที่ทำให้ workflow ของ AI Agent อ่านง่าย ดูแลง่าย และ debug ได้ละเอียด

ในโปรเจกต์ล่าสุดของผม ทีมต้องสร้าง Enterprise RAG System ที่รองรับเอกสารภาษาไทยหลายพันฉบับ รองรับ multi-turn conversation และ track ประวัติการค้นหาของผู้ใช้แต่ละคน ซึ่งถ้าใช้วิธีเดิมที่เราเคยทำ ต้องเขียน spaghetti code นับพันบรรทัด แต่พอหันมาใช้ LangGraph โค้ดลดลงเหลือแค่ 30% และ maintainability ดีขึ้นมาก

Core Concepts: เข้าใจ LangGraph อย่างถ่องแท้

ก่อนจะเข้าสู่โค้ด มาทำความเข้าใจ concept หลักของ LangGraph กันก่อน

1. State Graph

State คือ dictionary ที่เก็บข้อมูลทั้งหมดที่ flows ผ่าน graph ของเรา แต่ละ node สามารถอ่านและเขียน state ได้

2. Nodes และ Edges

Nodes คือ functions ที่ทำงานบางอย่าง (เช่น ค้นหา, ตอบคำถาม, บันทึก) ส่วน Edges คือ connections ที่กำหนดว่า flow จะไปที่ไหนต่อ

3. Conditional Edges

Edges ที่มีเงื่อนไข ทำให้เราสามารถ branch logic ได้ตามสถานะปัจจุบัน

โครงสร้างโปรเจกต์ Enterprise RAG ด้วย LangGraph

ในโปรเจกต์ที่ผมทำ ระบบ RAG ต้องทำงานดังนี้:

  1. Query Analysis — วิเคราะห์คำถามของผู้ใช้ ตรวจสอบว่าเป็นคำถามใหม่หรือต่อเนื่องจากเซสชันเดิม
  2. Retrieval — ค้นหาเอกสารที่เกี่ยวข้องจาก vector database
  3. Grading — ประเมินความเกี่ยวข้องของเอกสารที่ค้นหาได้
  4. Generation — สร้างคำตอบจากเอกสารที่เกี่ยวข้อง
  5. Session Tracking — บันทึกประวัติการสนทนา

การตั้งค่า HolySheep API Client

สำหรับ AI API ผมเลือกใช้ HolySheep AI เพราะอัตราเพียง ¥1=$1 ซึ่งประหยัดได้ถึง 85%+ เมื่อเทียบกับที่อื่น ความหน่วงต่ำกว่า 50ms และราคาต่อล้าน token ถูกมาก เช่น DeepSeek V3.2 เพียง $0.42/MTok หรือ Gemini 2.5 Flash $2.50/MTok

# requirements.txt
langgraph==0.2.45
langchain-core==0.3.24
langchain-community==0.3.12
chromadb==0.5.18
openai==1.58.1
httpx==0.28.1

ติดตั้งด้วย: pip install -r requirements.txt

# config.py - การตั้งค่า HolySheep API
import os
from openai import OpenAI

ตั้งค่า HolySheep API - base_url ต้องเป็น https://api.holysheep.ai/v1

HOLYSHEEP_API_KEY = os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY") HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1"

สร้าง client สำหรับ AI Chat

chat_client = OpenAI( api_key=HOLYSHEEP_API_KEY, base_url=HOLYSHEEP_BASE_URL, timeout=30.0, max_retries=3 )

สร้าง client สำหรับ Embeddings

embedding_client = OpenAI( api_key=HOLYSHEEP_API_KEY, base_url=HOLYSHEEP_BASE_URL, timeout=30.0, max_retries=3 )

กำหนดโมเดลที่ใช้

CHAT_MODEL = "gpt-4.1" # $8/MTok - เหมาะสำหรับงาน complex reasoning EMBEDDING_MODEL = "text-embedding-3-small" # สำหรับ embedding

ตั้งค่า Vector Database

CHROMA_PERSIST_DIR = "./chroma_db" COLLECTION_NAME = "enterprise_docs"

สร้าง State Schema และ Graph Definition

# state_schema.py - กำหนด State ของ RAG System
from typing import TypedDict, Annotated, Sequence, Optional
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
import operator

class RAGState(TypedDict):
    """State schema สำหรับ Enterprise RAG Graph"""
    
    # ประวัติการสนทนาทั้งหมด
    messages: Annotated[Sequence[BaseMessage], operator.add]
    
    # คำถามปัจจุบัน
    question: str
    
    # เอกสารที่ค้นหาได้
    documents: list
    
    # เอกสารที่ผ่านการกรอง (relevance > threshold)
    relevant_docs: list
    
    # คำตอบสุดท้าย
    answer: str
    
    # คะแนนความมั่นใจ
    confidence_score: float
    
    # Session ID สำหรับ track ประวัติ
    session_id: str
    
    # Flag สำหรับตรวจสอบว่าเป็นคำถามต่อเนื่องหรือไม่
    is_follow_up: bool
    
    # Error message ถ้ามีปัญหา
    error: Optional[str]
    
    # Metadata ของ user
    user_id: str

class ConfigState(TypedDict):
    """Config สำหรับ thread/runner"""
    session_id: str
    user_id: str
    language: str  # "th" หรือ "en"

Implementing Nodes ทั้งหมด

# nodes.py - Implementation ของแต่ละ Node
from typing import Literal
from config import chat_client, embedding_client, CHAT_MODEL
from state_schema import RAGState, ConfigState
from langchain_core.messages import HumanMessage, AIMessage
import json

========== Node 1: Query Analysis ==========

def analyze_query(state: RAGState) -> RAGState: """วิเคราะห์คำถาม - ตรวจสอบว่าเป็นคำถามใหม่หรือต่อเนื่อง""" messages = state["messages"] question = state["question"] # สร้าง prompt สำหรับวิเคราะห์ analysis_prompt = f"""วิเคราะห์คำถามต่อไปนี้: คำถาม: {question} ประวัติการสนทนาล่าสุด: {chr(10).join([f"{m.type}: {m.content}" for m in messages[-4:]]) if messages else "ไม่มีประวัติ"} ตอบกลับเป็น JSON ดังนี้: {{ "is_follow_up": true/false, "reason": "เหตุผล", "context_needed": "บริบทจากประวัติที่ต้องใช้" }}""" try: response = chat_client.chat.completions.create( model=CHAT_MODEL, messages=[{"role": "user", "content": analysis_prompt}], temperature=0.1, response_format={"type": "json_object"} ) result = json.loads(response.choices[0].message.content) return { **state, "is_follow_up": result.get("is_follow_up", False), "error": None } except Exception as e: return {**state, "error": f"Query analysis failed: {str(e)}"}

========== Node 2: Retrieval ==========

def retrieve_documents(state: RAGState) -> RAGState: """ค้นหาเอกสารจาก vector database""" from chromadb import ChromaClient from config import CHROMA_PERSIST_DIR, COLLECTION_NAME question = state["question"] is_follow_up = state["is_follow_up"] messages = state["messages"] # ถ้าเป็นคำถามต่อเนื่อง รวม context จากประวัติ search_query = question if is_follow_up and messages: context_summary = f"ประเด็นก่อนหน้า: {messages[-2].content if len(messages) >= 2 else ''}" search_query = f"{context_summary}\n\n{question}" try: # สร้าง embedding จาก HolySheep embedding_response = embedding_client.embeddings.create( model="text-embedding-3-small", input=search_query ) query_embedding = embedding_response.data[0].embedding # ค้นหาใน ChromaDB client = ChromaClient(path=CHROMA_PERSIST_DIR) collection = client.get_collection(name=COLLECTION_NAME) results = collection.query( query_embeddings=[query_embedding], n_results=10, include=["documents", "metadatas"] ) documents = [] if results and results.get("documents"): for i, doc in enumerate(results["documents"][0]): documents.append({ "content": doc, "metadata": results["metadatas"][0][i] if results.get("metadatas") else {}, "score": results.get("distances", [[1.0]])[0][i] }) return { **state, "documents": documents, "error": None } except Exception as e: return {**state, "documents": [], "error": f"Retrieval failed: {str(e)}"}

========== Node 3: Document Grading ==========

def grade_documents(state: RAGState) -> RAGState: """ประเมินความเกี่ยวข้องของเอกสาร""" question = state["question"] documents = state["documents"] if not documents: return {**state, "relevant_docs": [], "confidence_score": 0.0} grading_prompt = f"""ประเมินความเกี่ยวข้องของเอกสารต่อไปนี้กับคำถาม: "{question}" เอกสาร: {chr(10).join([f"[{i+1}] {doc['content'][:500]}..." for i, doc in enumerate(documents)])} สำหรับแต่ละเอกสาร ให้คะแนน relevance 1-10 และตอบเป็น JSON: {{ "grades": [ {{"index": 1, "score": 8, "reason": "เหตุผล"}}, ... ], "overall_confidence": 0.85 }}""" try: response = chat_client.chat.completions.create( model=CHAT_MODEL, messages=[{"role": "user", "content": grading_prompt}], temperature=0.1, response_format={"type": "json_object"} ) result = json.loads(response.choices[0].message.content) grades = {g["index"]: g["score"] for g in result.get("grades", [])} # กรองเอกสารที่คะแนน >= 6 relevant_docs = [ doc for i, doc in enumerate(documents) if grades.get(i+1, 0) >= 6 ] return { **state, "relevant_docs": relevant_docs, "confidence_score": result.get("overall_confidence", 0.0), "error": None } except Exception as e: return {**state, "error": f"Grading failed: {str(e)}"}

========== Node 4: Answer Generation ==========

def generate_answer(state: RAGState) -> RAGState: """สร้างคำตอบจากเอกสารที่เกี่ยวข้อง""" question = state["question"] relevant_docs = state["relevant_docs"] session_id = state["session_id"] if not relevant_docs: answer = "ขออภัย ฉันไม่พบเอกสารที่เกี่ยวข้องกับคำถามของคุณในฐานข้อมูล" return { **state, "answer": answer, "messages": state["messages"] + [AIMessage(content=answer)] } # สร้าง context จาก relevant docs context = "\n\n".join([f"เอกสารที่ {i+1}:\n{doc['content']}" for i, doc in enumerate(relevant_docs)]) generation_prompt = f"""คุณเป็นผู้ช่วย AI ที่ตอบคำถามจากเอกสารองค์กร Session ID: {session_id} เอกสารที่เกี่ยวข้อง: {context} คำถาม: {question} คำแนะนำ: 1. ตอบเป็นภาษาไทย 2. อ้างอิงแหล่งที่มาจากเอกสารที่ให้มา 3. ถ้าไม่แน่ใจ ให้บอกว่าไม่มีข้อมูลในเอกสาร 4. ความมั่นใจ: {state['confidence_score']:.0%}""" try: response = chat_client.chat.completions.create( model=CHAT_MODEL, messages=[{"role": "user", "content": generation_prompt}], temperature=0.3, max_tokens=2000 ) answer = response.choices[0].message.content return { **state, "answer": answer, "messages": state["messages"] + [AIMessage(content=answer)], "error": None } except Exception as e: return {**state, "error": f"Generation failed: {str(e)}"}

========== Node 5: Track Session ==========

def track_session(state: RAGState) -> RAGState: """บันทึกประวัติการสนทนา""" import sqlite3 from datetime import datetime session_id = state["session_id"] user_id = state["user_id"] question = state["question"] answer = state["answer"] confidence = state["confidence_score"] try: conn = sqlite3.connect("sessions.db") cursor = conn.cursor() cursor.execute(""" INSERT INTO conversations (session_id, user_id, question, answer, confidence, timestamp) VALUES (?, ?, ?, ?, ?, ?) """, (session_id, user_id, question, answer, confidence, datetime.now().isoformat())) conn.commit() conn.close() return {**state, "error": None} except Exception as e: return {**state, "error": f"Session tracking failed: {str(e)}"}

สร้าง Graph และ Wiring ทั้งหมด

# rag_graph.py - สร้าง LangGraph
from langgraph.graph import StateGraph, END
from state_schema import RAGState, ConfigState
from nodes import (
    analyze_query, 
    retrieve_documents, 
    grade_documents, 
    generate_answer,
    track_session
)

def create_rag_graph():
    """สร้าง RAG workflow graph"""
    
    # เริ่มต้น graph ด้วย state schema
    workflow = StateGraph(RAGState)
    
    # เพิ่ม nodes ทั้งหมด
    workflow.add_node("analyze", analyze_query)
    workflow.add_node("retrieve", retrieve_documents)
    workflow.add_node("grade", grade_documents)
    workflow.add_node("generate", generate_answer)
    workflow.add_node("track", track_session)
    
    # กำหนด entry point
    workflow.set_entry_point("analyze")
    
    # กำหนด edges
    workflow.add_edge("analyze", "retrieve")
    workflow.add_edge("retrieve", "grade")
    
    # Conditional edge: ถ้าไม่มี relevant docs ให้ generate answer ทันที
    workflow.add_conditional_edges(
        "grade",
        lambda state: "skip_generate" if not state.get("relevant_docs") else "continue",
        {
            "skip_generate": "track",
            "continue": "generate"
        }
    )
    
    workflow.add_edge("generate", "track")
    
    # จบ workflow
    workflow.add_edge("track", END)
    
    return workflow.compile()

========== การใช้งาน Graph ==========

def run_rag_query( question: str, session_id: str, user_id: str ) -> dict: """รัน RAG query ผ่าน LangGraph""" from langgraph.checkpoint.sqlite import SqliteSaver # สร้าง checkpointer สำหรับ state persistence checkpointer = SqliteSaver.from_conn_string(":memory:") # สร้าง graph พร้อม checkpointer graph = create_rag_graph() # สร้าง initial state initial_state = RAGState( messages=[HumanMessage(content=question)], question=question, documents=[], relevant_docs=[], answer="", confidence_score=0.0, session_id=session_id, is_follow_up=False, error=None, user_id=user_id ) # รัน graph config = {"configurable": {"thread_id": session_id}} result = graph.invoke(initial_state, config) return { "answer": result["answer"], "confidence": result["confidence_score"], "sources": len(result["relevant_docs"]), "error": result.get("error") }

ตัวอย่างการใช้งาน

if __name__ == "__main__": result = run_rag_query( question="นโยบายการลาพนักงานเป็นอย่างไร?", session_id="session_001", user_id="user_123" ) print(f"คำตอบ: {result['answer']}") print(f"ความมั่นใจ: {result['confidence']:.1%}") print(f"แหล่งอ้างอิง: {result['sources']} ฉบับ")

Deploy บน Production ด้วย Streaming

สำหรับ production deployment ผมแนะนำให้ implement streaming response เพื่อประสบการณ์ผู้ใช้ที่ดี

# server.py - FastAPI server with streaming
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import uuid
from rag_graph import run_rag_query, create_rag_graph
import asyncio

app = FastAPI(title="Enterprise RAG API", version="1.0.0")

class QueryRequest(BaseModel):
    question: str
    session_id: str | None = None
    user_id: str

class QueryResponse(BaseModel):
    answer: str
    confidence: float
    sources: int

@app.post("/api/query", response_model=QueryResponse)
async def query(request: QueryRequest):
    """Endpoint สำหรับ query ปกติ"""
    
    session_id = request.session_id or str(uuid.uuid4())
    
    try:
        result = run_rag_query(
            question=request.question,
            session_id=session_id,
            user_id=request.user_id
        )
        
        if result.get("error"):
            raise HTTPException(status_code=500, detail=result["error"])
        
        return QueryResponse(
            answer=result["answer"],
            confidence=result["confidence"],
            sources=result["sources"]
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/query/stream")
async def query_stream(request: QueryRequest):
    """Streaming endpoint สำหรับ real-time response"""
    
    session_id = request.session_id or str(uuid.uuid4())
    
    async def event_generator():
        from config import chat_client, CHAT_MODEL
        
        # ส่ง immediate acknowledgment
        yield f"data: {{'status': 'started', 'session_id': '{session_id}'}}\n\n"
        
        # รัน query
        result = run_rag_query(
            question=request.question,
            session_id=session_id,
            user_id=request.user_id
        )
        
        # Stream the answer word by word
        answer = result["answer"]
        words = answer.split()
        
        for i, word in enumerate(words):
            yield f"data: {{'word': '{word}', 'progress': {i/len(words):.2f}}}\n\n"
            await asyncio.sleep(0.05)  # Simulate streaming delay
        
        yield f"data: {{'status': 'done', 'confidence': {result['confidence']:.2f}}}\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

@app.get("/api/health")
async def health():
    return {"status": "healthy", "model": CHAT_MODEL}

รันด้วย: uvicorn server:app --host 0.0.0.0 --port 8000

ข้อผิดพลาดที่พบบ่อยและวิธีแก้ไข

จากประสบการณ์ตรงที่ใช้งาน LangGraph มาหลายเดือน ผมรวบรวมข้อผิดพลาดที่พบบ่อยที่สุดมาฝาก

ข้อผิดพลาดที่ 1: State Not Persisting Between Nodes

# ❌ วิธีที่ผิด - State ถูก overwrite
def node_a(state):
    return {"messages": [...], "question": "new question"}

def node_b(state):
    # จะไม่เห็น messages จาก node_a เพราะ state ถูก reset
    return {"messages": [...]}  # WRONG!

✅ วิธีที่ถูก - ใช้ spread operator

def node_a(state): return {"question": "new question"} # ไม่ต้อง return messages def node_b(state): # state จะมีทั้ง messages (จากก่อนหน้า) และ question (จาก node_a) return {"answer": f"Question: {state['question']}"}

ข้อผิดพลาดที่ 2: HolySheep API Timeout

# ❌ วิธีที่ผิด - ไม่มี retry logic
response = chat_client.chat.completions.create(
    model="gpt-4.1",
    messages=messages
)