ในฐานะวิศวกรที่ทำงานด้าน AI Agent มาหลายปี ผมเชื่อว่า ReAct (Reasoning + Acting) เป็นรูปแบบที่สำคัญที่สุดในการสร้าง LLM-powered Agent ที่เชื่อถือได้ ในบทความนี้ผมจะแชร์ประสบการณ์ตรงในการ implement ReAct Agent ด้วย Python พร้อมโค้ด production-ready และ optimization ที่ใช้ได้จริง
ReAct Agent คืออะไร
ReAct เป็นรูปแบบการทำงานของ Agent ที่ผสมผสาน Reasoning (การคิดวิเคราะห์) และ Acting (การกระทำ) เข้าด้วยกัน โดย LLM จะทำหน้าที่คิดก่อนว่าควรทำอะไรต่อไป แล้วสั่งการ tool ให้ทำงาน แล้วนำผลลัพธ์กลับมาคิดอีกครั้งจนได้คำตอบสุดท้าย
สถาปัตยกรรมหลักของ ReAct Agent
จากประสบการณ์การ implement หลายโปรเจกต์ สถาปัตยกรรมที่ดีประกอบด้วย 4 ส่วนหลัก:
- Prompt Manager — จัดการ template และ context
- Tool Registry — ลงทะเบียนและ execute tools
- Memory Module — เก็บ conversation history และ intermediate steps
- Loop Controller — ควบคุม max iterations และ stop conditions
การติดตั้งและ Setup
pip install openai httpx aiofiles pydantic tenacity
Implementation ฉบับเต็ม
import os
import json
import httpx
from typing import TypedDict, List, Optional, Literal
from dataclasses import dataclass, field
from tenacity import retry, stop_after_attempt, wait_exponential
Configuration — ใช้ HolySheep AI API
BASE_URL = "https://api.holysheep.ai/v1"
API_KEY = os.environ.get("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY")
@dataclass
class Tool:
"""โครงสร้าง Tool สำหรับ Agent"""
name: str
description: str
parameters: dict
function: callable
@dataclass
class ReActMessage:
"""โครงสร้าง message ใน ReAct loop"""
role: str
content: str
tool_calls: Optional[List[dict]] = None
tool_results: Optional[List[dict]] = None
@dataclass
class ReActAgent:
"""ReAct Agent core implementation"""
tools: List[Tool] = field(default_factory=list)
max_iterations: int = 10
model: str = "gpt-4.1"
def __post_init__(self):
self.conversation_history: List[ReActMessage] = []
self.client = httpx.AsyncClient(timeout=60.0)
def register_tool(self, name: str, description: str,
parameters: dict, function: callable):
"""ลงทะเบียน tool ให้ Agent ใช้งาน"""
tool = Tool(
name=name,
description=description,
parameters=parameters,
function=function
)
self.tools.append(tool)
return self
def _build_system_prompt(self) -> str:
"""สร้าง system prompt พร้อม tool descriptions"""
tool_descriptions = "\n".join([
f"- {t.name}: {t.description} (params: {list(t.parameters.get('properties', {}).keys())})"
for t in self.tools
])
return f"""คุณเป็น ReAct Agent ที่คิดอย่างมีเหตุผลก่อนทำการ
มีเครื่องมือดังนี้ให้ใช้งาน:
{tool_descriptions}
ห้ามคิดว่าตัวเองรู้ทุกอย่าง — ต้องใช้ tool เมื่อต้องการข้อมูลจริง"""
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
async def _call_llm(self, messages: List[dict]) -> dict:
"""เรียก LLM ผ่าน HolySheep API พร้อม retry logic"""
async with self.client as client:
response = await client.post(
f"{BASE_URL}/chat/completions",
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
},
json={
"model": self.model,
"messages": messages,
"temperature": 0.1,
"max_tokens": 2000
}
)
response.raise_for_status()
return response.json()
def _parse_tool_calls(self, content: str) -> List[dict]:
"""แยกวิเคราะห์ tool calls จาก LLM response"""
import re
pattern = r'(.*?) '
matches = re.findall(pattern, content, re.DOTALL)
tool_calls = []
for match in matches:
try:
tool_data = json.loads(match)
tool_calls.append(tool_data)
except json.JSONDecodeError:
continue
return tool_calls
async def execute_tool(self, tool_name: str, arguments: dict) -> str:
"""Execute tool และ return ผลลัพธ์"""
for tool in self.tools:
if tool.name == tool_name:
try:
result = await tool.function(**arguments)
return json.dumps(result, ensure_ascii=False)
except Exception as e:
return f"Error: {str(e)}"
return f"Tool '{tool_name}' not found"
async def run(self, user_query: str) -> dict:
"""Main ReAct loop"""
messages = [
{"role": "system", "content": self._build_system_prompt()}
]
for iteration in range(self.max_iterations):
# 1. ส่ง query ให้ LLM คิด
messages.append({"role": "user", "content": user_query})
response = await self._call_llm(messages)
assistant_message = response["choices"][0]["message"]["content"]
# 2. ตรวจสอบว่ามี tool calls หรือไม่
tool_calls = self._parse_tool_calls(assistant_message)
if not tool_calls:
# ไม่มี tool calls = ได้คำตอบสุดท้าย
return {
"status": "completed",
"answer": assistant_message,
"iterations": iteration + 1,
"history": messages
}
# 3. Execute tools
tool_results = []
for call in tool_calls:
tool_name = call.get("name")
arguments = call.get("arguments", {})
result = await self.execute_tool(tool_name, arguments)
tool_results.append({
"tool": tool_name,
"result": result
})
messages.append({
"role": "tool",
"content": f"[{tool_name}] {result}"
})
# 4. สร้าง context สำหรับรอบถัดไป
user_query = f"ผลการทำงาน: {json.dumps(tool_results, ensure_ascii=False)}\nคิดต่ออย่างไร?"
return {
"status": "max_iterations_reached",
"iterations": self.max_iterations,
"history": messages
}
async def close(self):
await self.client.aclose()
==== ตัวอย่างการใช้งาน ====
async def search_web(query: str) -> dict:
"""ตัวอย่าง Tool: ค้นหาข้อมูลจากเว็บ"""
# Implement จริงจะใช้ search API
return {"query": query, "results": ["result1", "result2"]}
async def calculate(expression: str) -> dict:
"""ตัวอย่าง Tool: คำนวณทางคณิตศาสตร์"""
import ast
result = eval(expression)
return {"expression": expression, "result": result}
async def main():
# สร้าง Agent และลงทะเบียน tools
agent = ReActAgent(
max_iterations=5,
model="gpt-4.1"
)
agent.register_tool(
name="search_web",
description="ค้นหาข้อมูลจากอินเทอร์เน็ต",
parameters={
"type": "object",
"properties": {
"query": {"type": "string", "description": "คำค้นหา"}
}
},
function=search_web
)
agent.register_tool(
name="calculate",
description="คำนวณนิพจน์ทางคณิตศาสตร์",
parameters={
"type": "object",
"properties": {
"expression": {"type": "string", "description": "นิพจน์ เช่น 2+2*3"}
}
},
function=calculate
)
# Run agent
result = await agent.run(
"ถ้าผมมีเงิน 1000 บาท แล้วซื้อของราคา 234.50 บาท เหลือเงินเท่าไหร่ แล้วถ้าซื้อเพิ่มอีก 2 件 ราคาชิ้นละ 150 บาท จะเหลือเท่าไหร่"
)
print(f"Status: {result['status']}")
print(f"Iterations: {result['iterations']}")
print(f"Answer: {result.get('answer', 'N/A')}")
await agent.close()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
การควบคุม Concurrency และ Rate Limiting
ใน production environment การจัดการ concurrent requests อย่างเหมาะสมมีผลต่อประสิทธิภาพอย่างมาก ผมแนะนำใช้ Semaphore ร่วมกับ circuit breaker pattern
import asyncio
import time
from typing import Optional
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class RateLimiter:
"""Token bucket rate limiter สำหรับ API calls"""
max_requests_per_minute: int = 60
max_tokens_per_minute: int = 100000
def __post_init__(self):
self.request_times: list = []
self.token_counts: list = []
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1000):
"""รอจนกว่าจะได้ permission สำหรับ request"""
async with self._lock:
now = time.time()
# Clean up เก่ากว่า 1 นาที
self.request_times = [t for t in self.request_times if now - t < 60]
self.token_counts = [t for t, _ in zip(self.token_counts, self.request_times)
if now - t < 60]
total_tokens = sum(self.token_counts)
# ตรวจสอบ rate limits
if len(self.request_times) >= self.max_requests_per_minute:
wait_time = 60 - (now - self.request_times[0])
raise RateLimitError(f"Rate limit reached, wait {wait_time:.1f}s")
if total_tokens + tokens > self.max_tokens_per_minute:
wait_time = 60 - (now - self.request_times[0])
raise RateLimitError(f"Token limit reached, wait {wait_time:.1f}s")
# อนุญาต request
self.request_times.append(now)
self.token_counts.append(tokens)
def get_remaining_capacity(self) -> dict:
"""ดู capacity ที่เหลืออยู่"""
now = time.time()
recent_requests = [t for t in self.request_times if now - t < 60]
recent_tokens = sum(self.token_counts[:len(recent_requests)])
return {
"requests_remaining": self.max_requests_per_minute - len(recent_requests),
"tokens_remaining": self.max_tokens_per_minute - recent_tokens
}
class CircuitBreaker:
"""Circuit breaker pattern สำหรับ handle API failures"""
def __init__(self, failure_threshold: int = 5,
recovery_timeout: float = 60.0):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time: Optional[float] = None
self.state = "closed" # closed, open, half-open
self._lock = asyncio.Lock()
async def call(self, func, *args, **kwargs):
async with self._lock:
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half-open"
else:
raise CircuitOpenError("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
async with self._lock:
self.failure_count = 0
self.state = "closed"
return result
except Exception as e:
async with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise
class ConcurrentAgentPool:
"""Pool สำหรับรัน multiple agentsพร้อมกัน"""
def __init__(self, max_concurrent: int = 5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.rate_limiter = RateLimiter(
max_requests_per_minute=60,
max_tokens_per_minute=100000
)
self.circuit_breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=60.0
)
async def run_with_semaphore(self, agent, query: str) -> dict:
async with self.semaphore:
await self.rate_limiter.acquire(tokens=1500)
async def guarded_run():
return await agent.run(query)
result = await self.circuit_breaker.call(guarded_run)
return result
async def run_batch(self, queries: List[str],
agent_factory) -> List[dict]:
"""รันหลาย queries พร้อมกันอย่างปลอดภัย"""
tasks = []
for query in queries:
agent = agent_factory()
task = self.run_with_semaphore(agent, query)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
==== Benchmark Code ====
async def benchmark_throughput():
"""วัดประสิทธิภาพ throughput ของ system"""
import statistics
pool = ConcurrentAgentPool(max_concurrent=5)
latencies = []
async def dummy_agent():
await asyncio.sleep(0.1) # Simulate API call
return {"status": "ok", "latency_ms":