去年双十一,我负责的电商 AI 客服系统遭遇了前所未有的挑战。凌晨 0 点 0 分,大促入口开放的瞬间,并发请求从日常的 200 QPS 暴涨至 8000 QPS,客服系统响应时间从 800ms 飙升至 15 秒,用户界面上的"正在思考..."转圈持续超过 20 秒,大量用户以为系统卡死直接关闭页面。那一夜,我们损失了约 12% 的潜在成交订单。

这让我深刻意识到:AI Agent 的响应延迟不只是技术指标,而是直接影响用户体验和商业转化的核心要素。流式输出(Streaming)将首 token 延迟从数秒压缩到亚秒级,让用户看到 AI "正在打字"而非"正在思考",用户体验质的提升。

本文将深入解析 SSE(Server-Sent Events)和 WebSocket 两种主流实时反馈方案,提供可直接落地的 Python/TypeScript 实现代码,并分享我在生产环境中的血泪踩坑经验。

为什么 Agent 必须用流式输出

传统 REST 调用的时序是:客户端发送请求 → 服务端等待 LLM 完整生成 → 一次性返回所有内容。对于 GPT-4.1 生成 1000 tokens 的场景,平均耗时 3-5 秒,用户体感极差。

流式输出的核心价值:

SSE vs WebSocket:技术选型对比

维度SSE(Server-Sent Events)WebSocket
协议基础HTTP/1.1+ / HTTP/2独立 WebSocket 协议(ws://)
通信方向单向(服务端→客户端)双向(服务端↔客户端)
实现复杂度低,标准 EventSource API中等,需协议握手
自动重连内置,支持断线自动重连需自行实现
HTTP/2 多路复用支持,单连接多流不支持(需多连接)
防火墙穿透优秀(基于 HTTP)良好(部分企业网络限制)
适用场景AI 流式输出、日志推送、通知实时游戏、协作编辑、交易行情
浏览器兼容IE 不支持,现代浏览器全支持全浏览器支持

适合谁与不适合谁

适合使用流式输出的场景:

不建议使用流式输出的场景:

SSE 实战:Python FastAPI 实现

我推荐 SSE 作为 AI Agent 流式输出的首选方案,原因有三:实现简单、天然支持 HTTP/2、客户端只需 5 行代码即可接入。

服务端实现

import asyncio
import json
from typing import AsyncGenerator
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI

app = FastAPI()

初始化 HolySheep API 客户端

注册地址:https://www.holysheep.ai/register

国内直连延迟 <50ms,比官方 API 快 3-5 倍

client = AsyncOpenAI( api_key="YOUR_HOLYSHEEP_API_KEY", base_url="https://api.holysheep.ai/v1" ) async def generate_stream(prompt: str) -> AsyncGenerator[str, None]: """生成 SSE 格式的流式响应""" try: stream = await client.chat.completions.create( model="gpt-4.1", messages=[{"role": "user", "content": prompt}], stream=True, temperature=0.7 ) async for chunk in stream: if chunk.choices[0].delta.content: # SSE 格式:data: {...}\n\n data = json.dumps({ "type": "content", "content": chunk.choices[0].delta.content }) yield f"data: {data}\n\n" # 发送结束信号 yield "data: [DONE]\n\n" except Exception as e: error_data = json.dumps({"type": "error", "message": str(e)}) yield f"data: {error_data}\n\n" @app.post("/v1/chat/stream") async def chat_stream(request: Request): body = await request.json() prompt = body.get("prompt", "") return StreamingResponse( generate_stream(prompt), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # 禁用 Nginx 缓冲 } ) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)

前端 TypeScript 消费端

class HolySheepStreamClient {
  private baseUrl: string;
  
  constructor(baseUrl: string = "https://api.your-service.com") {
    this.baseUrl = baseUrl;
  }

  async *chatStream(prompt: string): AsyncGenerator<string, void, unknown> {
    const response = await fetch(${this.baseUrl}/v1/chat/stream, {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
      },
      body: JSON.stringify({ prompt }),
    });

    if (!response.ok) {
      throw new Error(HTTP ${response.status}: ${response.statusText});
    }

    const reader = response.body!.getReader();
    const decoder = new TextDecoder();
    let buffer = "";

    while (true) {
      const { done, value } = await reader.read();
      
      if (done) break;

      buffer += decoder.decode(value, { stream: true });
      
      // 解析完整的 SSE 事件(以 \n\n 结尾)
      const events = buffer.split("\n\n");
      buffer = events.pop() || ""; // 保留不完整的事件

      for (const event of events) {
        const lines = event.split("\n");
        for (const line of lines) {
          if (line.startsWith("data: ")) {
            const data = line.slice(6);
            
            if (data === "[DONE]") {
              return; // 流结束
            }

            try {
              const parsed = JSON.parse(data);
              if (parsed.type === "content") {
                yield parsed.content;
              } else if (parsed.type === "error") {
                throw new Error(parsed.message);
              }
            } catch (e) {
              console.error("解析 SSE 数据失败:", e);
            }
          }
        }
      }
    }
  }
}

