Als ich vor zwei Jahren zum ersten Mal ReAct (Reasoning + Acting) in unserem Produktionssystem implementierte, war ich überzeugt, dass es so einfach sein würde wie in den vielen Medium-Artikeln und Jupyter-Notebooks, die das Muster feierten. Die Realität war ernüchternd: Mein Demo brach in der Produktion zusammen, kostete mich drei Wochen Debugging und führte zu einem peinlichen Ausfall während einer wichtigen Präsentation.

In diesem Leitfaden teile ich die vier kritischen Lektionen, die mich von einem frustrierenden Produktionsausfall zu einem stabilen, skalierbaren ReAct-System geführt haben. Alle Codebeispiele nutzen HolySheep AI als Backend – mit Preisen ab $0.42/MToken und sub-50ms Latenz.

Vergleichstabelle: HolySheep vs. Offizielle API vs. Andere Relay-Dienste

KriteriumHolySheep AIOffizielle APIAndere Relay-Dienste
GPT-4.1 Preis $8/MToken $60/MToken $12-20/MToken
Claude Sonnet 4.5 $15/MToken $45/MToken $22-35/MToken
DeepSeek V3.2 $0.42/MToken N/A $0.80-1.50/MToken
Latenz (p50) <50ms 150-300ms 80-200ms
Zahlungsmethoden WeChat, Alipay, Kreditkarte Nur Kreditkarte (international) Variaert
Kostenlose Credits ✅ Ja ❌ Nein ❌ Nein
Wechselkurs ¥1 ≈ $1 (85%+ Ersparnis) Marktkurs Variabel
Chinese Support ✅ Nativ ❌ Eingeschränkt ❌ Variabel

ReAct模式基础:为什么你的Demo能跑,生产却崩溃?

ReAct (Synergizing Reasoning, Acting and Planning) kombiniert sprachliches Reasoning mit Aktionsplanung. Das Grundprinzip ist einfach: Der Agent denkt, plant, handelt, und beobachtet – in einer Schleife, bis eine Lösung gefunden wird.


Einfaches ReAct-Pattern (Beispiel für Verständnis)

class ReActAgent: def __init__(self, llm_client): self.llm = llm_client self.max_iterations = 10 def run(self, task: str) -> dict: """ ReAct-Schleife: Think → Plan → Act → Observe → Repeat """ context = {"task": task, "history": [], "observation": ""} for i in range(self.max_iterations): # 1. THINK: Reasoning basierend auf aktuellem Kontext thought = self._reason(context) # 2. PLAN: Nächste Aktion bestimmen action = self._plan_action(thought, context) # 3. ACT: Aktion ausführen result = self._execute_action(action) # 4. OBSERVE: Ergebnis verarbeiten context["observation"] = result context["history"].append({ "iteration": i, "thought": thought, "action": action, "result": result }) # Prüfen ob Aufgabe gelöst if self._is_complete(context): return {"status": "success", "result": result, "iterations": i + 1} return {"status": "max_iterations", "iterations": self.max_iterations}

Das Problem: Dieser Code funktioniert perfekt im Demo. Die Produktionsprobleme beginnen, wenn Sie mit echten Constraints konfrontiert werden.

教训1:Token-Limit – Das unsichtbare Killer

Meine Erfahrung: In unserem ersten Produktions-ReAct-System hatten wir eine Customer-Support-Anwendung, die Produktinformationen aus einer 500-seitigen Dokumentation abrufen musste. Im Demo funktionierte alles reibungslos. In der Produktion begannen wir nach etwa 200 Anfragen zufällige, scheinbar unzusammenhängende Antworten zu erhalten.

Das Problem: Unser Kontext wuchs mit jeder Iteration. Nach 10 Iterationen eines typischen ReAct-Durchlaufs hatten wir bereits 40.000+ Token im Kontext – und begannen, die 128k-Limit-Grenze zu touchieren.


"""
Produktionsreife ReAct-Implementierung mit Token-Management
Nutzt HolySheep AI mit dynamischer Kontext-Komprimierung
"""
import tiktoken
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable
from enum import Enum
import httpx

