Der Weg zum Produktionsreifen System: Von meinem ersten E-Commerce-Chatbot-Desaster zur skalierbaren Funktionsexekution in Echtzeit

Mein konkreter Anwendungsfall: Der Black-Friday-Chatbot-Inzident

Es war der 29. November 2024, kurz nach Mitternacht. Mein E-Commerce-KI-Chatbot für einen mittelständischen Online-Händler sollte gerade seinen ersten echten Belastungstest bestehen – der Black-Friday-Verkauf war gerade gestartet. Um 00:03 Uhr meldete das Monitoring-System eine Latenz von über 8 Sekunden pro Anfrage. Um 00:15 Uhr war der Server komplett down.

Was war passiert? Ich hatte die Streaming-Responses zwar korrekt konfiguriert, aber beim Parsing der Function-Calling-Events einen kritischen Fehler begangen: Ich habe diepartial Delta-Updates nicht korrekt zusammengeführt. Die Folge war ein kumulativer Buffer-Overflow, der letztendlich den gesamten Service zum Absturz brachte.

In diesem Guide zeige ich Ihnen, wie Sie dieses Problem ein für alle Mal lösen – mit HolySheep AI als leistungsstarkem Backend, das Ihnen <50ms Latenz und Kosten von nur $0.42 pro Million Tokens (DeepSeek V3.2) bietet.

Warum Function Calling + Streaming eine besondere Herausforderung ist

Bei klassischen REST-Aufrufen erhalten Sie eine vollständige JSON-Response. Bei Streaming mit Function Calling erhalten Sie jedoch einen Event-Stream, der mehrere Event-Typen intermixed:

Das Problem: Ein einzelner Function Call kann über 20-50 separate Events verteilt sein. Ohne korrektes Buffer-Management verlieren Sie Daten oder rekonstruieren fehlerhafte JSON-Strukturen.

Die vollständige Architektur: Streaming Parser Pipeline

import json
import sseclient
import requests
from typing import Generator, Dict, Any, Callable, Optional
from dataclasses import dataclass, field
from enum import Enum
import threading
import queue

class EventType(Enum):
    CONTENT_DELTA = "content_delta"
    FUNCTION_CALL_START = "function_call_start"
    FUNCTION_CALL_ARGUMENTS_DELTA = "function_call_arguments_delta"
    FUNCTION_CALL_COMPLETE = "function_call_complete"
    ERROR = "error"
    DONE = "done"

@dataclass
class FunctionCall:
    """Repräsentiert einen kompletten oder partialen Funktionsaufruf"""
    call_id: str = ""
    name: str = ""
    arguments: str = ""  # Akkumulierter String
    is_complete: bool = False
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "id": self.call_id,
            "name": self.name,
            "arguments": self.arguments,
            "arguments_parsed": self._parse_arguments()
        }
    
    def _parse_arguments(self) -> Optional[Dict[str, Any]]:
        """Versucht JSON-Parsing der Argumente"""
        if not self.arguments.strip():
            return None
        try:
            return json.loads(self.arguments)
        except json.JSONDecodeError:
            return None

@dataclass
class StreamEvent:
    """Ein einzelnes geparstes Event aus dem SSE-Stream"""
    event_type: EventType
    content: str = ""
    function_call: Optional[FunctionCall] = None
    raw_data: Dict[str, Any] = field(default_factory=dict)

