ในโลกของการเทรดคริปโตอัตโนมัติ การพัฒนาระบบ Market Making บน OKX เป็นทักษะที่มีความต้องการสูง แต่นักพัฒนาหลายคนมักประสบปัญหาการเชื่อมต่อที่ล้มเหลว โดยเฉพาะ ConnectionError: timeout ที่เกิดขึ้นบ่อยครั้งเมื่อ API ของ OKX มี latency สูงในช่วงตลาด volatile บทความนี้จะพาคุณแก้ปัญหาจริงที่พบบ่อย พร้อมสร้างกรอบการพัฒนา Market Making Strategy ที่พร้อมใช้งานจริง

ข้อกำหนดเบื้องต้นและการตั้งค่าสภาพแวดล้อม

ก่อนเริ่มพัฒนา คุณต้องมีบัญชี OKX ที่ผ่านการยืนยัน KYC แล้ว และได้รับ API Key จาก OKX Developer Portal โดยต้องเลือกสิทธิ์ Trade และ Read Only สำหรับ Market Making

# ติดตั้ง dependencies ที่จำเป็น
pip install okx-sdk pandas numpy websockets

โครงสร้างโปรเจกต์

okx-market-maker/ ├── config/ │ └── settings.py ├── core/ │ ├── okx_client.py │ ├── order_manager.py │ └── market_maker.py ├── strategies/ │ └── base_strategy.py ├── utils/ │ └── helpers.py ├── main.py └── requirements.txt

การเชื่อมต่อ OKX API พื้นฐาน

การเชื่อมต่อ OKX REST API ต้องระวังเรื่อง Signature ที่ต้องสร้างจาก timestamp + method + request_path + body โดยใช้ HMAC SHA256 หาก signature ไม่ถูกต้องจะได้รับ 401 Unauthorized ทันที

import hmac
import base64
import time
import requests
from typing import Dict, Optional

