ในฐานะวิศวกรที่ทำงานด้าน AI Agent มาหลายปี ผมเชื่อว่า ReAct (Reasoning + Acting) เป็นรูปแบบที่สำคัญที่สุดในการสร้าง LLM-powered Agent ที่เชื่อถือได้ ในบทความนี้ผมจะแชร์ประสบการณ์ตรงในการ implement ReAct Agent ด้วย Python พร้อมโค้ด production-ready และ optimization ที่ใช้ได้จริง

ReAct Agent คืออะไร

ReAct เป็นรูปแบบการทำงานของ Agent ที่ผสมผสาน Reasoning (การคิดวิเคราะห์) และ Acting (การกระทำ) เข้าด้วยกัน โดย LLM จะทำหน้าที่คิดก่อนว่าควรทำอะไรต่อไป แล้วสั่งการ tool ให้ทำงาน แล้วนำผลลัพธ์กลับมาคิดอีกครั้งจนได้คำตอบสุดท้าย

สถาปัตยกรรมหลักของ ReAct Agent

จากประสบการณ์การ implement หลายโปรเจกต์ สถาปัตยกรรมที่ดีประกอบด้วย 4 ส่วนหลัก:

การติดตั้งและ Setup

pip install openai httpx aiofiles pydantic tenacity

Implementation ฉบับเต็ม

import os
import json
import httpx
from typing import TypedDict, List, Optional, Literal
from dataclasses import dataclass, field
from tenacity import retry, stop_after_attempt, wait_exponential

Configuration — ใช้ HolySheep AI API

BASE_URL = "https://api.holysheep.ai/v1" API_KEY = os.environ.get("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY") @dataclass class Tool: """โครงสร้าง Tool สำหรับ Agent""" name: str description: str parameters: dict function: callable @dataclass class ReActMessage: """โครงสร้าง message ใน ReAct loop""" role: str content: str tool_calls: Optional[List[dict]] = None tool_results: Optional[List[dict]] = None @dataclass class ReActAgent: """ReAct Agent core implementation""" tools: List[Tool] = field(default_factory=list) max_iterations: int = 10 model: str = "gpt-4.1" def __post_init__(self): self.conversation_history: List[ReActMessage] = [] self.client = httpx.AsyncClient(timeout=60.0) def register_tool(self, name: str, description: str, parameters: dict, function: callable): """ลงทะเบียน tool ให้ Agent ใช้งาน""" tool = Tool( name=name, description=description, parameters=parameters, function=function ) self.tools.append(tool) return self def _build_system_prompt(self) -> str: """สร้าง system prompt พร้อม tool descriptions""" tool_descriptions = "\n".join([ f"- {t.name}: {t.description} (params: {list(t.parameters.get('properties', {}).keys())})" for t in self.tools ]) return f"""คุณเป็น ReAct Agent ที่คิดอย่างมีเหตุผลก่อนทำการ มีเครื่องมือดังนี้ให้ใช้งาน: {tool_descriptions} ห้ามคิดว่าตัวเองรู้ทุกอย่าง — ต้องใช้ tool เมื่อต้องการข้อมูลจริง""" @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) async def _call_llm(self, messages: List[dict]) -> dict: """เรียก LLM ผ่าน HolySheep API พร้อม retry logic""" async with self.client as client: response = await client.post( f"{BASE_URL}/chat/completions", headers={ "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" }, json={ "model": self.model, "messages": messages, "temperature": 0.1, "max_tokens": 2000 } ) response.raise_for_status() return response.json() def _parse_tool_calls(self, content: str) -> List[dict]: """แยกวิเคราะห์ tool calls จาก LLM response""" import re pattern = r'(.*?)' matches = re.findall(pattern, content, re.DOTALL) tool_calls = [] for match in matches: try: tool_data = json.loads(match) tool_calls.append(tool_data) except json.JSONDecodeError: continue return tool_calls async def execute_tool(self, tool_name: str, arguments: dict) -> str: """Execute tool และ return ผลลัพธ์""" for tool in self.tools: if tool.name == tool_name: try: result = await tool.function(**arguments) return json.dumps(result, ensure_ascii=False) except Exception as e: return f"Error: {str(e)}" return f"Tool '{tool_name}' not found" async def run(self, user_query: str) -> dict: """Main ReAct loop""" messages = [ {"role": "system", "content": self._build_system_prompt()} ] for iteration in range(self.max_iterations): # 1. ส่ง query ให้ LLM คิด messages.append({"role": "user", "content": user_query}) response = await self._call_llm(messages) assistant_message = response["choices"][0]["message"]["content"] # 2. ตรวจสอบว่ามี tool calls หรือไม่ tool_calls = self._parse_tool_calls(assistant_message) if not tool_calls: # ไม่มี tool calls = ได้คำตอบสุดท้าย return { "status": "completed", "answer": assistant_message, "iterations": iteration + 1, "history": messages } # 3. Execute tools tool_results = [] for call in tool_calls: tool_name = call.get("name") arguments = call.get("arguments", {}) result = await self.execute_tool(tool_name, arguments) tool_results.append({ "tool": tool_name, "result": result }) messages.append({ "role": "tool", "content": f"[{tool_name}] {result}" }) # 4. สร้าง context สำหรับรอบถัดไป user_query = f"ผลการทำงาน: {json.dumps(tool_results, ensure_ascii=False)}\nคิดต่ออย่างไร?" return { "status": "max_iterations_reached", "iterations": self.max_iterations, "history": messages } async def close(self): await self.client.aclose()

==== ตัวอย่างการใช้งาน ====

