AI 애플리케이션에서 사용자에게 실시간 피드백을 제공하는 것은 사용자 경험의 핵심입니다. 저는 3년 넘게 대규모 AI API 연동을 진행하며 스트리밍 응답의 성능 최적화와背壓(backpressure) 처리가 프로덕션 안정성의 핵심임을 확인했습니다. 이번 튜토리얼에서는 HolySheep AI의 글로벌 AI 게이트웨이와 FastAPI를 결합하여 효율적인 SSE 스트리밍 응답을 구현하는 방법을 깊이 있게 다룹니다.

SSE와 스트리밍 응답의 기본 원리

Server-Sent Events(SSE)는 서버에서 클라이언트로 단방향 데이터 스트림을 전송하는 HTTP 기반 프로토콜입니다. AI 응답의 경우 모델이 토큰을 생성할 때마다 실시간으로 전송하여 사용자가 전체 응답을 기다리지 않고 부분 결과를 확인할 수 있습니다.

FastAPI는 Starlette의 ASGI 기반 아키텍처를 활용하여 고성능 비동기 SSE 구현을 지원합니다. 전통적인 풀링 방식 대비 스트리밍은 TTFT(Time To First Token)를 크게 단축하며, 긴 응답에서 사용자가 중간에 이탈하더라도 불필요한 토큰 생성을 방지할 수 있습니다.

HolySheep AI API 연동 설정

HolySheep AI는 단일 API 키로 여러 AI 모델을 통합 관리할 수 있는 글로벌 게이트웨이입니다. 가입 시 무료 크레딧이 제공되며, 해외 신용카드 없이 로컬 결제가 가능합니다. 저는 여러 프로젝트에서 HolySheep AI를 활용하여 모델별 비용을 최적화하고 있습니다.

먼저 필요한 의존성을 설치합니다:

pip install fastapi uvicorn httpx sse-starlette python-dotenv aiohttp

프로젝트 구조는 다음과 같이 구성합니다:

fastapi-ai-streaming/
├── main.py
├── streaming/
│   ├── __init__.py
│   ├── generator.py
│   ├── backpressure.py
│   └── models.py
├── config.py
└── requirements.txt

비동기 생성기를 활용한 스트리밍 응답 구현

FastAPI에서 SSE 스트리밍 응답을 구현하는 핵심은 async generator입니다. async generator는 await 가능한 객체를 yield하면서 전체 응답 완료 전에도 데이터를 전송할 수 있게 합니다.

# streaming/generator.py
import asyncio
import json
import time
from typing import AsyncGenerator
from dataclasses import dataclass
from datetime import datetime

import httpx
from config import settings


@dataclass
class StreamMetrics:
    """스트리밍 성능 지표"""
    request_id: str
    model: str
    tokens_received: int
    time_to_first_token_ms: float
    total_time_ms: float
    tokens_per_second: float
    error_count: int = 0