class OKXClient:
    BASE_URL = "https://www.okx.com"
    
    def __init__(self, api_key: str, secret_key: str, passphrase: str, use_sandbox: bool = False):
        self.api_key = api_key
        self.secret_key = secret_key
        self.passphrase = passphrase
        self.base_url = "https://www.okx.com" if not use_sandbox else "https://www.okx.com"
        
    def _sign(self, timestamp: str, method: str, request_path: str, body: str = "") -> str:
        """สร้าง signature สำหรับ OKX API authentication"""
        message = timestamp + method + request_path + body
        mac = hmac.new(
            self.secret_key.encode('utf-8'),
            message.encode('utf-8'),
            digestmod='sha256'
        )
        return base64.b64encode(mac.digest()).decode('utf-8')
    
    def _get_headers(self, method: str, request_path: str, body: str = "") -> Dict[str, str]:
        """สร้าง headers พร้อม signature"""
        timestamp = str(time.time())
        signature = self._sign(timestamp, method, request_path, body)
        
        return {
            'OK-ACCESS-KEY': self.api_key,
            'OK-ACCESS-SIGN': signature,
            'OK-ACCESS-TIMESTAMP': timestamp,
            'OK-ACCESS-PASSPHRASE': self.passphrase,
            'Content-Type': 'application/json',
            'x-simulated-trading': '1' if self.base_url != "https://www.okx.com" else '0'
        }
    
    def get_account_balance(self) -> Dict:
        """ดึงข้อมูลยอดเงินในบัญชี"""
        request_path = "/api/v5/account/balance"
        headers = self._get_headers("GET", request_path)
        
        try:
            response = requests.get(
                self.base_url + request_path,
                headers=headers,
                timeout=10  # Timeout 10 วินาที
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.Timeout:
            raise ConnectionError("Connection timeout - API server may be overloaded")
        except requests.exceptions.RequestException as e:
            if response.status_code == 401:
                raise ConnectionError("401 Unauthorized - Invalid API credentials or signature")
            raise ConnectionError(f"Request failed: {str(e)}")
    
    def place_order(self, inst_id: str, td_mode: str, side: str, 
                   ord_type: str, sz: str, px: Optional[str] = None) -> Dict:
        """วางคำสั่งซื้อขาย"""
        request_path = "/api/v5/trade/order"
        body = {
            "instId": inst_id,
            "tdMode": td_mode,
            "clOrdId": f"mm_{int(time.time() * 1000)}",  # Unique order ID
            "side": side,
            "ordType": ord_type,
            "sz": sz,
        }
        if px:
            body["px"] = px
            
        import json
        body_str = json.dumps(body)
        headers = self._get_headers("POST", request_path, body_str)
        
        try:
            response = requests.post(
                self.base_url + request_path,
                headers=headers,
                data=body_str,
                timeout=10
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.Timeout:
            raise ConnectionError("Order placement timeout - retry with exponential backoff")
        except requests.exceptions.RequestException as e:
            if response.status_code == 401:
                raise ConnectionError("401 Unauthorized - Check API credentials")
            if response.status_code == 51415:
                raise ConnectionError("Order rate limit exceeded - reduce order frequency")
            raise ConnectionError(f"Order failed: {str(e)}")

กรอบการพัฒนา Market Making Strategy

ระบบ Market Making ที่ดีต้องจัดการหลายส่วนพร้อมกัน ได้แก่ การดึง Order Book สด การคำนวณ Spread อัตโนมัติ และการจัดการ Position รวมถึงการตั้ง Stop Loss อัตโนมัติ

import asyncio
import json
import time
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from collections import deque

@dataclass
class OrderBookLevel:
    price: float
    size: float
    
@dataclass
class MarketData:
    symbol: str
    best_bid: float
    best_ask: float
    mid_price: float
    spread_bps: float
    timestamp: float
    
@dataclass
class Position:
    symbol: str
    long_qty: float
    short_qty: float
    unrealized_pnl: float
    entry_price: float
    
@dataclass
class MMConfig:
    symbol: str
    base_spread_bps: float = 20.0        # Spread พื้นฐาน 20 bps
    order_size: float = 0.01             # ขนาดคำสั่งต่อครั้ง
    max_position: float = 1.0            # Position สูงสุดต่อด้าน
    inventory_skew: float = 0.0          # Inventory skew factor
    refresh_interval_ms: int = 500        # รีเฟรชทุก 500ms

class MarketMaker:
    def __init__(self, okx_client, config: MMConfig):
        self.client = okx_client
        self.config = config
        self.positions: Dict[str, Position] = {}
        self.active_orders: List[str] = []
        self.order_book_history: deque = deque(maxlen=100)
        
    def calculate_optimal_spread(self, market_data: MarketData) -> tuple:
        """คำนวณ Spread ที่เหมาะสมตามสภาพตลาด"""
        base_spread = self.config.base_spread_bps
        
        # Adjust spread ตาม volatility
        spread_adjustment = 1.0
        if len(self.order_book_history) > 5:
            prices = [ob.mid_price for ob in self.order_book_history]
            volatility = max(prices) / min(prices) - 1
            spread_adjustment = 1 + (volatility * 100)  # Scale with volatility
            
        # Inventory skew adjustment
        net_position = self.get_net_position()
        skew_factor = 1 + (net_position / self.config.max_position) * self.config.inventory_skew
        
        total_spread = base_spread * spread_adjustment * skew_factor
        
        half_spread = total_spread / 2
        bid_price = market_data.mid_price * (1 - half_spread / 10000)
        ask_price = market_data.mid_price * (1 + half_spread / 10000)
        
        return bid_price, ask_price
    
    def get_net_position(self) -> float:
        """คำนวณ Net Position"""
        if self.config.symbol not in self.positions:
            return 0.0
        pos = self.positions[self.config.symbol]
        return pos.long_qty - pos.short_qty
    
    async def place_bid_ask_orders(self, market_data: MarketData):
        """วางคำสั่ง Bid และ Ask พร้อมกัน"""
        # Cancel existing orders
        await self.cancel_all_orders()
        
        # Calculate optimal prices
        bid_price, ask_price = self.calculate_optimal_spread(market_data)
        
        # Check position limits
        net_pos = self.get_net_position()
        
        try:
            # Place Bid order (ซื้อ)
            if net_pos < self.config.max_position:
                bid_result = self.client.place_order(
                    inst_id=self.config.symbol,
                    td_mode="cross",
                    side="buy",
                    ord_type="limit",
                    sz=str(self.config.order_size),
                    px=str(bid_price)
                )
                if bid_result.get('code') == '0':
                    self.active_orders.append(bid_result['data'][0]['clOrdId'])
                    
            # Place Ask order (ขาย)
            if net_pos > -self.config.max_position:
                ask_result = self.client.place_order(
                    inst_id=self.config.symbol,
                    td_mode="cross",
                    side="sell",
                    ord_type="limit",
                    sz=str(self.config.order_size),
                    px=str(ask_price)
                )
                if ask_result.get('code') == '0':
                    self.active_orders.append(ask_result['data'][0]['clOrdId'])
                    
        except ConnectionError as e:
            print(f"Connection error, will retry: {e}")
            await asyncio.sleep(1)  # Retry หลัง 1 วินาที
            
    async def cancel_all_orders(self):
        """ยกเลิกคำสั่งที่รอดำเนินการทั้งหมด"""
        for order_id in self.active_orders:
            try:
                self.client.cancel_order(self.config.symbol, order_id)
            except Exception as e:
                print(f"Failed to cancel order {order_id}: {e}")
        self.active_orders = []
        
    async def run(self, websocket_client):
        """Main loop สำหรับ Market Making"""
        while True:
            try:
                # รับ market data จาก websocket
                market_data = await websocket_client.get_market_data(self.config.symbol)
                
                # Update order book history
                self.order_book_history.append(market_data)
                
                # Place new orders
                await self.place_bid_ask_orders(market_data)
                
                # Update positions
                self.update_positions()
                
                # Check risk limits
                self.check_risk_limits()
                
                await asyncio.sleep(self.config.refresh_interval_ms / 1000)
                
            except Exception as e:
                print(f"Error in market maker loop: {e}")
                await asyncio.sleep(5)

การใช้งาน WebSocket สำหรับ Real-time Data

สำหรับ Market Making ที่มีความเร็วสูง การใช้ WebSocket เป็นสิ่งจำเป็น เพราะ REST API มี latency สูงเกินไป ต่อไปนี้คือตัวอย่าง WebSocket client ที่ใช้งานได้จริง

import websockets
import asyncio
import json
import hmac
import base64
import time
from typing import Callable, Optional

class OKXWebSocketClient:
    def __init__(self, api_key: str, secret_key: str, passphrase: str):
        self.api_key = api_key
        self.secret_key = secret_key
        self.passphrase = passphrase
        self.ws = None
        self.subscriptions = []
        
    def _generate_signature(self) -> dict:
        """สร้าง WebSocket authentication signature"""
        timestamp = str(time.time())
        message = timestamp + 'GET' + '/users/self/verify'
        mac = hmac.new(
            self.secret_key.encode('utf-8'),
            message.encode('utf-8'),
            digestmod='sha256'
        )
        signature = base64.b64encode(mac.digest()).decode('utf-8')
        
        return {
            'op': 'login',
            'args': [{
                'apiKey': self.api_key,
                'passphrase': self.passphrase,
                'timestamp': timestamp,
                'sign': signature
            }]
        }
    
    async def connect(self, url: str = "wss://ws.okx.com:8443/ws/v5/business"):
        """เชื่อมต่อ WebSocket"""
        self.ws = await websockets.connect(url, ping_interval=None)
        
        # Login
        login_msg = self._generate_signature()
        await self.ws.send(json.dumps(login_msg))
        
        response = await self.ws.recv()
        result = json.loads(response)
        
        if result.get('event') == 'login' and result.get('code') == '0':
            print("WebSocket login successful")
            return True
        else:
            print(f"WebSocket login failed: {result}")
            return False
    
    async def subscribe(self, channel: str, inst_type: str = "FUTURES"):
        """Subscribe ไปยัง channel ที่ต้องการ"""
        subscribe_msg = {
            'op': 'subscribe',
            'args': [{
                'channel': channel,
                'instType': inst_type
            }]
        }
        
        await self.ws.send(json.dumps(subscribe_msg))
        response = await self.ws.recv()
        result = json.loads(response)
        
        if result.get('code') == '0':
            self.subscriptions.append(channel)
            print(f"Subscribed to {channel}")
            return True
        return False
    
    async def subscribe_orderbook(self, inst_id: str, depth: str = "L1"):
        """Subscribe orderbook data"""
        subscribe_msg = {
            'op': 'subscribe',
            'args': [{
                'channel': f'books{depth}',
                'instId': inst_id
            }]
        }
        
        await self.ws.send(json.dumps(subscribe_msg))
        
    async def listen(self, callback: Callable):
        """ฟัง messages และเรียก callback"""
        async for message in self.ws:
            data = json.loads(message)
            if 'data' in data:
                await callback(data['data'])
            elif 'event' in data and data['event'] == 'error':
                print(f"WebSocket error: {data}")
                
    async def get_market_data(self, inst_id: str) -> Optional[MarketData]:
        """Get latest market data"""
        if not self.ws:
            return None
            
        # Subscribe ถ้ายังไม่ได้ subscribe
        if f'booksL1-{inst_id}' not in self.subscriptions:
            await self.subscribe_orderbook(inst_id)
            
        # Wait for next message
        try:
            message = await asyncio.wait_for(
                self.ws.recv(),
                timeout=5.0
            )
            data = json.loads(message)
            
            if 'data' in data:
                orderbook = data['data'][0]
                best_bid = float(orderbook['bids'][0][0])
                best_ask = float(orderbook['asks'][0][0])
                mid_price = (best_bid + best_ask) / 2
                spread_bps = (best_ask - best_bid) / mid_price * 10000
                
                return MarketData(
                    symbol=inst_id,
                    best_bid=best_bid,
                    best_ask=best_ask,
                    mid_price=mid_price,
                    spread_bps=spread_bps,
                    timestamp=time.time()
                )
        except asyncio.TimeoutError:
            return None
            
        return None

การบูรณาการ AI สำหรับ Market Making ที่ชาญฉลาด

ในการพัฒนา Market Making Strategy ที่ซับซ้อน คุณสามารถใช้ AI เพื่อวิเคราะห์ Market Sentiment จากข่าวและ Social Media แล้วปรับ Spread อัตโนมัติ ตัวอย่างเช่น เมื่อ AI ตรวจพบ Fear & Greed Index สูง ระบบจะขยาย Spread เพื่อลดความเสี่ยง

import requests
from typing import Dict, Optional

class MarketSentimentAnalyzer:
    """
    ใช้ AI API เพื่อวิเคราะห์ Market Sentiment
    ผ่าน HolySheep AI - API ราคาประหยัด รองรับหลาย models
    """
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.holysheep.ai/v1"
        
    def analyze_sentiment(self, news_text: str, symbols: list) -> Dict:
        """
        วิเคราะห์ Sentiment จากข้อความข่าว
        ใช้ DeepSeek V3.2 ซึ่งราคาถูกมาก ($0.42/MTok)
        """
        prompt = f"""Analyze the market sentiment for these trading symbols: {', '.join(symbols)}
        
News content: {news_text}

Respond with JSON format:
{{
    "sentiment": "bullish|bearish|neutral",
    "confidence": 0.0-1.0,
    "risk_level": "low|medium|high",
    "recommended_spread_adjustment": -50 to +50 (percentage)
}}

Only respond with the JSON, no additional text."""

        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": "deepseek-v3.2",
            "messages": [
                {"role": "user", "content": prompt}
            ],
            "temperature": 0.3,  # Low temperature for consistent analysis
            "max_tokens": 200
        }
        
        try:
            response = requests.post(
                f"{self.base_url}/chat/completions",
                headers=headers,
                json=payload,
                timeout=10
            )
            response.raise_for_status()
            result = response.json()
            
            # Parse AI response
            content = result['choices'][0]['message']['content']
            # Extract JSON from response
            import json
            import re
            json_match = re.search(r'\{.*\}', content, re.DOTALL)
            if json_match:
                return json.loads(json_match.group())
            return {"sentiment": "neutral", "confidence": 0.5, "risk_level": "medium", "recommended_spread_adjustment": 0}
            
        except requests.exceptions.Timeout:
            return {"error": "AI API timeout - using default values"}
        except Exception as e:
            return {"error": str(e)}
    
    def generate_risk_report(self, positions: Dict, market_data: Dict) -> str:
        """
        สร้าง Risk Report อัตโนมัติ
        ใช้ GPT-4.1 สำหรับรายงานที่ละเอียด
        """
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        prompt = f"""Generate a concise market making risk report based on:

Current Positions:
{json.dumps(positions, indent=2)}

Market Data:
{json.dumps(market_data, indent=2)}

Provide a brief analysis with risk warnings if any."""
        
        payload = {
            "model": "gpt-4.1",
            "messages": [
                {"role": "user", "content": prompt}
            ],
            "temperature": 0.5,
            "max_tokens": 500
        }
        
        try:
            response = requests.post(
                f"{self.base_url}/chat/completions",
                headers=headers,
                json=payload,
                timeout=15
            )
            response.raise_for_status()
            result = response.json()
            return result['choices'][0]['message']['content']
        except Exception as e:
            return f"Failed to generate report: {e}"

ข้อผิดพลาดที่พบบ่อยและวิธีแก้ไข

1. Error 401 Unauthorized - Signature ไม่ถูกต้อง

สาเหตุ: Signature ที่สร้างไม่ตรงกับที่ OKX คำนวณ เกิดจาก timestamp ไม่ตรงกัน หรือ body string ไม่ตรงกับที่ส่งจริง

# วิธีแก้ไข: ตรวจสอบ timestamp format และใช้ exact body string

import time

def get_timestamp() -> str:
    """ใช้ timestamp ที่มี precision ถึง milliseconds"""
    return str(time.time())

ตรวจสอบว่า body string ตรงกันทุกตัวอักษร

import json def correct_signature_generation(): # ✅ ถูกต้อง: ใช้ exact JSON string body_dict = {"instId": "BTC-USDT-SWAP", "sz": "0.01"} body_string = json.dumps(body_dict, separators=(',', ':')) # body_string = '{"instId":"BTC-USDT-SWAP","sz":"0.01"}' # ❌ ผิด: ใช้ string ที่มี spaces # wrong_body = '{"instId": "BTC-USDT-SWAP", "sz": "0.01"}' timestamp = get_timestamp() message = f"{timestamp}GET/api/v5/trade/orders-pending{body_string}" # Sign message signature = hmac.new( SECRET_KEY.encode(), message.encode(), hashlib.sha256 ).digest() return base64.b64encode(signature).decode()

2. Connection Timeout ในช่วง High Volatility

สาเหตุ: OKX API มี latency สูงมากในช่วงตลาดเคลื่อนไหวรุนแรง ทำให้ timeout เกิดขึ้นบ่อยครั้ง โดยเฉพาะ WebSocket reconnect

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential

class ResilientOKXClient:
    """Client ที่จัดการ retry อัตโนมัติ"""
    
    def __init__(self, base_client):
        self.client = base_client
        
    async def resilient_request(self, func, max_retries=5, *args, **kwargs):
        """Retry request ด้วย exponential backoff"""
        for attempt in range(max_retries):
            try:
                return await func(*args, **kwargs)
            except ConnectionError as e:
                wait_time = min(2 ** attempt, 30)  # Max 30 seconds
                print(f"Attempt {attempt + 1} failed: {e}")
                print(f"Retrying in {wait_time} seconds...")
                await asyncio.sleep(wait_time)
                
                # หลัง retry ให้ตรวจสอบ connection
                if attempt == 0:
                    await self._verify_connection()
                    
        raise ConnectionError(f"All {max_retries} attempts failed")
    
    async def _verify_connection(self):
        """ตรวจสอบ connection ก่อน retry"""
        try:
            # Ping OKX API
            response = await self.client.health_check()
            if not response.get('success'):
                print("API server may be overloaded")
        except:
            print("Connection verification failed")

ใช้งาน

resilient_client = ResilientOKXClient(okx_client) async def safe_place_order(*args): return await resilient_client.resilient_request( okx_client.place_order, *args )

3. Order Rate Limit Exceeded (Error 51415)

สาเหตุ: OKX มี rate limit ต่อวินาที หากส่งคำสั่งเกินจะได้รับ error 51415 ทันที โดยเฉพาะเมื่อใช้ Market Making ที่วางคำสั่งบ่อยมาก

import asyncio
from collections import deque
import time

class RateLimiter:
    """Rate limiter สำหรับ OKX API"""
    
    def __init__(self, max_requests_per_second: int = 10):
        self.max_rps = max_requests_per_second
        self.requests = deque()
        self._lock = asyncio.Lock()
        
    async def acquire(self):
        """รอจนกว่าจะสามารถส่ง request ได้"""
        async with self._lock:
            now = time.time()
            
            # ลบ requests ที่เก่ากว่า 1 วินาที
            while self.requests and self.requests[0] < now - 1:
                self.requests.popleft()
                
            if len(self.requests) >= self.max_rps:
                # รอจนกว่าจะมี slot ว่าง
                sleep_time = 1 - (now - self.requests[0])
                if sleep_time > 0:
                    await asyncio.sleep(sleep_time)
                    # ลบ request เก่า
                    while self.requests and self.requests[0] < time.time() - 1:
                        self.requests.popleft()
            
            # เพิ่ม request ใหม่
            self.requests.append(time.time())
            
    async def request_with_limit(self, func, *args, **kwargs):
        """Execute request พร้อม rate limiting"""
        await self.acquire()
        return await func(*args, **kwargs)

ใช้งาน

rate_limiter = RateLimiter(max_requests_per_second=8) # เผื่อ buffer class ThrottledMarketMaker(MarketMaker): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.rate_limiter = RateLimiter(max_requests_per_second=8) async def place_bid_ask_orders(self, market_data): """วางคำสั่งพร้อม rate limiting""" try: # Place orders through rate limiter await self.rate_limiter.request_with_limit( self._execute_orders, market_data ) except Exception as e: if "51415" in str(e): print("Rate limit hit - reducing order frequency")