Kịch Bản Lỗi Thực Tế: "ConnectionError: Timeout khi đang nhận stream"
Tuần trước, một đồng nghiệp của mình gặp lỗi nghiêm trọng khi triển khai chatbot AI cho khách hàng doanh nghiệp. Anh ấy sử dụng OpenAI API nhưng gặp phải vấn đề:
Error: ConnectionError: Timeout after 30 seconds
at fetch (node:internal/deps/undici/undici:12345:19)
at SSEParser.parse (app.js:245:67)
Chunk received: {"id":"chatcmpl-abc123","object":"chat.completion.chunk"
{"error":{"type":"invalid_request_error","code":"401","message":"Incorrect API key provided"}}
Stream bị ngắt giữa chừng, user nhìn thấy văn bản dở dang và không có cách nào tự động khôi phục. Kết quả? Khách hàng không hài lòng và team phải làm thêm 2 ngày để fix.
Bài viết này sẽ hướng dẫn bạn xây dựng hệ thống SSE streaming hoàn chỉnh với HolySheep AI — nơi cung cấp đầy đủ các model AI hàng đầu với chi phí tiết kiệm đến 85% so với các nền tảng khác, hỗ trợ thanh toán qua WeChat và Alipay, độ trễ trung bình dưới 50ms.
SSE (Server-Sent Events) Là Gì?
SSE là công nghệ cho phép server gửi dữ liệu đến client thông qua kết nối HTTP đơn lẻ theo thời gian thực. Khác với WebSocket, SSE chỉ hỗ trợ truyền một chiều (server → client) nhưng đơn giản và nhẹ hơn nhiều.
Khi sử dụng API chat completions streaming của HolySheep AI, mỗi token được sinh ra sẽ được gửi dưới dạng một event riêng biệt:
# Response format từ HolySheep AI
event: message
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1234567890,"model":"gpt-4","choices":[{"index":0,"delta":{"content":"Xin"},"finish_reason":null}]}
event: message
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1234567890,"model":"gpt-4","choices":[{"index":0,"delta":{"content":" chào"},"finish_reason":null}]}
event: done
data: [DONE]
Triển Khai Frontend với JavaScript Thuần
Dưới đây là implementation hoàn chỉnh sử dụng Fetch API với EventSource polyfill cho khả năng tương thích cao:
/**
* SSE Streaming Client cho HolySheep AI
* Hỗ trợ断线重连 tự động và xử lý lỗi toàn diện
*/
class HolySheepStreamClient {
constructor(apiKey, options = {}) {
this.baseUrl = 'https://api.holysheep.ai/v1';
this.apiKey = apiKey;
this.model = options.model || 'gpt-4';
this.maxRetries = options.maxRetries || 3;
this.retryDelay = options.retryDelay || 1000;
this.timeout = options.timeout || 60000;
this.controller = null;
this.retryCount = 0;
this.isConnected = false;
this.onChunk = options.onChunk || (() => {});
this.onError = options.onError || (() => {});
this.onComplete = options.onComplete || (() => {});
this.onRetry = options.onRetry || (() => {});
}
async sendMessage(messages, conversationHistory = []) {
this.controller = new AbortController();
this.isConnected = true;
this.retryCount = 0;
// Tích hợp conversation history nếu có
const fullMessages = [...conversationHistory, ...messages];
try {
await this.streamRequest(fullMessages);
} catch (error) {
this.handleError(error);
}
}
async streamRequest(messages, retryAttempt = 0) {
const response = await fetch(${this.baseUrl}/chat/completions, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': Bearer ${this.apiKey},
},
body: JSON.stringify({
model: this.model,
messages: messages,
stream: true,
stream_options: { include_usage: true }
}),
signal: this.controller.signal,
});
if (!response.ok) {
const errorData = await response.json().catch(() => ({}));
throw new Error(HTTP ${response.status}: ${errorData.error?.message || response.statusText});
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let fullContent = '';
while (this.isConnected) {
try {
const { done, value } = await reader.read();
if (done) {
this.onComplete({ fullContent, timestamp: Date.now() });
break;
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
this.onComplete({ fullContent, timestamp: Date.now() });
return;
}
try {
const parsed = JSON.parse(data);
// Xử lý chunk delta
if (parsed.choices?.[0]?.delta?.content) {
const content = parsed.choices[0].delta.content;
fullContent += content;
this.onChunk({
content,
fullContent,
usage: parsed.usage
});
}
} catch (parseError) {
console.warn('Parse error:', parseError);
}
}
}
} catch (readError) {
if (readError.name === 'AbortError') {
console.log('Request aborted');
return;
}
throw readError;
}
}
}
handleError(error) {
this.isConnected = false;
// Kiểm tra các lỗi có thể retry
const retryableErrors = [
'timeout', 'ECONNRESET', 'ECONNREFUSED',
'network', 'fetch', 'TypeError'
];
const isRetryable = retryableErrors.some(
e => error.message?.toLowerCase().includes(e.toLowerCase())
) || error.name === 'AbortError';
// Tự động retry với exponential backoff
if (isRetryable && this.retryCount < this.maxRetries) {
this.retryCount++;
const delay = this.retryDelay * Math.pow(2, this.retryCount - 1);
this.onRetry({
attempt: this.retryCount,
maxRetries: this.maxRetries,
delay,
error: error.message
});
setTimeout(() => {
this.sendMessage([], this.conversationHistory);
}, delay);
} else {
this.onError({
error: error.message,
retryable: isRetryable,
attempts: this.retryCount
});
}
}
stop() {
this.isConnected = false;
if (this.controller) {
this.controller.abort();
}
}
// Cập nhật conversation history cho context
addToHistory(role, content) {
if (!this.conversationHistory) this.conversationHistory = [];
this.conversationHistory.push({ role, content });
}
}
React Hook cho SSE Streaming
Với ứng dụng React hiện đại, đây là custom hook tích hợp đầy đủ tính năng:
import { useState, useCallback, useRef, useEffect } from 'react';
const API_BASE_URL = 'https://api.holysheep.ai/v1';
export function useSSEStream(apiKey) {
const [messages, setMessages] = useState([]);
const [isLoading, setIsLoading] = useState(false);
const [error, setError] = useState(null);
const [retryInfo, setRetryInfo] = useState(null);
const abortControllerRef = useRef(null);
const conversationHistoryRef = useRef([]);
const processStream = useCallback(async (inputMessage) => {
setIsLoading(true);
setError(null);
setRetryInfo(null);
// Khởi tạo AbortController cho request
abortControllerRef.current = new AbortController();
const userMessage = { role: 'user', content: inputMessage };
const updatedHistory = [...conversationHistoryRef.current, userMessage];
// Thêm message placeholder cho assistant
const assistantMessageId = Date.now();
setMessages(prev => [...prev, {
id: assistantMessageId,
role: 'assistant',
content: '',
isStreaming: true
}]);
try {
const response = await fetch(${API_BASE_URL}/chat/completions, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': Bearer ${apiKey},
},
body: JSON.stringify({
model: 'gpt-4',
messages: updatedHistory,
stream: true,
stream_options: { include_usage: true }
}),
signal: abortControllerRef.current.signal,
});
if (!response.ok) {
const errorData = await response.json().catch(() => ({}));
throw new Error(HTTP ${response.status}: ${errorData.error?.message});
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let fullContent = '';
let retryCount = 0;
const MAX_RETRIES = 3;
const processWithRetry = async () => {
while (true) {
try {
const { done, value } = await reader.read();
if (done) {
// Cập nhật conversation history
conversationHistoryRef.current = [
...updatedHistory,
{ role: 'assistant', content: fullContent }
];
setMessages(prev => prev.map(msg =>
msg.id === assistantMessageId
? { ...msg, content: fullContent, isStreaming: false }
: msg
));
setIsLoading(false);
return;
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
conversationHistoryRef.current = [
...updatedHistory,
{ role: 'assistant', content: fullContent }
];
setMessages(prev => prev.map(msg =>
msg.id === assistantMessageId
? { ...msg, content: fullContent, isStreaming: false }
: msg
));
setIsLoading(false);
return;
}
try {
const parsed = JSON.parse(data);
const content = parsed.choices?.[0]?.delta?.content;
if (content) {
fullContent += content;
setMessages(prev => prev.map(msg =>
msg.id === assistantMessageId
? { ...msg, content: fullContent }
: msg
));
}
} catch (e) {
// Ignore parse errors for partial data
}
}
}
retryCount = 0; // Reset retry count on successful read
} catch (err) {
if (err.name === 'AbortError') {
return;
}
retryCount++;
if (retryCount <= MAX_RETRIES) {
setRetryInfo({
attempt: retryCount,
maxRetries: MAX_RETRIES,
delay: 1000 * Math.pow(2, retryCount - 1)
});
await new Promise(r => setTimeout(r, 1000 * Math.pow(2, retryCount - 1)));
continue;
}
throw err;
}
}
};
await processWithRetry();
} catch (err) {
if (err.name !== 'AbortError') {
setError(err.message);
setMessages(prev => prev.filter(msg => msg.id !== assistantMessageId));
}
setIsLoading(false);
}
}, [apiKey]);
const stop = useCallback(() => {
if (abortControllerRef.current) {
abortControllerRef.current.abort();
}
setIsLoading(false);
}, []);
const clearHistory = useCallback(() => {
conversationHistoryRef.current = [];
setMessages([]);
}, []);
// Cleanup khi component unmount
useEffect(() => {
return () => {
if (abortControllerRef.current) {
abortControllerRef.current.abort();
}
};
}, []);
return {
messages,
isLoading,
error,
retryInfo,
sendMessage: processStream,
stop,
clearHistory
};
}
// === Cách sử dụng trong component ===
/*
import { useSSEStream } from './useSSEStream';
function ChatComponent() {
const { messages, isLoading, error, retryInfo, sendMessage, stop, clearHistory }
= useSSEStream('YOUR_HOLYSHEEP_API_KEY');
return (
<div>
<div className="messages">
{messages.map(msg => (
<div key={msg.id} className={msg.role}>
{msg.content}
{msg.isStreaming && <span className="cursor">|lt;/span>}
</div>
))}
</div>
{retryInfo && (
<div className="retry-notice">
Đang thử lại ({retryInfo.attempt}/{retryInfo.maxRetries})...
sau {retryInfo.delay / 1000}s
</div>
)}
{error && <div className="error">{error}</div>}
<input
onKeyPress={(e) => {
if (e.key === 'Enter') sendMessage(e.target.value);
}}
disabled={isLoading}
/>
</div>
);
}
*/
Xử Lý Các Trường Hợp Đặc Biệt
1. Kiểm Tra Trạng Thái Kết Nối
Luôn theo dõi trạng thái network để có phản hồi tốt với người dùng:
// Theo dõi network status
window.addEventListener('online', () => {
console.log('Network connected - có thể retry');
showNotification('Kết nối đã khôi phục', 'success');
});
window.addEventListener('offline', () => {
console.log('Network disconnected');
showNotification('Mất kết nối mạng', 'warning');
});
// Kiểm tra connection quality
navigator.connection?.addEventListener('change', () => {
const { effectiveType, downlink } = navigator.connection;
console.log(Connection: ${effectiveType}, ${downlink}Mbps);
if (effectiveType === '2g' || downlink < 0.5) {
showNotification('Kết nối chậm, có thể xảy ra lag', 'warning');
}
});
2. Xử Lý Token Expired
Khi nhận được lỗi 401, cần refresh token và retry:
// Interceptor cho việc refresh token
async function
Tài nguyên liên quan
Bài viết liên quan