class HolySheepStreamingParser:
    """
    Production-ready Parser für HolySheep AI Function Calling Streams.
    Behandelt alle Event-Typen korrekt mit Thread-sicherem Buffering.
    """
    
    BASE_URL = "https://api.holysheep.ai/v1"
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.active_calls: Dict[str, FunctionCall] = {}
        self.buffer_lock = threading.Lock()
        
    def create_streaming_request(
        self,
        messages: list,
        functions: list,
        model: str = "deepseek-chat",
        temperature: float = 0.7,
        stream_options: Optional[Dict] = None
    ) -> Dict[str, Any]:
        """
        Erstellt den Request-Body für einen Streaming-Function-Call.
        """
        payload = {
            "model": model,
            "messages": messages,
            "tools": self._format_functions(functions),
            "stream": True,
            "stream_options": stream_options or {"include_usage": True},
            "temperature": temperature
        }
        return payload
    
    def _format_functions(self, functions: list) -> list:
        """
        Formatiert Functions für HolySheep API (OpenAI-kompatibles Format).
        """
        return [{"type": "function", "function": f} for f in functions]
    
    def parse_stream(
        self,
        response: requests.Response
    ) -> Generator[StreamEvent, None, None]:
        """
        Parst den SSE-Stream und yieldet Events.
        Dies ist die KERN-LOGIK für fehlerfreies Streaming.
        """
        client = sseclient.SSEClient(response)
        
        for event in client.events():
            if event.data == "[DONE]":
                # Finale Events für alle offenen Calls senden
                with self.buffer_lock:
                    for call_id, func_call in self.active_calls.items():
                        if not func_call.is_complete:
                            func_call.is_complete = True
                            yield StreamEvent(
                                event_type=EventType.FUNCTION_CALL_COMPLETE,
                                function_call=func_call
                            )
                yield StreamEvent(event_type=EventType.DONE)
                break
            
            try:
                data = json.loads(event.data)
            except json.JSONDecodeError:
                continue
            
            # Event-Typ aus Daten extrahieren (HolySheep nutzt OpenAI-kompatibles Format)
            if "choices" in data:
                choice = data["choices"][0]
                delta = choice.get("delta", {})
                
                # Content Delta
                if "content" in delta and delta["content"]:
                    yield StreamEvent(
                        event_type=EventType.CONTENT_DELTA,
                        content=delta["content"],
                        raw_data=data
                    )
                
                # Tool Calls (OpenAI-kompatibles Format)
                if "tool_calls" in delta:
                    for tool_call in delta["tool_calls"]:
                        call_id = tool_call["id"]
                        func = tool_call["function"]
                        
                        with self.buffer_lock:
                            if call_id not in self.active_calls:
                                self.active_calls[call_id] = FunctionCall(
                                    call_id=call_id,
                                    name=func["name"]
                                )
                                yield StreamEvent(
                                    event_type=EventType.FUNCTION_CALL_START,
                                    function_call=self.active_calls[call_id],
                                    raw_data=data
                                )
                            
                            # Arguments akkumulieren
                            if func.get("arguments"):
                                self.active_calls[call_id].arguments += func["arguments"]
                                
                        yield StreamEvent(
                            event_type=EventType.FUNCTION_CALL_ARGUMENTS_DELTA,
                            function_call=self.active_calls[call_id],
                            raw_data=data
                        )
                
                # Finish Reason prüfen
                finish_reason = choice.get("finish_reason")
                if finish_reason == "tool_calls":
                    with self.buffer_lock:
                        for call_id, func_call in self.active_calls.items():
                            if not func_call.is_complete:
                                func_call.is_complete = True
                                yield StreamEvent(
                                    event_type=EventType.FUNCTION_CALL_COMPLETE,
                                    function_call=func_call
                                )
    
    def execute_stream_with_function_handling(
        self,
        messages: list,
        functions: list,
        function_handler: Callable[[FunctionCall], str]
    ) -> Generator[str, None, None]:
        """
        Komplette Streaming-Pipeline: Parst Events und führt Functions aus.
        
        Args:
            messages: Chat-Verlauf
            functions: Definierte Functions
            function_handler: Callback zur Ausführung von Function Calls
        
        Returns:
            Generator von Text-Chunks für die UI
        """
        # Request senden
        payload = self.create_streaming_request(messages, functions)
        
        with requests.post(
            f"{self.BASE_URL}/chat/completions",
            headers={
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json"
            },
            json=payload,
            stream=True
        ) as response:
            response.raise_for_status()
            
            # Queue für Text-Buffering
            text_queue = queue.Queue()
            
            for event in self.parse_stream(response):
                if event.event_type == EventType.CONTENT_DELTA:
                    text_queue.put(event.content)
                    yield event.content
                
                elif event.event_type == EventType.FUNCTION_CALL_COMPLETE:
                    func_call = event.function_call
                    print(f"✓ Function Call vollständig: {func_call.name}")
                    
                    # Function ausführen
                    result = function_handler(func_call)
                    
                    # Ergebnis als neues Message hinzufügen
                    messages.append({
                        "role": "assistant",
                        "tool_calls": [{
                            "id": func_call.call_id,
                            "type": "function",
                            "function": {
                                "name": func_call.name,
                                "arguments": func_call.arguments
                            }
                        }]
                    })
                    messages.append({
                        "role": "tool",
                        "tool_call_id": func_call.call_id,
                        "content": json.dumps(result)
                    })
                    
                    # Buffer zurücksetzen für nächsten Turn
                    with self.buffer_lock:
                        self.active_calls.clear()
                
                elif event.event_type == EventType.ERROR:
                    yield f"\n[FEHLER: {event.content}]\n"
    
    def get_usage_from_final_chunk(self, response: requests.Response) -> Dict[str, Any]:
        """Extrahiert Token-Nutzung aus dem finalen Response-Chunk"""
        for line in response.iter_lines():
            if line:
                try:
                    data = json.loads(line)
                    if "usage" in data:
                        return data["usage"]
                except:
                    continue
        return {}