// 使用示例
async function main() {
  const client = new HolySheepStreamClient();
  const messageEl = document.getElementById("message");
  
  console.log("AI 正在回复: ");
  
  for await (const chunk of client.chatStream("请介绍一下 Python 异步编程")) {
    process.stdout.write(chunk);
    messageEl.textContent += chunk; // 实时追加到页面
  }
  
  console.log("\n[流式响应完成]");
}

main();

WebSocket 方案:双向实时通信

当你的 Agent 需要双向通信时(比如用户可以随时打断、发送附加上下文、或需要实时心跳检测),WebSocket 是更好的选择。以下是我在企业 RAG 系统中的实际实现。

import asyncio
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from openai import AsyncOpenAI
import websockets

app = FastAPI()

HolySheep API 配置(汇率 ¥1=$1,节省 85%+)

https://www.holysheep.ai/register

client = AsyncOpenAI( api_key="YOUR_HOLYSHEEP_API_KEY", base_url="https://api.holysheep.ai/v1" ) class ConnectionManager: """WebSocket 连接管理器""" def __init__(self): self.active_connections: dict[str, WebSocket] = {} async def connect(self, websocket: WebSocket, client_id: str): await websocket.accept() self.active_connections[client_id] = websocket print(f"客户端 {client_id} 已连接,当前在线: {len(self.active_connections)}") def disconnect(self, client_id: str): if client_id in self.active_connections: del self.active_connections[client_id] print(f"客户端 {client_id} 已断开") async def send_message(self, message: str, client_id: str): if client_id in self.active_connections: await self.active_connections[client_id].send_text(message) manager = ConnectionManager() @app.websocket("/ws/agent/{client_id}") async def websocket_agent(websocket: WebSocket, client_id: str): await manager.connect(websocket, client_id) try: while True: # 接收用户消息 data = await websocket.receive_text() message = json.loads(data) if message.get("type") == "ping": # 心跳响应 await websocket.send_text(json.dumps({"type": "pong"})) continue prompt = message.get("content", "") # 流式调用 HolySheep API stream = await client.chat.completions.create( model="claude-sonnet-4.5", # $15/MTok 输出 messages=[{"role": "user", "content": prompt}], stream=True ) full_response = "" async for chunk in stream: if chunk.choices[0].delta.content: token = chunk.choices[0].delta.content full_response += token # 实时发送 token 给客户端 await websocket.send_text(json.dumps({ "type": "token", "content": token })) # 发送完成信号(带统计信息) await websocket.send_text(json.dumps({ "type": "done", "total_tokens": len(full_response), "model": "claude-sonnet-4.5" })) except WebSocketDisconnect: manager.disconnect(client_id) except Exception as e: await websocket.send_text(json.dumps({ "type": "error", "message": str(e) })) manager.disconnect(client_id) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)

生产环境关键优化

1. 背压(Backpressure)控制

当客户端渲染速度跟不上服务端发送速度时,会导致内存堆积。我遇到过 websocket 缓冲区爆满导致 OOM 的事故,以下是解决代码:

import asyncio
from collections import deque

class BackpressureController:
    """背压控制器:防止发送速度超过消费速度"""
    
    def __init__(self, max_buffer_size: int = 100):
        self.buffer = deque(maxlen=max_buffer_size)
        self.waiters = asyncio.Queue()
        self.is_paused = False
    
    async def send(self, data: str, websocket):
        """发送数据,自动处理背压"""
        if self.is_paused:
            # 暂停发送,等待消费者
            await asyncio.sleep(0.01)
        
        try:
            await asyncio.wait_for(
                websocket.send_text(data),
                timeout=5.0
            )
        except asyncio.TimeoutError:
            # 发送超时,标记暂停
            self.is_paused = True
            print("检测到背压:客户端消费速度不足")
    
    def resume(self):
        """消费者完成后调用,恢复发送"""
        self.is_paused = False