async def search_web(query: str) -> dict: """ตัวอย่าง Tool: ค้นหาข้อมูลจากเว็บ""" # Implement จริงจะใช้ search API return {"query": query, "results": ["result1", "result2"]} async def calculate(expression: str) -> dict: """ตัวอย่าง Tool: คำนวณทางคณิตศาสตร์""" import ast result = eval(expression) return {"expression": expression, "result": result} async def main(): # สร้าง Agent และลงทะเบียน tools agent = ReActAgent( max_iterations=5, model="gpt-4.1" ) agent.register_tool( name="search_web", description="ค้นหาข้อมูลจากอินเทอร์เน็ต", parameters={ "type": "object", "properties": { "query": {"type": "string", "description": "คำค้นหา"} } }, function=search_web ) agent.register_tool( name="calculate", description="คำนวณนิพจน์ทางคณิตศาสตร์", parameters={ "type": "object", "properties": { "expression": {"type": "string", "description": "นิพจน์ เช่น 2+2*3"} } }, function=calculate ) # Run agent result = await agent.run( "ถ้าผมมีเงิน 1000 บาท แล้วซื้อของราคา 234.50 บาท เหลือเงินเท่าไหร่ แล้วถ้าซื้อเพิ่มอีก 2 件 ราคาชิ้นละ 150 บาท จะเหลือเท่าไหร่" ) print(f"Status: {result['status']}") print(f"Iterations: {result['iterations']}") print(f"Answer: {result.get('answer', 'N/A')}") await agent.close() if __name__ == "__main__": import asyncio asyncio.run(main())

การควบคุม Concurrency และ Rate Limiting

ใน production environment การจัดการ concurrent requests อย่างเหมาะสมมีผลต่อประสิทธิภาพอย่างมาก ผมแนะนำใช้ Semaphore ร่วมกับ circuit breaker pattern

import asyncio
import time
from typing import Optional
from dataclasses import dataclass
from collections import defaultdict

@dataclass
class RateLimiter:
    """Token bucket rate limiter สำหรับ API calls"""
    max_requests_per_minute: int = 60
    max_tokens_per_minute: int = 100000
    
    def __post_init__(self):
        self.request_times: list = []
        self.token_counts: list = []
        self._lock = asyncio.Lock()
    
    async def acquire(self, tokens: int = 1000):
        """รอจนกว่าจะได้ permission สำหรับ request"""
        async with self._lock:
            now = time.time()
            # Clean up เก่ากว่า 1 นาที
            self.request_times = [t for t in self.request_times if now - t < 60]
            self.token_counts = [t for t, _ in zip(self.token_counts, self.request_times) 
                               if now - t < 60]
            
            total_tokens = sum(self.token_counts)
            
            # ตรวจสอบ rate limits
            if len(self.request_times) >= self.max_requests_per_minute:
                wait_time = 60 - (now - self.request_times[0])
                raise RateLimitError(f"Rate limit reached, wait {wait_time:.1f}s")
            
            if total_tokens + tokens > self.max_tokens_per_minute:
                wait_time = 60 - (now - self.request_times[0])
                raise RateLimitError(f"Token limit reached, wait {wait_time:.1f}s")
            
            # อนุญาต request
            self.request_times.append(now)
            self.token_counts.append(tokens)
    
    def get_remaining_capacity(self) -> dict:
        """ดู capacity ที่เหลืออยู่"""
        now = time.time()
        recent_requests = [t for t in self.request_times if now - t < 60]
        recent_tokens = sum(self.token_counts[:len(recent_requests)])
        return {
            "requests_remaining": self.max_requests_per_minute - len(recent_requests),
            "tokens_remaining": self.max_tokens_per_minute - recent_tokens
        }


class CircuitBreaker:
    """Circuit breaker pattern สำหรับ handle API failures"""
    
    def __init__(self, failure_threshold: int = 5, 
                 recovery_timeout: float = 60.0):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time: Optional[float] = None
        self.state = "closed"  # closed, open, half-open
        self._lock = asyncio.Lock()
    
    async def call(self, func, *args, **kwargs):
        async with self._lock:
            if self.state == "open":
                if time.time() - self.last_failure_time > self.recovery_timeout:
                    self.state = "half-open"
                else:
                    raise CircuitOpenError("Circuit breaker is open")
        
        try:
            result = await func(*args, **kwargs)
            async with self._lock:
                self.failure_count = 0
                self.state = "closed"
            return result
        except Exception as e:
            async with self._lock:
                self.failure_count += 1
                self.last_failure_time = time.time()
                if self.failure_count >= self.failure_threshold:
                    self.state = "open"
            raise


class ConcurrentAgentPool:
    """Pool สำหรับรัน multiple agentsพร้อมกัน"""
    
    def __init__(self, max_concurrent: int = 5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.rate_limiter = RateLimiter(
            max_requests_per_minute=60,
            max_tokens_per_minute=100000
        )
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=5,
            recovery_timeout=60.0
        )
    
    async def run_with_semaphore(self, agent, query: str) -> dict:
        async with self.semaphore:
            await self.rate_limiter.acquire(tokens=1500)
            
            async def guarded_run():
                return await agent.run(query)
            
            result = await self.circuit_breaker.call(guarded_run)
            return result
    
    async def run_batch(self, queries: List[str], 
                       agent_factory) -> List[dict]:
        """รันหลาย queries พร้อมกันอย่างปลอดภัย"""
        tasks = []
        for query in queries:
            agent = agent_factory()
            task = self.run_with_semaphore(agent, query)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results


==== Benchmark Code ====

async def benchmark_throughput(): """วัดประสิทธิภาพ throughput ของ system""" import statistics pool = ConcurrentAgentPool(max_concurrent=5) latencies = [] async def dummy_agent(): await asyncio.sleep(0.1) # Simulate API call return {"status": "ok", "latency_ms":