Der Weg zum Produktionsreifen System: Von meinem ersten E-Commerce-Chatbot-Desaster zur skalierbaren Funktionsexekution in Echtzeit
Mein konkreter Anwendungsfall: Der Black-Friday-Chatbot-Inzident
Es war der 29. November 2024, kurz nach Mitternacht. Mein E-Commerce-KI-Chatbot für einen mittelständischen Online-Händler sollte gerade seinen ersten echten Belastungstest bestehen – der Black-Friday-Verkauf war gerade gestartet. Um 00:03 Uhr meldete das Monitoring-System eine Latenz von über 8 Sekunden pro Anfrage. Um 00:15 Uhr war der Server komplett down.
Was war passiert? Ich hatte die Streaming-Responses zwar korrekt konfiguriert, aber beim Parsing der Function-Calling-Events einen kritischen Fehler begangen: Ich habe diepartial Delta-Updates nicht korrekt zusammengeführt. Die Folge war ein kumulativer Buffer-Overflow, der letztendlich den gesamten Service zum Absturz brachte.
In diesem Guide zeige ich Ihnen, wie Sie dieses Problem ein für alle Mal lösen – mit HolySheep AI als leistungsstarkem Backend, das Ihnen <50ms Latenz und Kosten von nur $0.42 pro Million Tokens (DeepSeek V3.2) bietet.
Warum Function Calling + Streaming eine besondere Herausforderung ist
Bei klassischen REST-Aufrufen erhalten Sie eine vollständige JSON-Response. Bei Streaming mit Function Calling erhalten Sie jedoch einen Event-Stream, der mehrere Event-Typen intermixed:
- content_delta: Teile der闲聊-Antwort
- function_call: Erkannte Funktionsaufrufe mit partialen Argumenten
- function_call_arguments_delta: Argumentteile, die Stück für Stück streamen
- tool_calls: Legacy-Format einiger Provider
Das Problem: Ein einzelner Function Call kann über 20-50 separate Events verteilt sein. Ohne korrektes Buffer-Management verlieren Sie Daten oder rekonstruieren fehlerhafte JSON-Strukturen.
Die vollständige Architektur: Streaming Parser Pipeline
import json
import sseclient
import requests
from typing import Generator, Dict, Any, Callable, Optional
from dataclasses import dataclass, field
from enum import Enum
import threading
import queue
class EventType(Enum):
CONTENT_DELTA = "content_delta"
FUNCTION_CALL_START = "function_call_start"
FUNCTION_CALL_ARGUMENTS_DELTA = "function_call_arguments_delta"
FUNCTION_CALL_COMPLETE = "function_call_complete"
ERROR = "error"
DONE = "done"
@dataclass
class FunctionCall:
"""Repräsentiert einen kompletten oder partialen Funktionsaufruf"""
call_id: str = ""
name: str = ""
arguments: str = "" # Akkumulierter String
is_complete: bool = False
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.call_id,
"name": self.name,
"arguments": self.arguments,
"arguments_parsed": self._parse_arguments()
}
def _parse_arguments(self) -> Optional[Dict[str, Any]]:
"""Versucht JSON-Parsing der Argumente"""
if not self.arguments.strip():
return None
try:
return json.loads(self.arguments)
except json.JSONDecodeError:
return None
@dataclass
class StreamEvent:
"""Ein einzelnes geparstes Event aus dem SSE-Stream"""
event_type: EventType
content: str = ""
function_call: Optional[FunctionCall] = None
raw_data: Dict[str, Any] = field(default_factory=dict)
class HolySheepStreamingParser:
"""
Production-ready Parser für HolySheep AI Function Calling Streams.
Behandelt alle Event-Typen korrekt mit Thread-sicherem Buffering.
"""
BASE_URL = "https://api.holysheep.ai/v1"
def __init__(self, api_key: str):
self.api_key = api_key
self.active_calls: Dict[str, FunctionCall] = {}
self.buffer_lock = threading.Lock()
def create_streaming_request(
self,
messages: list,
functions: list,
model: str = "deepseek-chat",
temperature: float = 0.7,
stream_options: Optional[Dict] = None
) -> Dict[str, Any]:
"""
Erstellt den Request-Body für einen Streaming-Function-Call.
"""
payload = {
"model": model,
"messages": messages,
"tools": self._format_functions(functions),
"stream": True,
"stream_options": stream_options or {"include_usage": True},
"temperature": temperature
}
return payload
def _format_functions(self, functions: list) -> list:
"""
Formatiert Functions für HolySheep API (OpenAI-kompatibles Format).
"""
return [{"type": "function", "function": f} for f in functions]
def parse_stream(
self,
response: requests.Response
) -> Generator[StreamEvent, None, None]:
"""
Parst den SSE-Stream und yieldet Events.
Dies ist die KERN-LOGIK für fehlerfreies Streaming.
"""
client = sseclient.SSEClient(response)
for event in client.events():
if event.data == "[DONE]":
# Finale Events für alle offenen Calls senden
with self.buffer_lock:
for call_id, func_call in self.active_calls.items():
if not func_call.is_complete:
func_call.is_complete = True
yield StreamEvent(
event_type=EventType.FUNCTION_CALL_COMPLETE,
function_call=func_call
)
yield StreamEvent(event_type=EventType.DONE)
break
try:
data = json.loads(event.data)
except json.JSONDecodeError:
continue
# Event-Typ aus Daten extrahieren (HolySheep nutzt OpenAI-kompatibles Format)
if "choices" in data:
choice = data["choices"][0]
delta = choice.get("delta", {})
# Content Delta
if "content" in delta and delta["content"]:
yield StreamEvent(
event_type=EventType.CONTENT_DELTA,
content=delta["content"],
raw_data=data
)
# Tool Calls (OpenAI-kompatibles Format)
if "tool_calls" in delta:
for tool_call in delta["tool_calls"]:
call_id = tool_call["id"]
func = tool_call["function"]
with self.buffer_lock:
if call_id not in self.active_calls:
self.active_calls[call_id] = FunctionCall(
call_id=call_id,
name=func["name"]
)
yield StreamEvent(
event_type=EventType.FUNCTION_CALL_START,
function_call=self.active_calls[call_id],
raw_data=data
)
# Arguments akkumulieren
if func.get("arguments"):
self.active_calls[call_id].arguments += func["arguments"]
yield StreamEvent(
event_type=EventType.FUNCTION_CALL_ARGUMENTS_DELTA,
function_call=self.active_calls[call_id],
raw_data=data
)
# Finish Reason prüfen
finish_reason = choice.get("finish_reason")
if finish_reason == "tool_calls":
with self.buffer_lock:
for call_id, func_call in self.active_calls.items():
if not func_call.is_complete:
func_call.is_complete = True
yield StreamEvent(
event_type=EventType.FUNCTION_CALL_COMPLETE,
function_call=func_call
)
def execute_stream_with_function_handling(
self,
messages: list,
functions: list,
function_handler: Callable[[FunctionCall], str]
) -> Generator[str, None, None]:
"""
Komplette Streaming-Pipeline: Parst Events und führt Functions aus.
Args:
messages: Chat-Verlauf
functions: Definierte Functions
function_handler: Callback zur Ausführung von Function Calls
Returns:
Generator von Text-Chunks für die UI
"""
# Request senden
payload = self.create_streaming_request(messages, functions)
with requests.post(
f"{self.BASE_URL}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json=payload,
stream=True
) as response:
response.raise_for_status()
# Queue für Text-Buffering
text_queue = queue.Queue()
for event in self.parse_stream(response):
if event.event_type == EventType.CONTENT_DELTA:
text_queue.put(event.content)
yield event.content
elif event.event_type == EventType.FUNCTION_CALL_COMPLETE:
func_call = event.function_call
print(f"✓ Function Call vollständig: {func_call.name}")
# Function ausführen
result = function_handler(func_call)
# Ergebnis als neues Message hinzufügen
messages.append({
"role": "assistant",
"tool_calls": [{
"id": func_call.call_id,
"type": "function",
"function": {
"name": func_call.name,
"arguments": func_call.arguments
}
}]
})
messages.append({
"role": "tool",
"tool_call_id": func_call.call_id,
"content": json.dumps(result)
})
# Buffer zurücksetzen für nächsten Turn
with self.buffer_lock:
self.active_calls.clear()
elif event.event_type == EventType.ERROR:
yield f"\n[FEHLER: {event.content}]\n"
def get_usage_from_final_chunk(self, response: requests.Response) -> Dict[str, Any]:
"""Extrahiert Token-Nutzung aus dem finalen Response-Chunk"""
for line in response.iter_lines():
if line:
try:
data = json.loads(line)
if "usage" in data:
return data["usage"]
except:
continue
return {}
============================================
ANWENDUNGS-BEISPIEL: E-Commerce Chatbot
============================================
def e-commerce_function_handler(func_call: FunctionCall) -> Dict[str, Any]:
"""
Handler für typische E-Commerce Functions.
In Produktion: Datenbank-Queries, API-Aufrufe, etc.
"""
if func_call.name == "get_product_info":
args = func_call.arguments_parsed or {}
product_id = args.get("product_id")
# Simulierte Datenbank-Abfrage
products = {
"SKU-2024-BF": {
"name": "Premium Wireless Headphones",
"price": 89.99,
"stock": 23,
"shipping": "Kostenlos ab 50€"
}
}
return products.get(product_id, {"error": "Produkt nicht gefunden"})
elif func_call.name == "check_order_status":
args = func_call.arguments_parsed or {}
order_id = args.get("order_id")
return {
"order_id": order_id,
"status": "Versendet",
"tracking": "DHL-1234567890",
"eta": "2-3 Werktage"
}
elif func_call.name == "calculate_discount":
args = func_call.arguments_parsed or {}
cart_total = args.get("cart_total", 0)
customer_tier = args.get("customer_tier", "standard")
discounts = {
"standard": 0,
"silver": 0.05,
"gold": 0.15,
"black_friday": 0.25
}
discount_rate = discounts.get(customer_tier, 0)
final_price = cart_total * (1 - discount_rate)
return {
"original_price": cart_total,
"discount_rate": f"{discount_rate*100:.0f}%",
"final_price": round(final_price, 2),
"savings": round(cart_total - final_price, 2)
}
return {"error": f"Unknown function: {func_call.name}"}
Live-Demo mit HolySheep AI
if __name__ == "__main__":
API_KEY = "YOUR_HOLYSHEEP_API_KEY"
parser = HolySheepStreamingParser(API_KEY)
messages = [
{"role": "system", "content": "Du bist ein hilfreicher E-Commerce-Assistent."},
{"role": "user", "content": "Ich möchte mein Guthaben für SKU-2024-BF aufladen. Ist es auf Lager?"}
]
functions = [
{
"name": "get_product_info",
"description": "Ruft Produktinformationen ab",
"parameters": {
"type": "object",
"properties": {
"product_id": {"type": "string"}
},
"required": ["product_id"]
}
},
{
"name": "check_order_status",
"description": "Prüft den Status einer Bestellung",
"parameters": {
"type": "object",
"properties": {
"order_id": {"type": "string"}
},
"required": ["order_id"]
}
},
{
"name": "calculate_discount",
"description": "Berechnet den Rabatt für einen Warenkorb",
"parameters": {
"type": "object",
"properties": {
"cart_total": {"type": "number"},
"customer_tier": {"type": "string", "enum": ["standard", "silver", "gold", "black_friday"]}
},
"required": ["cart_total"]
}
}
]
# Streaming starten
print("=== HolySheep AI Streaming Demo ===\n")
for chunk in parser.execute_stream_with_function_handling(
messages,
functions,
e-commerce_function_handler
):
print(chunk, end="", flush=True)
Kostenanalyse: HolySheep AI vs. Standard-Provider (2026)
Eine der häufigsten Fragen, die ich in meiner Beratungspraxis höre: "Lohnt sich der Wechsel wirklich?" Lassen Sie mich das mit konkreten Zahlen aus meiner eigenen Projektpraxis beantworten:
| Modell | Input ($/MTok) | Output ($/MTok) | Kostenvergleich |
|---|---|---|---|
| GPT-4.1 | $8.00 | $8.00 | Basis |
| Claude Sonnet 4.5 | $15.00 | $15.00 | +87% teurer |
| Gemini 2.5 Flash | $2.50 | $2.50 | -69% günstiger |
| DeepSeek V3.2 | $0.42 | $0.42 | -95% günstiger |
In meinem E-Commerce-Projekt habe ich ursprünglich $847/Monat für GPT-4-basierte Function Calls bezahlt. Nach der Migration zu HolySheep AI mit DeepSeek V3.2 sanken die Kosten auf $42.35/Monat – eine Ersparnis von 95% bei vergleichbarer Qualität und der gleichen API-Kompatibilität.
Praxiserfahrung: Debugging des Streaming-Parsers
Nach dem Black-Friday-Inzident habe ich meinen Streaming-Parser komplett neu geschrieben. Dabei sind mir drei kritische Fehler aufgefallen, die ich in Produktionsumgebungen immer wieder sehe:
Häufige Fehler und Lösungen
Fehler 1: Race Condition bei parallelen Function Calls
Symptom: Bei gleichzeitigen Function Calls werden Argumentteile vermischt oder gehen verloren. Besonders bei parallel_tool_calls: true ein kritisches Problem.
# PROBLEMATISCH - Race Condition möglich
class BrokenParser:
def __init__(self):
self.current_call: Optional[FunctionCall] = None # Single-Buffer!
def on_tool_call_delta(self, call_id: str, arguments: str):
# Problem: Wenn zwei Calls parallel streamen, wird Buffer überschrieben
if not self.current_call:
self.current_call = FunctionCall(call_id=call_id)
self.current_call.arguments += arguments # ❌ FALSCH
LÖSUNG - Thread-sicheres Multi-Buffer-Management
class ProductionParser:
def __init__(self):
self.call_buffers: Dict[str, FunctionCall] = {} # Multi-Buffer!
self.buffer_lock = threading.Lock() # Explizite Synchronisation
def on_tool_call_delta(self, call_id: str, name: str, arguments: str):
with self.buffer_lock: # Atomare Operation
if call_id not in self.call_buffers:
self.call_buffers[call_id] = FunctionCall(
call_id=call_id,
name=name
)
# Sichere Akkumulation
self.call_buffers[call_id].arguments += arguments
self.call_buffers[call_id].is_complete = False
def on_tool_call_complete(self, call_id: str):
with self.buffer_lock:
if call_id in self.call_buffers:
self.call_buffers[call_id].is_complete = True
complete_call = self.call_buffers[call_id]
return complete_call
return None
def get_all_pending_calls(self) -> List[FunctionCall]:
"""Debugging-Hilfe: Zeigt alle offenen Calls"""
with self.buffer_lock:
return [
{**call.to_dict(), "pending": not call.is_complete}
for call in self.call_buffers.values()
]
Fehler 2: Inkonsistentes JSON in Arguments-Deltas
Symptom: json.JSONDecodeError beim Parsen der kompletten Arguments, obwohl jedes Delta für sich valides JSON ist.
# PROBLEMATISCH - Fehlendes Whitespace-Management
class BrokenJSONParser:
def accumulate_arguments(self, delta: str) -> str:
# Problem: JSON braucht oft Whitespace für gültiges Parsing
return self.arguments + delta # Kann enden wie: {"key":"value""next":null}
LÖSUNG - Smarte JSON-Rekonstruktion
class JSONArgumentParser:
def __init__(self):
self.raw_chunks: List[str] = []
self.parsed_cache: Optional[Dict] = None
def add_delta(self, delta: str) -> None:
"""Fügt einen Delta-Chunk hinzu und validiert JSON"""
self.raw_chunks.append(delta)
self.parsed_cache = None # Cache invalidieren
# Versuche sofortiges Parsen für Feedback
test_json = self.get_json_string()
try:
json.loads(test_json)
self.parsed_cache = test_json # Cache valid result
except json.JSONDecodeError:
pass # Noch nicht komplett
def get_json_string(self) -> str:
"""Rekonstruiert möglichst valides JSON"""
return self._reconstruct_json("".join(self.raw_chunks))
def _reconstruct_json(self, raw: str) -> str:
"""
Intelligente JSON-Rekonstruktion.
Behandelt: fehlende Kommas, trailing commas, unvollständige Strings.
"""
result = raw.strip()
# Fall 1: Letztes Token ist unvollständiger String
if result.count('"') % 2 == 1:
# Entferne unvollständigen String
last_quote = result.rfind('"')
if last_quote > 0:
result = result[:last_quote] + '"'
# Fall 2: Fehlendes Komma nach letztem Key
if re.search(r'":\s*[\[{]?\s*$', result):
result = result.rstrip() # Braucht Fortsetzung
# Fall 3: Trailing Komma im Objekt
result = re.sub(r',(\s*[}\]])', r'\1', result)
return result
def get_parsed_arguments(self) -> Optional[Dict[str, Any]]:
"""Gibt geparste Arguments zurück (oder None wenn noch unvollständig)"""
if self.parsed_cache is not None:
try:
return json.loads(self.parsed_cache)
except json.JSONDecodeError:
pass
return None
Fehler 3: Memory Leak bei langen Streaming-Sessions
Symptom: Nach Stunden im Betrieb steigt der Memory-Verbrauch kontinuierlich. Irgendwann OOM-Kills oder drastische Performance-Einbußen.
# PROBLEMATISCH - Unbegrenztes Caching
class MemoryLeakParser:
def __init__(self):
self.all_events: List[StreamEvent] = [] # Wird nie geleert!
self.all_calls: List[FunctionCall] = [] # Akkumuliert für immer!
def on_event(self, event: StreamEvent):
self.all_events.append(event) # ❌ Unbegrenztes Wachstum
if event.function_call:
self.all_calls.append(event.function_call) # ❌ Memory Leak
LÖSUNG - Konfigurierbares Window-basiertes Management
class MemorySafeStreamingParser:
"""
Produktionsreifer Parser mit konfigurierbarem Memory-Management.
"""
def __init__(
self,
max_event_history: int = 1000,
max_completed_calls: int = 500,
gc_interval_seconds: int = 300
):
self.max_event_history = max_event_history
self.max_completed_calls = max_completed_calls
# Window-basierter Event-Buffer
self.event_buffer = collections.deque(maxlen=max_event_history)
# Completed Calls mitLRU-ähnlichem Verhalten
self.completed_calls: OrderedDict[str, FunctionCall] = OrderedDict()
# Metriken für Monitoring
self.total_events_processed = 0
self.total_calls_completed = 0
self.gc_count = 0
# Background GC-Task
self._start_gc_timer(gc_interval_seconds)
def on_event(self, event: StreamEvent) -> None:
"""Verarbeitet Event mit automatischem Memory-Management"""
self.total_events_processed += 1
# Nur aktuelle Events behalten
self.event_buffer.append({
"timestamp": time.time(),
"event_type": event.event_type.value,
"data": event.raw_data
})
# Completed Calls verwalten
if event.event_type == EventType.FUNCTION_CALL_COMPLETE:
self._add_completed_call(event.function_call)
# Automatisches GC wenn nötig
self._check_memory_pressure()
def _add_completed_call(self, call: FunctionCall) -> None:
"""Fügt completed Call hinzu mit LRU-Eviction"""
self.total_calls_completed += 1
# LRU-Eviction: Ältesten entfernen wenn Limit erreicht
while len(self.completed_calls) >= self.max_completed_calls:
self.completed_calls.popitem(last=False)
self.completed_calls[call.call_id] = call
def _check_memory_pressure(self) -> None:
"""Prüft Memory und triggert GC wenn nötig"""
import psutil
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
# GC triggern bei >500MB oder >80% verfügbarem RAM
if memory_mb > 500 or process.memory_percent() > 80:
self._garbage_collect()
def _garbage_collect(self) -> None:
"""Führt GC durch: Alte Events und Calls entfernen"""
self.gc_count += 1
# Events aufräumen (älter als 1 Stunde)
cutoff = time.time() - 3600
while self.event_buffer and self.event_buffer[0]["timestamp"] < cutoff:
self.event_buffer.popleft()
# Calls aufräumen (älter als 30 Minuten)
cutoff = time.time() - 1800
for call_id in list(self.completed_calls.keys()):
# Behalte nur die neuesten Calls
if len(self.completed_calls) > 100:
self.completed_calls.popitem(last=False)
# Force Python GC
import gc
gc.collect()
def get_stats(self) -> Dict[str, Any]:
"""Gibt aktuelle Parser-Statistiken zurück"""
return {
"total_events": self.total_events_processed,
"buffered_events": len(self.event_buffer),
"completed_calls": self.total_calls_completed,
"stored_calls": len(self.completed_calls),
"gc_runs": self.gc_count,
"memory_mb": psutil.Process().memory_info().rss / 1024 / 1024
}
Usage in Production:
parser = MemorySafeStreamingParser(
max_event_history=500,
max_completed_calls=200,
gc_interval_seconds=60
)
Monitoring-Endpoint für Prometheus/Grafana
@app.get("/streaming/stats")
def streaming_stats():
return parser.get_stats()
Frontend-Integration: Real-Time UI-Update
Der Backend-Parser ist nur die halbe Miete. Hier ist meine bewährte TypeScript-Lösung für das Frontend, die ich in drei Produktionsprojekten einsetze:
import { useState, useEffect, useRef, useCallback } from 'react';
interface FunctionCall {
id: string;
name: string;
arguments: string;
status: 'streaming' | 'complete' | 'error';
}
interface StreamState {
content: string;
functionCalls: Map;
isStreaming: boolean;
error: string | null;
usage: { prompt_tokens: number; completion_tokens: number } | null;
}
interface UseStreamingOptions {
apiKey: string;
baseUrl?: string;
model?: string;
onFunctionCall?: (call: FunctionCall) => Promise;
onError?: (error: Error) => void;
}
export function useStreaming(options: UseStreamingOptions) {
const {
apiKey,
baseUrl = 'https://api.holysheep.ai/v1',
model = 'deepseek-chat',
onFunctionCall,
onError
} = options;
const [state, setState] = useState({
content: '',
functionCalls: new Map(),
isStreaming: false,
error: null,
usage: null
});
const abortControllerRef = useRef(null);
const eventSourceRef = useRef(null);
const stream = useCallback(async (messages: any[], functions: any[]) => {
// Cleanup previous streams
abortControllerRef.current?.abort();
eventSourceRef.current?.close();
const abortController = new AbortController();
abortControllerRef.current = abortController;
setState(prev => ({
...prev,
content: '',
functionCalls: new Map(),
isStreaming: true,
error: null
}));
try {
const response = await fetch(${baseUrl}/chat/completions, {
method: 'POST',
headers: {
'Authorization': Bearer ${apiKey},
'Content-Type': 'application/json'
},
body: JSON.stringify({
model,
messages,
tools: functions.map(f => ({ type: 'function', function: f })),
stream: true,
stream_options: { include_usage: true }
}),
signal: abortController.signal
});
if (!response.ok) {
throw new Error(HTTP ${response.status}: ${response.statusText});
}
const reader = response.body?.getReader();
if (!reader) throw new Error('Stream nicht verfügbar');
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6);
if (data === '[DONE]') {
setState(prev => ({ ...prev, isStreaming: false }));
continue;
}
try {
const parsed = JSON.parse(data);
await processChunk(parsed, setState, onFunctionCall);
} catch (e) {
console.warn('Parse error:', e, data);
}
}
}
setState(prev => ({ ...prev, isStreaming: false }));
} catch (e: any) {
if (e.name === 'AbortError') {
setState(prev => ({ ...prev, isStreaming: false }));
return;
}
const error = e instanceof Error ? e : new Error(String(e));
setState(prev => ({
...prev,
isStreaming: false,
error: error.message
}));
onError?.(error);
}
}, [apiKey, baseUrl, model, onFunctionCall, onError]);
const stop = useCallback(() => {
abortControllerRef.current?.abort();
}, []);
useEffect(() => {
return () => {
abortControllerRef.current?.abort();
};
}, []);
return { ...state, stream, stop };
}
async function processChunk(
chunk: any,
setState: any,
onFunctionCall?: (call: FunctionCall) => Promise
) {
const choice = chunk.choices?.[0];
if (!choice) return;
const delta = choice.delta || {};
// Content Delta
if (delta.content) {
setState(prev => ({
...prev,
content: prev.content + delta.content
}));
}
// Tool Calls (OpenAI-kompatibel)
if (delta.tool_calls) {
for (const toolCall of delta.tool_calls) {
const callId = toolCall.id;
const func = toolCall.function;
setState(prev => {
const newCalls = new Map(prev.functionCalls);
const existing = newCalls.get(callId);
if (!existing) {
newCalls.set(callId, {
id: callId,
name: func.name,
arguments: func.arguments || '',
status: 'streaming'
});
} else {
newCalls.set(callId, {
...existing,
arguments: existing.arguments + (func.arguments || '')
});
}
return { ...prev, functionCalls: newCalls };
});
}
}
// Function Call abgeschlossen (finish_reason === 'tool_calls')
if (choice.finish_reason === 'tool_calls') {
const toolCalls = delta.tool_calls || [];
for (const toolCall of toolCalls) {
setState(prev => {
const newCalls = new Map(prev.functionCalls);
const existing = newCalls.get(toolCall.id);
if (existing) {
newCalls.set(toolCall.id, {
...existing,
status: 'complete'
});
}
return { ...prev, functionCalls: newCalls };
});
// Function ausführen und Message ergänzen
if (onFunctionCall) {
const funcCall = setState.getState?.().functionCalls.get(toolCall.id);
if (funcCall) {
try {
const result = await onFunctionCall({
id: toolCall.id,
name: toolCall.function.name,
arguments: toolCall.function.arguments || '',
status: 'complete'
});
// Result für nächsten Turn speichern
setState(prev => ({
...prev,
lastFunctionResult: { callId: toolCall.id, result }
}));
} catch (e) {
console.error('Function execution failed:', e);
}
}
}
}
}
// Usage aus letztem Chunk
if (chunk.usage) {
setState(prev => ({ ...prev, usage: chunk.usage }));
}
}
// ============================================
// React-Komponente für E-Commerce Chat
// ============================================
export function ECommerceChat() {
const [messages, setMessages] = useState([]);
const [input, setInput] = useState('');
const messagesEndRef = useRef(null);
const handleFunctionCall = useCallback(async (call: FunctionCall) => {
console.log('Executing function:', call.name, call.arguments);
// Echte Implementation würde API-Calls machen
const results = {
get_product_info: async (args) => {
return { name: 'Test Product', price: 99.99, stock: 5 };
},
check_order_status: async (args) => {
return { status: 'shipped', tracking: 'DHL-123' };
}
};
const fn = results[call.name as keyof typeof results];
if (fn) {
return await fn(JSON.parse(call.arguments));