안녕하세요, HolySheep AI의 시니어 엔지니어입니다. 이번 포스트에서는 초당 1,000개 이상의 요청(QPS)을 안정적으로 처리하는 AI API 게이트웨이 아키텍처를 설계하고 구현하는 방법을 다루겠습니다. HolySheep AI는 지금 가입하고 무료 크레딧을 받아 실제 환경에서 테스트해볼 수 있습니다.

아키텍처 개요: 왜 분산 설계가 필요한가?

단일 AI API 서버는 모델 추론 지연(latency)으로 인해 병목 현상이 발생합니다. 일반적으로:

따라서 1000+ QPS를 처리하려면 최소 10~20대의 백엔드 서버와 정교한 로드밸런싱이 필수적입니다.

"""
HolySheep AI 게이트웨이 아키텍처
Author: HolySheep AI Engineering Team
Target: 1000+ QPS with sub-100ms p99 latency
"""

import asyncio
import time
import hashlib
from dataclasses import dataclass, field
from typing import Optional, List, Dict
from collections import defaultdict
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class BackendServer:
    """백엔드 AI API 서버 정보"""
    id: str
    url: str
    weight: int = 100  # 로드밸런싱 가중치
    max_connections: int = 100
    timeout: float = 30.0
    
    # 건강도 측정
    healthy: bool = True
    current_requests: int = 0
    total_requests: int = 0
    failed_requests: int = 0
    avg_latency_ms: float = 0.0
    
    # Circuit Breaker 상태
    failure_count: int = 0
    last_failure_time: float = 0.0
    circuit_open: bool = False
    half_open_successes: int = 0
    
    def __post_init__(self):
        self.lock = asyncio.Lock()

    @property
    def success_rate(self) -> float:
        if self.total_requests == 0:
            return 1.0
        return (self.total_requests - self.failed_requests) / self.total_requests

    @property
    def effective_weight(self) -> int:
        """동적 가중치 계산 (성능 기반)"""
        if not self.healthy or self.circuit_open:
            return 0
        
        # 성공률과 지연 시간 기반 가중치 조정
        success_factor = self.success_rate ** 2
        latency_factor = max(0.1, 1.0 - (self.avg_latency_ms / 2000))
        
        return int(self.weight * success_factor * latency_factor)


class CircuitBreaker:
    """Circuit Breaker 패턴 구현 - 장애 전파 방지"""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        half_open_max_calls: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls
        
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
        self.failure_count = 0
        self.last_failure_time = 0.0
        self.half_open_calls = 0
    
    def record_success(self):
        self.failure_count = 0
        self.state = "CLOSED"
        self.half_open_calls = 0
    
    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.state == "HALF_OPEN":
            self.half_open_calls += 1
            if self.half_open_calls >= self.half_open_max_calls:
                self.state = "CLOSED"
                self.failure_count = 0
        
        elif self.state == "CLOSED":
            if self.failure_count >= self.failure_threshold:
                self.state = "OPEN"
                logger.warning(f"Circuit breaker OPENED after {self.failure_count} failures")
    
    def can_attempt(self) -> bool:
        if self.state == "CLOSED":
            return True
        
        if self.state == "OPEN":
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self.state = "HALF_OPEN"
                logger.info("Circuit breaker transitioning to HALF_OPEN")
                return True
            return False
        
        if self.state == "HALF_OPEN":
            return self.half_open_calls < self.half_open_max_calls
        
        return False


