ในยุคที่ AI กำลังเปลี่ยนแปลงวิธีการทำงานของนักพัฒนา การสร้าง MCP Server (Model Context Protocol Server) ที่เชื่อมต่อกับฐานข้อมูล PostgreSQL เป็นทักษะที่จำเป็นอย่างยิ่ง ในบทความนี้ผมจะแชร์ประสบการณ์ตรงจากการพัฒนาระบบที่ใช้งานจริงในโปรเจกต์ E-commerce ขนาดใหญ่ ที่ต้องจัดการข้อมูลลูกค้ากว่า 2 ล้านรายการ
ทำไมต้องสร้าง MCP Server สำหรับ PostgreSQL
ในกรณีศึกษาของระบบ E-commerce ที่ผมเคยพัฒนา ทีมต้องการให้ AI สามารถ:
- ค้นหาข้อมูลลูกค้าจากฐานข้อมูล PostgreSQL แบบเรียลไทม์
- วิเคราะห์พฤติกรรมการซื้อของลูกค้าโดยอัตโนมัติ
- ตอบคำถามเกี่ยวกับสถานะออเดอร์โดยดึงข้อมูลจากฐานข้อมูล
- สร้างรายงานยอดขายแบบอัตโนมัติ
การใช้ MCP Server ช่วยให้ AI สามารถเข้าถึงข้อมูลจาก PostgreSQL ได้อย่างปลอดภัยและมีประสิทธิภาพ รวมถึงค่าใช้จ่ายที่ประหยัดมากด้วย HolySheep ที่มีราคาเริ่มต้นเพียง $0.42/MTok สำหรับ DeepSeek V3.2
การติดตั้งและตั้งค่า Environment
ก่อนเริ่มต้น ตรวจสอบให้แน่ใจว่าคุณมี Python 3.10 ขึ้นไปและได้ติดตั้ง dependencies ที่จำเป็นแล้ว
# สร้าง virtual environment
python -m venv mcp-postgres-env
source mcp-postgres-env/bin/activate # Linux/Mac
mcp-postgres-env\Scripts\activate # Windows
ติดตั้ง dependencies
pip install psycopg2-binary asyncpg mcp-server python-dotenv fastapi
สร้าง MCP Server สำหรับ PostgreSQL
โครงสร้างหลักของ MCP Server ที่ผมใช้งานจริงมีดังนี้ โค้ดนี้รองรับทั้ง synchronous และ asynchronous operations
import os
import json
from typing import Any, Optional
from dataclasses import dataclass
import psycopg2
from psycopg2.extras import RealDictCursor
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import asyncio
@dataclass
class PostgreSQLConfig:
host: str
port: int
database: str
user: str
password: str
class PostgreSQLMCPServer:
def __init__(self, config: PostgreSQLConfig):
self.config = config
self.server = Server("postgresql-mcp-server")
self._register_tools()
def _register_tools(self):
"""ลงทะเบียนเครื่องมือที่ AI สามารถใช้ได้"""
@self.server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="pg_query",
description="Execute a SELECT query on PostgreSQL database",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "SQL SELECT query"},
"params": {"type": "array", "description": "Query parameters"}
},
"required": ["query"]
}
),
Tool(
name="pg_get_table_info",
description="Get table structure and row count",
inputSchema={
"type": "object",
"properties": {
"table_name": {"type": "string"}
},
"required": ["table_name"]
}
),
Tool(
name="pg_get_customers",
description="Search customers by name or email",
inputSchema={
"type": "object",
"properties": {
"search_term": {"type": "string"},
"limit": {"type": "integer", "default": 10}
},
"required": ["search_term"]
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
if name == "pg_query":
return await self._execute_query(
arguments.get("query"),
arguments.get("params", [])
)
elif name == "pg_get_table_info":
return await self._get_table_info(arguments.get("table_name"))
elif name == "pg_get_customers":
return await self._get_customers(
arguments.get("search_term"),
arguments.get("limit", 10)
)
raise ValueError(f"Unknown tool: {name}")
def _get_connection(self):
"""สร้างการเชื่อมต่อฐานข้อมูล"""
return psycopg2.connect(
host=self.config.host,
port=self.config.port,
database=self.config.database,
user=self.config.user,
password=self.config.password
)
async def _execute_query(self, query: str, params: list) -> list[TextContent]:
"""รัน SQL query และคืนผลลัพธ์"""
try:
conn = self._get_connection()
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute(query, params)
if query.strip().upper().startswith("SELECT"):
rows = cursor.fetchall()
result = json.dumps([dict(row) for row in rows], default=str, indent=2)
else:
conn.commit()
result = json.dumps({"affected_rows": cursor.rowcount})
cursor.close()
conn.close()
return [TextContent(type="text", text=result)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _get_table_info(self, table_name: str) -> list[TextContent]:
"""ดึงข้อมูลโครงสร้างตาราง"""
query = """
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = %s
ORDER BY ordinal_position
"""
return await self._execute_query(query, [table_name])
async def _get_customers(self, search_term: str, limit: int) -> list[TextContent]:
"""ค้นหาลูกค้าตามชื่อหรืออีเมล"""
query = """
SELECT id, name, email, created_at
FROM customers
WHERE name ILIKE %s OR email ILIKE %s
LIMIT %s
"""
search_pattern = f"%{search_term}%"
return await self._execute_query(query, [search_pattern, search_pattern, limit])
async def run(self):
"""เริ่มต้น MCP Server"""
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
การใช้งาน
if __name__ == "__main__":
config = PostgreSQLConfig(
host=os.getenv("PG_HOST", "localhost"),
port=int(os.getenv("PG_PORT", "5432")),
database=os.getenv("PG_DATABASE", "ecommerce"),
user=os.getenv("PG_USER", "admin"),
password=os.getenv("PG_PASSWORD", "")
)
server = PostgreSQLMCPServer(config)
asyncio.run(server.run())
การเชื่อมต่อกับ HolySheep AI
หลังจากสร้าง MCP Server แล้ว ต่อไปคือการเชื่อมต่อกับ HolySheep AI เพื่อใช้ AI วิเคราะห์ข้อมูล โดย HolySheep มีความเร็วในการตอบสนองน้อยกว่า 50ms พร้อมรองรับหลายโมเดล
import os
from openai import OpenAI
เชื่อมต่อกับ HolySheep AI
client = OpenAI(
api_key=os.getenv("HOLYSHEEP_API_KEY"), # YOUR_HOLYSHEEP_API_KEY
base_url="https://api.holysheep.ai/v1" # ห้ามใช้ api.openai.com
)
def chat_with_database(question: str, mcp_server_process):
"""ส่งคำถามไปยัง AI พร้อมเข้าถึงฐานข้อมูลผ่าน MCP"""
response = client.chat.completions.create(
model="claude-sonnet-4.5", # หรือ gpt-4.1, gemini-2.5-flash
messages=[
{
"role": "system",
"content": """คุณเป็น AI Assistant ที่สามารถเข้าถึงฐานข้อมูล PostgreSQL
ใช้เครื่องมือ pg_query, pg_get_customers, pg_get_table_info เพื่อดึงข้อมูล
ตอบเป็นภาษาไทยและแสดงข้อมูลในรูปแบบที่อ่านง่าย"""
},
{
"role": "user",
"content": question
}
],
tools=[
{
"type": "function",
"function": {
"name": "pg_query",
"description": "Execute a SELECT query on PostgreSQL database",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"params": {"type": "array"}
}
}
}
},
{
"type": "function",
"function": {
"name": "pg_get_customers",
"description": "Search customers by name or email",
"parameters": {
"type": "object",
"properties": {
"search_term": {"type": "string"},
"limit": {"type": "integer"}
}
}
}
}
],
tool_choice="auto"
)
return response.choices[0].message
ตัวอย่างการใช้งาน
if __name__ == "__main__":
os.environ["HOLYSHEEP_API_KEY"] = "YOUR_HOLYSHEEP_API_KEY"
# ค้นหาข้อมูลลูกค้าที่มียอดสั่งซื้อสูงสุด
result = chat_with_database(
"แสดงลูกค้าที่มียอดสั่งซื้อสูงสุด 5 อันดับแรก"
)
print(result.content)
# วิเคราะห์ยอดขายรายเดือน
result = chat_with_database(
"สรุปยอดขายเดือนนี้เทียบกับเดือนที่แล้ว"
)
print(result.content)
ตัวอย่างการใช้งานจริง: ระบบ Customer Service AI
ในโปรเจกต์จริงที่ผมพัฒนา ระบบนี้ช่วยให้ทีม Customer Service ตอบคำถามลูกค้าได้รวดเร็วขึ้น 70% โดย AI สามารถ:
- ตรวจสอบสถานะออเดอร์โดยดึงข้อมูลจากตาราง orders
- ค้นหาประวัติการสั่งซื้อของลูกค้าจากตาราง order_items
- ตรวจสอบข้อมูลจัดส่งจากตาราง shipments
- วิเคราะห์ปัญหาที่พบบ่อยและเสนอวิธีแก้
ค่าใช้จ่ายรายเดือนสำหรับระบบนี้อยู่ที่ประมาณ $150/เดือน หากใช้ Claude Sonnet 4.5 กับ HolySheep ซึ่งราคาอยู่ที่ $15/MTok เทียบกับ $30+ หากใช้ผู้ให้บริการอื่น ทำให้ประหยัดได้มากกว่า 50%
ข้อผิดพลาดที่พบบ่อยและวิธีแก้ไข
1. ปัญหา: Connection Refused หรือ Timeout
สาเหตุ: ฐานข้อมูล PostgreSQL ไม่ได้รับอนุญาตให้รับการเชื่อมต่อจากภายนอก หรือ Firewall บล็อก port
# แก้ไข: ตรวจสอบไฟล์ pg_hba.conf
เพิ่มบรรทัดนี้เพื่ออนุญาตการเชื่อมต่อจาก IP ของ MCP Server
host all all 192.168.1.100/32 md5
แก้ไข: ตรวจสอบไฟล์ postgresql.conf
listen_addresses = '*' # เปลี่ยนจาก 'localhost'
port = 5432
แก้ไข: Restart PostgreSQL
sudo systemctl restart postgresql
2. ปัญหา: คืนค่าเป็น Empty Array แม้ว่าข้อมูลมีอยู่จริง
สาเหตุ: Transaction ถูก rollback โดย