Server-Sent Events (SSE) haben sich als De-facto-Standard für Echtzeit-Streams in LLM-Anwendungen etabliert. Dieser Artikel zeigt die vollständige Implementierung einer produktionsreifen Streaming-Architektur mit automatischem Reconnection-Handling, Concurrency-Kontrolle und Performance-Optimierung. Jetzt registrieren und von unserer <50ms Latenz profitieren.
1. SSE-Protokoll und Streaming-Architektur verstehen
Server-Sent Events basieren auf HTTP/1.1 Chunked Transfer Encoding und ermöglichen eine unidirektionale Datenverbindung vom Server zum Client. Die Besonderheit bei LLM-Streams liegt in der Verarbeitung von Delta-Updates,wo jedes Chunk ein Fragment der Gesamtantwort darstellt.
1.1 Architekturübersicht
Die HolySheep AI API unterstützt nativ SSE-Streams mit folgender Datenstruktur:
- Content-Type:
text/event-stream - Cache-Control:
no-cache - Connection:
keep-alive - Transfer-Encoding:
chunked
2. Vollständige Frontend-Implementierung
2.1 TypeScript SSE-Client mit Reconnection-Logik
// sse-stream-client.ts
interface StreamConfig {
baseUrl: string;
apiKey: string;
model: string;
maxRetries: number;
retryDelay: number;
onChunk: (delta: string, fullText: string) => void;
onError: (error: Error, retryCount: number) => void;
onComplete: (fullText: string) => void;
}
interface StreamState {
abortController: AbortController;
retryCount: number;
buffer: string;
isConnected: boolean;
lastEventId: string;
}
class SSStreamClient {
private config: StreamConfig;
private state: StreamState;
private retryTimeoutId: ReturnType | null = null;
constructor(config: StreamConfig) {
this.config = {
maxRetries: 5,
retryDelay: 1000,
...config
};
this.state = this.createInitialState();
}
private createInitialState(): StreamState {
return {
abortController: new AbortController(),
retryCount: 0,
buffer: '',
isConnected: false,
lastEventId: ''
};
}
async sendMessage(messages: Array<{role: string; content: string}>): Promise {
this.resetState();
const response = await fetch(${this.config.baseUrl}/chat/completions, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': Bearer ${this.config.apiKey},
'Accept': 'text/event-stream',
'X-Request-ID': crypto.randomUUID()
},
body: JSON.stringify({
model: this.config.model,
messages: messages,
stream: true,
stream_options: { include_usage: true }
}),
signal: this.state.abortController.signal
});
if (!response.ok) {
throw new Error(HTTP ${response.status}: ${response.statusText});
}
this.state.isConnected = true;
await this.processStream(response);
}
private async processStream(response: Response): Promise {
const reader = response.body?.getReader();
if (!reader) throw new Error('Stream reader not available');
const decoder = new TextDecoder();
let partialLine = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
this.state.isConnected = false;
this.config.onComplete(this.state.buffer);
break;
}
const chunk = decoder.decode(value, { stream: true });
const lines = (partialLine + chunk).split('\n');
partialLine = lines.pop() || '';
for (const line of lines) {
await this.parseSSEEvent(line);
}
}
} catch (error) {
if ((error as Error).name === 'AbortError') {
this.state.isConnected = false;
return;
}
throw error;
} finally {
reader.releaseLock();
}
}
private async parseSSEEvent(line: string): Promise {
if (!line.startsWith('data: ')) return;
const data = line.slice(6).trim();
if (data === '[DONE]') return;
try {
const parsed = JSON.parse(data);
if (parsed.id) this.state.lastEventId = parsed.id;
if (parsed.choices?.[0]?.delta?.content) {
const delta = parsed.choices[0].delta.content;
this.state.buffer += delta;
this.config.onChunk(delta, this.state.buffer);
}
if (parsed.usage) {
console.debug('Usage:', parsed.usage);
}
} catch (parseError) {
console.warn('Parse error:', data);
}
}
private async handleReconnection(): Promise {
if (this.state.retryCount >= this.config.maxRetries) {
this.config.onError(
new Error('Max retries exceeded'),
this.state.retryCount
);
return;
}
const delay = this.calculateBackoff();
console.log(Reconnecting in ${delay}ms (attempt ${this.state.retryCount + 1}));
this.retryTimeoutId = setTimeout(async () => {
this.state.retryCount++;
this.state.abortController = new AbortController();
try {
// Resend last message with offset handling
this.state.isConnected = true;
// Reconnect logic would resend the message
} catch (error) {
await this.handleReconnection();
}
}, delay);
}
private calculateBackoff(): number {
const exponentialDelay = Math.min(
this.config.retryDelay * Math.pow(2, this.state.retryCount),
30000
);
const jitter = Math.random() * 1000;
return exponentialDelay + jitter;
}
private resetState(): void {
if (this.retryTimeoutId) {
clearTimeout(this.retryTimeoutId);
}
this.state = this.createInitialState();
}
disconnect(): void {
this.state.abortController.abort();
this.resetState();
}
}
// Usage Example
const client = new SSStreamClient({
baseUrl: 'https://api.holysheep.ai/v1',
apiKey: 'YOUR_HOLYSHEEP_API_KEY',
model: 'gpt-4.1',
onChunk: (delta, full) => {
document.getElementById('output')!.textContent = full;
},
onError: (err, retries) => {
console.error(Error after ${retries} retries:, err);
},
onComplete: (text) => {
console.log('Stream complete:', text.length, 'chars');
}
});
client.sendMessage([
{ role: 'user', content: 'Erkläre SSE Streaming im Detail' }
]);
2.2 React-Hook mit automatischer Reconnection
// useStreamingChat.ts
import { useState, useCallback, useRef, useEffect } from 'react';
interface Message {
id: string;
role: 'user' | 'assistant';
content: string;
timestamp: number;
isComplete: boolean;
}
interface StreamingState {
messages: Message[];
isStreaming: boolean;
error: Error | null;
retryCount: number;
}
interface UseStreamingChatOptions {
apiKey: string;
baseUrl?: string;
model?: string;
maxRetries?: number;
systemPrompt?: string;
}
export function useStreamingChat(options: UseStreamingChatOptions) {
const {
apiKey,
baseUrl = 'https://api.holysheep.ai/v1',
model = 'gpt-4.1',
maxRetries = 5,
systemPrompt = 'Du bist ein hilfreicher Assistent.'
} = options;
const [state, setState] = useState({
messages: [],
isStreaming: false,
error: null,
retryCount: 0
});
const abortControllerRef = useRef(null);
const eventSourceRef = useRef(null);
const bufferRef = useRef
3. Performance-Optimierung und Concurrency-Control
3.1 Request-Queuing und Throttling
Bei hochfrequenten Streams ist eine intelligente Request-Queue essentiell:
// RequestQueue.ts
class PriorityRequestQueue {
private queue: Array<{
priority: number;
request: () => Promise;
resolve: () => void;
reject: (error: Error) => void;
}> = [];
private activeRequests = 0;
private readonly maxConcurrent: number;
private readonly maxQueueSize: number;
constructor(maxConcurrent = 3, maxQueueSize = 10) {
this.maxConcurrent = maxConcurrent;
this.maxQueueSize = maxQueueSize;
}
async enqueue(
priority: number,
request: () => Promise
): Promise {
return new Promise((resolve, reject) => {
if (this.queue.length >= this.maxQueueSize) {
reject(new Error('Queue overflow'));
return;
}
const entry = { priority, request, resolve, reject };
this.queue.push(entry);
this.queue.sort((a, b) => b.priority - a.priority);
this.processNext();
});
}
private async processNext(): Promise {
if (this.activeRequests >= this.maxConcurrent) return;
const entry = this.queue.shift();
if (!entry) return;
this.activeRequests++;
try {
await entry.request();
entry.resolve();
} catch (error) {
entry.reject(error as Error);
} finally {
this.activeRequests--;
this.processNext();
}
}
get size(): number {
return this.queue.length;
}
clear(): void {
this.queue.forEach(entry =>
entry.reject(new Error('Queue cleared'))
);
this.queue = [];
}
}
// Usage
const queue = new PriorityRequestQueue(3, 10);
// High priority request
await queue.enqueue(10, async () => {
await client.sendMessage(highPriorityMessage);
});
// Normal priority
await queue.enqueue(5, async () => {
await client.sendMessage(normalMessage);
});
3.2 Memory-Management und Buffer-Optimierung
Für lange Streams ist effizientes Memory-Management kritisch:
- Chunk-Batching: UI-Updates auf 16ms-Interval (60fps) reduzieren
- Virtual Scrolling: Bei langen Outputs nur sichtbare Inhalte rendern
- Text-Substring-Caching: Vermeiden Sie wiederholte String-Konkatenationen
- WeakRef für Cache: Automatische Garbage Collection bei Speicherdruck
4. Benchmark-Daten und Kostenanalyse
Bei HolySheep AI erreichen wir im Vergleich zu führenden Anbietern signifikante Vorteile:
| Modell | Preis pro 1M Token | Latenz (P50) | Ersparnis |
|---|---|---|---|
| GPT-4.1 | $8.00 | ~120ms | Basis |
| Claude Sonnet 4.5 | $15.00 | ~95ms | <