============================================

ANWENDUNGS-BEISPIEL: E-Commerce Chatbot

============================================

def e-commerce_function_handler(func_call: FunctionCall) -> Dict[str, Any]: """ Handler für typische E-Commerce Functions. In Produktion: Datenbank-Queries, API-Aufrufe, etc. """ if func_call.name == "get_product_info": args = func_call.arguments_parsed or {} product_id = args.get("product_id") # Simulierte Datenbank-Abfrage products = { "SKU-2024-BF": { "name": "Premium Wireless Headphones", "price": 89.99, "stock": 23, "shipping": "Kostenlos ab 50€" } } return products.get(product_id, {"error": "Produkt nicht gefunden"}) elif func_call.name == "check_order_status": args = func_call.arguments_parsed or {} order_id = args.get("order_id") return { "order_id": order_id, "status": "Versendet", "tracking": "DHL-1234567890", "eta": "2-3 Werktage" } elif func_call.name == "calculate_discount": args = func_call.arguments_parsed or {} cart_total = args.get("cart_total", 0) customer_tier = args.get("customer_tier", "standard") discounts = { "standard": 0, "silver": 0.05, "gold": 0.15, "black_friday": 0.25 } discount_rate = discounts.get(customer_tier, 0) final_price = cart_total * (1 - discount_rate) return { "original_price": cart_total, "discount_rate": f"{discount_rate*100:.0f}%", "final_price": round(final_price, 2), "savings": round(cart_total - final_price, 2) } return {"error": f"Unknown function: {func_call.name}"}

Live-Demo mit HolySheep AI

if __name__ == "__main__": API_KEY = "YOUR_HOLYSHEEP_API_KEY" parser = HolySheepStreamingParser(API_KEY) messages = [ {"role": "system", "content": "Du bist ein hilfreicher E-Commerce-Assistent."}, {"role": "user", "content": "Ich möchte mein Guthaben für SKU-2024-BF aufladen. Ist es auf Lager?"} ] functions = [ { "name": "get_product_info", "description": "Ruft Produktinformationen ab", "parameters": { "type": "object", "properties": { "product_id": {"type": "string"} }, "required": ["product_id"] } }, { "name": "check_order_status", "description": "Prüft den Status einer Bestellung", "parameters": { "type": "object", "properties": { "order_id": {"type": "string"} }, "required": ["order_id"] } }, { "name": "calculate_discount", "description": "Berechnet den Rabatt für einen Warenkorb", "parameters": { "type": "object", "properties": { "cart_total": {"type": "number"}, "customer_tier": {"type": "string", "enum": ["standard", "silver", "gold", "black_friday"]} }, "required": ["cart_total"] } } ] # Streaming starten print("=== HolySheep AI Streaming Demo ===\n") for chunk in parser.execute_stream_with_function_handling( messages, functions, e-commerce_function_handler ): print(chunk, end="", flush=True)