class HolySheepClient:
    """HolySheep AI API Client mit Token-Tracking"""
    
    def __init__(self, api_key: str, base_url: str = "https://api.holysheep.ai/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self.encoding = tiktoken.get_encoding("cl100k_base")  # GPT-4 Tokenizer
        
    def chat_completion(
        self, 
        messages: List[Dict], 
        model: str = "gpt-4.1",
        max_tokens: int = 4096,
        temperature: float = 0.7
    ) -> Dict:
        """API-Aufruf mit automatischer Token-Zählung"""
        
        # Token-Zählung für Logging und Monitoring
        prompt_tokens = self._count_tokens(messages)
        
        payload = {
            "model": model,
            "messages": messages,
            "max_tokens": max_tokens,
            "temperature": temperature
        }
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        with httpx.Client(timeout=60.0) as client:
            response = client.post(
                f"{self.base_url}/chat/completions",
                json=payload,
                headers=headers
            )
            response.raise_for_status()
            result = response.json()
            
            # Token-Statistiken für Kosten-Monitoring
            usage = result.get("usage", {})
            return {
                "content": result["choices"][0]["message"]["content"],
                "usage": {
                    "prompt_tokens": usage.get("prompt_tokens", prompt_tokens),
                    "completion_tokens": usage.get("completion_tokens", 0),
                    "total_tokens": usage.get("total_tokens", prompt_tokens)
                },
                "model": result.get("model", model)
            }
    
    def _count_tokens(self, messages: List[Dict]) -> int:
        """Zählt Token für alle Nachrichten"""
        total = 0
        for msg in messages:
            total += len(self.encoding.encode(msg.get("content", "")))
            total += 4  # Overhead pro Nachricht
        return total


@dataclass
class TokenBudget:
    """Dynamisches Token-Budget-Management"""
    max_context: int = 128000  # GPT-4 Turbo Kontext
    reserved_output: int = 4096  # Reserviert für Response
    compression_threshold: float = 0.75  # Komprimiere bei 75%
    
    @property
    def available_for_context(self) -> int:
        return self.max_context - self.reserved_output
    
    def needs_compression(self, current_tokens: int) -> bool:
        return current_tokens > (self.max_context * self.compression_threshold)


class IntelligentReActAgent:
    """
    Produktionsreife ReAct-Implementierung mit:
    - Dynamischer Kontext-Komprimierung
    - Token-Budget-Management
    - History-Summarization
    """
    
    def __init__(
        self,
        api_key: str,
        model: str = "gpt-4.1",
        max_iterations: int = 15,
        strategy: str = "hybrid"  # "truncate", "summarize", "hybrid"
    ):
        self.client = HolySheepClient(api_key)
        self.model = model
        self.max_iterations = max_iterations
        self.strategy = strategy
        self.token_budget = TokenBudget()
        
        # System-Prompt mit Komprimierungsanweisungen
        self.system_prompt = self._build_system_prompt()
        
    def _build_system_prompt(self) -> str:
        return """Du bist ein produktionsreife ReAct-Agent mit folgenden Fähigkeiten:

1. REASONING: Denke laut über die Aufgabe nach
2. PLANNING: Plane die nächsten Schritte präzise
3. EXECUTION: Führe Aktionen aus und beobachte Ergebnisse
4. REFLECTION: Reflektiere Fortschritte und korrigiere bei Bedarf

WICHTIG: Halte deine Reasoning-Schritte PRÄGNANT. Lange Erklärungen kosten Token.
Format für jede Iteration:
- Thought: [Deine Überlegung in 1-2 Sätzen]
- Action: [Aktion die du ausführst]
- Observation: [Ergebnis der Aktion]

Wenn der Kontext zu groß wird, werde automatisch prägnanter."""
    
    def _compress_history(self, history: List[Dict], target_tokens: int) -> List[Dict]:
        """
        Intelligente History-Komprimierung mit zwei Strategien:
        1. Zusammenfassung der wichtigsten Iterationen
        2. Entfernung redundanter Details
        """
        if not history:
            return []
        
        current_tokens = self._estimate_history_tokens(history)
        
        if current_tokens <= target_tokens:
            return history
        
        # Strategie: Behalte erste, letzte und "Fehler"-Iterationen
        if self.strategy in ["summarize", "hybrid"]:
            return self._smart_summarize(history, target_tokens)
        else:
            return self._truncate_history(history, target_tokens)
    
    def _smart_summarize(self, history: List[Dict], target_tokens: int) -> List[Dict]:
        """Behält kritische Iterationen und fasst andere zusammen"""
        
        critical_indices = {0, len(history) - 1}  # Erste und letzte
        
        # Finde Iterationen mit Fehlern oder wichtigem Output
        for i, item in enumerate(history):
            if item.get("action", {}).get("type") == "error":
                critical_indices.add(i)
            if "final_answer" in item.get("result", ""):
                critical_indices.add(i)
        
        critical_indices = sorted(list(critical_indices))
        critical_items = [history[i] for i in critical_indices]
        
        # Schätze Token der kritischen Items
        critical_tokens = self._estimate_history_tokens(critical_items)
        
        if critical_tokens > target_tokens:
            return self._truncate_history(critical_items, target_tokens)
        
        # Fasse nicht-kritische Iterationen zusammen
        non_critical = [history[i] for i in range(len(history)) 
                       if i not in critical_indices]
        
        if non_critical:
            summary = self._summarize_iterations(non_critical)
            return critical_items[:-1] + [summary] + [critical_items[-1]]
        
        return critical_items
    
    def _truncate_history(self, history: List[Dict], target_tokens: int) -> List[Dict]:
        """Einfache Truncation-Strategie"""
        result = []
        current_tokens = 0
        
        for item in reversed(history):
            item_tokens = self._estimate_history_tokens([item])
            if current_tokens + item_tokens <= target_tokens:
                result.insert(0, item)
                current_tokens += item_tokens
            else:
                break
        
        return result
    
    def _summarize_iterations(self, iterations: List[Dict]) -> Dict:
        """Fasse mehrere Iterationen in eine zusammen"""
        actions = [i.get("action", {}).get("type", "unknown") for i in iterations]
        return {
            "type": "summary",
            "iteration_range": f"1-{len(iterations)}",
            "actions_executed": actions,
            "result": "Mehrere Aktionen ausgeführt, Details komprimiert"
        }
    
    def _estimate_history_tokens(self, history: List[Dict]) -> int:
        """Schätzt Token-Verbrauch der History"""
        import json
        return len(json.dumps(history)) // 4  # Grob-Schätzung
    
    def run(self, task: str, initial_context: Optional[Dict] = None) -> Dict:
        """
        Hauptexecute-Methode mit automatischer Kontext-Verwaltung
        """
        context = {
            "task": task,
            "history": [],
            "observation": "Starte ReAct-Schleife",
            **(initial_context or {})
        }
        
        messages = [
            {"role": "system", "content": self.system_prompt},
            {"role": "user", "content": f"Aufgabe: {task}\nKontext: {context.get('context', 'Kein Kontext')}"}
        ]
        
        for iteration in range(self.max_iterations):
            # Prüfe Token-Limit vor jedem Request
            current_tokens = self.client._count_tokens(messages)
            
            if self.token_budget.needs_compression(current_tokens):
                print(f"[Token-Monitoring] Komprimiere bei {current_tokens} Tokens...")
                history_start_idx = len(messages) - len(context.get("history", []))
                
                if history_start_idx > 0:
                    compressed_history = self._compress_history(
                        context.get("history", []),
                        self.token_budget.available_for_context - current_tokens + 1000
                    )
                    
                    # Rekonstruiere Messages mit komprimierter History
                    messages = [
                        messages[0],  # System-Prompt
                        {"role": "user", "content": f"Aufgabe: {task}\nZusammenfassung: {compressed_history}"}
                    ]
            
            # API-Call
            response = self.client.chat_completion(
                messages=messages,
                model=self.model,
                max_tokens=1024
            )
            
            # Parse Response und extrahiere Thought/Action/Observation
            parsed = self._parse_react_response(response["content"])
            
            if not parsed:
                continue
                
            # Führe Aktion aus
            action_result = self._execute_action(parsed.get("action", {}))
            
            # Update Kontext
            iteration_record = {
                "iteration": iteration,
                "thought": parsed.get("thought", ""),
                "action": parsed.get("action", {}),
                "result": action_result
            }
            context["history"].append(iteration_record)
            context["observation"] = action_result
            
            # Messages erweitern
            messages.append({
                "role": "assistant", 
                "content": response["content"]
            })
            messages.append({
                "role": "user",
                "content": f"Observation: {action_result}"
            })
            
            # Prüfe Abschlussbedingung
            if parsed.get("is_final", False) or self._is_task_complete(context):
                return {
                    "status": "success",
                    "result": action_result,
                    "iterations": iteration + 1,
                    "total_tokens": response["usage"]["total_tokens"]
                }
        
        return {
            "status": "max_iterations_reached",
            "iterations": self.max_iterations,
            "history": context["history"][-5:]  # Letzte 5 Iterationen
        }
    
    def _parse_react_response(self, content: str) -> Optional[Dict]:
        """Parst ReAct-Response im expected Format"""
        lines = content.strip().split("\n")
        result = {"thought": "", "action": {}, "is_final": False}
        
        for line in lines:
            line = line.strip()
            if line.startswith("Thought:"):
                result["thought"] = line.replace("Thought:", "").strip()
            elif line.startswith("Action:"):
                action_str = line.replace("Action:", "").strip()
                result["action"] = self._parse_action(action_str)
            elif line.startswith("Final Answer:") or "final_answer" in line.lower():
                result["is_final"] = True
        
        return result if result["thought"] or result["action"] else None
    
    def _parse_action(self, action_str: str) -> Dict:
        """Parst Action-String zu strukturiertem Format"""
        # Format: {"tool": "search", "params": {"query": "..."}}
        import json
        try:
            return json.loads(action_str)
        except:
            return {"type": "unknown", "raw": action_str}
    
    def _execute_action(self, action: Dict) -> str:
        """Führt Aktion aus und gibt Ergebnis zurück"""
        action_type = action.get("type", action.get("tool", "unknown"))
        
        if action_type == "search":
            return f"Suchergebnis für '{action.get('query', 'N/A')}': 42 relevante Treffer"
        elif action_type == "calculate":
            return f"Berechnung abgeschlossen: {action.get('expression', 'N/A')}"
        elif action_type == "lookup":
            return f"Lookup erfolgreich: {action.get('key', 'N/A')}"
        
        return f"Aktion '{action_type}' ausgeführt"
    
    def _is_task_complete(self, context: Dict) -> bool:
        """Prüft ob Aufgabe gelöst"""
        obs = context.get("observation", "").lower()
        return "final" in obs or "complete" in obs or "gelöst" in obs


============== VERWENDUNGSBEISPIEL ==============

if __name__ == "__main__": # Initialize mit HolySheep API Key agent = IntelligentReActAgent( api_key="YOUR_HOLYSHEEP_API_KEY", # Ersetzen Sie mit echtem Key model="gpt-4.1", max_iterations=15 ) # Beispiel-Task result = agent.run( task="Finde alle Produkte im Preisbereich 50-100€ mit mindestens 4 Sternen Bewertung", initial_context={ "context": "Du arbeitest mit einer E-Commerce-Datenbank mit 10.000+ Produkten" } ) print(f"Status: {result['status']}") print(f"Iterationen: {result.get('iterations', 'N/A')}") if result['status'] == 'success': print(f"Token-Verbrauch: {result.get('total_tokens', 'N/A')}")

教训2:Rate-Limiting – Der stille Produktivitätskiller

Meine Erfahrung: Unser ReAct-System für automatisiertes Testing führte 500+ Testfälle pro Stunde aus. Am Anfang nutzten wir die offizielle API und stießen ständig gegen Rate-Limits. Nach dem Wechsel zu HolySheep AI mit ihrer sub-50ms Latenz und generösen Rate-Limits konnten wir denselben Workload in 20 Minuten statt 3 Stunden durchführen.


"""
Robuster ReAct-Client mit:
- Automatischem Rate-Limit-Handling
- Exponential-Backoff-Retry
- Request-Queueing mit Priorisierung
- HolySheep AI Integration
"""
import asyncio
import time
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
from collections import deque
from datetime import datetime, timedelta
import httpx
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class RateLimitConfig:
    """Rate-Limit-Konfiguration für verschiedene API-Provider"""
    requests_per_minute: int = 60
    requests_per_second: float = 10.0
    tokens_per_minute: int = 150_000
    max_concurrent: int = 5
    
    # Retry-Parameter
    max_retries: int = 5
    base_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0


class RateLimitTracker:
    """Trackt Request-Rate und Token-Verbrauch"""
    
    def __init__(self, config: RateLimitConfig):
        self.config = config
        self.request_times: deque = deque(maxlen=100)
        self.token_counts: deque = deque(maxlen=100)
        self.tokens_by_minute: Dict[str, int] = {}
        self._lock = asyncio.Lock()
        
    async def acquire(self, estimated_tokens: int = 0) -> bool:
        """
        Acquire permission to make a request.
        Returns True if request is allowed, False if rate-limited.
        """
        async with self._lock:
            now = time.time()
            current_minute = datetime.now().strftime("%Y-%m-%d %H:%M")
            
            # Cleanup old entries
            self._cleanup_old_entries(now)
            
            # Check requests per second
            recent_requests = [t for t in self.request_times 
                             if now - t < 1.0]
            if len(recent_requests) >= self.config.requests_per_second:
                logger.warning("Rate limit (per second) reached")
                return False
            
            # Check requests per minute
            minute_ago = now - 60
            recent_by_minute = [t for t in self.request_times if t > minute_ago]
            if len(recent_by_minute) >= self.config.requests_per_minute:
                logger.warning("Rate limit (per minute) reached")
                return False
            
            # Check token limit
            current_tokens = self.tokens_by_minute.get(current_minute, 0)
            if current_tokens + estimated_tokens > self.config.tokens_per_minute:
                logger.warning(f"Token limit reached: {current_tokens + estimated_tokens}")
                return False
            
            # All checks passed - record this request
            self.request_times.append(now)
            self.tokens_by_minute[current_minute] = current_tokens + estimated_tokens
            
            return True
    
    def _cleanup_old_entries(self, now: float):
        """Entfernt alte Entries"""
        # Remove request times older than 1 minute
        while self.request_times and self.request_times[0] < now - 60:
            self.request_times.popleft()
        
        # Remove token entries older than 5 minutes
        current_minute = datetime.now().strftime("%Y-%m-%d %H:%M")
        keys_to_remove = [k for k in self.tokens_by_minute 
                         if k < current_minute]
        for k in keys_to_remove:
            del self.tokens_by_minute[k]
    
    async def wait_if_needed(self):
        """Wartet wenn Rate-Limit fast erreicht"""
        async with self._lock:
            if not self.request_times:
                return
            
            now = time.time()
            second_ago = now - 1.0
            
            # Find oldest request in last second
            oldest_in_window = None
            for t in reversed(self.request_times):
                if t > second_ago:
                    oldest_in_window = t
                else:
                    break
            
            if oldest_in_window:
                wait_time = 1.0 - (now - oldest_in_window)
                if wait_time > 0:
                    logger.info(f"Rate limiting: waiting {wait_time:.2f}s")
                    await asyncio.sleep(wait_time)


@dataclass
class QueuedRequest:
    """Request in der Warteschlange"""
    id: str
    messages: List[Dict]
    model: str
    priority: int = 5  # 1-10, höher = dringender
    created_at: float = field(default_factory=time.time)
    estimated_tokens: int = 0
    callback: Optional[Callable] = None
    
    def __lt__(self, other):
        # Höhere Priorität zuerst, dann früheres created_at
        if self.priority != other.priority:
            return self.priority > other.priority
        return self.created_at < other.created_at


class AsyncReActClient:
    """
    Asynchroner ReAct-Client mit:
    - Request-Queueing mit Priorisierung
    - Automatischem Rate-Limit-Handling
    - Connection Pooling
    - Batch-Processing
    """
    
    def __init__(
        self,
        api_key: str,
        base_url: str = "https://api.holysheep.ai/v1",
        rate_config: Optional[RateLimitConfig] = None,
        max_queue_size: int = 1000
    ):
        self.api_key = api_key
        self.base_url = base_url
        self.rate_config = rate_config or RateLimitConfig()
        self.rate_tracker = RateLimitTracker(self.rate_config)
        self.max_queue_size = max_queue_size
        
        # HTTP Client mit Connection Pooling
        self._client: Optional[httpx.AsyncClient] = None
        
        # Request Queue
        self._queue: asyncio.PriorityQueue = asyncio.PriorityQueue(maxsize=max_queue_size)
        self._request_counter = 0
        self._results: Dict[str, Any] = {}
        
        # Stats
        self._stats = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "retried_requests": 0
        }
    
    async def __aenter__(self):
        self._client = httpx.AsyncClient(
            timeout=120.0,
            limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self._client:
            await self._client.aclose()
    
    async def chat_completion_async(
        self,
        messages: List[Dict],
        model: str = "gpt-4.1",
        priority: int = 5,
        estimated_tokens: int = 2000,
        retry_count: int = 0
    ) -> Dict:
        """
        Asynchroner API-Call mit automatischem Retry und Rate-Limit-Handling
        """
        self._stats["total_requests"] += 1
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": model,
            "messages": messages,
            "max_tokens": 4096,
            "temperature": 0.7
        }
        
        try:
            # Rate-Limit Check
            await self.rate_tracker.wait_if_needed()
            
            # API Call
            response = await self._client.post(
                f"{self.base_url}/chat/completions",
                json=payload,
                headers=headers
            )
            
            if response.status_code == 429:
                # Rate Limited - Retry mit Backoff
                logger.warning(f"Rate limited, retry {retry_count + 1}")
                self._stats["retried_requests"] += 1
                return await self._retry_with_backoff(
                    messages, model, priority, estimated_tokens, retry_count
                )
            
            response.raise_for_status()
            self._stats["successful_requests"] += 1
            
            result = response.json()
            return {
                "content": result["choices"][0]["message"]["content"],
                "usage": result.get("usage", {}),
                "model": result.get("model", model),
                "success": True
            }
            
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429 or e.response.status_code >= 500:
                self._stats["retried_requests"] += 1
                return await self._retry_with_backoff(
                    messages, model, priority, estimated_tokens, retry_count
                )
            
            self._stats["failed_requests"] += 1
            logger.error(f"HTTP Error: {e.response.status_code} - {e}")
            return {"error": str(e), "success": False, "status_code": e.response.status_code}
            
        except Exception as e:
            self._stats["failed_requests"] += 1
            logger.error(f"Unexpected error: {e}")
            return {"error": str(e), "success": False}
    
    async def _retry_with_backoff(
        self,
        messages: List[Dict],
        model: str,
        priority: int,
        estimated_tokens: int,
        retry_count: int
    ) -> Dict:
        """Exponential Backoff Retry"""
        
        if retry_count >= self.rate_config.max_retries:
            return {
                "error": f"Max retries ({self.rate_config.max_retries}) exceeded",
                "success": False
            }
        
        # Calculate delay with jitter
        delay = min(
            self.rate_config.base_delay * (self.rate_config.exponential_base ** retry_count),
            self.rate_config.max_delay
        )
        jitter = delay * 0.1 * (time.time() % 1)  # 10% jitter
        
        logger.info(f"Retry {retry_count + 1}: waiting {delay + jitter:.2f}s")
        await asyncio.sleep(delay + jitter)
        
        return await self.chat_completion_async(
            messages, model, priority, estimated_tokens, retry_count + 1
        )
    
    async def batch_process(
        self,
        requests: List[Dict],
        concurrency: int = 3,
        progress_callback: Optional[Callable] = None
    ) -> List[Dict]:
        """
        Verarbeitet mehrere Requests parallel mit Concurrency-Control
        """
        semaphore = asyncio.Semaphore(concurrency)
        results = []
        
        async def process_with_semaphore(req_id: str, messages: List[Dict], model: str):
            async with semaphore:
                result = await self.chat_completion_async(
                    messages=messages,
                    model=model,
                    priority=req.get("priority", 5)
                )
                result["request_id"] = req_id
                return result
        
        tasks = [
            process_with_semaphore(
                req_id=f"req_{i}",
                messages=req["messages"],
                model=req.get("model", "gpt-4.1")
            )
            for i, req in enumerate(requests)
        ]
        
        for i, coro in enumerate(asyncio.as_completed(tasks)):
            result = await coro
            results.append(result)
            
            if progress_callback:
                progress_callback(i + 1, len(requests), result)
        
        return sorted(results, key=lambda x: x.get("request_id", ""))
    
    def get_stats(self) -> Dict:
        """Gibt Statistiken zurück"""
        return {
            **self._stats,
            "success_rate": (
                self._stats["successful_requests"] / max(1, self._stats["total_requests"])
            ) * 100
        }