class StreamingAIGenerator:
    """
    HolySheep AI API와 연동하는 비동기 스트리밍 생성기
   背壓 처리 및 재시도 로직 포함
    """
    
    def __init__(
        self,
        api_key: str,
        base_url: str = "https://api.holysheep.ai/v1",
        timeout: float = 120.0,
        max_retries: int = 3
    ):
        self.api_key = api_key
        self.base_url = base_url
        self.timeout = timeout
        self.max_retries = max_retries
        self._client: httpx.AsyncClient | None = None
    
    async def _get_client(self) -> httpx.AsyncClient:
        """지연 초기화 패턴으로 연결 재사용"""
        if self._client is None or self._client.is_closed:
            self._client = httpx.AsyncClient(
                timeout=httpx.Timeout(self.timeout),
                limits=httpx.Limits(
                    max_keepalive_connections=20,
                    max_connections=100,
                    keepalive_expiry=30.0
                )
            )
        return self._client
    
    async def close(self):
        """클라이언트 리소스 정리"""
        if self._client and not self._client.is_closed:
            await self._client.aclose()
            self._client = None
    
    async def stream_chat_completion(
        self,
        model: str,
        messages: list[dict],
        temperature: float = 0.7,
        max_tokens: int = 2048,
        request_id: str | None = None
    ) -> AsyncGenerator[str, None]:
        """
        HolySheep AI API에서 SSE 스트리밍 응답 수신
        
        Args:
            model: HolySheep AI에서 지원되는 모델명
            messages: 대화 컨텍스트
            temperature: 응답 창의성 조절
            max_tokens: 최대 생성 토큰 수
            request_id: 요청 추적용 ID
            
        Yields:
            SSE 포맷의 데이터 청크
        """
        request_id = request_id or f"req_{int(time.time() * 1000)}"
        start_time = time.perf_counter()
        first_token_time: float | None = None
        tokens_received = 0
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
            "Accept": "text/event-stream"
        }
        
        payload = {
            "model": model,
            "messages": messages,
            "stream": True,
            "temperature": temperature,
            "max_tokens": max_tokens
        }
        
        client = await self._get_client()
        retry_count = 0
        
        while retry_count < self.max_retries:
            try:
                async with client.stream(
                    "POST",
                    f"{self.base_url}/chat/completions",
                    headers=headers,
                    json=payload
                ) as response:
                    
                    if response.status_code == 429:
                        # Rate limit 처리: 지수 백오프
                        retry_after = int(response.headers.get("Retry-After", 5))
                        wait_time = min(60, (2 ** retry_count) * retry_after)
                        await asyncio.sleep(wait_time)
                        retry_count += 1
                        continue
                    
                    response.raise_for_status()
                    
                    async for line in response.aiter_lines():
                        if not line or not line.startswith("data: "):
                            continue
                        
                        data = line[6:]  # "data: " prefix 제거
                        
                        if data == "[DONE]":
                            break
                        
                        try:
                            chunk = json.loads(data)
                            
                            # 첫 토큰 수신 시간 기록
                            if first_token_time is None and chunk.get("choices"):
                                delta = chunk["choices"][0].get("delta", {})
                                if delta.get("content"):
                                    first_token_time = time.perf_counter()
                            
                            # 토큰 카운트
                            delta = chunk.get("choices", [{}])[0].get("delta", {})
                            if delta.get("content"):
                                tokens_received += 1
                            
                            # SSE 포맷으로 변환하여 yield
                            yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
                            
                        except json.JSONDecodeError:
                            continue
                
                # 성공적으로 완료
                total_time = (time.perf_counter() - start_time) * 1000
                metrics = StreamMetrics(
                    request_id=request_id,
                    model=model,
                    tokens_received=tokens_received,
                    time_to_first_token_ms=(
                        (first_token_time - start_time) * 1000 
                        if first_token_time else 0
                    ),
                    total_time_ms=total_time,
                    tokens_per_second=(
                        (tokens_received / (total_time / 1000))
                        if total_time > 0 else 0
                    )
                )
                
                # 메트릭스 이벤트 전송
                yield f"data: {json.dumps({'type': 'metrics', 'data': vars(metrics)}, ensure_ascii=False)}\n\n"
                yield "data: [DONE]\n\n"
                return
                
            except httpx.HTTPStatusError as e:
                if e.response.status_code >= 500 and retry_count < self.max_retries:
                    retry_count += 1
                    await asyncio.sleep(2 ** retry_count)
                    continue
                raise
            except Exception as e:
                yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n"
                return
        
        # 최대 재시도 횟수 초과
        yield f"data: {json.dumps({'type': 'error', 'error': 'Max retries exceeded'})}\n\n"
        yield "data: [DONE]\n\n"


async def create_streaming_generator() -> StreamingAIGenerator:
    """의존성 주입을 위한 팩토리 함수"""
    return StreamingAIGenerator(
        api_key=settings.HOLYSHEEP_API_KEY,
        base_url="https://api.holysheep.ai/v1",
        timeout=120.0
    )

背壓 처리 메커니즘

背壓(backpressure)은 데이터 소비자가 생산자보다 느릴 때 발생하는 문제입니다. AI 스트리밍에서 이는 클라이언트 연결 지연, 네트워크 병목, 또는 클라이언트 처리 과부하로 인해 발생할 수 있습니다.背壓을 처리하지 않으면 서버 메모리가 고갈되거나 클라이언트 연결이 끊어질 수 있습니다.

버퍼 기반 流控 구현

# streaming/backpressure.py
import asyncio
import time
from collections import deque
from dataclasses import dataclass, field
from typing import AsyncGenerator, Optional
from enum import Enum
import logging

logger = logging.getLogger(__name__)


class BackpressureStrategy(Enum):
    """背壓 처리 전략"""
    DROP_OLDEST = "drop_oldest"      # 가장 오래된 데이터 버리기
    DROP_NEWEST = "drop_newest"      # 최신 데이터 버리기
    BLOCK = "block"                  # 생산자 차단
    BUFFER_FULL_ERROR = "error"     # 버퍼 가득 시 오류 발생


@dataclass
class BackpressureConfig:
    """背壓 설정"""
    max_buffer_size: int = 100           # 최대 버퍼 크기
    max_wait_time_ms: float = 5000.0     # 최대 대기 시간
    strategy: BackpressureStrategy = BackpressureStrategy.DROP_OLDEST
    enable_metrics: bool = True


@dataclass
class BackpressureMetrics:
    """背壓 모니터링 지표"""
    total_items: int = 0
    dropped_items: int = 0
    wait_events: int = 0
    max_queue_depth: int = 0
    avg_processing_time_ms: float = 0.0
    last_updated: float = field(default_factory=time.time)


