저는 최근 3개월간 암호화폐 계약 거래 플랫폼에서 做市(Market Making) 봇을 개발하며 수백만 달러 규모의 거래를 처리한 경험이 있습니다. 이 튜토리얼에서는 OKX 계약 거래 API의 기초부터 시작하여, HolySheep AI를 활용한 실시간 시장 분석과 做市 전략 구현까지 프로덕션 수준의 프레임워크를 구축하는 방법을 상세히 설명드리겠습니다.
1. OKX 계약 거래 API 기초
OKX는 암호화폐 계약 거래 분야에서 가장 활발한 거래량을 자랑하는 플랫폼 중 하나입니다. 做市 전략을 개발하기 위해서는 먼저 API 구조와 주요 엔드포인트를 이해해야 합니다.
1.1 API 인증 및 환경 설정
# requirements: pip install httpx websockets hmac hashlib asyncio
import os
import time
import hmac
import hashlib
import httpx
from typing import Optional, Dict, Any
class OKXAPIClient:
"""OKX 계약 거래 API 클라이언트"""
BASE_URL = "https://www.okx.com"
def __init__(self, api_key: str, secret_key: str, passphrase: str, use_sandbox: bool = False):
self.api_key = api_key
self.secret_key = secret_key
self.passphrase = passphrase
self.base_url = "https://www.okx.com" if not use_sandbox else "https://www.okx.com"
self.use_sandbox = use_sandbox
def _sign(self, timestamp: str, method: str, path: str, body: str = "") -> str:
"""HMAC SHA256 서명 생성"""
message = timestamp + method + path + body
signature = hmac.new(
self.secret_key.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha256
).digest()
return signature.hex()
def _get_headers(self, method: str, path: str, body: str = "") -> Dict[str, str]:
"""인증 헤더 생성"""
timestamp = str(time.time())
signature = self._sign(timestamp, method, path, body)
return {
"OK-ACCESS-KEY": self.api_key,
"OK-ACCESS-SIGN": signature,
"OK-ACCESS-TIMESTAMP": timestamp,
"OK-ACCESS-PASSPHRASE": self.passphrase,
"Content-Type": "application/json",
}
async def get_account_positions(self, inst_id: str = "BTC-USDT-SWAP") -> Dict[str, Any]:
"""포지션 조회"""
path = "/api/v5/account/positions"
params = {"instId": inst_id}
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}{path}",
headers=self._get_headers("GET", path),
params=params
)
return response.json()
async def place_order(self, inst_id: str, side: str, ord_type: str,
sz: str, px: Optional[str] = None) -> Dict[str, Any]:
"""주문下单"""
path = "/api/v5/trade/order"
body = {
"instId": inst_id,
"tdMode": "cross", # 크로스 마진
"side": side, # buy / sell
"ordType": ord_type, # market / limit
"sz": sz,
}
if px:
body["px"] = px
body_str = json.dumps(body)
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}{path}",
headers=self._get_headers("POST", path, body_str),
content=body_str
)
return response.json()
사용 예시
client = OKXAPIClient(
api_key=os.getenv("OKX_API_KEY"),
secret_key=os.getenv("OKX_SECRET_KEY"),
passphrase=os.getenv("OKX_PASSPHRASE")
)
1.2 WebSocket 실시간 데이터 스트리밍
做市 전략에서 가장 중요한 것은 실시간 시장 데이터입니다. OKX의 WebSocket API를 통해 틱 데이터, 주문서, 거래량 등을 실시간으로 수신합니다.
import asyncio
import json
import websockets
from dataclasses import dataclass
from typing import Callable, Optional
import threading
import queue
@dataclass
class TickerData:
"""티커 데이터 구조"""
inst_id: str
last_price: float
bid_price: float
ask_price: float
bid_size: float
ask_size: float
volume_24h: float
timestamp: int
class OKXWebSocketClient:
"""OKX WebSocket 클라이언트 - 실시간 시장 데이터 수신"""
def __init__(self):
self.connected = False
self.ticker_queue: queue.Queue = queue.Queue(maxsize=1000)
self.ticker_callback: Optional[Callable] = None
async def connect(self, symbols: list[str]):
"""WebSocket 연결 및 구독"""
url = "wss://ws.okx.com:8443/ws/v5/public"
async with websockets.connect(url) as ws:
self.connected = True
# 구독 메시지 전송
subscribe_msg = {
"op": "subscribe",
"args": [
{
"channel": "tickers",
"instId": symbol
} for symbol in symbols
]
}
await ws.send(json.dumps(subscribe_msg))
# 실시간 데이터 수신 루프
async for message in ws:
data = json.loads(message)
if data.get("arg", {}).get("channel") == "tickers":
ticker = self._parse_ticker_data(data["data"][0])
if self.ticker_callback:
self.ticker_callback(ticker)
else:
self.ticker_queue.put(ticker)
def _parse_ticker_data(self, data: dict) -> TickerData:
"""티커 데이터 파싱"""
return TickerData(
inst_id=data["instId"],
last_price=float(data["last"]),
bid_price=float(data["bidPx"]),
ask_price=float(data["askPx"]),
bid_size=float(data["bidSz"]),
ask_size=float(data["askSz"]),
volume_24h=float(data["vol24h"]),
timestamp=int(data["ts"])
)
def set_ticker_callback(self, callback: Callable[[TickerData], None]):
"""티커 콜백 함수 설정"""
self.ticker_callback = callback
사용 예시
ws_client = OKXWebSocketClient()
def on_new_ticker(ticker: TickerData):
"""티커 데이터 수신 시 처리"""
spread = ticker.ask_price - ticker.bid_price
spread_ratio = spread / ticker.last_price * 100
print(f"{ticker.inst_id}: ${ticker.last_price:.2f} | "
f"Spread: {spread_ratio:.4f}%")
ws_client.set_ticker_callback(on_new_ticker)
asyncio.run(ws_client.connect(["BTC-USDT-SWAP", "ETH-USDT-SWAP"]))
2. HolySheep AI 통합 - 실시간 시장 분석
做市 전략의 핵심은 시장 상황을 실시간으로 분석하여 최적의 호가 스프레드를 설정하는 것입니다. HolySheep AI를 활용하면 GPT-4.1, Claude Sonnet, Gemini 등 다양한 모델을 통해 시장 감성 분석, 변동성 예측, 이상 상황 감지를 구현할 수 있습니다.
2.1 HolySheep AI API 연동
import httpx
import json
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
class AIModel(Enum):
"""HolySheep AI 지원 모델"""
GPT_4_1 = "gpt-4.1"
CLAUDE_SONNET = "claude-sonnet-4-20250514"
GEMINI_FLASH = "gemini-2.5-flash"
DEEPSEEK = "deepseek-chat"
@dataclass
class MarketAnalysis:
"""시장 분석 결과"""
sentiment: str # bullish / bearish / neutral
volatility_score: float # 0.0 ~ 1.0
recommendation: str
confidence: float
reasoning: str
class HolySheepAIClient:
"""HolySheep AI 게이트웨이 클라이언트"""
BASE_URL = "https://api.holysheep.ai/v1"
def __init__(self, api_key: str):
self.api_key = api_key
self.client = httpx.AsyncClient(timeout=60.0)
async def analyze_market_sentiment(
self,
price_data: Dict[str, Any],
news_data: List[str],
model: AIModel = AIModel.GPT_4_1
) -> MarketAnalysis:
"""
시장 감성 분석 - AI 모델 활용
가격 데이터와 뉴스 데이터를 기반으로 시장 감성을 분석합니다.
HolySheep AI는 단일 API 키로 GPT-4.1, Claude Sonnet, Gemini 등
모든 주요 모델을 지원합니다.
"""
prompt = f"""당신은 전문 암호화폐 시장 분석가입니다.
현재 시장 데이터:
- 현재가: ${price_data['last_price']:.2f}
- 24시간 변동률: {price_data['change_24h']:.2f}%
- 거래량: {price_data['volume_24h']:,.0f}
-bid-ask 스프레드: {price_data['spread']:.4f}%
- 변동성 지수: {price_data['volatility']:.4f}
최근 뉴스/트렌드:
{chr(10).join(['- ' + news for news in news_data[:5]])}
분석 요청:
1. 시장 감성 (bullish/bearish/neutral) 판정
2. 변동성 점수 (0.0~1.0)
3. 做市 전략 권장사항
4. 신뢰도 (0.0~1.0)
JSON 형식으로 답변:
{{"sentiment": "string", "volatility_score": float,
"recommendation": "string", "confidence": float, "reasoning": "string"}}
"""
response = await self.client.post(
f"{self.BASE_URL}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": model.value,
"messages": [
{"role": "system", "content": "당신은 전문 암호화폐 시장 분석가입니다."},
{"role": "user", "content": prompt}
],
"temperature": 0.3,
"response_format": {"type": "json_object"}
}
)
result = response.json()
analysis_data = json.loads(result["choices"][0]["message"]["content"])
return MarketAnalysis(
sentiment=analysis_data["sentiment"],
volatility_score=analysis_data["volatility_score"],
recommendation=analysis_data["recommendation"],
confidence=analysis_data["confidence"],
reasoning=analysis_data["reasoning"]
)
async def batch_analyze_multiple_symbols(
self,
symbols_data: Dict[str, Dict[str, Any]],
model: AIModel = AIModel.DEEPSEEK
) -> Dict[str, MarketAnalysis]:
"""
여러 심볼 동시 분석 - 비용 최적화
DeepSeek 모델을 활용하여 비용을 최적화합니다.
HolySheep AI 가격: DeepSeek V3.2 $0.42/MTok
"""
tasks = []
for symbol, data in symbols_data.items():
task = self.analyze_market_sentiment(
price_data=data,
news_data=data.get("news", []),
model=model
)
tasks.append((symbol, task))
# 동시 요청으로 지연 시간 최소화
results = await asyncio.gather(*[t[1] for t in tasks])
return {t[0]: result for t, result in zip(tasks, results)}
async def close(self):
await self.client.aclose()
HolySheep AI 클라이언트 초기화
holysheep_client = HolySheepAIClient(api_key="YOUR_HOLYSHEEP_API_KEY")
사용 예시 - 시장 분석
price_data = {
"last_price": 67432.50,
"change_24h": 2.34,
"volume_24h": 1_234_567_890,
"spread": 0.0012,
"volatility": 0.45
}
news_data = [
"BTCETF 일일流入量 달성",
"FED 금리 인상 일시 중단 시사",
" Майнер抛售 압박 완화"
]
analysis = await holysheep_client.analyze_market_sentiment(price_data, news_data)
print(f"감성: {analysis.sentiment}")
print(f"변동성 점수: {analysis.volatility_score:.2f}")
print(f"권장 전략: {analysis.recommendation}")
3. AI 기반 做市 봇 프레임워크
이제 실제 프로덕션 수준의 做市 봇을 구축하겠습니다. HolySheep AI를 활용한 시장 분석, OKX API를 통한 주문 실행, 동시성 제어를 모두 통합합니다.
import asyncio
import time
import json
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
from datetime import datetime, timedelta
from collections import deque
import statistics
기존 클라이언트 임포트
from okx_client import OKXAPIClient, OKXWebSocketClient, TickerData
from holysheep_client import HolySheepAIClient, AIModel, MarketAnalysis
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("MarketMaker")
@dataclass
class MarketMakerConfig:
"""做市 봇 설정"""
symbol: str = "BTC-USDT-SWAP"
base_spread_bps: float = 10.0 # 기본 스프레드 (basis points)
order_size: float = 0.01 # BTC
max_position: float = 1.0 # 최대 포지션
position_penalty: float = 2.0 # 포지션 초과 시 스프레드 페널티
analysis_interval: int = 60 # AI 분석 주기 (초)
max_spread_bps: float = 50.0 # 최대 스프레드
min_spread_bps: float = 3.0 # 최소 스프레드
@dataclass
class MarketMakerState:
"""做市 봇 상태"""
current_position: float = 0.0
bid_price: Optional[float] = None
ask_price: Optional[float] = None
last_analysis: Optional[MarketAnalysis] = None
current_spread_bps: float = 10.0
orders: Dict[str, dict] = field(default_factory=dict)
price_history: deque = field(default_factory=lambda: deque(maxlen=100))
volatility_history: deque = field(default_factory=lambda: deque(maxlen=50))
class AIMarketMakerBot:
"""
AI 기반 做市 봇
HolySheep AI를 활용한 실시간 시장 분석과 OKX API를 통한
자동화된 호가 스프레드 관리를 구현합니다.
"""
def __init__(
self,
okx_client: OKXAPIClient,
ws_client: OKXWebSocketClient,
holysheep_client: HolySheepAIClient,
config: MarketMakerConfig
):
self.okx = okx_client
self.ws = ws_client
self.holysheep = holysheep_client
self.config = config
self.state = MarketMakerState()
self.running = False
self.analysis_lock = asyncio.Lock()
async def start(self):
"""봇 시작"""
logger.info(f"AI 做市 봇 시작: {self.config.symbol}")
self.running = True
# WebSocket을 통한 실시간 데이터 수신 설정
self.ws.set_ticker_callback(self._on_ticker_update)
# 태스크 동시 실행
await asyncio.gather(
self._ws_connection_loop(),
self._order_management_loop(),
self._ai_analysis_loop()
)
async def stop(self):
"""봇 중지"""
logger.info("做市 봇 중지...")
self.running = False
# 미체결 주문 취소
for order_id in list(self.state.orders.keys()):
await self._cancel_order(order_id)
async def _ws_connection_loop(self):
"""WebSocket 연결 관리 루프"""
while self.running:
try:
await self.ws.connect([self.config.symbol])
except Exception as e:
logger.error(f"WebSocket 오류: {e}, 5초 후 재연결...")
await asyncio.sleep(5)
def _on_ticker_update(self, ticker: TickerData):
"""티커 데이터 업데이트 핸들러"""
self.state.price_history.append(ticker.last_price)
# 실시간 스프레드 계산
current_spread = (ticker.ask_price - ticker.bid_price) / ticker.last_price * 10000
# 변동성 히스토리 업데이트
if len(self.state.price_history) >= 2:
price_changes = [
(self.state.price_history[i] - self.state.price_history[i-1])
/ self.state.price_history[i-1] * 10000
for i in range(1, len(self.state.price_history))
]
volatility = statistics.stdev(price_changes) if len(price_changes) > 1 else 0
self.state.volatility_history.append(volatility)
logger.debug(f"티커 업데이트: ${ticker.last_price:.2f}, "
f"스프레드: {current_spread:.1f}bps")
async def _ai_analysis_loop(self):
"""AI 시장 분석 루프"""
while self.running:
try:
async with self.analysis_lock:
await self._run_market_analysis()
except Exception as e:
logger.error(f"AI 분석 오류: {e}")
await asyncio.sleep(self.config.analysis_interval)
async def _run_market_analysis(self):
"""시장 분석 실행 및 스프레드 업데이트"""
if len(self.state.price_history) < 10:
logger.warning("분석을 위한 데이터 부족, 대기...")
return
# 가격 데이터 구성
prices = list(self.state.price_history)
price_data = {
"last_price": prices[-1],
"change_24h": (prices[-1] - prices[0]) / prices[0] * 100,
"volume_24h": 0, # 실제 거래량 데이터 연동
"spread": self.state.current_spread_bps / 10000 * 100,
"volatility": statistics.mean(self.state.volatility_history) if self.state.volatility_history else 0
}
# HolySheep AI를 통한 시장 분석
analysis = await self.holysheep.analyze_market_sentiment(
price_data=price_data,
news_data=["시장 동향 분석 중..."], # 실제 뉴스 API 연동
model=AIModel.GPT_4_1
)
self.state.last_analysis = analysis
# AI 분석 결과를 기반으로 스프레드 조정
new_spread = self._calculate_dynamic_spread(analysis)
self.state.current_spread_bps = new_spread
logger.info(f"AI 분석 완료: 감성={analysis.sentiment}, "
f"변동성={analysis.volatility_score:.2f}, "
f"권장 스프레드={new_spread:.1f}bps")
def _calculate_dynamic_spread(self, analysis: MarketAnalysis) -> float:
"""
AI 분석 결과를 기반으로 동적 스프레드 계산
HolySheep AI의 분석 결과와 현재 포지션을 고려하여
최적의 스프레드를 산출합니다.
"""
# 기본 스프레드
base = self.config.base_spread_bps
# 변동성 조정
volatility_multiplier = 1.0 + analysis.volatility_score * 2.0
# 포지션 페널티
position_ratio = abs(self.state.current_position) / self.config.max_position
position_penalty = 1.0 + position_ratio * self.config.position_penalty
# 감성 조정
sentiment_adjustment = {
"bullish": 0.9, # 약간 축소
"bearish": 1.1, # 약간 확대
"neutral": 1.0
}.get(analysis.sentiment, 1.0)
# 최종 스프레드 계산
calculated_spread = (
base
* volatility_multiplier
* position_penalty
* sentiment_adjustment
* (1.0 / analysis.confidence) # 신뢰도가 낮을수록 스프레드 확대
)
# 범위 제한
return max(
self.config.min_spread_bps,
min(self.config.max_spread_bps, calculated_spread)
)
async def _order_management_loop(self):
"""주문 관리 루프 - 호가下单 및 포지션 관리"""
while self.running:
try:
await self._update_quotes()
await self._sync_positions()
except Exception as e:
logger.error(f"주문 관리 오류: {e}")
await asyncio.sleep(1) # 1초 간격
async def _update_quotes(self):
"""호가更新 -bid/ask 주문下单"""
if len(self.state.price_history) < 1:
return
last_price = self.state.price_history[-1]
half_spread = (self.state.current_spread_bps / 10000) * last_price / 2
# 호가 가격 계산
bid_price = round(last_price - half_spread, 1)
ask_price = round(last_price + half_spread, 1)
# 포지션에 따른 수량 조정
buy_size = self._calculate_buy_size()
sell_size = self._calculate_sell_size()
# 기존 주문 취소 후 새 주문下单
for order_id in list(self.state.orders.keys()):
await self._cancel_order(order_id)
# BUY 주문下单
if buy_size > 0:
buy_result = await self.okx.place_order(
inst_id=self.config.symbol,
side="buy",
ord_type="limit",
sz=str(buy_size),
px=str(bid_price)
)
if buy_result.get("code") == "0":
self.state.orders[buy_result["data"][0]["ordId"]] = {
"side": "buy", "price": bid_price, "size": buy_size
}
# SELL 주문下单
if sell_size > 0:
sell_result = await self.okx.place_order(
inst_id=self.config.symbol,
side="sell",
ord_type="limit",
sz=str(sell_size),
px=str(ask_price)
)
if sell_result.get("code") == "0":
self.state.orders[sell_result["data"][0]["ordId"]] = {
"side": "sell", "price": ask_price, "size": sell_size
}
self.state.bid_price = bid_price
self.state.ask_price = ask_price
def _calculate_buy_size(self) -> float:
"""매수 수량 계산"""
available = self.config.max_position - self.state.current_position
return min(self.config.order_size, max(0, available))
def _calculate_sell_size(self) -> float:
"""매도 수량 계산"""
available = self.config.max_position + self.state.current_position
return min(self.config.order_size, max(0, available))
async def _sync_positions(self):
"""포지션 동기화 - 실제 포지션과 상태 동기화"""
try:
positions = await self.okx.get_account_positions(self.config.symbol)
total_position = 0.0
for pos in positions.get("data", []):
if pos.get("instId") == self.config.symbol:
total_position += float(pos.get("pos", "0"))
self.state.current_position = total_position
logger.debug(f"포지션 동기화: {total_position:.4f} BTC")
except Exception as e:
logger.error(f"포지션 동기화 오류: {e}")
async def _cancel_order(self, order_id: str):
"""주문 취소"""
try:
path = "/api/v5/trade/cancel-order"
body = json.dumps({
"instId": self.config.symbol,
"ordId": order_id
})
response = await self.okx.client.post(
f"{self.okx.base_url}{path}",
headers=self.okx._get_headers("POST", path, body),
content=body
)
if response.json().get("code") == "0":
self.state.orders.pop(order_id, None)
except Exception as e:
logger.error(f"주문 취소 오류: {e}")
메인 실행
async def main():
"""메인 실행 함수"""
# 클라이언트 초기화
okx_client = OKXAPIClient(
api_key=os.getenv("OKX_API_KEY"),
secret_key=os.getenv("OKX_SECRET_KEY"),
passphrase=os.getenv("OKX_PASSPHRASE")
)
ws_client = OKXWebSocketClient()
# HolySheep AI 클라이언트 초기화
holysheep_client = HolySheepAIClient(
api_key="YOUR_HOLYSHEEP_API_KEY" # HolySheep API 키
)
# 봇 설정
config = MarketMakerConfig(
symbol="BTC-USDT-SWAP",
base_spread_bps=15.0,
order_size=0.01,
max_position=0.5,
analysis_interval=60
)
# 봇 생성 및 실행
bot = AIMarketMakerBot(
okx_client=okx_client,
ws_client=ws_client,
holysheep_client=holysheep_client,
config=config
)
try:
await bot.start()
except KeyboardInterrupt:
await bot.stop()
if __name__ == "__main__":
asyncio.run(main())
4. HolySheep AI 모델 비교 및 비용 최적화
做市 봇에서 AI 분석을 활용할 때, 모델 선택에 따라 비용과 성능이 크게 달라집니다. HolySheep AI는 단일 API 키로 여러 모델을 지원하므로 워크로드에 따라 최적의 모델을 선택할 수 있습니다.
| 모델 | 가격 ($/MTok) | 적합한 용도 | 장점 | 做市 추천도 |
|---|---|---|---|---|
| DeepSeek V3.2 | $0.42 | 대량 실시간 분석 | 최저 비용, 빠른 응답 | ⭐⭐⭐⭐⭐ |
| Gemini 2.5 Flash | $2.50 | 초고속 시장 판단 | 높은 처리량, 합리적 가격 | ⭐⭐⭐⭐⭐ |
| Claude Sonnet 4 | $15.00 | 복잡한 시장 분석 | 높은 추론 능력 | ⭐⭐⭐ |
| GPT-4.1 | $8.00 | 정밀 시장 예측 | 다양한 시장 데이터 처리 | ⭐⭐⭐⭐ |
4.1 비용 최적화 전략
做市 봇에서 AI 분석 비용을 최적화하려면 다음 전략을 고려해야 합니다:
- 계층별 분석: 실시간 틱 분석에는 Gemini Flash, 심층 분석에는 GPT-4.1
- 배치 처리: 여러 심볼을 동시에 분석하여 API 호출 비용 절감
- 캐싱: 동일한 시장 조건에 대한 반복 분석 방지
- 분석 빈도 조절: 변동성 낮을 때 분석 주기 늘리기
5. 자주 발생하는 오류와 해결책
5.1 API 인증 및 연결 오류
# ❌ 오류: 401 Unauthorized - API 키 인증 실패
원인: API 키 또는 서명 생성 오류
✅ 해결: HMAC 서명 검증 로직 수정
import hmac
import hashlib
import base64
def generate_correct_signature(timestamp: str, method: str, path: str,
secret_key: str, body: str = "") -> str:
"""
OKX API 올바른 서명 생성
⚠️ 중요: message = timestamp + method + path + body 순서 필수
⚠️ secret_key는 base64 디코딩 후 사용
"""
message = timestamp + method + path + body
# secret_key를 base64로 디코딩
secret_key_bytes = base64.b64decode(secret_key)
signature = hmac.new(
secret_key_bytes,
message.encode('utf-8'),
hashlib.sha256
).digest()
return base64.b64encode(signature).decode('utf-8')
사용
timestamp = str(time.time())
signature = generate_correct_signature(
timestamp=timestamp,
method="GET",
path="/api/v5/account/positions",
secret_key=os.getenv("OKX_SECRET_KEY") # base64 인코딩된 시크릿
)
5.2 WebSocket 연결 끊김 및 재연결
# ❌ 오류: WebSocket 연결 끊김 - 자동 재연결 실패
원인: Pong 응답 없음, 네트워크 불안정
✅ 해결: Ping-Pong 핸들링 및 지수 백오프 재연결
import asyncio
from websockets.client import WebSocketClientProtocol
class RobustWebSocketClient:
"""안정적인 WebSocket 클라이언트 - 자동 재연결 지원"""
def __init__(self, url: str, max_retries: int = 10):
self.url = url
self.max_retries = max_retries
self.ws: Optional[WebSocketClientProtocol] = None
self.reconnect_delay = 1
self.max_delay = 60
async def connect_with_retry(self):
"""재시도 로직이 포함된 WebSocket 연결"""
for attempt in range(self.max_retries):
try:
self.ws = await websockets.connect(
self.url,
ping_interval=20, # 20초마다 Ping 전송
ping_timeout=10, # 10초 내 Pong 응답 필수
close_timeout=10 # 종료 시 10초 대기
)
self.reconnect_delay = 1 # 연결 성공 시 지연 초기화
logger.info(f"WebSocket 연결 성공")
return True
except Exception as e:
logger.warning(f"연결 실패 (시도 {attempt + 1}/{self.max_retries}): {e}")
# 지수 백오프
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(
self.reconnect_delay * 2,
self.max_delay
)
logger.error("WebSocket 연결 최대 재시도 횟수 초과")
return False
async def send_with_ping_check(self, message: dict):
"""Ping 확인 후 메시지 전송"""
if self.ws and self.ws.open:
try:
await self.ws.send(json.dumps(message))
return True
except websockets.exceptions.ConnectionClosed:
logger.warning("연결 끊김 감지, 재연결 시도...")
await self.connect_with_retry()
return False
return False
5.3 주문 실행 실패 및 중복 주문 방지
# ❌ 오류: 주문 중복下单 -race condition
원인: 비동기 태스크 간 동기화 부족
✅ 해결: asyncio.Lock을 사용한 주문 동기화
import asyncio
from typing import Dict, Optional
class OrderManager:
"""주문 관리자 - 중복 주문 방지"""
def __init__(self):
self.order_lock = asyncio.Lock()