@dataclass
class LoadBalancerMetrics:
    """로드밸런서 성능 지표"""
    total_requests: int = 0
    successful_requests: int = 0
    failed_requests: int = 0
    total_latency_ms: float = 0.0
    p50_latency_ms: float = 0.0
    p95_latency_ms: float = 0.0
    p99_latency_ms: float = 0.0
    request_times: List[float] = field(default_factory=list)
    
    def record_request(self, latency_ms: float, success: bool):
        self.total_requests += 1
        if success:
            self.successful_requests += 1
        else:
            self.failed_requests += 1
        
        self.total_latency_ms += latency_ms
        self.request_times.append(latency_ms)
        
        # Percentile 계산 (간소화)
        if len(self.request_times) > 100:
            sorted_times = sorted(self.request_times)
            self.p50_latency_ms = sorted_times[len(sorted_times) // 2]
            self.p95_latency_ms = sorted_times[int(len(sorted_times) * 0.95)]
            self.p99_latency_ms = sorted_times[int(len(sorted_times) * 0.99)]
            # sliding window 유지
            self.request_times = sorted_times[-1000:]
    
    @property
    def success_rate(self) -> float:
        if self.total_requests == 0:
            return 0.0
        return self.successful_requests / self.total_requests
    
    @property
    def avg_latency_ms(self) -> float:
        if self.total_requests == 0:
            return 0.0
        return self.total_latency_ms / self.total_requests

가중치 기반 Least-Connection 로드밸런서

AI API 특성상 모델 응답 시간이 가변적이기 때문에, 단순 Round-Robin보다 동적 가중치 기반 Least-Connection 방식이 적합합니다. 이 방식은:

import aiohttp
import random
from typing import Optional, Callable
import json

class HolySheepAILoadBalancer:
    """
    HolySheep AI 게이트웨이 로드밸런서
    - 가중치 기반 Least-Connection 알고리즘
    - 자동 장애 감지 및 복구
    - Circuit Breaker 패턴 통합
    """
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.holysheep.ai/v1"
        
        # 백엔드 서버 풀 (HolySheep AI 통합 모델들)
        self.servers: List[BackendServer] = [
            BackendServer(id="gpt4-1", url=f"{self.base_url}/chat/completions", weight=100),
            BackendServer(id="claude-sonnet", url=f"{self.base_url}/chat/completions", weight=100),
            BackendServer(id="gemini-flash", url=f"{self.base_url}/chat/completions", weight=150),
            BackendServer(id="deepseek-v3", url=f"{self.base_url}/chat/completions", weight=200),
        ]
        
        # Circuit Breaker 맵
        self.circuit_breakers: Dict[str, CircuitBreaker] = {
            server.id: CircuitBreaker(failure_threshold=5, recovery_timeout=30)
            for server in self.servers
        }
        
        # 메트릭 수집
        self.metrics = LoadBalancerMetrics()
        
        # 요청 큐 (overflow handling)
        self.request_queue: asyncio.Queue = asyncio.Queue(maxsize=10000)
        self.queue_worker_task: Optional[asyncio.Task] = None
        
        # 연결 풀
        self.session: Optional[aiohttp.ClientSession] = None
    
    async def start(self):
        """로드밸런서 시작"""
        connector = aiohttp.TCPConnector(
            limit=1000,  # 전체 연결 제한
            limit_per_host=200,  # 호스트별 연결 제한
            ttl_dns_cache=300,
            enable_cleanup_closed=True
        )
        
        timeout = aiohttp.ClientTimeout(total=60, connect=10)
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout
        )
        
        # 큐 워커 시작
        self.queue_worker_task = asyncio.create_task(self._queue_worker())
        
        logger.info(f"Load balancer started with {len(self.servers)} servers")
    
    async def stop(self):
        """로드밸런서 종료"""
        if self.queue_worker_task:
            self.queue_worker_task.cancel()
        
        if self.session:
            await self.session.close()
        
        logger.info("Load balancer stopped")
    
    async def _queue_worker(self):
        """요청 큐 처리 워커 (Overflow Handling)"""
        while True:
            try:
                future = await self.request_queue.get()
                if future is None:
                    break
                
                # 큐에积压된 요청이 많으면 대기 시간 경고
                queue_size = self.request_queue.qsize()
                if queue_size > 100:
                    logger.warning(f"Request queue backlog: {queue_size}")
                
                await future
                self.request_queue.task_done()
                
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Queue worker error: {e}")
    
    def _select_server(self) -> Optional[BackendServer]:
        """가중치 기반 Least-Connection 서버 선택"""
        available_servers = []
        
        for server in self.servers:
            cb = self.circuit_breakers[server.id]
            
            if not server.healthy:
                continue
            
            if not cb.can_attempt():
                continue
            
            # effective_weight가 0이면 건너뜀
            if server.effective_weight <= 0:
                continue
            
            available_servers.append(server)
        
        if not available_servers:
            logger.error("No available servers!")
            return None
        
        # Weighted Least-Connection: 연결 수 / 가중치 비가 가장 작은 서버 선택
        def score(s: BackendServer) -> float:
            return s.current_requests / s.effective_weight
        
        selected = min(available_servers, key=score)
        return selected
    
    async def _execute_request(
        self,
        server: BackendServer,
        payload: dict,
        future: asyncio.Future
    ):
        """단일 서버에 요청 실행"""
        start_time = time.time()
        cb = self.circuit_breakers[server.id]
        
        try:
            async with self.session.post(
                server.url,
                json=payload,
                headers={
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json"
                }
            ) as response:
                async with server.lock:
                    server.current_requests += 1
                    server.total_requests += 1
                
                if response.status == 200:
                    result = await response.json()
                    latency_ms = (time.time() - start_time) * 1000
                    
                    async with server.lock:
                        server.avg_latency_ms = (
                            server.avg_latency_ms * 0.9 + latency_ms * 0.1
                        )
                        server.current_requests -= 1
                    
                    cb.record_success()
                    self.metrics.record_request(latency_ms, success=True)
                    future.set_result(result)
                    
                else:
                    error_text = await response.text()
                    raise aiohttp.ClientResponseError(
                        response.request_info,
                        response.history,
                        status=response.status,
                        message=error_text
                    )
                    
        except Exception as e:
            latency_ms = (time.time() - start_time) * 1000
            
            async with server.lock:
                server.current_requests -= 1
                server.total_requests += 1
                server.failed_requests += 1
                server.last_failure_time = time.time()
            
            cb.record_failure()
            self.metrics.record_request(latency_ms, success=False)
            
            future.set_exception(e)
    
    async def chat_completions(
        self,
        model: str,
        messages: List[dict],
        temperature: float = 0.7,
        max_tokens: int = 1000,
        **kwargs
    ) -> dict:
        """
        HolySheep AI 채팅 완료 요청 (로드밸런싱 적용)
        
        Returns:
            dict: AI 모델 응답
        """
        payload = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
            "max_tokens": max_tokens,
            **kwargs
        }
        
        max_retries = len(self.servers)  # 각 서버별 1회 시도
        attempted_servers = set()
        
        for attempt in range(max_retries):
            server = self._select_server()
            
            if server is None:
                raise RuntimeError("No available servers to handle request")
            
            if server.id in attempted_servers:
                continue  # 이미 시도한 서버 건너뛰기
            
            attempted_servers.add(server.id)
            
            # Circuit이 열려있으면 스킵
            if server.circuit_open:
                continue
            
            future: asyncio.Future = asyncio.get_event_loop().create_future()
            
            # 요청 실행
            await self._execute_request(server, payload, future)
            
            try:
                result = await asyncio.wait_for(
                    future,
                    timeout=server.timeout
                )
                return result
                
            except asyncio.TimeoutError:
                logger.warning(f"Timeout on {server.id}, attempt {attempt + 1}")
                continue
                
            except Exception as e:
                logger.warning(f"Error on {server.id}: {e}, attempt {attempt + 1}")
                continue
        
        raise RuntimeError(f"All servers failed after {max_retries} attempts")

    def get_health_report(self) -> dict:
        """전체 서버 건강도 보고서"""
        return {
            "total_requests": self.metrics.total_requests,
            "success_rate": f"{self.metrics.success_rate * 100:.2f}%",
            "avg_latency_ms": f"{self.metrics.avg_latency_ms:.2f}",
            "p50_latency_ms": f"{self.metrics.p50_latency_ms:.2f}",
            "p95_latency_ms": f"{self.metrics.p95_latency_ms:.2f}",
            "p99_latency_ms": f"{self.metrics.p99_latency_ms:.2f}",
            "servers": [
                {
                    "id": s.id,
                    "healthy": s.healthy,
                    "circuit_state": self.circuit_breakers[s.id].state,
                    "current_requests": s.current_requests,
                    "avg_latency_ms": f"{s.avg_latency_ms:.2f}",
                    "success_rate": f"{s.success_rate * 100:.2f}%"
                }
                for s in self.servers
            ]
        }

