Last week, I spent three hours debugging a ConnectionError: timeout after 30000ms that was blocking our production data pipeline. The culprit? Our MCP server was trying to connect to a read replica that had silently gone offline. Since then, I've built a bulletproof architecture for MCP Server + PostgreSQL integration that I want to share with you.

In this tutorial, you'll learn how to build a custom Model Context Protocol (MCP) Server that enables AI assistants to query your PostgreSQL databases with enterprise-grade reliability. We'll use HolySheep AI for the LLM backend—at $1 per dollar (vs industry average ¥7.3), that's 85%+ savings with sub-50ms latency and WeChat/Alipay support.

What is MCP and Why Does It Matter for Database Queries?

The Model Context Protocol (MCP) is an open standard that lets AI assistants connect to external data sources and tools. Rather than building fragile prompt-engineered SQL generators, MCP provides structured, type-safe tool definitions that AI models can invoke reliably.

With a custom MCP Server for PostgreSQL, you get:

Architecture Overview

Our solution has three components:


Project structure

mcp-postgres-server/ ├── server.py # Main MCP server implementation ├── config.yaml # Database and API configuration ├── requirements.txt # Python dependencies └── test_connection.py # Integration tests

Prerequisites

Step 1: Install Dependencies


Create virtual environment

python3 -m venv mcp-env source mcp-env/bin/activate

Install required packages

pip install --upgrade pip pip install \ mcp[cli] \ psycopg2-binary \ asyncpg \ sqlalchemy \ pyyaml \ httpx \ pydantic

Verify installation

python -c "import mcp; print(f'MCP SDK version: {mcp.__version__}')"

Step 2: Configure Your Environment

Create a config.yaml file with your database credentials. Never commit this file to version control—use environment variables in production.


config.yaml

database: host: "your-postgres-host.example.com" port: 5432 database: "production_db" username: "mcp_service_account" # Use environment variable: os.environ.get('DB_PASSWORD') password: "${DB_PASSWORD}" pool_size: 10 max_overflow: 20 pool_timeout: 30 connection_timeout: 5000 # milliseconds holysheep: base_url: "https://api.holysheep.ai/v1" api_key: "${HOLYSHEEP_API_KEY}" model: "gpt-4.1" # $8/MTok input, $8/MTok output temperature: 0.1 max_tokens: 1000 mcp: host: "0.0.0.0" port: 8765 debug: false security: max_rows_returned: 1000 allowed_schemas: ["public", "analytics"] blocked_tables: ["users", "passwords", "api_keys"]

Step 3: Implement the MCP Server

Here's the complete MCP server implementation with connection pooling, retry logic, and comprehensive error handling:


"""
Custom MCP Server for PostgreSQL Database Queries
Powered by HolySheep AI (https://www.holysheep.ai)
"""

import os
import json
import asyncio
from typing import Optional, List, Dict, Any
from contextlib import asynccontextmanager

import asyncpg
import httpx
import yaml
from pydantic import BaseModel
from mcp.server import Server
from mcp.types import Tool, TextContent
from mcp.server.stdio import stdio_server

Load configuration

def load_config() -> dict: config_path = os.path.join(os.path.dirname(__file__), 'config.yaml') with open(config_path) as f: config = yaml.safe_load(f) # Substitute environment variables for section in config: for key, value in config[section].items(): if isinstance(value, str) and value.startswith('${') and value.endswith('}'): env_var = value[2:-1] config[section][key] = os.environ.get(env_var, '') return config CONFIG = load_config()

Initialize MCP Server

