Server-Sent Events (SSE) haben sich als De-facto-Standard für Echtzeit-Streams in LLM-Anwendungen etabliert. Dieser Artikel zeigt die vollständige Implementierung einer produktionsreifen Streaming-Architektur mit automatischem Reconnection-Handling, Concurrency-Kontrolle und Performance-Optimierung. Jetzt registrieren und von unserer <50ms Latenz profitieren.

1. SSE-Protokoll und Streaming-Architektur verstehen

Server-Sent Events basieren auf HTTP/1.1 Chunked Transfer Encoding und ermöglichen eine unidirektionale Datenverbindung vom Server zum Client. Die Besonderheit bei LLM-Streams liegt in der Verarbeitung von Delta-Updates,wo jedes Chunk ein Fragment der Gesamtantwort darstellt.

1.1 Architekturübersicht

Die HolySheep AI API unterstützt nativ SSE-Streams mit folgender Datenstruktur:

2. Vollständige Frontend-Implementierung

2.1 TypeScript SSE-Client mit Reconnection-Logik

// sse-stream-client.ts
interface StreamConfig {
  baseUrl: string;
  apiKey: string;
  model: string;
  maxRetries: number;
  retryDelay: number;
  onChunk: (delta: string, fullText: string) => void;
  onError: (error: Error, retryCount: number) => void;
  onComplete: (fullText: string) => void;
}

interface StreamState {
  abortController: AbortController;
  retryCount: number;
  buffer: string;
  isConnected: boolean;
  lastEventId: string;
}

class SSStreamClient {
  private config: StreamConfig;
  private state: StreamState;
  private retryTimeoutId: ReturnType | null = null;

  constructor(config: StreamConfig) {
    this.config = {
      maxRetries: 5,
      retryDelay: 1000,
      ...config
    };
    this.state = this.createInitialState();
  }

  private createInitialState(): StreamState {
    return {
      abortController: new AbortController(),
      retryCount: 0,
      buffer: '',
      isConnected: false,
      lastEventId: ''
    };
  }

  async sendMessage(messages: Array<{role: string; content: string}>): Promise {
    this.resetState();
    
    const response = await fetch(${this.config.baseUrl}/chat/completions, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': Bearer ${this.config.apiKey},
        'Accept': 'text/event-stream',
        'X-Request-ID': crypto.randomUUID()
      },
      body: JSON.stringify({
        model: this.config.model,
        messages: messages,
        stream: true,
        stream_options: { include_usage: true }
      }),
      signal: this.state.abortController.signal
    });

    if (!response.ok) {
      throw new Error(HTTP ${response.status}: ${response.statusText});
    }

    this.state.isConnected = true;
    await this.processStream(response);
  }

  private async processStream(response: Response): Promise {
    const reader = response.body?.getReader();
    if (!reader) throw new Error('Stream reader not available');

    const decoder = new TextDecoder();
    let partialLine = '';

    try {
      while (true) {
        const { done, value } = await reader.read();
        
        if (done) {
          this.state.isConnected = false;
          this.config.onComplete(this.state.buffer);
          break;
        }

        const chunk = decoder.decode(value, { stream: true });
        const lines = (partialLine + chunk).split('\n');
        partialLine = lines.pop() || '';

        for (const line of lines) {
          await this.parseSSEEvent(line);
        }
      }
    } catch (error) {
      if ((error as Error).name === 'AbortError') {
        this.state.isConnected = false;
        return;
      }
      throw error;
    } finally {
      reader.releaseLock();
    }
  }

  private async parseSSEEvent(line: string): Promise {
    if (!line.startsWith('data: ')) return;
    
    const data = line.slice(6).trim();
    if (data === '[DONE]') return;

    try {
      const parsed = JSON.parse(data);
      
      if (parsed.id) this.state.lastEventId = parsed.id;
      
      if (parsed.choices?.[0]?.delta?.content) {
        const delta = parsed.choices[0].delta.content;
        this.state.buffer += delta;
        this.config.onChunk(delta, this.state.buffer);
      }
      
      if (parsed.usage) {
        console.debug('Usage:', parsed.usage);
      }
    } catch (parseError) {
      console.warn('Parse error:', data);
    }
  }

  private async handleReconnection(): Promise {
    if (this.state.retryCount >= this.config.maxRetries) {
      this.config.onError(
        new Error('Max retries exceeded'),
        this.state.retryCount
      );
      return;
    }

    const delay = this.calculateBackoff();
    console.log(Reconnecting in ${delay}ms (attempt ${this.state.retryCount + 1}));

    this.retryTimeoutId = setTimeout(async () => {
      this.state.retryCount++;
      this.state.abortController = new AbortController();
      
      try {
        // Resend last message with offset handling
        this.state.isConnected = true;
        // Reconnect logic would resend the message
      } catch (error) {
        await this.handleReconnection();
      }
    }, delay);
  }

  private calculateBackoff(): number {
    const exponentialDelay = Math.min(
      this.config.retryDelay * Math.pow(2, this.state.retryCount),
      30000
    );
    const jitter = Math.random() * 1000;
    return exponentialDelay + jitter;
  }

  private resetState(): void {
    if (this.retryTimeoutId) {
      clearTimeout(this.retryTimeoutId);
    }
    this.state = this.createInitialState();
  }

  disconnect(): void {
    this.state.abortController.abort();
    this.resetState();
  }
}

