Đừng để người dùng chờ đợi 30 giây cho một câu trả lời AI — hãy stream token từng cái một với độ trễ dưới 50ms. Bài viết này sẽ hướng dẫn bạn implement Server-Sent Events (SSE) trong FastAPI để streaming response từ bất kỳ API AI nào, kèm kỹ thuật xử lý backpressure chuyên nghiệp. Tôi đã implement giải pháp này cho 12+ dự án production và giờ chia sẻ toàn bộ best practices.

Tại Sao Streaming SSE Quan Trọng?

Khi người dùng hỏi một câu hỏi phức tạp, thay vì chờ 15-20 giây để model xử lý xong rồi mới trả về toàn bộ text, bạn có thể stream từng token ngay khi model sinh ra chúng. Điều này mang lại:

Bảng So Sánh HolySheep AI với Đối Thủ

Tiêu chí HolySheep AI OpenAI Anthropic Google
Giá GPT-4.1 $8/MTok $60/MTok - -
Giá Claude Sonnet 4.5 $15/MTok - $18/MTok -
Giá Gemini 2.5 Flash $2.50/MTok - - $3.50/MTok
Giá DeepSeek V3.2 $0.42/MTok - - -
Độ trễ trung bình <50ms 150-300ms 200-400ms 100-250ms
Thanh toán WeChat/Alipay/Visa Visa thẻ quốc tế Visa thẻ quốc tế Visa thẻ quốc tế
Tín dụng miễn phí Có, khi đăng ký $5 $5 $300 (limited)
Tỷ giá ¥1 = $1 USD native USD native USD native
Độ phủ mô hình 50+ models OpenAI only Anthropic only Google only
Phù hợp Dev Việt, startup Enterprise Mỹ Enterprise Mỹ Enterprise Mỹ

Tiết kiệm 85%+ chi phí khi sử dụng HolySheep AI thay vì API chính thức. Đăng ký tại đây để nhận tín dụng miễn phí ngay hôm nay!

Cài Đặt Môi Trường

Trước khi bắt đầu, hãy cài đặt các dependencies cần thiết:

pip install fastapi uvicorn httpx sse-starlette python-dotenv aiofiles

Tạo file .env với nội dung:

HOLYSHEEP_API_KEY=YOUR_HOLYSHEEP_API_KEY
HOLYSHEEP_BASE_URL=https://api.holysheep.ai/v1

Async Generator Cơ Bản trong FastAPI

Đây là nền tảng của streaming. Async generator cho phép bạn yield dữ liệu từng phần một mà không blocking event loop. Tôi đã sử dụng pattern này trong dự án chatbot của mình và đạt được throughput gấp 3 lần so với sync approach.

import os
import json
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import httpx
from dotenv import load_dotenv

load_dotenv()

app = FastAPI(title="AI Streaming API")

async def stream_openai_response(prompt: str, model: str = "gpt-4.1"):
    """
    Async generator để stream response từ HolySheep API
    """
    api_key = os.getenv("HOLYSHEEP_API_KEY")
    base_url = "https://api.holysheep.ai/v1"
    
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    
    payload = {
        "model": model,
        "messages": [{"role": "user", "content": prompt}],
        "stream": True
    }
    
    async with httpx.AsyncClient(timeout=120.0) as client:
        async with client.stream(
            "POST",
            f"{base_url}/chat/completions",
            headers=headers,
            json=payload
        ) as response:
            # Xử lý streaming response từng chunk
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    data = line[6:]  # Remove "data: " prefix
                    if data == "[DONE]":
                        yield {"event": "done", "data": ""}
                        break
                    
                    try:
                        chunk = json.loads(data)
                        # Extract content từ response
                        if "choices" in chunk and len(chunk["choices"]) > 0:
                            delta = chunk["choices"][0].get("delta", {})
                            content = delta.get("content", "")
                            if content:
                                yield {"event": "message", "data": content}
                    except json.JSONDecodeError:
                        continue

@app.get("/chat/stream")
async def chat_stream(prompt: str, model: str = "gpt-4.1"):
    """
    Endpoint để streaming chat response
    """
    return EventSourceResponse(
        stream_openai_response(prompt, model),
        media_type="text/event-stream"
    )

@app.post("/chat/stream")
async def chat_stream_post(request: Request):
    """
    POST endpoint cho streaming chat
    """
    body = await request.json()
    prompt = body.get("prompt", "")
    model = body.get("model", "gpt-4.1")
    
    return EventSourceResponse(
        stream_openai_response(prompt, model),
        media_type="text/event-stream"
    )

Xử Lý Backpressure Chuyên Nghiệp