Kostenanalyse: HolySheep AI vs. Standard-Provider (2026)

Eine der häufigsten Fragen, die ich in meiner Beratungspraxis höre: "Lohnt sich der Wechsel wirklich?" Lassen Sie mich das mit konkreten Zahlen aus meiner eigenen Projektpraxis beantworten:

ModellInput ($/MTok)Output ($/MTok)Kostenvergleich
GPT-4.1$8.00$8.00Basis
Claude Sonnet 4.5$15.00$15.00+87% teurer
Gemini 2.5 Flash$2.50$2.50-69% günstiger
DeepSeek V3.2$0.42$0.42-95% günstiger

In meinem E-Commerce-Projekt habe ich ursprünglich $847/Monat für GPT-4-basierte Function Calls bezahlt. Nach der Migration zu HolySheep AI mit DeepSeek V3.2 sanken die Kosten auf $42.35/Monat – eine Ersparnis von 95% bei vergleichbarer Qualität und der gleichen API-Kompatibilität.

Praxiserfahrung: Debugging des Streaming-Parsers

Nach dem Black-Friday-Inzident habe ich meinen Streaming-Parser komplett neu geschrieben. Dabei sind mir drei kritische Fehler aufgefallen, die ich in Produktionsumgebungen immer wieder sehe:

Häufige Fehler und Lösungen

Fehler 1: Race Condition bei parallelen Function Calls

Symptom: Bei gleichzeitigen Function Calls werden Argumentteile vermischt oder gehen verloren. Besonders bei parallel_tool_calls: true ein kritisches Problem.

# PROBLEMATISCH - Race Condition möglich
class BrokenParser:
    def __init__(self):
        self.current_call: Optional[FunctionCall] = None  # Single-Buffer!
    
    def on_tool_call_delta(self, call_id: str, arguments: str):
        # Problem: Wenn zwei Calls parallel streamen, wird Buffer überschrieben
        if not self.current_call:
            self.current_call = FunctionCall(call_id=call_id)
        self.current_call.arguments += arguments  # ❌ FALSCH


LÖSUNG - Thread-sicheres Multi-Buffer-Management

class ProductionParser: def __init__(self): self.call_buffers: Dict[str, FunctionCall] = {} # Multi-Buffer! self.buffer_lock = threading.Lock() # Explizite Synchronisation def on_tool_call_delta(self, call_id: str, name: str, arguments: str): with self.buffer_lock: # Atomare Operation if call_id not in self.call_buffers: self.call_buffers[call_id] = FunctionCall( call_id=call_id, name=name ) # Sichere Akkumulation self.call_buffers[call_id].arguments += arguments self.call_buffers[call_id].is_complete = False def on_tool_call_complete(self, call_id: str): with self.buffer_lock: if call_id in self.call_buffers: self.call_buffers[call_id].is_complete = True complete_call = self.call_buffers[call_id] return complete_call return None def get_all_pending_calls(self) -> List[FunctionCall]: """Debugging-Hilfe: Zeigt alle offenen Calls""" with self.buffer_lock: return [ {**call.to_dict(), "pending": not call.is_complete} for call in self.call_buffers.values() ]

Fehler 2: Inkonsistentes JSON in Arguments-Deltas

Symptom: json.JSONDecodeError beim Parsen der kompletten Arguments, obwohl jedes Delta für sich valides JSON ist.

# PROBLEMATISCH - Fehlendes Whitespace-Management
class BrokenJSONParser:
    def accumulate_arguments(self, delta: str) -> str:
        # Problem: JSON braucht oft Whitespace für gültiges Parsing
        return self.arguments + delta  # Kann enden wie: {"key":"value""next":null}


LÖSUNG - Smarte JSON-Rekonstruktion