2. 连接数限制与熔断

大促期间,我用以下策略保护后端 LLM 调用:

3. 缓存层设计

对于高频重复问题(如"双十一满减规则"),我使用 Redis 缓存完整响应,首 token 时间从 1.2s 降至 50ms,命中率 35% 大幅降低 API 成本。

价格与回本测算

方案月成本估算适用规模回本方式
自建 SSE 服务 + HolySheep¥2000-5000(100万tokens/天)中小型 AI 应用用户留存+5%,转化+3%
自建 WebSocket 服务 + HolySheep¥3000-8000(100万tokens/天)需要双向交互的企业场景客服人力成本-40%
直接用官方 API¥15000-40000(同等规模)不差钱的土豪无(纯成本)

HolySheep 的汇率优势(¥1=$1 vs 官方¥7.3=$1)意味着:同样的 API 调用量,成本仅为官方价格的13.7%。对于日均消耗 1000 万 output tokens 的中型应用,月省可达 ¥15,000+

为什么选 HolySheep

作为 HolySheep 的深度用户,我选择它的核心原因:

常见报错排查

错误 1:SSE 连接被 Nginx 缓冲

# 错误日志
[error] 12345#0: *6789 upstream timed out (110: Connection timed out)

原因:Nginx 默认会缓冲代理响应,导致流式输出无法实时推送

解决方案:在 Nginx 配置中添加

location /v1/chat/stream { proxy_pass http://backend; proxy_http_version 1.1; proxy_set_header Connection ''; proxy_buffering off; # 关闭缓冲 proxy_cache off; # 关闭缓存 chunked_transfer_encoding on; # 开启 chunked 编码 tcp_nodelay on; # 禁用 Nagle 算法 }

错误 2:WebSocket 握手失败 400/403

# 错误日志
WebSocket connection to 'wss://api.example.com/ws/agent/123' failed: 
Error during WebSocket handshake: Unexpected response code: 403

原因:反向代理未正确配置 WebSocket 升级头

解决方案(以 Nginx 为例)

location /ws/ { proxy_pass http://websocket_backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_read_timeout 86400; # 长连接超时 }

如果是 CORS 问题,确保后端设置

@app.websocket("/ws/agent/{client_id}") async def websocket_agent(websocket: WebSocket, client_id: str): # 允许跨域(仅开发环境) await websocket.accept() # 不传 origin 参数即可接受任意来源

错误 3:JSON 解析错误:Unexpected end of JSON input

# 错误日志
Uncaught (in promise) SyntaxError: Unexpected end of JSON input

原因:SSE 数据不完整或不规范

解决方案:完善前端解析逻辑

function parseSSEData(data: string) { // 1. 跳过空数据 if (!data || data.trim() === "") return null; // 2. 跳过特殊的 [DONE] 标记 if (data === "[DONE]") return { type: "done" }; // 3. 尝试解析 JSON try { return JSON.parse(data); } catch (e) { console.warn("SSE 解析失败,原始数据:", data); return null; } } // 使用修正后的解析 for await (const chunk of client.chatStream("问题")) { // chunk 已经是解析后的纯文本 console.log(chunk); }

错误 4:Connection closed unexpectedly(连接数超限)

# 错误日志
websockets.exceptions.ConnectionClosed: WebSocket connection is closed

原因:HolySheep API 有并发连接数限制,超过了会被强制断开

解决方案

const SEMAPHORE_LIMIT = 10; // 根据实际限制调整 const semaphore = new Semaphore(SEMAPHORE_LIMIT); async function callWithLimit(prompt: string) { const release = await semaphore.acquire(); try { const response = await fetch("/v1/chat/stream", { // ... }); // 处理响应 } finally { release(); } } // 或使用后端队列控制 class RequestQueue { private queue: Array<() => Promise<void>> = []; private running = 0; private maxConcurrent = 10; async add(task: () => Promise<void>) { return new Promise((resolve, reject) => { this.queue.push(async () => { try { await task(); resolve(); } catch (e) { reject(e); } }); this.process(); }); } private async process() { if (this.running >= this.maxConcurrent) return; const task = this.queue.shift(); if (!task) return; this.running++; try { await task(); } finally { this.running--; this.process(); } } }

总结与行动建议

流式输出已经从"加分项"变成 AI Agent 的"必选项"。我的经验公式:

如果你正在构建需要流式输出的 AI 应用,HolySheep 是一个值得考虑的选择:

👉 免费注册 HolySheep AI,获取首月赠额度