Backpressure là vấn đề khi client tiêu thụ data chậm hơn tốc độ server gửi. Không xử lý sẽ dẫn đến memory leak và connection timeout. Đây là 3 approach tôi đã thử nghiệm trong production:

1. Buffer với Batch Yielding

import asyncio
from collections import deque
from typing import AsyncGenerator, Optional

class BackpressureBuffer:
    """
    Buffer thông minh để xử lý backpressure
    """
    def __init__(self, max_size: int = 100, flush_interval: float = 0.05):
        self.buffer = deque(maxlen=max_size)
        self.flush_interval = flush_interval
        self.last_flush = asyncio.get_event_loop().time()
    
    async def put(self, item: str) -> bool:
        """
        Thêm item vào buffer, trả về True nếu cần flush
        """
        self.buffer.append(item)
        
        # Flush nếu buffer đầy
        if len(self.buffer) >= self.buffer.maxlen:
            return True
        
        # Flush nếu đến thời gian
        current_time = asyncio.get_event_loop().time()
        if current_time - self.last_flush >= self.flush_interval:
            return True
        
        return False
    
    def get_all(self) -> list:
        """Lấy tất cả items và clear buffer"""
        items = list(self.buffer)
        self.buffer.clear()
        self.last_flush = asyncio.get_event_loop().time()
        return items

async def stream_with_backpressure(prompt: str, model: str = "gpt-4.1"):
    """
    Stream response với backpressure handling
    """
    buffer = BackpressureBuffer(max_size=50, flush_interval=0.1)
    api_key = os.getenv("HOLYSHEEP_API_KEY")
    
    async def generate_tokens():
        headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": model,
            "messages": [{"role": "user", "content": prompt}],
            "stream": True
        }
        
        async with httpx.AsyncClient(timeout=120.0) as client:
            async with client.stream(
                "POST",
                "https://api.holysheep.ai/v1/chat/completions",
                headers=headers,
                json=payload
            ) as response:
                async for line in response.aiter_lines():
                    if line.startswith("data: "):
                        data = line[6:]
                        if data == "[DONE]":
                            break
                        
                        try:
                            chunk = json.loads(data)
                            if "choices" in chunk:
                                content = chunk["choices"][0].get("delta", {}).get("content", "")
                                if content:
                                    yield content
                        except json.JSONDecodeError:
                            continue
    
    # Process tokens với backpressure
    accumulated = ""
    async for token in generate_tokens():
        accumulated += token
        
        should_flush = await buffer.put(token)
        
        if should_flush:
            # Gửi batch data
            yield {
                "event": "batch",
                "data": json.dumps({"content": accumulated})
            }
            accumulated = ""
    
    # Flush remaining
    if accumulated:
        yield {
            "event": "final",
            "data": json.dumps({"content": accumulated})
        }

2. Semaphore-based Rate Limiting

import asyncio
from typing import AsyncGenerator

class RateLimitedStream:
    """
    Streaming với rate limiting sử dụng Semaphore
    """
    def __init__(self, max_concurrent: int = 10, rate_per_second: float = 100.0):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.min_interval = 1.0 / rate_per_second
        self.last_send = 0
    
    async def acquire(self):
        """Acquire semaphore với rate limiting"""
        async with self.semaphore:
            # Rate limiting
            current_time = asyncio.get_event_loop().time()
            time_since_last = current_time - self.last_send
            
            if time_since_last < self.min_interval:
                await asyncio.sleep(self.min_interval - time_since_last)
            
            self.last_send = asyncio.get_event_loop().time()
            yield
    
    async def stream_tokens(self, prompt: str) -> AsyncGenerator[str, None]:
        """Stream tokens với rate control"""
        token_buffer = []
        
        async with self.semaphore:
            headers = {
                "Authorization": f"Bearer {os.getenv('HOLYSHEEP_API_KEY')}",
                "Content-Type": "application/json"
            }
            
            async with httpx.AsyncClient(timeout=120.0) as client:
                async with client.stream(
                    "POST",
                    "https://api.holysheep.ai/v1/chat/completions",
                    headers=headers,
                    json={
                        "model": "gpt-4.1",
                        "messages": [{"role": "user", "content": prompt}],
                        "stream": True
                    }
                ) as response:
                    async for line in response.aiter_lines():
                        if line.startswith("data: "):
                            data = line[6:]
                            if data == "[DONE]":
                                break
                            
                            try:
                                chunk = json.loads(data)
                                content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
                                if content:
                                    # Rate limit check
                                    await asyncio.sleep(self.min_interval)
                                    yield content
                            except json.JSONDecodeError:
                                continue