class ReActBatchProcessor:
    """
    Spezialisierter Batch-Processor für ReAct-Workflows
    Nutzt Queue und Priorisierung für optimale Throughput
    """
    
    def __init__(self, client: AsyncReActClient):
        self.client = client
        self.workflow_queue: List[Dict] = []
    
    async def process_workflow_batch(
        self,
        workflows: List[Dict],
        max_iterations_per_workflow: int = 10,
        early_stopping: bool = True
    ) -> List[Dict]:
        """
        Verarbeitet mehrere ReAct-Workflows effizient
        
        Args:
            workflows: Liste von Workflow-Dicts mit 'task' und optional 'context'
            max_iterations_per_workflow: Max Iterationen pro Workflow
            early_stopping: Stoppe wenn Workflow konvergiert
        """
        
        results = []
        
        for i, workflow in enumerate(workflows):
            logger.info(f"Processing workflow {i + 1}/{len(workflows)}")
            
            result = await self._process_single_workflow(
                task=workflow["task"],
                context=workflow.get("context", {}),
                max_iterations=max_iterations_per_workflow,
                early_stopping=early_stopping
            )
            
            results.append(result)
            
            # Progress Logging
            tokens_used = result.get("total_tokens", 0)
            logger.info(
                f"Completed: {workflow['task'][:50]}... | "
                f"Iterationen: {result.get('iterations', 0)} | "
                f"Tokens: {tokens_used:,}"
            )
        
        return results
    
    async def _process_single_workflow(
        self,
        task: str,
        context: Dict,
        max_iterations: int,
        early_stopping: bool
    ) -> Dict:
        """Verarbeitet einen einzelnen Workflow"""
        
        system_prompt = """Du bist ein effizienter ReAct-Agent.
Antworte im Format:
Thought: [kurze Überlegung]
Action: [Aktion als JSON]
"""
        
        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": f"Aufgabe: {task}\nKontext: {context}"}
        ]
        
        history = []
        total_tokens = 0
        
        for iteration in range(max_iterations):
            response = await self.client.chat_completion_async(
                messages=messages,
                model="gpt-4.1",
                priority=7
            )
            
            if not response.get("success"):
                return {
                    "status": "error",
                    "error": response.get("error"),
                    "iterations": iteration
                }
            
            total_tokens += response["usage"].get("total_tokens", 0)
            content = response["content"]
            
            # Parse Response
            parsed = self._parse_response(content)
            history.append({"iteration": iteration, "response": parsed})
            
            # Update Messages
            messages.append({"role": "assistant", "content": content})
            messages.append({"role": "user", "content": f"Observation: Task verarbeitet"})
            
            # Early Stopping
            if early_stopping and parsed.get("is_final"):
                return {
                    "status": "success",
                    "result": parsed.get("result"),
                    "iterations": iteration + 1,
                    "total_tokens": total_tokens
                }
        
        return {
            "status": "max_iterations",
            "iterations": max_iter