거래소 API를 처음 연동했을 때, 저는 ConnectionError: timeout after 30000ms 오류와 씨름했습니다. 1초에 100건의 주문서를 전송해야 하는 고주파 트레이딩 시스템이었는데, 동시 연결 수가 늘어나자마자 429 Too Many Requests와 503 Service Unavailable이 폭발적으로 증가했죠.
이 튜토리얼에서는 HolySheep AI의 인프라를 활용하여 거래소 API의 동시 연결 수 테스트를 체계적으로 수행하는 방법을 다룹니다. Python, Node.js, 그리고 Go 언어로 실제 검증된 스크립트를 제공하며, 흔히 발생하는 6가지 오류의 해결책까지 정리했습니다.
동시 연결 테스트의 중요성
암호화폐 거래소 API는 레이트 리밋(Rate Limit)으로 보호됩니다. Binance는 1200リクエスト/분, Coinbase Pro는 10リクエスト/초 제한을 두고 있죠. 이 한계를 정확히 파악하지 못하면:
- 정상 거래 중 갑자기 주문이 누락되는 현상
- API 키 밴 (영구 정지)
- 시장 변동성 증가 시 호가 스캔 실패
- 차트 데이터 갭으로 인한 자동매매 시스템 오작동
테스트 환경 구성
# Python 테스트 환경 설정
pip install asyncio aiohttp websockets httpx
프로젝트 구조
exchange-stress-test/
├── config.py # API 설정
├── test_sequential.py # 순차 테스트
├── test_concurrent.py # 동시 테스트
├── test_burst.py # 버스트 트래픽 테스트
├── monitor.py # 실시간 모니터링
└── report.py # 결과 분석
# config.py - 거래소 API 설정
import os
HolySheep AI 게이트웨이 사용 (모든 모델 통합 + 로컬 결제)
HOLYSHEEP_API_KEY = os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY")
HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1"
거래소별 API 설정
EXCHANGE_CONFIG = {
"binance": {
"api_key": os.getenv("BINANCE_API_KEY"),
"api_secret": os.getenv("BINANCE_API_SECRET"),
"base_url": "https://api.binance.com",
"rate_limit": 1200, # 요청/분
"weight_limit": 6100, # Binance 가중치 제한
},
"coinbase": {
"api_key": os.getenv("COINBASE_API_KEY"),
"api_secret": os.getenv("COINBASE_API_SECRET"),
"base_url": "https://api.exchange.coinbase.com",
"rate_limit": 10, # 요청/초
},
}
테스트 시나리오 설정
TEST_SCENARIOS = {
"light": {"concurrent": 10, "duration": 60, "target_rps": 5},
"medium": {"concurrent": 50, "duration": 120, "target_rps": 50},
"heavy": {"concurrent": 100, "duration": 300, "target_rps": 200},
"stress": {"concurrent": 500, "duration": 60, "target_rps": 500},
}
Python 비동기 동시 연결 테스트
# test_concurrent.py - 동시 연결 스트레스 테스트
import asyncio
import aiohttp
import time
import json
from datetime import datetime
from collections import defaultdict
from config import EXCHANGE_CONFIG, TEST_SCENARIOS
class ExchangeStressTester:
def __init__(self, exchange_name="binance"):
self.exchange = EXCHANGE_CONFIG[exchange_name]
self.results = {
"success": 0,
"failed": 0,
"errors": defaultdict(int),
"latencies": [],
"rate_limit_hits": 0,
}
self.start_time = None
self.end_time = None
async def make_request(self, session, endpoint, sem):
"""단일 API 요청 실행"""
async with sem: # 세마포어로 동시성 제어
url = f"{self.exchange['base_url']}{endpoint}"
headers = {"X-MBX-APIKEY": self.exchange["api_key"]}
start = time.time()
try:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as response:
latency = (time.time() - start) * 1000 # ms 단위
self.results["latencies"].append(latency)
if response.status == 200:
self.results["success"] += 1
return await response.json()
elif response.status == 429:
self.results["rate_limit_hits"] += 1
self.results["errors"]["429_RateLimit"] += 1
# 레이트 리밋 도달 시 대기
retry_after = response.headers.get("Retry-After", 1)
await asyncio.sleep(float(retry_after))
elif response.status == 401:
self.results["errors"]["401_Unauthorized"] += 1
elif response.status == 502:
self.results["errors"]["502_BadGateway"] += 1
else:
self.results["errors"][f"HTTP_{response.status}"] += 1
self.results["failed"] += 1
return None
except asyncio.TimeoutError:
self.results["errors"]["TimeoutError"] += 1
self.results["failed"] += 1
except aiohttp.ClientConnectorError as e:
self.results["errors"]["ConnectionError"] += 1
self.results["failed"] += 1
except Exception as e:
self.results["errors"][type(e).__name__] += 1
self.results["failed"] += 1
async def run_concurrent_test(self, scenario="medium"):
"""동시 연결 테스트 실행"""
config = TEST_SCENARIOS[scenario]
concurrent = config["concurrent"]
duration = config["duration"]
self.start_time = datetime.now()
print(f"🚀 동시 연결 테스트 시작: {concurrent} 동시 연결, {duration}초")
sem = asyncio.Semaphore(concurrent) # 최대 동시 연결 수
async with aiohttp.ClientSession() as session:
tasks = []
end_time = time.time() + duration
request_count = 0
# 지속 시간 동안 요청 계속 생성
while time.time() < end_time:
# Binance 테스트용 엔드포인트
endpoints = [
"/api/v3/ticker/price",
"/api/v3/depth",
"/api/v3/trades",
]
for endpoint in endpoints:
task = asyncio.create_task(
self.make_request(session, endpoint, sem)
)
tasks.append(task)
request_count += 1
# 너무 많은 태스크 방지
if len(tasks) >= concurrent * 10:
await asyncio.gather(*tasks, return_exceptions=True)
tasks = []
await asyncio.sleep(1 / config["target_rps"]) # 목표 RPS 달성
# 남은 태스크 대기
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
self.end_time = datetime.now()
return self.generate_report()
def generate_report(self):
"""결과 리포트 생성"""
duration = (self.end_time - self.start_time).total_seconds()
total = self.results["success"] + self.results["failed"]
latencies = sorted(self.results["latencies"])
p50 = latencies[len(latencies)//2] if latencies else 0
p95 = latencies[int(len(latencies)*0.95)] if latencies else 0
p99 = latencies[int(len(latencies)*0.99)] if latencies else 0
report = f"""
╔══════════════════════════════════════════════════════╗
║ 동시 연결 테스트 결과 리포트 ║
╠══════════════════════════════════════════════════════╣
║ 테스트 시간: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')} ║
║ 소요 시간: {duration:.1f}초 ║
╠══════════════════════════════════════════════════════╣
║ 총 요청 수: {total:,} ║
║ 성공: {self.results["success"]:,} ({self.results["success"]/total*100:.1f}%) ║
║ 실패: {self.results["failed"]:,} ({self.results["failed"]/total*100:.1f}%) ║
║ 레이트 리밋 도달: {self.results["rate_limit_hits"]:,} ║
╠══════════════════════════════════════════════════════╣
║ 평균 RPS: {total/duration:.1f} ║
║ 지연 시간 P50: {p50:.1f}ms ║
║ 지연 시간 P95: {p95:.1f}ms ║
║ 지연 시간 P99: {p99:.1f}ms ║
╠══════════════════════════════════════════════════════╣
║ 오류 분포: ║
"""
for error, count in self.results["errors"].items():
report += f"║ {error}: {count:,} ║\n"
report += "╚══════════════════════════════════════════════════════╝"
return report
async def main():
tester = ExchangeStressTester("binance")
report = await tester.run_concurrent_test("medium")
print(report)
if __name__ == "__main__":
asyncio.run(main())
Node.js를 통한 실시간 WebSocket 스트레스 테스트
# test_websocket_stress.js - WebSocket 동시 연결 테스트
const WebSocket = require('ws');
class ExchangeWebSocketStressTest {
constructor(exchange = 'binance') {
this.exchange = exchange;
this.connections = new Map();
this.metrics = {
messages: 0,
errors: 0,
reconnects: 0,
latencies: [],
connectTimes: [],
};
}
createConnection(id, symbol = 'btcusdt') {
const streams = ${symbol}@ticker/${symbol}@depth/${symbol}@trade;
const url = this.exchange === 'binance'
? wss://stream.binance.com:9443/stream?streams=${streams}
: wss://ws.exchange.coinbase.com;
const startTime = Date.now();
const ws = new WebSocket(url);
ws.on('open', () => {
const connectTime = Date.now() - startTime;
this.metrics.connectTimes.push(connectTime);
console.log([${id}] WebSocket 연결됨 (${connectTime}ms));
});
ws.on('message', (data) => {
this.metrics.messages++;
try {
const msg = JSON.parse(data);
// 메시지 처리 로직
if (msg.stream && msg.stream.includes('@ticker')) {
this.processTicker(msg.data);
}
} catch (e) {
console.error([${id}] 메시지 파싱 오류:, e.message);
}
});
ws.on('error', (error) => {
this.metrics.errors++;
console.error([${id}] WebSocket 오류:, error.message);
});
ws.on('close', (code, reason) => {
console.log([${id}] 연결 종료: ${code} - ${reason});
this.connections.delete(id);
});
this.connections.set(id, ws);
return ws;
}
processTicker(data) {
// 티커 데이터 처리 (지연 시간 측정)
const latency = Date.now() - (data.E || Date.now());
this.metrics.latencies.push(latency);
}
async runStressTest(numConnections = 100, durationSeconds = 60) {
console.log(🔥 WebSocket 스트레스 테스트 시작: ${numConnections} 동시 연결);
const symbols = ['btcusdt', 'ethusdt', 'bnbusdt', 'adausdt', 'dogeusdt'];
// 동시 연결 생성
for (let i = 0; i < numConnections; i++) {
const symbol = symbols[i % symbols.length];
this.createConnection(conn-${i}, symbol);
// 동시 연결 폭증 방지
if (i % 10 === 0) {
await this.sleep(50);
}
}
// 모니터링 시작
const monitorInterval = setInterval(() => {
this.printStats();
}, 5000);
// 테스트 지속 시간 후 종료
await this.sleep(durationSeconds * 1000);
clearInterval(monitorInterval);
this.closeAll();
this.printFinalReport();
}
printStats() {
const active = this.connections.size;
const avgLatency = this.metrics.latencies.length > 0
? this.metrics.latencies.reduce((a, b) => a + b, 0) / this.metrics.latencies.length
: 0;
console.log(📊 [${new Date().toISOString()}]
+ 활성 연결: ${active} |
+ 수신 메시지: ${this.metrics.messages} |
+ 평균 지연: ${avgLatency.toFixed(2)}ms |
+ 오류: ${this.metrics.errors}
);
}
printFinalReport() {
const connectTimes = this.metrics.connectTimes.sort((a, b) => a - b);
const p50 = connectTimes[Math.floor(connectTimes.length * 0.5)];
const p95 = connectTimes[Math.floor(connectTimes.length * 0.95)];
const p99 = connectTimes[Math.floor(connectTimes.length * 0.99)];
console.log(`
╔══════════════════════════════════════════════════════╗
║ WebSocket 스트레스 테스트 최종 결과 ║
╠══════════════════════════════════════════════════════╣
║ 총 생성된 연결: ${this.connections.size + Object.keys(this.metrics).length} ║
║ 총 수신 메시지: ${this.metrics.messages.toLocaleString()} ║
║ 총 오류 발생: ${this.metrics.errors} ║
║ 재연결 시도: ${this.metrics.reconnects} ║
╠══════════════════════════════════════════════════════╣
║ 연결 시간 P50: ${p50}ms ║
║ 연결 시간 P95: ${p95}ms ║
║ 연결 시간 P99: ${p99}ms ║
╚══════════════════════════════════════════════════════╝
`);
}
closeAll() {
for (const [id, ws] of this.connections) {
ws.close(1000, '테스트 종료');
}
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// 실행
const tester = new ExchangeWebSocketStressTest('binance');
tester.runStressTest(50, 30).catch(console.error);
버스트 트래픽 시뮬레이션
# test_burst.py - 버스트 트래픽 테스트 (일시적 트래픽 폭증)
import asyncio
import aiohttp
import random
import time
async def burst_traffic_test(base_url, api_key, burst_size=200, cycles=5):
"""
버스트 트래픽 테스트: 일시적으로 대량 요청을 보내는 시나리오
실제 시장 급변 상황(예:大口裁定 체결, 리플레이 공격)을 시뮬레이션
"""
results = {
"bursts": [],
"total_requests": 0,
"success_in_burst": 0,
"rate_limited": 0,
}
async with aiohttp.ClientSession() as session:
headers = {"X-MBX-APIKEY": api_key}
for cycle in range(cycles):
print(f"\n📈 버스트 #{cycle + 1} 시작 (사이즈: {burst_size})")
cycle_start = time.time()
# 버스트 내에서 동시 요청
tasks = []
for i in range(burst_size):
endpoint = random.choice([
"/api/v3/ticker/price?symbol=BTCUSDT",
"/api/v3/depth?symbol=BTCUSDT&limit=20",
"/api/v3/trades?symbol=BTCUSDT",
"/api/v3/klines?symbol=BTCUSDT&interval=1m&limit=1",
])
task = asyncio.create_task(
make_request(session, f"{base_url}{endpoint}", headers)
)
tasks.append(task)
# 모든 버스트 요청 동시 실행
cycle_results = await asyncio.gather(*tasks, return_exceptions=True)
cycle_duration = time.time() - cycle_start
success = sum(1 for r in cycle_results if r is True)
rate_limited = sum(1 for r in cycle_results if r == "rate_limited")
burst_result = {
"cycle": cycle + 1,
"duration_ms": cycle_duration * 1000,
"total_requests": burst_size,
"success": success,
"rate_limited": rate_limited,
"rps": burst_size / cycle_duration,
}
results["bursts"].append(burst_result)
results["total_requests"] += burst_size
results["success_in_burst"] += success
results["rate_limited"] += rate_limited
print(f" ✓ 완료: {burst_size} 요청, {cycle_duration*1000:.0f}ms, "
f"성공 {success}, 레이트 리밋 {rate_limited}")
# 버스트 사이 대기 시간
await asyncio.sleep(random.uniform(5, 15))
return results
async def make_request(session, url, headers):
try:
async with session.get(url, headers=headers,
timeout=aiohttp.ClientTimeout(total=5)) as resp:
if resp.status == 200:
return True
elif resp.status == 429:
return "rate_limited"
else:
return f"error_{resp.status}"
except asyncio.TimeoutError:
return "timeout"
except Exception:
return "connection_error"
실행 예시
if __name__ == "__main__":
base_url = "https://api.binance.com"
api_key = "YOUR_API_KEY"
results = asyncio.run(burst_traffic_test(base_url, api_key, burst_size=100, cycles=3))
print("\n" + "="*60)
print("버스트 트래픽 테스트 완료")
print("="*60)
자주 발생하는 오류 해결
1. ConnectionError: timeout after 30000ms
가장 흔한 오류입니다. 동시 연결 수가 많아지면 서버 응답이 지연됩니다.
# 해결 1: 타임아웃 증가 + 리트라이 로직 추가
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
async def robust_request(session, url, headers, max_retries=3):
for attempt in range(max_retries):
try:
async with session.get(
url,
headers=headers,
timeout=aiohttp.ClientTimeout(total=30) # 30초로 증가
) as resp:
if resp.status == 200:
return await resp.json()
elif resp.status == 429:
# 레이트 리밋 시指數 적 대기
await asyncio.sleep(2 ** attempt)
continue
else:
return None
except asyncio.TimeoutError:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
해결 2: 연결 풀 크기 최적화
connector = aiohttp.TCPConnector(
limit=100, # 동시 연결 수 제한
limit_per_host=50, # 호스트별 제한
ttl_dns_cache=300, # DNS 캐시 TTL
keepalive_timeout=30 # keep-alive 타임아웃
)
session = aiohttp.ClientSession(connector=connector)
2. 401 Unauthorized: Invalid API Key
API 키 인증 실패입니다. 여러 원인이 있을 수 있습니다.
# 해결: 서명 생성 로직 확인
import hmac
import hashlib
import time
import urllib.parse
def create_binance_signature(query_string, secret_key):
"""Binance API 서명 생성"""
return hmac.new(
secret_key.encode('utf-8'),
query_string.encode('utf-8'),
hashlib.sha256
).hexdigest()
def create_signed_request(params, api_secret):
"""서명 포함 요청 파라미터 생성"""
params['timestamp'] = int(time.time() * 1000)
params['signature'] = create_binance_signature(
urllib.parse.urlencode(params),
api_secret
)
return params
사용 예시
params = {
'symbol': 'BTCUSDT',
'side': 'BUY',
'type': 'LIMIT',
'quantity': 0.001,
'price': 50000,
'timeInForce': 'GTC',
}
signed_params = create_signed_request(params, api_secret)
print(f"서명 생성 완료: {signed_params['signature'][:20]}...")
3. 429 Too Many Requests - 레이트 리밋 초과
# 해결: 레이트 리밋 감지 및自适应 대기 로직
import time
import asyncio
class RateLimitHandler:
def __init__(self, requests_per_minute=1200):
self.rpm_limit = requests_per_minute
self.request_times = []
self.lock = asyncio.Lock()
async def acquire(self):
"""레이트 리밋 범위 내에서 요청 허가"""
async with self.lock:
now = time.time()
# 1분 이상 지난 요청 제거
self.request_times = [t for t in self.request_times if now - t < 60]
# 레이트 리밋에 도달했는지 확인
if len(self.request_times) >= self.rpm_limit:
# 가장 오래된 요청이 완료될 때까지 대기
oldest = self.request_times[0]
wait_time = 60 - (now - oldest) + 0.1
print(f"⚠️ 레이트 리밋 도달, {wait_time:.1f}초 대기...")
await asyncio.sleep(wait_time)
return await self.acquire() # 재귀적으로 다시 확인
self.request_times.append(now)
return True
def check_retry_after(self, response_headers):
"""서버가 제공하는 Retry-After 헤더 확인"""
retry_after = response_headers.get('Retry-After') or \
response_headers.get('X-Spread-Retry-After')
if retry_after:
return float(retry_after)
return None
사용
rate_limiter = RateLimitHandler(requests_per_minute=1000)
async def limited_request(session, url, headers):
await rate_limiter.acquire()
async with session.get(url, headers=headers) as resp:
if resp.status == 429:
wait = rate_limiter.check_retry_after(resp.headers)
if wait:
await asyncio.sleep(wait)
return await limited_request(session, url, headers)
return resp
4. 502 Bad Gateway / 503 Service Unavailable
# 해결: 다중 거래소 페일오버 및 Circuit Breaker 패턴
import asyncio
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # 정상
OPEN = "open" # 차단
HALF_OPEN = "half_open" # 부분 허용
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = None
def record_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"⚡ 서킷 브레이커 OPEN: {self.failure_count}회 연속 실패")
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self.record_success()
return result
except Exception as e:
self.record_failure()
raise
다중 거래소 페일오버
class MultiExchangeClient:
def __init__(self):
self.exchanges = {
'binance': ExchangeClient('binance'),
'coinbase': ExchangeClient('coinbase'),
'kraken': ExchangeClient('kraken'),
}
self.circuit_breakers = {
name: CircuitBreaker()
for name in self.exchanges
}
async def get_price_with_failover(self, symbol):
"""가장 빠른 응답 반환 + 페일오버"""
async def try_exchange(name, client):
cb = self.circuit_breakers[name]
try:
return await cb.call(client.get_price, symbol)
except:
return None
tasks = [
try_exchange(name, client)
for name, client in self.exchanges.items()
]
results = await asyncio.gather(*tasks)
for result in results:
if result is not None:
return result
raise Exception("모든 거래소 연결 실패")
5. WebSocket 연결 끊김 (1006 Abnormal Closure)
# 해결: WebSocket 자동 재연결 로직
import asyncio
import websockets
import json
class WebSocketReconnector:
def __init__(self, url, streams, max_reconnects=10):
self.url = url
self.streams = streams
self.max_reconnects = max_reconnects
self.reconnect_delay = 1
self.ws = None
async def connect(self):
full_url = f"{self.url}/{'/'.join(self.streams)}"
self.ws = await websockets.connect(full_url)
print(f"✅ WebSocket 연결됨: {full_url}")
async def listen_with_reconnect(self):
reconnect_count = 0
while reconnect_count < self.max_reconnects:
try:
await self.connect()
while True:
message = await self.ws.recv()
data = json.loads(message)
await self.process_message(data)
except websockets.ConnectionClosed as e:
reconnect_count += 1
print(f"❌ 연결 끊김 (코드: {e.code}), "
f"재연결 시도 {reconnect_count}/{self.max_reconnects}")
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(self.reconnect_delay * 2, 30) # 최대 30초
except Exception as e:
print(f"⚠️ 오류 발생: {e}")
reconnect_count += 1
await asyncio.sleep(self.reconnect_delay)
raise Exception("최대 재연결 횟수 초과")
async def process_message(self, data):
"""메시지 처리 (하위 클래스에서 오버라이드)"""
print(f"📩 메시지 수신: {data}")
사용
async def main():
reconnector = WebSocketReconnector(
url="wss://stream.binance.com:9443/ws",
streams=["btcusdt@ticker", "ethusdt@ticker"],
max_reconnects=100
)
await reconnector.listen_with_reconnect()
asyncio.run(main())
6. 메모리 누수 (동시 테스트 시 리소스 고갈)
# 해결: 리소스 정리 및 연결 풀 관리
import gc
import asyncio
from contextlib import asynccontextmanager
class ManagedTestSession:
"""자동 정리되는 테스트 세션"""
def __init__(self, max_connections=100):
self.max_connections = max_connections
self.sessions = []
self.connector = None
self.session = None
async def __aenter__(self):
self.connector = aiohttp.TCPConnector(
limit=self.max_connections,
limit_per_host=50,
)
self.session = aiohttp.ClientSession(connector=self.connector)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 명시적 정리
await self.session.close()
await self.connector.close()
# 가비지 컬렉션 강제 실행
gc.collect()
# 연결 완전 해제 대기
await asyncio.sleep(0.25)
return False # 예외를 억제하지 않음
사용
async def run_memory_safe_test():
for batch in range(100):
async with ManagedTestSession(max_connections=50) as session:
tasks = [session.session.get(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 각 배치 후 메모리 확인
import psutil
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
print(f"배치 {batch}: 메모리 사용량 {memory_mb:.1f}MB")
# 배치 간 짧은 대기
await asyncio.sleep(1)
테스트 결과 분석 및 최적화
# report.py - 테스트 결과 종합 분석
import json
from statistics import mean, median, stdev
def analyze_results(results):
"""결과 데이터 분석 및 권장사항 도출"""
analysis = {
"performance_metrics": {},
"bottlenecks": [],
"recommendations": [],
}
# 기본 성능 지표
latencies = results.get("latencies", [])
if latencies:
analysis["performance_metrics"] = {
"mean_latency_ms": round(mean(latencies), 2),
"median_latency_ms": round(median(latencies), 2),
"p95_latency_ms": round(sorted(latencies)[int(len(latencies)*0.95)], 2),
"p99_latency_ms": round(sorted(latencies)[int(len(latencies)*0.99)], 2),
"max_latency_ms": max(latencies),
"std_dev": round(stdev(latencies), 2) if len(latencies) > 1 else 0,
}
# 병목 지점 식별
errors = results.get("errors", {})
rate_limit_hits = results.get("rate_limit_hits", 0)
total = results.get("success", 0) + results.get("failed", 0)
if rate_limit_hits / total > 0.1:
analysis["bottlenecks"].append({
"type": "rate_limit",
"severity": "high",
"message": f"{rate_limit_hits/total*100:.1f}% 요청이 레이트 리밋에 걸림",
})
if "TimeoutError" in errors and errors["TimeoutError"] / total > 0.05:
analysis["bottlenecks"].append({
"type": "timeout",
"severity": "medium",
"message": "서버 응답 지연 발생, 연결 풀 최적화 필요",
})
# 최적화 권장사항
avg_latency = analysis["performance_metrics"].get("mean_latency_ms", 0)
if avg_latency > 500:
analysis["recommendations"].append(
"평균 지연 시간이 높습니다. CDN 또는 에지 서버 사용을 고려하세요."
)
if rate_limit_hits / total > 0.05:
analysis["recommendations"].append(
"레이트 리밋 최적화가 필요합니다. 요청 캐싱 및 배치 처리 도입을 권장합니다."
)
return analysis
실제 사용
if __name__ == "__main__":
# 테스트 결과 로드
with open("test_results.json") as f:
results = json.load(f)
analysis = analyze_results(results)
print("\n📊 성능 분석 결과")
print("-" * 40)
for key, value in analysis["performance_metrics"].items():
print(f" {key}: {value}ms")
if analysis["bottlenecks"]:
print("\n⚠️ 병목 지점:")
for bottleneck in analysis["bottlenecks"]:
print(f" [{bottleneck['severity'].upper()}] {bottleneck['message']}")
if analysis["recommendations"]:
print("\n💡 권장 최적화:")
for rec in analysis["recommendations"]:
print(f" • {rec}")
이런 팀에 적합 / 비적합
| 적합한 팀 |
|---|