server = Server("postgres-mcp-server") class DatabaseConnectionManager: """Manages PostgreSQL connection pool with automatic reconnection.""" def __init__(self, config: dict): self.config = config['database'] self.pool: Optional[asyncpg.Pool] = None async def connect(self): """Initialize connection pool.""" if self.pool is None: self.pool = await asyncpg.create_pool( host=self.config['host'], port=self.config['port'], user=self.config['username'], password=self.config['password'], database=self.config['database'], min_size=2, max_size=self.config['pool_size'], command_timeout=self.config['connection_timeout'] / 1000, ) print(f"✅ Connected to PostgreSQL at {self.config['host']}:{self.config['port']}") async def disconnect(self): """Close all connections gracefully.""" if self.pool: await self.pool.close() print("🔌 Database connections closed") @asynccontextmanager async def acquire(self): """Context manager for acquiring connection from pool.""" async with self.pool.acquire() as connection: yield connection async def execute_query( self, query: str, params: Optional[tuple] = None, max_rows: int = 1000 ) -> Dict[str, Any]: """ Execute a read query with timeout and row limits. Returns: {"success": bool, "data": list, "row_count": int, "execution_time_ms": float} """ import time start_time = time.perf_counter() try: async with self.acquire() as conn: # Validate query is SELECT only normalized_query = query.strip().upper() if not normalized_query.startswith('SELECT'): return { "success": False, "error": "Only SELECT queries are allowed for security", "data": [], "row_count": 0, "execution_time_ms": 0 } # Execute with row limit rows = await conn.fetch( f"{query} LIMIT {max_rows}", *params if params else [] ) execution_time = (time.perf_counter() - start_time) * 1000 return { "success": True, "data": [dict(row) for row in rows], "row_count": len(rows), "execution_time_ms": round(execution_time, 2), "columns": [desc[0] for desc in rows[0].get_columns()] if rows else [] } except asyncpg.exceptions.ConnectionDoesNotExistError as e: # Handle connection timeout - attempt reconnection print(f"⚠️ Connection error, reconnecting: {e}") await self.connect() return { "success": False, "error": "Connection timeout. Please retry.", "data": [], "row_count": 0, "execution_time_ms": (time.perf_counter() - start_time) * 1000 } except Exception as e: execution_time = (time.perf_counter() - start_time) * 1000 return { "success": False, "error": str(e), "data": [], "row_count": 0, "execution_time_ms": round(execution_time, 2) }

Global database manager instance