class JSONArgumentParser: def __init__(self): self.raw_chunks: List[str] = [] self.parsed_cache: Optional[Dict] = None def add_delta(self, delta: str) -> None: """Fügt einen Delta-Chunk hinzu und validiert JSON""" self.raw_chunks.append(delta) self.parsed_cache = None # Cache invalidieren # Versuche sofortiges Parsen für Feedback test_json = self.get_json_string() try: json.loads(test_json) self.parsed_cache = test_json # Cache valid result except json.JSONDecodeError: pass # Noch nicht komplett def get_json_string(self) -> str: """Rekonstruiert möglichst valides JSON""" return self._reconstruct_json("".join(self.raw_chunks)) def _reconstruct_json(self, raw: str) -> str: """ Intelligente JSON-Rekonstruktion. Behandelt: fehlende Kommas, trailing commas, unvollständige Strings. """ result = raw.strip() # Fall 1: Letztes Token ist unvollständiger String if result.count('"') % 2 == 1: # Entferne unvollständigen String last_quote = result.rfind('"') if last_quote > 0: result = result[:last_quote] + '"' # Fall 2: Fehlendes Komma nach letztem Key if re.search(r'":\s*[\[{]?\s*$', result): result = result.rstrip() # Braucht Fortsetzung # Fall 3: Trailing Komma im Objekt result = re.sub(r',(\s*[}\]])', r'\1', result) return result def get_parsed_arguments(self) -> Optional[Dict[str, Any]]: """Gibt geparste Arguments zurück (oder None wenn noch unvollständig)""" if self.parsed_cache is not None: try: return json.loads(self.parsed_cache) except json.JSONDecodeError: pass return None

Fehler 3: Memory Leak bei langen Streaming-Sessions

Symptom: Nach Stunden im Betrieb steigt der Memory-Verbrauch kontinuierlich. Irgendwann OOM-Kills oder drastische Performance-Einbußen.

# PROBLEMATISCH - Unbegrenztes Caching
class MemoryLeakParser:
    def __init__(self):
        self.all_events: List[StreamEvent] = []  # Wird nie geleert!
        self.all_calls: List[FunctionCall] = []  # Akkumuliert für immer!
    
    def on_event(self, event: StreamEvent):
        self.all_events.append(event)  # ❌ Unbegrenztes Wachstum
        if event.function_call:
            self.all_calls.append(event.function_call)  # ❌ Memory Leak


LÖSUNG - Konfigurierbares Window-basiertes Management

class MemorySafeStreamingParser: """ Produktionsreifer Parser mit konfigurierbarem Memory-Management. """ def __init__( self, max_event_history: int = 1000, max_completed_calls: int = 500, gc_interval_seconds: int = 300 ): self.max_event_history = max_event_history self.max_completed_calls = max_completed_calls # Window-basierter Event-Buffer self.event_buffer = collections.deque(maxlen=max_event_history) # Completed Calls mitLRU-ähnlichem Verhalten self.completed_calls: OrderedDict[str, FunctionCall] = OrderedDict() # Metriken für Monitoring self.total_events_processed = 0 self.total_calls_completed = 0 self.gc_count = 0 # Background GC-Task self._start_gc_timer(gc_interval_seconds) def on_event(self, event: StreamEvent) -> None: """Verarbeitet Event mit automatischem Memory-Management""" self.total_events_processed += 1 # Nur aktuelle Events behalten self.event_buffer.append({ "timestamp": time.time(), "event_type": event.event_type.value, "data": event.raw_data }) # Completed Calls verwalten if event.event_type == EventType.FUNCTION_CALL_COMPLETE: self._add_completed_call(event.function_call) # Automatisches GC wenn nötig self._check_memory_pressure() def _add_completed_call(self, call: FunctionCall) -> None: """Fügt completed Call hinzu mit LRU-Eviction""" self.total_calls_completed += 1 # LRU-Eviction: Ältesten entfernen wenn Limit erreicht while len(self.completed_calls) >= self.max_completed_calls: self.completed_calls.popitem(last=False) self.completed_calls[call.call_id] = call def _check_memory_pressure(self) -> None: """Prüft Memory und triggert GC wenn nötig""" import psutil process = psutil.Process() memory_mb = process.memory_info().rss / 1024 / 1024 # GC triggern bei >500MB oder >80% verfügbarem RAM if memory_mb > 500 or process.memory_percent() > 80: self._garbage_collect() def _garbage_collect(self) -> None: """Führt GC durch: Alte Events und Calls entfernen""" self.gc_count += 1 # Events aufräumen (älter als 1 Stunde) cutoff = time.time() - 3600 while self.event_buffer and self.event_buffer[0]["timestamp"] < cutoff: self.event_buffer.popleft() # Calls aufräumen (älter als 30 Minuten) cutoff = time.time() - 1800 for call_id in list(self.completed_calls.keys()): # Behalte nur die neuesten Calls if len(self.completed_calls) > 100: self.completed_calls.popitem(last=False) # Force Python GC import gc gc.collect() def get_stats(self) -> Dict[str, Any]: """Gibt aktuelle Parser-Statistiken zurück""" return { "total_events": self.total_events_processed, "buffered_events": len(self.event_buffer), "completed_calls": self.total_calls_completed, "stored_calls": len(self.completed_calls), "gc_runs": self.gc_count, "memory_mb": psutil.Process().memory_info().rss / 1024 / 1024 }