// Usage Example
const client = new SSStreamClient({
  baseUrl: 'https://api.holysheep.ai/v1',
  apiKey: 'YOUR_HOLYSHEEP_API_KEY',
  model: 'gpt-4.1',
  onChunk: (delta, full) => {
    document.getElementById('output')!.textContent = full;
  },
  onError: (err, retries) => {
    console.error(Error after ${retries} retries:, err);
  },
  onComplete: (text) => {
    console.log('Stream complete:', text.length, 'chars');
  }
});

client.sendMessage([
  { role: 'user', content: 'Erkläre SSE Streaming im Detail' }
]);

2.2 React-Hook mit automatischer Reconnection

// useStreamingChat.ts
import { useState, useCallback, useRef, useEffect } from 'react';

interface Message {
  id: string;
  role: 'user' | 'assistant';
  content: string;
  timestamp: number;
  isComplete: boolean;
}

interface StreamingState {
  messages: Message[];
  isStreaming: boolean;
  error: Error | null;
  retryCount: number;
}

interface UseStreamingChatOptions {
  apiKey: string;
  baseUrl?: string;
  model?: string;
  maxRetries?: number;
  systemPrompt?: string;
}

export function useStreamingChat(options: UseStreamingChatOptions) {
  const {
    apiKey,
    baseUrl = 'https://api.holysheep.ai/v1',
    model = 'gpt-4.1',
    maxRetries = 5,
    systemPrompt = 'Du bist ein hilfreicher Assistent.'
  } = options;

  const [state, setState] = useState({
    messages: [],
    isStreaming: false,
    error: null,
    retryCount: 0
  });

  const abortControllerRef = useRef(null);
  const eventSourceRef = useRef(null);
  const bufferRef = useRef>(new Map());

  const sendMessage = useCallback(async (content: string) => {
    const messageId = crypto.randomUUID();
    
    // Add user message
    setState(prev => ({
      ...prev,
      messages: [
        ...prev.messages,
        { id: messageId, role: 'user', content, timestamp: Date.now(), isComplete: true }
      ],
      isStreaming: true,
      error: null,
      retryCount: 0
    }));

    // Add placeholder for assistant
    const assistantId = crypto.randomUUID();
    setState(prev => ({
      ...prev,
      messages: [
        ...prev.messages,
        { id: assistantId, role: 'assistant', content: '', timestamp: Date.now(), isComplete: false }
      ]
    }));

    bufferRef.current.set(assistantId, '');
    await streamResponse(content, assistantId);
  }, [apiKey, baseUrl, model, maxRetries, systemPrompt]);

  const streamResponse = useCallback(async (
    userMessage: string,
    assistantId: string,
    retryCount = 0
  ) => {
    abortControllerRef.current = new AbortController();
    const conversationHistory = [
      { role: 'system', content: systemPrompt },
      ...state.messages.map(m => ({ role: m.role, content: m.content })),
      { role: 'user', content: userMessage }
    ];

    try {
      const response = await fetch(${baseUrl}/chat/completions, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': Bearer ${apiKey},
          'Accept': 'text/event-stream'
        },
        body: JSON.stringify({
          model,
          messages: conversationHistory,
          stream: true,
          stream_options: { include_usage: true }
        }),
        signal: abortControllerRef.current.signal
      });

      if (!response.ok) {
        throw new Error(API Error: ${response.status});
      }

      const reader = response.body!.getReader();
      const decoder = new TextDecoder();
      let partialLine = '';

      while (true) {
        const { done, value } = await reader.read();
        
        if (done) {
          setState(prev => ({
            ...prev,
            isStreaming: false,
            messages: prev.messages.map(m => 
              m.id === assistantId ? { ...m, isComplete: true } : m
            )
          }));
          break;
        }

        const chunk = decoder.decode(value, { stream: true });
        const lines = (partialLine + chunk).split('\n');
        partialLine = lines.pop() || '';

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = line.slice(6).trim();
            if (data === '[DONE]') continue;

            try {
              const parsed = JSON.parse(data);
              const delta = parsed.choices?.[0]?.delta?.content;
              
              if (delta) {
                const currentBuffer = bufferRef.current.get(assistantId) || '';
                const newBuffer = currentBuffer + delta;
                bufferRef.current.set(assistantId, newBuffer);

                setState(prev => ({
                  ...prev,
                  messages: prev.messages.map(m =>
                    m.id === assistantId ? { ...m, content: newBuffer } : m
                  )
                }));
              }
            } catch (e) {
              // Ignore parse errors for incomplete JSON
            }
          }
        }
      }
    } catch (error) {
      if ((error as Error).name === 'AbortError') {
        setState(prev => ({ ...prev, isStreaming: false }));
        return;
      }

      console.error('Stream error:', error);

      if (retryCount < maxRetries) {
        const delay = Math.min(1000 * Math.pow(2, retryCount), 30000);
        setState(prev => ({ ...prev, retryCount: retryCount + 1 }));
        
        await new Promise(resolve => setTimeout(resolve, delay));
        return streamResponse(userMessage, assistantId, retryCount + 1);
      }

      setState(prev => ({
        ...prev,
        isStreaming: false,
        error: error as Error
      }));
    }
  }, [apiKey, baseUrl, model, maxRetries, systemPrompt, state.messages]);

  const cancel = useCallback(() => {
    abortControllerRef.current?.abort();
    eventSourceRef.current?.close();
    setState(prev => ({ ...prev, isStreaming: false }));
  }, []);

  useEffect(() => {
    return () => {
      abortControllerRef.current?.abort();
      eventSourceRef.current?.close();
    };
  }, []);

  return {
    messages: state.messages,
    isStreaming: state.isStreaming,
    error: state.error,
    retryCount: state.retryCount,
    sendMessage,
    cancel
  };
}