db_manager = DatabaseConnectionManager(CONFIG) class SQLGenerator: """Uses HolySheep AI to convert natural language to SQL.""" def __init__(self, config: dict): self.config = config['holysheep'] self.api_key = self.config['api_key'] self.base_url = self.config['base_url'] async def generate_sql( self, natural_language: str, schema_context: str ) -> str: """ Convert natural language query to SQL using HolySheep AI. """ system_prompt = f"""You are a PostgreSQL expert. Convert the user's natural language query to a valid SQL SELECT statement. Database Schema: {schema_context} Rules: 1. Only generate SELECT statements (no INSERT, UPDATE, DELETE, DROP) 2. Use proper JOINs when needed 3. Include appropriate WHERE clauses 4. Add LIMIT clause to prevent large result sets (max 100 rows) 5. Use table aliases if needed 6. Return ONLY the SQL query, no explanations Example: User: "Show me top 5 customers by revenue" SQL: SELECT customer_name, SUM(amount) as total_revenue FROM customers c JOIN orders o ON c.id = o.customer_id GROUP BY customer_name ORDER BY total_revenue DESC LIMIT 5; """ async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( f"{self.base_url}/chat/completions", headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" }, json={ "model": self.config['model'], "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": natural_language} ], "temperature": self.config['temperature'], "max_tokens": self.config['max_tokens'] } ) if response.status_code != 200: raise Exception(f"HolySheep API error: {response.status_code} - {response.text}") data = response.json() return data['choices'][0]['message']['content'].strip() sql_generator = SQLGenerator(CONFIG)

MCP Tool Definitions

@server.list_tools() async def list_tools() -> List[Tool]: """Define available MCP tools.""" return [ Tool( name="get_database_schema", description="Get the schema of the PostgreSQL database including tables, columns, and relationships", inputSchema={ "type": "object", "properties": { "schema_filter": { "type": "string", "description": "Filter by schema name (default: 'public')" } } } ), Tool( name="execute_sql_query", description="Execute a SELECT query on the PostgreSQL database", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "The SQL SELECT query to execute" } }, "required": ["query"] } ), Tool( name="natural_language_to_sql", description="Convert a natural language question to a SQL query, then execute it", inputSchema={ "type": "object", "properties": { "question": { "type": "string", "description": "Natural language question about the database" }, "auto_execute": { "type": "boolean", "description": "Whether to automatically execute the generated SQL (default: true)" } }, "required": ["question"] } ) ] @server.call_tool() async def call_tool(name: str, arguments: Any) -> List[TextContent]: """Handle tool calls from MCP clients.""" if name == "get_database_schema": return await handle_get_schema(arguments) elif name == "execute_sql_query": return await handle_execute_query(arguments) elif name == "natural_language_to_sql": return await handle_nl_to_sql(arguments) else: return [TextContent(type="text", text=f"Unknown tool: {name}")] async def handle_get_schema(arguments: dict) -> List[TextContent]: """Retrieve database schema information.""" schema_filter = arguments.get('schema_filter', 'public') max_rows = CONFIG['security']['max_rows_returned'] query = """ SELECT t.table_schema, t.table_name, c.column_name, c.data_type, c.is_nullable, c.column_default, CASE WHEN pk.column_name IS NOT NULL THEN true ELSE false END as is_primary_key FROM information_schema.tables t JOIN information_schema.columns c ON t.table_name = c.table_name AND t.table_schema = c.table_schema LEFT JOIN ( SELECT ku.table_schema, ku.table_name, ku.column_name FROM information_schema.table_constraints tc JOIN information_schema.key_column_usage ku ON tc.constraint_name = ku.constraint_name WHERE tc.constraint_type = 'PRIMARY KEY' ) pk ON c.table_schema = pk.table_schema AND c.table_name = pk.table_name AND c.column_name = pk.column_name WHERE t.table_schema = $1 ORDER BY t.table_name, c.ordinal_position LIMIT $2 """ result = await db_manager.execute_query(query, (schema_filter, max_rows)) if result['success']: schema_text = json.dumps(result['data'], indent=2) return [TextContent(type="text", text=f"📊 Database Schema ({result['row_count']} columns):\n``\n{schema_text}\n``")] else: return [TextContent(type="text", text=f"❌ Error: {result['error']}")] async def handle_execute_query(arguments: dict) -> List[TextContent]: """Execute a raw SQL query.""" query = arguments['query'] max_rows = CONFIG['security']['max_rows_returned'] result = await db_manager.execute_query(query, max_rows=max_rows) if result['success']: data_preview = json.dumps(result['data'][:10], indent=2) # Preview first 10 rows return [TextContent( type="text", text=f"✅ Query executed successfully\n" f"📈 Rows returned: {result['row_count']}\n" f"⏱️ Execution time: {result['execution_time_ms']}ms\n\n" f"Results (showing first 10 rows):\n``json\n{data_preview}\n``" )] else: return [TextContent(type="text", text=f"❌ Query failed: {result['error']}")] async def handle_nl_to_sql(arguments: dict) -> List[TextContent]: """Convert natural language to SQL and execute.""" question = arguments['question'] auto_execute = arguments.get('auto_execute', True) # First, get schema context schema_result = await db_manager.execute_query( """ SELECT t.table_schema, t.table_name, c.column_name, c.data_type FROM information_schema.tables t JOIN information_schema.columns c ON t.table_name = c.table_name AND t.table_schema = c.table_schema WHERE t.table_schema IN ('public', 'analytics') ORDER BY t.table_name, c.ordinal_position """, max_rows=500 ) if not schema_result['success']: return [TextContent(type="text", text=f"❌ Failed to retrieve schema: {schema_result['error']}")] # Build schema context schema_context = json.dumps(schema_result['data'], indent=2) try: # Generate SQL using HolySheep AI generated_sql = await sql_generator.generate_sql(question, schema_context) if not auto_execute: return [TextContent( type="text", text=f"🤖 Generated SQL:\n``sql\n{generated_sql}\n`\n\nSet auto_execute: true` to run this query." )] # Execute the generated query result = await db_manager.execute_query(generated_sql) if result['success']: data_preview = json.dumps(result['data'][:10], indent=2) return [TextContent( type="text", text=f"🤖 Generated SQL:\n``sql\n{generated_sql}\n``\n\n" f"✅ Query executed successfully\n" f"📈 Rows returned: {result['row_count']}\n" f"⏱️ Execution time: {result['execution_time_ms']}ms\n\n" f"Results:\n``json\n{data_preview}\n``" )] else: return [TextContent( type="text", text=f"🤖 Generated SQL:\n``sql\n{generated_sql}\n``\n\n❌ Execution failed: {result['error']}" )] except Exception as e: return [TextContent(type="text", text=f"❌ Error: {str(e)}")] async def main(): """Main entry point for MCP server.""" print("🚀 Starting MCP PostgreSQL Server...") print(f"📡 Endpoint: http://{CONFIG['mcp']['host']}:{CONFIG['mcp']['port']}") print(f"🤖 AI Backend: {CONFIG['holysheep']['base_url']}") # Initialize database connection await db_manager.connect() # Run server async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main())