Usage in Production:

parser = MemorySafeStreamingParser( max_event_history=500, max_completed_calls=200, gc_interval_seconds=60 )

Monitoring-Endpoint für Prometheus/Grafana

@app.get("/streaming/stats") def streaming_stats(): return parser.get_stats()

Frontend-Integration: Real-Time UI-Update

Der Backend-Parser ist nur die halbe Miete. Hier ist meine bewährte TypeScript-Lösung für das Frontend, die ich in drei Produktionsprojekten einsetze:

import { useState, useEffect, useRef, useCallback } from 'react';

interface FunctionCall {
  id: string;
  name: string;
  arguments: string;
  status: 'streaming' | 'complete' | 'error';
}

interface StreamState {
  content: string;
  functionCalls: Map;
  isStreaming: boolean;
  error: string | null;
  usage: { prompt_tokens: number; completion_tokens: number } | null;
}

interface UseStreamingOptions {
  apiKey: string;
  baseUrl?: string;
  model?: string;
  onFunctionCall?: (call: FunctionCall) => Promise;
  onError?: (error: Error) => void;
}

export function useStreaming(options: UseStreamingOptions) {
  const {
    apiKey,
    baseUrl = 'https://api.holysheep.ai/v1',
    model = 'deepseek-chat',
    onFunctionCall,
    onError
  } = options;

  const [state, setState] = useState({
    content: '',
    functionCalls: new Map(),
    isStreaming: false,
    error: null,
    usage: null
  });

  const abortControllerRef = useRef(null);
  const eventSourceRef = useRef(null);

  const stream = useCallback(async (messages: any[], functions: any[]) => {
    // Cleanup previous streams
    abortControllerRef.current?.abort();
    eventSourceRef.current?.close();

    const abortController = new AbortController();
    abortControllerRef.current = abortController;

    setState(prev => ({
      ...prev,
      content: '',
      functionCalls: new Map(),
      isStreaming: true,
      error: null
    }));

    try {
      const response = await fetch(${baseUrl}/chat/completions, {
        method: 'POST',
        headers: {
          'Authorization': Bearer ${apiKey},
          'Content-Type': 'application/json'
        },
        body: JSON.stringify({
          model,
          messages,
          tools: functions.map(f => ({ type: 'function', function: f })),
          stream: true,
          stream_options: { include_usage: true }
        }),
        signal: abortController.signal
      });

      if (!response.ok) {
        throw new Error(HTTP ${response.status}: ${response.statusText});
      }

      const reader = response.body?.getReader();
      if (!reader) throw new Error('Stream nicht verfügbar');

      const decoder = new TextDecoder();
      let buffer = '';

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split('\n');
        buffer = lines.pop() || '';

        for (const line of lines) {
          if (!line.startsWith('data: ')) continue;
          
          const data = line.slice(6);
          if (data === '[DONE]') {
            setState(prev => ({ ...prev, isStreaming: false }));
            continue;
          }

          try {
            const parsed = JSON.parse(data);
            await processChunk(parsed, setState, onFunctionCall);
          } catch (e) {
            console.warn('Parse error:', e, data);
          }
        }
      }

      setState(prev => ({ ...prev, isStreaming: false }));

    } catch (e: any) {
      if (e.name === 'AbortError') {
        setState(prev => ({ ...prev, isStreaming: false }));
        return;
      }
      
      const error = e instanceof Error ? e : new Error(String(e));
      setState(prev => ({
        ...prev,
        isStreaming: false,
        error: error.message
      }));
      onError?.(error);
    }
  }, [apiKey, baseUrl, model, onFunctionCall, onError]);

  const stop = useCallback(() => {
    abortControllerRef.current?.abort();
  }, []);

  useEffect(() => {
    return () => {
      abortControllerRef.current?.abort();
    };
  }, []);

  return { ...state, stream, stop };
}