class BackpressureHandler:
    """
   背壓을 관리하는 버퍼링 핸들러
    
    클라이언트 처리 속도가 서버 전송 속도보다 느릴 때
    적절한 流控를 적용하여 시스템 안정성 유지
    """
    
    def __init__(self, config: BackpressureConfig | None = None):
        self.config = config or BackpressureConfig()
        self._buffer: deque[tuple[float, str]] = deque(maxlen=self.config.max_buffer_size)
        self._metrics = BackpressureMetrics()
        self._lock = asyncio.Lock()
        self._not_empty = asyncio.Condition(self._lock)
        self._closed = False
        self._total_processing_time = 0.0
    
    @property
    def metrics(self) -> BackpressureMetrics:
        return self._metrics
    
    async def put(self, item: str, timeout: float | None = None) -> bool:
        """
        버퍼에 아이템 추가
        
       背壓 전략에 따라 버퍼가 가득 찼을 때 행동 결정
        
        Returns:
            True: 아이템 추가 성공
            False: 버퍼 가득으로 버려짐 (DROP_* 전략)
        """
        if self._closed:
            return False
        
        async with self._lock:
            # 버퍼 공간 확인
            if len(self._buffer) >= self.config.max_buffer_size:
                return await self._handle_buffer_full()
            
            # 버퍼에 추가
            timestamp = time.perf_counter()
            self._buffer.append((timestamp, item))
            self._metrics.total_items += 1
            self._metrics.max_queue_depth = max(
                self._metrics.max_queue_depth,
                len(self._buffer)
            )
            
            # 소비자 깨우기
            self._not_empty.notify()
            
            return True
    
    async def _handle_buffer_full(self) -> bool:
        """버퍼 가득 찼을 때의 처리"""
        match self.config.strategy:
            case BackpressureStrategy.DROP_OLDEST:
                # 가장 오래된 아이템 제거 후 새 아이템 추가
                self._buffer.popleft()
                self._buffer.append((time.perf_counter(), self._buffer[-1][1]))
                self._metrics.dropped_items += 1
                logger.warning("Buffer full: dropping oldest item")
                return True
                
            case BackpressureStrategy.DROP_NEWEST:
                # 최신 아이템 버리기
                self._metrics.dropped_items += 1
                logger.debug("Buffer full: dropping newest item")
                return False
                
            case BackpressureStrategy.BLOCK:
                # 소비자가 공간을 만들 때까지 대기
                self._metrics.wait_events += 1
                try:
                    await asyncio.wait_for(
                        self._not_empty.wait_for(lambda: len(self._buffer) < self.config.max_buffer_size),
                        timeout=timeout or (self.config.max_wait_time_ms / 1000)
                    )
                    return True
                except asyncio.TimeoutError:
                    logger.error("Backpressure timeout: max wait time exceeded")
                    return False
                    
            case BackpressureStrategy.BUFFER_FULL_ERROR:
                raise BufferError("Stream buffer full, cannot accept more items")
        
        return False
    
    async def get(self, timeout: float | None = None) -> Optional[str]:
        """버퍼에서 아이템 가져오기"""
        async with self._lock:
            if not self._buffer:
                if self._closed:
                    return None
                
                self._metrics.wait_events += 1
                try:
                    await asyncio.wait_for(
                        self._not_empty.wait_for(lambda: bool(self._buffer) or self._closed),
                        timeout=timeout
                    )
                except asyncio.TimeoutError:
                    return None
                
                if self._closed and not self._buffer:
                    return None
            
            timestamp, item = self._buffer.popleft()
            self._not_empty.notify()
            
            # 처리 시간 기록
            processing_time = (time.perf_counter() - timestamp) * 1000
            self._total_processing_time += processing_time
            self._metrics.avg_processing_time_ms = (
                self._total_processing_time / max(1, self._metrics.total_items)
            )
            
            return item
    
    async def close(self):
        """핸들러 종료 및 리소스 정리"""
        async with self._lock:
            self._closed = True
            self._not_empty.notify_all()
            self._buffer.clear()
    
    def reset_metrics(self):
        """지표 초기화"""
        self._metrics = BackpressureMetrics()