3. Performance-Optimierung und Concurrency-Control

3.1 Request-Queuing und Throttling

Bei hochfrequenten Streams ist eine intelligente Request-Queue essentiell:

// RequestQueue.ts
class PriorityRequestQueue {
  private queue: Array<{
    priority: number;
    request: () => Promise;
    resolve: () => void;
    reject: (error: Error) => void;
  }> = [];
  
  private activeRequests = 0;
  private readonly maxConcurrent: number;
  private readonly maxQueueSize: number;

  constructor(maxConcurrent = 3, maxQueueSize = 10) {
    this.maxConcurrent = maxConcurrent;
    this.maxQueueSize = maxQueueSize;
  }

  async enqueue(
    priority: number,
    request: () => Promise
  ): Promise {
    return new Promise((resolve, reject) => {
      if (this.queue.length >= this.maxQueueSize) {
        reject(new Error('Queue overflow'));
        return;
      }

      const entry = { priority, request, resolve, reject };
      this.queue.push(entry);
      this.queue.sort((a, b) => b.priority - a.priority);
      
      this.processNext();
    });
  }

  private async processNext(): Promise {
    if (this.activeRequests >= this.maxConcurrent) return;
    
    const entry = this.queue.shift();
    if (!entry) return;

    this.activeRequests++;
    
    try {
      await entry.request();
      entry.resolve();
    } catch (error) {
      entry.reject(error as Error);
    } finally {
      this.activeRequests--;
      this.processNext();
    }
  }

  get size(): number {
    return this.queue.length;
  }

  clear(): void {
    this.queue.forEach(entry => 
      entry.reject(new Error('Queue cleared'))
    );
    this.queue = [];
  }
}

// Usage
const queue = new PriorityRequestQueue(3, 10);

// High priority request
await queue.enqueue(10, async () => {
  await client.sendMessage(highPriorityMessage);
});

// Normal priority
await queue.enqueue(5, async () => {
  await client.sendMessage(normalMessage);
});

3.2 Memory-Management und Buffer-Optimierung

Für lange Streams ist effizientes Memory-Management kritisch:

4. Benchmark-Daten und Kostenanalyse

Bei HolySheep AI erreichen wir im Vergleich zu führenden Anbietern signifikante Vorteile:

<

🔥 HolySheep AI ausprobieren

Direktes KI-API-Gateway. Claude, GPT-5, Gemini, DeepSeek — ein Schlüssel, kein VPN.

👉 Kostenlos registrieren →

ModellPreis pro 1M TokenLatenz (P50)Ersparnis
GPT-4.1$8.00~120msBasis
Claude Sonnet 4.5$15.00~95ms