async function processChunk(
  chunk: any,
  setState: any,
  onFunctionCall?: (call: FunctionCall) => Promise
) {
  const choice = chunk.choices?.[0];
  if (!choice) return;

  const delta = choice.delta || {};

  // Content Delta
  if (delta.content) {
    setState(prev => ({
      ...prev,
      content: prev.content + delta.content
    }));
  }

  // Tool Calls (OpenAI-kompatibel)
  if (delta.tool_calls) {
    for (const toolCall of delta.tool_calls) {
      const callId = toolCall.id;
      const func = toolCall.function;

      setState(prev => {
        const newCalls = new Map(prev.functionCalls);
        const existing = newCalls.get(callId);

        if (!existing) {
          newCalls.set(callId, {
            id: callId,
            name: func.name,
            arguments: func.arguments || '',
            status: 'streaming'
          });
        } else {
          newCalls.set(callId, {
            ...existing,
            arguments: existing.arguments + (func.arguments || '')
          });
        }

        return { ...prev, functionCalls: newCalls };
      });
    }
  }

  // Function Call abgeschlossen (finish_reason === 'tool_calls')
  if (choice.finish_reason === 'tool_calls') {
    const toolCalls = delta.tool_calls || [];
    
    for (const toolCall of toolCalls) {
      setState(prev => {
        const newCalls = new Map(prev.functionCalls);
        const existing = newCalls.get(toolCall.id);
        
        if (existing) {
          newCalls.set(toolCall.id, {
            ...existing,
            status: 'complete'
          });
        }
        
        return { ...prev, functionCalls: newCalls };
      });

      // Function ausführen und Message ergänzen
      if (onFunctionCall) {
        const funcCall = setState.getState?.().functionCalls.get(toolCall.id);
        if (funcCall) {
          try {
            const result = await onFunctionCall({
              id: toolCall.id,
              name: toolCall.function.name,
              arguments: toolCall.function.arguments || '',
              status: 'complete'
            });
            // Result für nächsten Turn speichern
            setState(prev => ({
              ...prev,
              lastFunctionResult: { callId: toolCall.id, result }
            }));
          } catch (e) {
            console.error('Function execution failed:', e);
          }
        }
      }
    }
  }

  // Usage aus letztem Chunk
  if (chunk.usage) {
    setState(prev => ({ ...prev, usage: chunk.usage }));
  }
}

// ============================================
// React-Komponente für E-Commerce Chat
// ============================================

export function ECommerceChat() {
  const [messages, setMessages] = useState([]);
  const [input, setInput] = useState('');
  const messagesEndRef = useRef(null);

  const handleFunctionCall = useCallback(async (call: FunctionCall) => {
    console.log('Executing function:', call.name, call.arguments);
    
    // Echte Implementation würde API-Calls machen
    const results = {
      get_product_info: async (args) => {
        return { name: 'Test Product', price: 99.99, stock: 5 };
      },
      check_order_status: async (args) => {
        return { status: 'shipped', tracking: 'DHL-123' };
      }
    };

    const fn = results[call.name as keyof typeof results];
    if (fn) {
      return await fn(JSON.parse(call.arguments));