class BackpressureAwareStream:
    """
   背壓 인지를 지원하는 스트리밍 래퍼
    
    HolySheep AI API 응답을 소비자에게 전달하면서
    자동으로背壓 처리
    """
    
    def __init__(
        self,
        generator: AsyncGenerator[str, None],
        config: BackpressureConfig | None = None
    ):
        self._generator = generator
        self._handler = BackpressureHandler(config)
        self._task: asyncio.Task | None = None
        self._exception: Exception | None = None
    
    @property
    def handler(self) -> BackpressureHandler:
        return self._handler
    
    @property
    def metrics(self) -> BackpressureMetrics:
        return self._handler.metrics
    
    async def _producer(self):
        """생성자로부터 아이템을 버퍼로 이동"""
        try:
            async for item in self._generator:
                success = await self._handler.put(item)
                if not success and self._handler.config.strategy != BackpressureStrategy.DROP_NEWEST:
                    logger.warning("Item dropped due to backpressure")
        except Exception as e:
            self._exception = e
            logger.error(f"Producer error: {e}")
        finally:
            await self._handler.close()
    
    async def start(self):
        """프로듀서 태스크 시작"""
        self._task = asyncio.create_task(self._producer())
    
    async def stop(self):
        """프로듀서 태스크 중지"""
        await self._handler.close()
        if self._task:
            self._task.cancel()
            try:
                await self._task
            except asyncio.CancelledError:
                pass
    
    async def stream(self) -> AsyncGenerator[str, None]:
        """소비자를 위한 AsyncGenerator"""
        await self.start()
        
        try:
            while True:
                item = await self._handler.get(timeout=30.0)
                if item is None:
                    break
                yield item
        finally:
            await self.stop()
    
    def get_exception(self) -> Exception | None:
        return self._exception

FastAPI 엔드포인트 구현

# main.py
from contextlib import asynccontextmanager
from typing import Annotated

from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
import uvicorn

from streaming.generator import StreamingAIGenerator, create_streaming_generator
from streaming.backpressure import (
    BackpressureConfig,
    BackpressureStrategy,
    BackpressureAwareStream
)
from config import settings


전역 생성기 인스턴스

streaming_generator: StreamingAIGenerator | None = None @asynccontextmanager async def lifespan(app: FastAPI): """애플리케이션 생명주기 관리""" global streaming_generator streaming_generator = await create_streaming_generator() yield if streaming_generator: await streaming_generator.close() app = FastAPI( title="HolySheep AI Streaming API", description="비동기 생성기와背壓 처리 기반 AI 스트리밍 서비스", version="1.0.0", lifespan=lifespan )

CORS 설정

app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"] ) class ChatRequest(BaseModel): """채팅 요청 스키마""" model: str = Field(default="gpt-4.1", description="HolySheep AI 모델명") messages: list[dict[str, str]] = Field( ..., description="대화 메시지 목록", min_length=1 ) temperature: float = Field( default=0.7, ge=0.0, le=2.0, description="응답 다양성" ) max_tokens: int = Field( default=2048, ge=1, le=8192, description="최대 토큰 수" ) stream: bool = Field(default=True, description="스트리밍 모드") backpressure_strategy: str = Field( default="drop_oldest", description="背壓 처리 전략" ) class ChatResponse(BaseModel): """비스트리밍 응답 스키마""" content: str model: str usage: dict async def get_generator() -> StreamingAIGenerator: """의존성 주입을 통한 생성기 획득""" if streaming_generator is None: raise HTTPException(status_code=503, detail="Service not initialized") return streaming_generator @app.post("/v1/chat/completions") async def chat_completions( request: ChatRequest, generator: Annotated[StreamingAIGenerator, Depends(get_generator)] ): """ HolySheep AI 스트리밍 채팅 완료 엔드포인트 스트리밍 모드: SSE를 통해 실시간 토큰 전송 비스트리밍 모드: 전체 응답 완료 후 반환 """ if not request.stream: # 비스트리밍 모드: 전체 응답 수집 full_content = [] async for chunk in generator.stream_chat_completion( model=request.model, messages=request.messages, temperature=request.temperature, max_tokens=request.max_tokens ): if chunk.startswith("data: [DONE]"): break # non-stream 응답 형식으로 변환 pass return ChatResponse( content="".join(full_content), model=request.model, usage={} ) #背壓 설정 구성 strategy_map = { "drop_oldest": BackpressureStrategy.DROP_OLDEST, "drop_newest": BackpressureStrategy.DROP_NEWEST, "block": BackpressureStrategy.BLOCK, "error": BackpressureStrategy.BUFFER_FULL_ERROR } backpressure_config = BackpressureConfig( max_buffer_size=100, max_wait_time_ms=5000.0, strategy=strategy_map.get(request.backpressure_strategy, BackpressureStrategy.DROP_OLDEST) ) # 스트리밍 응답 생성 stream_generator = generator.stream_chat_completion( model=request.model, messages=request.messages, temperature=request.temperature, max_tokens=request.max_tokens ) #背壓AwareStream으로 래핑 aware_stream = BackpressureAwareStream( generator=stream_generator, config=backpressure_config ) return StreamingResponse( aware_stream.stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Nginx 버퍼링 비활성화 } ) @app.get("/health") async def health_check(): """헬스 체크 엔드포인트""" return {"status": "healthy", "service": "holysheep-ai-streaming"} @app.get("/metrics/backpressure")