동시 요청 처리 및 Rate Limiting

1000+ QPS를 달성하려면 동시 연결 관리와 Rate Limiting이 필수적입니다. HolySheep AI의 모델별 Rate Limit을 초과하지 않으면서 최대 처리량을 확보하는 전략을 구현하겠습니다.

import asyncio
from typing import Dict, Tuple
from datetime import datetime, timedelta
import threading

class TokenBucketRateLimiter:
    """
    Token Bucket 알고리즘 기반 Rate Limiter
    - HolySheep AI 모델별 Rate Limit 적용
    - 버스트 요청 허용
    """
    
    def __init__(self, rate: float, capacity: int):
        """
        Args:
            rate: 초당 토큰 replenishment 수 (RPM/60)
            capacity: 버킷 최대 용량
        """
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.time()
        self.lock = asyncio.Lock()
    
    async def acquire(self, tokens: int = 1) -> float:
        """토큰 획득, 대기 시간 반환"""
        async with self.lock:
            now = time.time()
            
            # 시간 경과에 따른 토큰 보충
            elapsed = now - self.last_update
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.last_update = now
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return 0.0  # 대기 없음
            
            # 부족한 토큰만큼 대기
            wait_time = (tokens - self.tokens) / self.rate
            self.tokens = 0
            self.last_update = now + wait_time
            
            return wait_time