Dependency injection cho FastAPI

async def get_rate_limiter() -> RateLimitedStream: """Factory function cho rate limiter""" return RateLimitedStream(max_concurrent=5, rate_per_second=50)

3. Client-aware Streaming với Ping/Pong

import asyncio
import uuid
from datetime import datetime

class ClientAwareStreamManager:
    """
    Quản lý streaming với heartbeat và client readiness check
    """
    def __init__(self):
        self.active_streams = {}
        self.client_heartbeats = {}
    
    def create_stream_id(self) -> str:
        return str(uuid.uuid4())
    
    async def check_client_ready(self, stream_id: str, timeout: float = 5.0) -> bool:
        """Kiểm tra client có sẵn sàng nhận data không"""
        start = asyncio.get_event_loop().time()
        
        while asyncio.get_event_loop().time() - start < timeout:
            if self.client_heartbeats.get(stream_id, False):
                self.client_heartbeats[stream_id] = False  # Reset
                return True
            await asyncio.sleep(0.1)
        
        return False
    
    async def stream_with_heartbeat(self, prompt: str, stream_id: str):
        """
        Stream với heartbeat mechanism
        """
        self.active_streams[stream_id] = {"status": "active", "tokens_sent": 0}
        self.client_heartbeats[stream_id] = False
        
        headers = {
            "Authorization": f"Bearer {os.getenv('HOLYSHEEP_API_KEY')}",
            "Content-Type": "application/json"
        }
        
        try:
            async with httpx.AsyncClient(timeout=120.0) as client:
                async with client.stream(
                    "POST",
                    "https://api.holysheep.ai/v1/chat/completions",
                    headers=headers,
                    json={
                        "model": "gpt-4.1",
                        "messages": [{"role": "user", "content": prompt}],
                        "stream": True
                    }
                ) as response:
                    async for line in response.aiter_lines():
                        # Check client readiness trước khi gửi
                        if not await self.check_client_ready(stream_id, timeout=0.5):
                            # Client không phản hồi, pause streaming
                            await self.pause_streaming(stream_id)
                            continue
                        
                        if line.startswith("data: "):
                            data = line[6:]
                            if data == "[DONE]":
                                yield {"event": "done", "data": ""}
                                break
                            
                            try:
                                chunk = json.loads(data)
                                content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
                                if content:
                                    self.active_streams[stream_id]["tokens_sent"] += 1
                                    yield {"event": "message", "data": content}
                            except json.JSONDecodeError:
                                continue
        finally:
            if stream_id in self.active_streams:
                del self.active_streams[stream_id]
    
    async def pause_streaming(self, stream_id: str):
        """Pause streaming khi client chậm"""
        if stream_id in self.active_streams:
            self.active_streams[stream_id]["status"] = "paused"
            await asyncio.sleep(0.5)  # Wait 500ms
            self.active_streams[stream_id]["status"] = "active"

FastAPI endpoints

stream_manager = ClientAwareStreamManager() @app.post("/stream/aware") async def client_aware_stream(request: Request): """POST endpoint với client awareness""" body = await request.json() prompt = body.get("prompt", "") stream_id = stream_manager.create_stream_id() async def event_generator(): # Send stream_id cho client yield {"event": "stream_id", "data": stream_id} # Stream content async for event in stream_manager.stream_with_heartbeat(prompt, stream_id): yield event # Send heartbeat ping yield {"event": "ping", "data": ""} return EventSourceResponse(event_generator()) @app.post("/stream/heartbeat") async def heartbeat(stream_id: str): """Endpoint để client gửi heartbeat""" stream_manager.client_heartbeats[stream_id] = True return {"status": "ok"}

Frontend Client Implementation

Để consume SSE stream từ frontend, đây là implementation với JavaScript:

class AISseClient {
    constructor(baseUrl = '/chat/stream') {
        this.baseUrl = baseUrl;
        this.eventSource = null;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
    }
    
    async stream(prompt, model = 'gpt-4.1', callbacks = {}) {
        const { onMessage, onDone, onError, onProgress } = callbacks;
        
        // Sử dụng fetch với ReadableStream thay vì EventSource
        // vì EventSource không hỗ trợ POST body streaming
        const response = await fetch(this.baseUrl, {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
            },
            body: JSON.stringify({ prompt, model })
        });
        
        if (!response.ok) {
            throw new Error(HTTP error! status: ${response.status});
        }
        
        const reader = response.body.getReader();
        const decoder = new TextDecoder();
        let buffer = '';
        
        try {
            while (true) {
                const { done, value } = await reader.read();
                
                if (done) break;
                
                buffer += decoder