Als ich vor zwei Jahren zum ersten Mal ReAct (Reasoning + Acting) in unserem Produktionssystem implementierte, war ich überzeugt, dass es so einfach sein würde wie in den vielen Medium-Artikeln und Jupyter-Notebooks, die das Muster feierten. Die Realität war ernüchternd: Mein Demo brach in der Produktion zusammen, kostete mich drei Wochen Debugging und führte zu einem peinlichen Ausfall während einer wichtigen Präsentation.
In diesem Leitfaden teile ich die vier kritischen Lektionen, die mich von einem frustrierenden Produktionsausfall zu einem stabilen, skalierbaren ReAct-System geführt haben. Alle Codebeispiele nutzen HolySheep AI als Backend – mit Preisen ab $0.42/MToken und sub-50ms Latenz.
Vergleichstabelle: HolySheep vs. Offizielle API vs. Andere Relay-Dienste
| Kriterium | HolySheep AI | Offizielle API | Andere Relay-Dienste |
|---|---|---|---|
| GPT-4.1 Preis | $8/MToken | $60/MToken | $12-20/MToken |
| Claude Sonnet 4.5 | $15/MToken | $45/MToken | $22-35/MToken |
| DeepSeek V3.2 | $0.42/MToken | N/A | $0.80-1.50/MToken |
| Latenz (p50) | <50ms | 150-300ms | 80-200ms |
| Zahlungsmethoden | WeChat, Alipay, Kreditkarte | Nur Kreditkarte (international) | Variaert |
| Kostenlose Credits | ✅ Ja | ❌ Nein | ❌ Nein |
| Wechselkurs | ¥1 ≈ $1 (85%+ Ersparnis) | Marktkurs | Variabel |
| Chinese Support | ✅ Nativ | ❌ Eingeschränkt | ❌ Variabel |
ReAct模式基础:为什么你的Demo能跑,生产却崩溃?
ReAct (Synergizing Reasoning, Acting and Planning) kombiniert sprachliches Reasoning mit Aktionsplanung. Das Grundprinzip ist einfach: Der Agent denkt, plant, handelt, und beobachtet – in einer Schleife, bis eine Lösung gefunden wird.
Einfaches ReAct-Pattern (Beispiel für Verständnis)
class ReActAgent:
def __init__(self, llm_client):
self.llm = llm_client
self.max_iterations = 10
def run(self, task: str) -> dict:
"""
ReAct-Schleife: Think → Plan → Act → Observe → Repeat
"""
context = {"task": task, "history": [], "observation": ""}
for i in range(self.max_iterations):
# 1. THINK: Reasoning basierend auf aktuellem Kontext
thought = self._reason(context)
# 2. PLAN: Nächste Aktion bestimmen
action = self._plan_action(thought, context)
# 3. ACT: Aktion ausführen
result = self._execute_action(action)
# 4. OBSERVE: Ergebnis verarbeiten
context["observation"] = result
context["history"].append({
"iteration": i,
"thought": thought,
"action": action,
"result": result
})
# Prüfen ob Aufgabe gelöst
if self._is_complete(context):
return {"status": "success", "result": result, "iterations": i + 1}
return {"status": "max_iterations", "iterations": self.max_iterations}
Das Problem: Dieser Code funktioniert perfekt im Demo. Die Produktionsprobleme beginnen, wenn Sie mit echten Constraints konfrontiert werden.
教训1:Token-Limit – Das unsichtbare Killer
Meine Erfahrung: In unserem ersten Produktions-ReAct-System hatten wir eine Customer-Support-Anwendung, die Produktinformationen aus einer 500-seitigen Dokumentation abrufen musste. Im Demo funktionierte alles reibungslos. In der Produktion begannen wir nach etwa 200 Anfragen zufällige, scheinbar unzusammenhängende Antworten zu erhalten.
Das Problem: Unser Kontext wuchs mit jeder Iteration. Nach 10 Iterationen eines typischen ReAct-Durchlaufs hatten wir bereits 40.000+ Token im Kontext – und begannen, die 128k-Limit-Grenze zu touchieren.
"""
Produktionsreife ReAct-Implementierung mit Token-Management
Nutzt HolySheep AI mit dynamischer Kontext-Komprimierung
"""
import tiktoken
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable
from enum import Enum
import httpx
class HolySheepClient:
"""HolySheep AI API Client mit Token-Tracking"""
def __init__(self, api_key: str, base_url: str = "https://api.holysheep.ai/v1"):
self.api_key = api_key
self.base_url = base_url
self.encoding = tiktoken.get_encoding("cl100k_base") # GPT-4 Tokenizer
def chat_completion(
self,
messages: List[Dict],
model: str = "gpt-4.1",
max_tokens: int = 4096,
temperature: float = 0.7
) -> Dict:
"""API-Aufruf mit automatischer Token-Zählung"""
# Token-Zählung für Logging und Monitoring
prompt_tokens = self._count_tokens(messages)
payload = {
"model": model,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
with httpx.Client(timeout=60.0) as client:
response = client.post(
f"{self.base_url}/chat/completions",
json=payload,
headers=headers
)
response.raise_for_status()
result = response.json()
# Token-Statistiken für Kosten-Monitoring
usage = result.get("usage", {})
return {
"content": result["choices"][0]["message"]["content"],
"usage": {
"prompt_tokens": usage.get("prompt_tokens", prompt_tokens),
"completion_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", prompt_tokens)
},
"model": result.get("model", model)
}
def _count_tokens(self, messages: List[Dict]) -> int:
"""Zählt Token für alle Nachrichten"""
total = 0
for msg in messages:
total += len(self.encoding.encode(msg.get("content", "")))
total += 4 # Overhead pro Nachricht
return total
@dataclass
class TokenBudget:
"""Dynamisches Token-Budget-Management"""
max_context: int = 128000 # GPT-4 Turbo Kontext
reserved_output: int = 4096 # Reserviert für Response
compression_threshold: float = 0.75 # Komprimiere bei 75%
@property
def available_for_context(self) -> int:
return self.max_context - self.reserved_output
def needs_compression(self, current_tokens: int) -> bool:
return current_tokens > (self.max_context * self.compression_threshold)
class IntelligentReActAgent:
"""
Produktionsreife ReAct-Implementierung mit:
- Dynamischer Kontext-Komprimierung
- Token-Budget-Management
- History-Summarization
"""
def __init__(
self,
api_key: str,
model: str = "gpt-4.1",
max_iterations: int = 15,
strategy: str = "hybrid" # "truncate", "summarize", "hybrid"
):
self.client = HolySheepClient(api_key)
self.model = model
self.max_iterations = max_iterations
self.strategy = strategy
self.token_budget = TokenBudget()
# System-Prompt mit Komprimierungsanweisungen
self.system_prompt = self._build_system_prompt()
def _build_system_prompt(self) -> str:
return """Du bist ein produktionsreife ReAct-Agent mit folgenden Fähigkeiten:
1. REASONING: Denke laut über die Aufgabe nach
2. PLANNING: Plane die nächsten Schritte präzise
3. EXECUTION: Führe Aktionen aus und beobachte Ergebnisse
4. REFLECTION: Reflektiere Fortschritte und korrigiere bei Bedarf
WICHTIG: Halte deine Reasoning-Schritte PRÄGNANT. Lange Erklärungen kosten Token.
Format für jede Iteration:
- Thought: [Deine Überlegung in 1-2 Sätzen]
- Action: [Aktion die du ausführst]
- Observation: [Ergebnis der Aktion]
Wenn der Kontext zu groß wird, werde automatisch prägnanter."""
def _compress_history(self, history: List[Dict], target_tokens: int) -> List[Dict]:
"""
Intelligente History-Komprimierung mit zwei Strategien:
1. Zusammenfassung der wichtigsten Iterationen
2. Entfernung redundanter Details
"""
if not history:
return []
current_tokens = self._estimate_history_tokens(history)
if current_tokens <= target_tokens:
return history
# Strategie: Behalte erste, letzte und "Fehler"-Iterationen
if self.strategy in ["summarize", "hybrid"]:
return self._smart_summarize(history, target_tokens)
else:
return self._truncate_history(history, target_tokens)
def _smart_summarize(self, history: List[Dict], target_tokens: int) -> List[Dict]:
"""Behält kritische Iterationen und fasst andere zusammen"""
critical_indices = {0, len(history) - 1} # Erste und letzte
# Finde Iterationen mit Fehlern oder wichtigem Output
for i, item in enumerate(history):
if item.get("action", {}).get("type") == "error":
critical_indices.add(i)
if "final_answer" in item.get("result", ""):
critical_indices.add(i)
critical_indices = sorted(list(critical_indices))
critical_items = [history[i] for i in critical_indices]
# Schätze Token der kritischen Items
critical_tokens = self._estimate_history_tokens(critical_items)
if critical_tokens > target_tokens:
return self._truncate_history(critical_items, target_tokens)
# Fasse nicht-kritische Iterationen zusammen
non_critical = [history[i] for i in range(len(history))
if i not in critical_indices]
if non_critical:
summary = self._summarize_iterations(non_critical)
return critical_items[:-1] + [summary] + [critical_items[-1]]
return critical_items
def _truncate_history(self, history: List[Dict], target_tokens: int) -> List[Dict]:
"""Einfache Truncation-Strategie"""
result = []
current_tokens = 0
for item in reversed(history):
item_tokens = self._estimate_history_tokens([item])
if current_tokens + item_tokens <= target_tokens:
result.insert(0, item)
current_tokens += item_tokens
else:
break
return result
def _summarize_iterations(self, iterations: List[Dict]) -> Dict:
"""Fasse mehrere Iterationen in eine zusammen"""
actions = [i.get("action", {}).get("type", "unknown") for i in iterations]
return {
"type": "summary",
"iteration_range": f"1-{len(iterations)}",
"actions_executed": actions,
"result": "Mehrere Aktionen ausgeführt, Details komprimiert"
}
def _estimate_history_tokens(self, history: List[Dict]) -> int:
"""Schätzt Token-Verbrauch der History"""
import json
return len(json.dumps(history)) // 4 # Grob-Schätzung
def run(self, task: str, initial_context: Optional[Dict] = None) -> Dict:
"""
Hauptexecute-Methode mit automatischer Kontext-Verwaltung
"""
context = {
"task": task,
"history": [],
"observation": "Starte ReAct-Schleife",
**(initial_context or {})
}
messages = [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": f"Aufgabe: {task}\nKontext: {context.get('context', 'Kein Kontext')}"}
]
for iteration in range(self.max_iterations):
# Prüfe Token-Limit vor jedem Request
current_tokens = self.client._count_tokens(messages)
if self.token_budget.needs_compression(current_tokens):
print(f"[Token-Monitoring] Komprimiere bei {current_tokens} Tokens...")
history_start_idx = len(messages) - len(context.get("history", []))
if history_start_idx > 0:
compressed_history = self._compress_history(
context.get("history", []),
self.token_budget.available_for_context - current_tokens + 1000
)
# Rekonstruiere Messages mit komprimierter History
messages = [
messages[0], # System-Prompt
{"role": "user", "content": f"Aufgabe: {task}\nZusammenfassung: {compressed_history}"}
]
# API-Call
response = self.client.chat_completion(
messages=messages,
model=self.model,
max_tokens=1024
)
# Parse Response und extrahiere Thought/Action/Observation
parsed = self._parse_react_response(response["content"])
if not parsed:
continue
# Führe Aktion aus
action_result = self._execute_action(parsed.get("action", {}))
# Update Kontext
iteration_record = {
"iteration": iteration,
"thought": parsed.get("thought", ""),
"action": parsed.get("action", {}),
"result": action_result
}
context["history"].append(iteration_record)
context["observation"] = action_result
# Messages erweitern
messages.append({
"role": "assistant",
"content": response["content"]
})
messages.append({
"role": "user",
"content": f"Observation: {action_result}"
})
# Prüfe Abschlussbedingung
if parsed.get("is_final", False) or self._is_task_complete(context):
return {
"status": "success",
"result": action_result,
"iterations": iteration + 1,
"total_tokens": response["usage"]["total_tokens"]
}
return {
"status": "max_iterations_reached",
"iterations": self.max_iterations,
"history": context["history"][-5:] # Letzte 5 Iterationen
}
def _parse_react_response(self, content: str) -> Optional[Dict]:
"""Parst ReAct-Response im expected Format"""
lines = content.strip().split("\n")
result = {"thought": "", "action": {}, "is_final": False}
for line in lines:
line = line.strip()
if line.startswith("Thought:"):
result["thought"] = line.replace("Thought:", "").strip()
elif line.startswith("Action:"):
action_str = line.replace("Action:", "").strip()
result["action"] = self._parse_action(action_str)
elif line.startswith("Final Answer:") or "final_answer" in line.lower():
result["is_final"] = True
return result if result["thought"] or result["action"] else None
def _parse_action(self, action_str: str) -> Dict:
"""Parst Action-String zu strukturiertem Format"""
# Format: {"tool": "search", "params": {"query": "..."}}
import json
try:
return json.loads(action_str)
except:
return {"type": "unknown", "raw": action_str}
def _execute_action(self, action: Dict) -> str:
"""Führt Aktion aus und gibt Ergebnis zurück"""
action_type = action.get("type", action.get("tool", "unknown"))
if action_type == "search":
return f"Suchergebnis für '{action.get('query', 'N/A')}': 42 relevante Treffer"
elif action_type == "calculate":
return f"Berechnung abgeschlossen: {action.get('expression', 'N/A')}"
elif action_type == "lookup":
return f"Lookup erfolgreich: {action.get('key', 'N/A')}"
return f"Aktion '{action_type}' ausgeführt"
def _is_task_complete(self, context: Dict) -> bool:
"""Prüft ob Aufgabe gelöst"""
obs = context.get("observation", "").lower()
return "final" in obs or "complete" in obs or "gelöst" in obs
============== VERWENDUNGSBEISPIEL ==============
if __name__ == "__main__":
# Initialize mit HolySheep API Key
agent = IntelligentReActAgent(
api_key="YOUR_HOLYSHEEP_API_KEY", # Ersetzen Sie mit echtem Key
model="gpt-4.1",
max_iterations=15
)
# Beispiel-Task
result = agent.run(
task="Finde alle Produkte im Preisbereich 50-100€ mit mindestens 4 Sternen Bewertung",
initial_context={
"context": "Du arbeitest mit einer E-Commerce-Datenbank mit 10.000+ Produkten"
}
)
print(f"Status: {result['status']}")
print(f"Iterationen: {result.get('iterations', 'N/A')}")
if result['status'] == 'success':
print(f"Token-Verbrauch: {result.get('total_tokens', 'N/A')}")
教训2:Rate-Limiting – Der stille Produktivitätskiller
Meine Erfahrung: Unser ReAct-System für automatisiertes Testing führte 500+ Testfälle pro Stunde aus. Am Anfang nutzten wir die offizielle API und stießen ständig gegen Rate-Limits. Nach dem Wechsel zu HolySheep AI mit ihrer sub-50ms Latenz und generösen Rate-Limits konnten wir denselben Workload in 20 Minuten statt 3 Stunden durchführen.
"""
Robuster ReAct-Client mit:
- Automatischem Rate-Limit-Handling
- Exponential-Backoff-Retry
- Request-Queueing mit Priorisierung
- HolySheep AI Integration
"""
import asyncio
import time
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
from collections import deque
from datetime import datetime, timedelta
import httpx
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class RateLimitConfig:
"""Rate-Limit-Konfiguration für verschiedene API-Provider"""
requests_per_minute: int = 60
requests_per_second: float = 10.0
tokens_per_minute: int = 150_000
max_concurrent: int = 5
# Retry-Parameter
max_retries: int = 5
base_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
class RateLimitTracker:
"""Trackt Request-Rate und Token-Verbrauch"""
def __init__(self, config: RateLimitConfig):
self.config = config
self.request_times: deque = deque(maxlen=100)
self.token_counts: deque = deque(maxlen=100)
self.tokens_by_minute: Dict[str, int] = {}
self._lock = asyncio.Lock()
async def acquire(self, estimated_tokens: int = 0) -> bool:
"""
Acquire permission to make a request.
Returns True if request is allowed, False if rate-limited.
"""
async with self._lock:
now = time.time()
current_minute = datetime.now().strftime("%Y-%m-%d %H:%M")
# Cleanup old entries
self._cleanup_old_entries(now)
# Check requests per second
recent_requests = [t for t in self.request_times
if now - t < 1.0]
if len(recent_requests) >= self.config.requests_per_second:
logger.warning("Rate limit (per second) reached")
return False
# Check requests per minute
minute_ago = now - 60
recent_by_minute = [t for t in self.request_times if t > minute_ago]
if len(recent_by_minute) >= self.config.requests_per_minute:
logger.warning("Rate limit (per minute) reached")
return False
# Check token limit
current_tokens = self.tokens_by_minute.get(current_minute, 0)
if current_tokens + estimated_tokens > self.config.tokens_per_minute:
logger.warning(f"Token limit reached: {current_tokens + estimated_tokens}")
return False
# All checks passed - record this request
self.request_times.append(now)
self.tokens_by_minute[current_minute] = current_tokens + estimated_tokens
return True
def _cleanup_old_entries(self, now: float):
"""Entfernt alte Entries"""
# Remove request times older than 1 minute
while self.request_times and self.request_times[0] < now - 60:
self.request_times.popleft()
# Remove token entries older than 5 minutes
current_minute = datetime.now().strftime("%Y-%m-%d %H:%M")
keys_to_remove = [k for k in self.tokens_by_minute
if k < current_minute]
for k in keys_to_remove:
del self.tokens_by_minute[k]
async def wait_if_needed(self):
"""Wartet wenn Rate-Limit fast erreicht"""
async with self._lock:
if not self.request_times:
return
now = time.time()
second_ago = now - 1.0
# Find oldest request in last second
oldest_in_window = None
for t in reversed(self.request_times):
if t > second_ago:
oldest_in_window = t
else:
break
if oldest_in_window:
wait_time = 1.0 - (now - oldest_in_window)
if wait_time > 0:
logger.info(f"Rate limiting: waiting {wait_time:.2f}s")
await asyncio.sleep(wait_time)
@dataclass
class QueuedRequest:
"""Request in der Warteschlange"""
id: str
messages: List[Dict]
model: str
priority: int = 5 # 1-10, höher = dringender
created_at: float = field(default_factory=time.time)
estimated_tokens: int = 0
callback: Optional[Callable] = None
def __lt__(self, other):
# Höhere Priorität zuerst, dann früheres created_at
if self.priority != other.priority:
return self.priority > other.priority
return self.created_at < other.created_at
class AsyncReActClient:
"""
Asynchroner ReAct-Client mit:
- Request-Queueing mit Priorisierung
- Automatischem Rate-Limit-Handling
- Connection Pooling
- Batch-Processing
"""
def __init__(
self,
api_key: str,
base_url: str = "https://api.holysheep.ai/v1",
rate_config: Optional[RateLimitConfig] = None,
max_queue_size: int = 1000
):
self.api_key = api_key
self.base_url = base_url
self.rate_config = rate_config or RateLimitConfig()
self.rate_tracker = RateLimitTracker(self.rate_config)
self.max_queue_size = max_queue_size
# HTTP Client mit Connection Pooling
self._client: Optional[httpx.AsyncClient] = None
# Request Queue
self._queue: asyncio.PriorityQueue = asyncio.PriorityQueue(maxsize=max_queue_size)
self._request_counter = 0
self._results: Dict[str, Any] = {}
# Stats
self._stats = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"retried_requests": 0
}
async def __aenter__(self):
self._client = httpx.AsyncClient(
timeout=120.0,
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._client:
await self._client.aclose()
async def chat_completion_async(
self,
messages: List[Dict],
model: str = "gpt-4.1",
priority: int = 5,
estimated_tokens: int = 2000,
retry_count: int = 0
) -> Dict:
"""
Asynchroner API-Call mit automatischem Retry und Rate-Limit-Handling
"""
self._stats["total_requests"] += 1
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": messages,
"max_tokens": 4096,
"temperature": 0.7
}
try:
# Rate-Limit Check
await self.rate_tracker.wait_if_needed()
# API Call
response = await self._client.post(
f"{self.base_url}/chat/completions",
json=payload,
headers=headers
)
if response.status_code == 429:
# Rate Limited - Retry mit Backoff
logger.warning(f"Rate limited, retry {retry_count + 1}")
self._stats["retried_requests"] += 1
return await self._retry_with_backoff(
messages, model, priority, estimated_tokens, retry_count
)
response.raise_for_status()
self._stats["successful_requests"] += 1
result = response.json()
return {
"content": result["choices"][0]["message"]["content"],
"usage": result.get("usage", {}),
"model": result.get("model", model),
"success": True
}
except httpx.HTTPStatusError as e:
if e.response.status_code == 429 or e.response.status_code >= 500:
self._stats["retried_requests"] += 1
return await self._retry_with_backoff(
messages, model, priority, estimated_tokens, retry_count
)
self._stats["failed_requests"] += 1
logger.error(f"HTTP Error: {e.response.status_code} - {e}")
return {"error": str(e), "success": False, "status_code": e.response.status_code}
except Exception as e:
self._stats["failed_requests"] += 1
logger.error(f"Unexpected error: {e}")
return {"error": str(e), "success": False}
async def _retry_with_backoff(
self,
messages: List[Dict],
model: str,
priority: int,
estimated_tokens: int,
retry_count: int
) -> Dict:
"""Exponential Backoff Retry"""
if retry_count >= self.rate_config.max_retries:
return {
"error": f"Max retries ({self.rate_config.max_retries}) exceeded",
"success": False
}
# Calculate delay with jitter
delay = min(
self.rate_config.base_delay * (self.rate_config.exponential_base ** retry_count),
self.rate_config.max_delay
)
jitter = delay * 0.1 * (time.time() % 1) # 10% jitter
logger.info(f"Retry {retry_count + 1}: waiting {delay + jitter:.2f}s")
await asyncio.sleep(delay + jitter)
return await self.chat_completion_async(
messages, model, priority, estimated_tokens, retry_count + 1
)
async def batch_process(
self,
requests: List[Dict],
concurrency: int = 3,
progress_callback: Optional[Callable] = None
) -> List[Dict]:
"""
Verarbeitet mehrere Requests parallel mit Concurrency-Control
"""
semaphore = asyncio.Semaphore(concurrency)
results = []
async def process_with_semaphore(req_id: str, messages: List[Dict], model: str):
async with semaphore:
result = await self.chat_completion_async(
messages=messages,
model=model,
priority=req.get("priority", 5)
)
result["request_id"] = req_id
return result
tasks = [
process_with_semaphore(
req_id=f"req_{i}",
messages=req["messages"],
model=req.get("model", "gpt-4.1")
)
for i, req in enumerate(requests)
]
for i, coro in enumerate(asyncio.as_completed(tasks)):
result = await coro
results.append(result)
if progress_callback:
progress_callback(i + 1, len(requests), result)
return sorted(results, key=lambda x: x.get("request_id", ""))
def get_stats(self) -> Dict:
"""Gibt Statistiken zurück"""
return {
**self._stats,
"success_rate": (
self._stats["successful_requests"] / max(1, self._stats["total_requests"])
) * 100
}
class ReActBatchProcessor:
"""
Spezialisierter Batch-Processor für ReAct-Workflows
Nutzt Queue und Priorisierung für optimale Throughput
"""
def __init__(self, client: AsyncReActClient):
self.client = client
self.workflow_queue: List[Dict] = []
async def process_workflow_batch(
self,
workflows: List[Dict],
max_iterations_per_workflow: int = 10,
early_stopping: bool = True
) -> List[Dict]:
"""
Verarbeitet mehrere ReAct-Workflows effizient
Args:
workflows: Liste von Workflow-Dicts mit 'task' und optional 'context'
max_iterations_per_workflow: Max Iterationen pro Workflow
early_stopping: Stoppe wenn Workflow konvergiert
"""
results = []
for i, workflow in enumerate(workflows):
logger.info(f"Processing workflow {i + 1}/{len(workflows)}")
result = await self._process_single_workflow(
task=workflow["task"],
context=workflow.get("context", {}),
max_iterations=max_iterations_per_workflow,
early_stopping=early_stopping
)
results.append(result)
# Progress Logging
tokens_used = result.get("total_tokens", 0)
logger.info(
f"Completed: {workflow['task'][:50]}... | "
f"Iterationen: {result.get('iterations', 0)} | "
f"Tokens: {tokens_used:,}"
)
return results
async def _process_single_workflow(
self,
task: str,
context: Dict,
max_iterations: int,
early_stopping: bool
) -> Dict:
"""Verarbeitet einen einzelnen Workflow"""
system_prompt = """Du bist ein effizienter ReAct-Agent.
Antworte im Format:
Thought: [kurze Überlegung]
Action: [Aktion als JSON]
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Aufgabe: {task}\nKontext: {context}"}
]
history = []
total_tokens = 0
for iteration in range(max_iterations):
response = await self.client.chat_completion_async(
messages=messages,
model="gpt-4.1",
priority=7
)
if not response.get("success"):
return {
"status": "error",
"error": response.get("error"),
"iterations": iteration
}
total_tokens += response["usage"].get("total_tokens", 0)
content = response["content"]
# Parse Response
parsed = self._parse_response(content)
history.append({"iteration": iteration, "response": parsed})
# Update Messages
messages.append({"role": "assistant", "content": content})
messages.append({"role": "user", "content": f"Observation: Task verarbeitet"})
# Early Stopping
if early_stopping and parsed.get("is_final"):
return {
"status": "success",
"result": parsed.get("result"),
"iterations": iteration + 1,
"total_tokens": total_tokens
}
return {
"status": "max_iterations",
"iterations": max_iter