Step 4: Test Your MCP Server

Before deploying, create comprehensive tests to verify the integration:


"""
Integration tests for MCP PostgreSQL Server
Run with: pytest test_connection.py -v
"""

import os
import asyncio
import pytest
from unittest.mock import AsyncMock, patch

Set environment variables for testing

os.environ['DB_PASSWORD'] = 'test_password' os.environ['HOLYSHEEP_API_KEY'] = 'test_key_12345' from server import DatabaseConnectionManager, SQLGenerator, CONFIG, db_manager class TestDatabaseConnectionManager: """Test database connection and query execution.""" @pytest.fixture def db_config(self): return { 'database': { 'host': 'localhost', 'port': 5432, 'database': 'test_db', 'username': 'test_user', 'password': 'test_pass', 'pool_size': 5, 'max_overflow': 10, 'pool_timeout': 30, 'connection_timeout': 5000 } } @pytest.mark.asyncio async def test_connection_timeout_handling(self, db_config): """ Test that connection timeouts are handled gracefully. This was the error we started with: ConnectionError: timeout after 30000ms """ manager = DatabaseConnectionManager(db_config) # Simulate timeout error with patch.object(manager, 'pool', None): result = await manager.execute_query("SELECT 1") # Should return error dict, not raise exception assert isinstance(result, dict) assert 'error' in result or result['success'] in [True, False] @pytest.mark.asyncio async def test_sql_injection_prevention(self, db_config): """Verify that only SELECT queries are allowed.""" manager = DatabaseConnectionManager(db_config) # These should be blocked blocked_queries = [ "DROP TABLE users;", "DELETE FROM orders WHERE id = 1;", "UPDATE products SET price = 0;", "INSERT INTO logs (msg) VALUES ('hack');", "'; DROP TABLE users; --" ] for query in blocked_queries: result = await manager.execute_query(query) assert result['success'] == False assert 'Only SELECT queries are allowed' in result['error'] class TestSQLGenerator: """Test HolySheep AI integration for SQL generation.""" @pytest.fixture def sql_gen_config(self): return { 'holysheep': { 'base_url': 'https://api.holysheep.ai/v1', 'api_key': 'test_key', 'model': 'gpt-4.1', 'temperature': 0.1, 'max_tokens': 1000 } } @pytest.mark.asyncio async def test_happy_path_sql_generation(self, sql_gen_config): """Test successful SQL generation with mocked API.""" generator = SQLGenerator(sql_gen_config) mock_response = { 'choices': [{ 'message': { 'content': 'SELECT customer_name, SUM(amount) as revenue FROM customers GROUP BY customer_name ORDER BY revenue DESC LIMIT 10;' } }] } with patch('httpx.AsyncClient.post') as mock_post: mock_post.return_value.__aenter__.return_value.status_code = 200 mock_post.return_value.__aenter__.return_value.json.return_value = mock_response