class HolySheepAPIClient:
    """
    HolySheep AI 고성능 API 클라이언트
    - 모델별 Rate Limiting
    - 연결 풀링 및 Keep-Alive
    - 자동 재시도 로직
    """
    
    # HolySheep AI Rate Limits (예시, 실제_limits는 대시보드 확인)
    MODEL_LIMITS: Dict[str, Tuple[float, int]] = {
        "gpt-4.1": (500 / 60, 500),      # 500 RPM, 버킷 500
        "claude-sonnet-4-20250514": (500 / 60, 500),
        "gemini-2.5-flash": (1000 / 60, 1000),
        "deepseek-v3": (2000 / 60, 2000),
    }
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.holysheep.ai/v1"
        
        # 모델별 Rate Limiter
        self.rate_limiters: Dict[str, TokenBucketRateLimiter] = {}
        for model, (rate, capacity) in self.MODEL_LIMITS.items():
            self.rate_limiters[model] = TokenBucketRateLimiter(rate, capacity)
        
        # 전역 Rate Limiter (계정 레벨)
        self.global_rate_limiter = TokenBucketRateLimiter(
            rate=10000 / 60,  # 10,000 RPM
            capacity=10000
        )
        
        # 연결 풀
        self.session: Optional[aiohttp.ClientSession] = None
    
    async def __aenter__(self):
        connector = aiohttp.TCPConnector(
            limit=500,
            limit_per_host=100,
            ttl_dns_cache=300,
            keepalive_timeout=30
        )
        
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=aiohttp.ClientTimeout(total=120)
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def chat_completions(
        self,
        model: str,
        messages: List[dict],
        temperature: float = 0.7,
        max_tokens: int = 1000,
        retries: int = 3
    ) -> dict:
        """채팅 완료 요청 (Rate Limit 적용)"""
        
        # Rate Limit 대기
        rate_limiter = self.rate_limiters.get(model, self.global_rate_limiter)
        wait_time = await rate_limiter.acquire()
        
        if wait_time > 0:
            logger.info(f"Rate limited for {model}, waiting {wait_time:.3f}s")
            await asyncio.sleep(wait_time)
        
        for attempt in range(retries):
            try:
                async with self.session.post(
                    f"{self.base_url}/chat/completions",
                    json={
                        "model": model,
                        "messages": messages,
                        "temperature": temperature,
                        "max_tokens": max_tokens
                    },
                    headers={
                        "Authorization": f"Bearer {self.api_key}",
                        "Content-Type": "application/json"
                    }
                ) as response:
                    if response.status == 429:
                        # Rate Limit 초과 - 지수 백오프
                        retry_after = float(response.headers.get("Retry-After", 1))
                        wait_time = retry_after * (2 ** attempt)
                        logger.warning(f"Rate limit hit, retrying in {wait_time}s")
                        await asyncio.sleep(wait_time)
                        continue
                    
                    if response.status == 500:
                        # 서버 오류 - 재시도
                        await asyncio.sleep(2 ** attempt)
                        continue
                    
                    response.raise_for_status()
                    return await response.json()
                    
            except aiohttp.ClientError as e:
                if attempt == retries - 1:
                    raise
                await asyncio.sleep(2 ** attempt)
        
        raise RuntimeError("Max retries exceeded")


===== 벤치마