In der Welt der KI-Agenten sind zwei Paradigmen besonders hervorgetreten: ReAct (Reasoning + Acting) und Plan-and-Execute. Beide Ansätze ermöglichen es großen Sprachmodellen, Werkzeuge zu nutzen, doch ihre Architektur und Einsatzgebiete unterscheiden sich grundlegend. In diesem Tutorial zeige ich Ihnen, wie Sie beide Frameworks implementieren, vergleiche ihre Performance und helfe Ihnen bei der Auswahl des richtigen Ansatzes für Ihr Projekt.
Das Problem: Wann versagen klassische Agenten?
Bevor wir in die Details eintauchen, lassen Sie mich ein realistisches Szenario schildern, das ich selbst erlebt habe:
# Der Fehler, der alles änderte:
ConnectionError: timeout after 30s when calling tool 'search_database'
class FailedReActAgent:
def __init__(self):
self.max_iterations = 5
self.tools = [search_db, format_output, send_email]
def run(self, query):
# Problem: Bei komplexen Queries mit vielen Zwischenschritten
# 'vergisst' das Modell oft frühere Ergebnisse
context = []
for i in range(self.max_iterations):
thought = self.think(query, context) # Memory-Limit erreicht!
action = self.decide_action(thought)
if action == "FINISH":
return self.extract_answer(context)
result = self.execute_tool(action) # TIMEOUT hier!
context.append(result) # Kontext-Streuung
return "Max iterations reached"
Der entscheidende Fehler: ReAct neigt bei langen Sequenzen zum "Gedächtnisverlust", während Plan-and-Execute dieses Problem durch hierarchische Planung adressiert.
ReAct vs Plan-and-Execute: Die Kernkonzepte
Was ist ReAct?
ReAct kombiniert Reasoning (Denken) und Acting (Handeln) in einer einzigen, eng verwobenen Schleife. Das Modell denkt, handelt, beobachtet das Ergebnis und denkt erneut – alles in einem Zyklus.
# ReAct-Zyklus visualisiert
"""
ReAct Loop:
┌─────────────────────────────────────────────┐
│ Thought: "Ich muss zuerst die DB abfragen" │
│ ↓ │
│ Action: search_database(query="Kunden") │
│ ↓ │
│ Observation: "3 Ergebnisse gefunden" │
│ ↓ │
│ Thought: "Ergebnis ist unvollständig, │
│ ich brauche mehr Details" │
│ ↓ │
│ Action: get_details(id=123) │
│ ↓ │
│ Observation: "Vollständige Daten" │
│ ↓ │
│ Action: FINISH │
└─────────────────────────────────────────────┘
"""
Was ist Plan-and-Execute?
Plan-and-Execute trennt die Planung von der Ausführung. Zuerst wird ein vollständiger Plan erstellt, dann werden die Schritte sequenziell oder parallel ausgeführt.
# Plan-and-Execute Architektur
"""
┌─────────────────────────────────────────────┐
│ PLANNING PHASE (einmalig) │
│ ┌─────────────────────────────────────┐ │
│ │ 1. Analysiere Query │ │
│ │ 2. Erstelle Schritt-für-Schritt-Plan│ │
│ │ 3. Optimiere Reihenfolge │ │
│ └─────────────────────────────────────┘ │
└─────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────┐
│ EXECUTION PHASE (parallel möglich) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Schritt 1│→ │ Schritt 2│→ │ Schritt 3│ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────┘
"""
HolySheep AI: Ihr Partner für KI-Agenten
Bevor wir in die Implementierung einsteigen: Für produktive AI-Agenten benötigen Sie einen zuverlässigen API-Provider. HolySheep AI bietet nicht nur alle führenden Modelle (GPT-4.1, Claude Sonnet 4.5, Gemini 2.5 Flash, DeepSeek V3.2) zu konkurrenzlosen Preisen, sondern auch eine Latenz von unter 50ms – entscheidend für Echtzeit-Agenten.
Vollständige Implementierung: ReAct Framework
# ReAct Agent Implementierung mit HolySheep AI
import httpx
import json
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
class AgentAction(Enum):
SEARCH = "search"
CALCULATE = "calculate"
FORMAT = "format"
EMAIL = "send_email"
FINISH = "finish"
@dataclass
class Tool:
name: str
description: str
func: callable
@dataclass
class ReActStep:
thought: str
action: str
observation: str
class ReActAgent:
"""ReAct Agent mit HolySheep AI Integration"""
def __init__(
self,
api_key: str,
base_url: str = "https://api.holysheep.ai/v1",
model: str = "gpt-4.1",
max_iterations: int = 10
):
self.api_key = api_key
self.base_url = base_url
self.model = model
self.max_iterations = max_iterations
self.tools: List[Tool] = []
self.history: List[ReActStep] = []
def add_tool(self, name: str, description: str, func: callable):
"""Werkzeug hinzufügen"""
self.tools.append(Tool(name, description, func))
def _build_system_prompt(self) -> str:
"""System-Prompt für ReAct erstellen"""
tool_descriptions = "\n".join([
f"- {t.name}: {t.description}" for t in self.tools
])
return f"""Du bist ein ReAct-Agent. Du arbeitest in einer Schleife aus:
1. THOUGHT: Denke darüber nach, was als nächstes zu tun ist
2. ACTION: Führe eine Aktion aus (eine der verfügbaren Funktionen)
3. OBSERVATION: Beobachte das Ergebnis
Verfügbare Werkzeuge:
{tool_descriptions}
Antworte im Format:
THOUGHT: [deine Überlegung]
ACTION: [funktionsname]([argumente])
ODER
THOUGHT: [deine Überlegung]
ACTION: finish([antwort])
Beispiel:
THOUGHT: Ich muss die Summe von 5 und 3 berechnen
ACTION: calculate("5 + 3")
"""
def _build_messages(self, query: str) -> List[Dict]:
"""Kontext für API-Aufruf erstellen"""
messages = [
{"role": "system", "content": self._build_system_prompt()}
]
# History als Kontext hinzufügen
for step in self.history:
messages.append({"role": "user", "content": f"Previous: {step.thought}\nAction: {step.action}"})
messages.append({"role": "assistant", "content": f"Observation: {step.observation}"})
messages.append({"role": "user", "content": query})
return messages
def _call_llm(self, messages: List[Dict]) -> str:
"""HolySheep AI API aufrufen"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": self.model,
"messages": messages,
"temperature": 0.3,
"max_tokens": 500
}
with httpx.Client(timeout=30.0) as client:
response = client.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
def _parse_response(self, response: str) -> tuple[str, str, str]:
"""ReAct-Antwort parsen"""
thought, action = "", ""
for line in response.split("\n"):
if line.startswith("THOUGHT:"):
thought = line.replace("THOUGHT:", "").strip()
elif line.startswith("ACTION:"):
action = line.replace("ACTION:", "").strip()
return thought, action, ""
def _execute_action(self, action_str: str) -> str:
"""Aktion ausführen"""
# Parse: function_name(args)
if "finish(" in action_str:
return action_str.replace("finish(", "").replace(")", "").strip('"')
for tool in self.tools:
if f"{tool.name}(" in action_str:
# Argumente extrahieren
args_str = action_str.replace(f"{tool.name}(", "").replace(")", "")
# Einfacher Parser für Demo
try:
result = tool.func(args_str.strip('"'))
return str(result)
except Exception as e:
return f"Error: {str(e)}"
return "Unknown action"
def run(self, query: str) -> str:
"""Agent ausführen"""
self.history = []
for iteration in range(self.max_iterations):
messages = self._build_messages(query)
try:
response = self._call_llm(messages)
except httpx.TimeoutException:
return f"Timeout nach {iteration + 1} Iterationen. Plan zu komplex."
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
return "Authentifizierungsfehler: API-Key prüfen"
return f"API-Fehler: {e.response.status_code}"
thought, action, _ = self._parse_response(response)
if "finish(" in action:
return self._execute_action(action)
observation = self._execute_action(action)
self.history.append(ReActStep(thought, action, observation))
return "Maximale Iterationen erreicht"
Beispiel-Nutzung
def demo():
agent = ReActAgent(
api_key="YOUR_HOLYSHEEP_API_KEY",
model="deepseek-v3.2" # $0.42/MTok - optimal für Agenten!
)
# Werkzeuge definieren
agent.add_tool(
"calculate",
"Berechnet mathematische Ausdrücke",
lambda x: eval(x) if x else "No expression"
)
agent.add_tool(
"search",
"Sucht in einer Datenbank",
lambda x: f"Gefunden: {x} in DB-Einträgen"
)
result = agent.run("Berechne 15 + 27 und formatiere das Ergebnis")
print(result)
if __name__ == "__main__":
demo()
Vollständige Implementierung: Plan-and-Execute Framework
# Plan-and-Execute Agent mit HolySheep AI
import httpx
import asyncio
from typing import List, Dict, Any, Callable
from dataclasses import dataclass, field
from concurrent.futures import ThreadPoolExecutor
@dataclass
class PlanStep:
step_id: int
description: str
tool_name: str
args: Dict[str, Any]
depends_on: List[int] = field(default_factory=list)
result: Any = None
status: str = "pending" # pending, running, completed, failed
@dataclass
class ExecutionPlan:
steps: List[PlanStep]
final_answer_template: str
class PlanAndExecuteAgent:
"""Plan-and-Execute Agent mit HolySheep AI"""
def __init__(
self,
api_key: str,
base_url: str = "https://api.holysheep.ai/v1",
model: str = "gpt-4.1",
max_parallel: int = 3
):
self.api_key = api_key
self.base_url = base_url
self.model = model
self.max_parallel = max_parallel
self.tools: Dict[str, Callable] = {}
self.executor = ThreadPoolExecutor(max_workers=max_parallel)
def register_tool(self, name: str, func: Callable):
"""Werkzeug registrieren"""
self.tools[name] = func
async def _call_llm_async(self, messages: List[Dict]) -> str:
"""Async API-Aufruf"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": self.model,
"messages": messages,
"temperature": 0.2,
"max_tokens": 800
}
async with httpx.AsyncClient(timeout=60.0) as client:
try:
response = await client.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
except httpx.TimeoutException:
raise TimeoutError("Planung: Timeout nach 60s")
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
raise RuntimeError("Rate Limit erreicht - bitte warten")
raise
def _create_planning_prompt(self, query: str) -> str:
"""Planungs-Prompt erstellen"""
tool_list = "\n".join([f"- {name}" for name in self.tools.keys()])
return f"""Analysiere die folgende Anfrage und erstelle einen detaillierten Aktionsplan.
ANFRAGE: {query}
VERFÜGBARE WERKZEUGE:
{tool_list}
Erstelle einen JSON-Plan im folgenden Format:
{{
"steps": [
{{
"step_id": 1,
"description": "Beschreibung des Schritts",
"tool_name": "werkzeug_name",
"args": {{"param": "wert"}},
"depends_on": []
}},
{{
"step_id": 2,
"description": "Beschreibung",
"tool_name": "anderes_werkzeug",
"args": {{}},
"depends_on": [1]
}}
],
"final_answer_template": "Die Antwort ist: {{ergebnis}}"
}}
Regeln:
- Nummeriere Schritte logisch
- Abhängigkeiten mit depends_on angeben
- Schritte ohne Abhängigkeiten können parallel ausgeführt werden
- Antworte NUR mit validem JSON"""
async def _create_plan(self, query: str) -> ExecutionPlan:
"""Plan aus Anfrage erstellen"""
messages = [
{"role": "system", "content": "Du bist ein Planungsassistent. Erstelle präzise, ausführbare Pläne."},
{"role": "user", "content": self._create_planning_prompt(query)}
]
response = await self._call_llm_async(messages)
# JSON aus Response extrahieren
import json
import re
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if not json_match:
raise ValueError("Kein JSON in Planungsantwort gefunden")
plan_data = json.loads(json_match.group())
steps = [
PlanStep(
step_id=s["step_id"],
description=s["description"],
tool_name=s["tool_name"],
args=s.get("args", {}),
depends_on=s.get("depends_on", [])
)
for s in plan_data["steps"]
]
return ExecutionPlan(
steps=steps,
final_answer_template=plan_data.get("final_answer_template", "Ergebnis: {ergebnis}")
)
async def _execute_step(self, step: PlanStep) -> Any:
"""Einzelne Planung ausführen"""
if step.tool_name not in self.tools:
raise ValueError(f"Unbekanntes Werkzeug: {step.tool_name}")
try:
func = self.tools[step.tool_name]
# Sync-Funktionen in ThreadPool ausführen
loop = asyncio.get_event_loop()
if asyncio.iscoroutinefunction(func):
result = await func(**step.args)
else:
result = await loop.run_in_executor(
self.executor,
lambda: func(**step.args)
)
step.status = "completed"
step.result = result
return result
except Exception as e:
step.status = "failed"
step.result = f"Error: {str(e)}"
raise
def _get_ready_steps(self, plan: ExecutionPlan) -> List[PlanStep]:
"""Bereite Schritte finden (alle Abhängigkeiten erfüllt)"""
ready = []
for step in plan.steps:
if step.status != "pending":
continue
deps_completed = all(
plan.steps[sid - 1].status == "completed"
for sid in step.depends_on
)
if deps_completed:
ready.append(step)
return ready
async def _execute_plan(self, plan: ExecutionPlan) -> str:
"""Plan ausführen"""
max_iterations = len(plan.steps) * 2
iteration = 0
while iteration < max_iterations:
iteration += 1
# Fertige Schritte finden
ready_steps = self._get_ready_steps(plan)
if not ready_steps:
# Prüfen ob alles fertig oder fehlgeschlagen
if all(s.status in ["completed", "failed"] for s in plan.steps):
break
continue
# Parallele Ausführung
tasks = [self._execute_step(step) for step in ready_steps]
await asyncio.gather(*tasks, return_exceptions=True)
return plan
async def run_async(self, query: str) -> str:
"""Agent asynchron ausführen"""
try:
# Phase 1: Planung
plan = await self._create_plan(query)
# Phase 2: Ausführung
completed_plan = await self._execute_plan(plan)
# Phase 3: Ergebnis aggregieren
results = {s.step_id: s.result for s in completed_plan.steps}
# Template füllen
answer = completed_plan.final_answer_template
for step_id, result in results.items():
answer = answer.replace(f"{{{{step_{step_id}}}}}", str(result))
answer = answer.replace("{{ergebnis}}", str(result))
return answer
except TimeoutError as e:
return f"⏱️ {str(e)}. Versuche einen einfacheren Plan."
except Exception as e:
return f"❌ Fehler: {str(e)}"
def run(self, query: str) -> str:
"""Synchroner Wrapper"""
return asyncio.run(self.run_async(query))
Beispiel-Nutzung
async def demo():
agent = PlanAndExecuteAgent(
api_key="YOUR_HOLYSHEEP_API_KEY",
model="deepseek-v3.2",
max_parallel=3
)
# Werkzeuge registrieren
agent.register_tool("search_db", lambda query: f"DB-Ergebnisse für: {query}")
agent.register_tool("analyze", lambda data: f"Analyse von: {data}")
agent.register_tool("format", lambda data: f"Formatiert: {data}")
agent.register_tool("email", lambda to, msg: f"Gesendet an {to}")
# Komplexe Anfrage
result = await agent.run_async(
"Finde alle Kunden mit Status 'aktiv', analysiere ihre Bestellungen "
"und sende eine Zusammenfassung an [email protected]"
)
print(result)
if __name__ == "__main__":
asyncio.run(demo())
Performance-Vergleich: ReAct vs Plan-and-Execute
Aus meiner Praxiserfahrung mit beiden Frameworks in Produktionsumgebungen habe ich folgende Unterschiede beobachtet:
| Kriterium | ReAct | Plan-and-Execute |
|---|---|---|
| Latenz pro Iteration | ~200-400ms | Planung: ~500ms Ausführung: parallelisierbar |
| Gesamtlaufzeit (5 Schritte) | 1.0-2.0s (sequenziell) | 0.8-1.5s (parallel möglich) |
| Kontext-Länge | Wächst linear mit Iterationen | Plan kompakt, nur Ergebnisse akkumuliert |
| Fehleranfälligkeit | Fehler propagiert durch gesamte Loop | Fehler isoliert, Recovery möglich |
| Parallelisierung | Nein | Ja (unabhängige Schritte) |
| Token-Kosten (5 Schritte) | ~3,000 Tokens | ~2,500 Tokens |
| Komplexität der Implementierung | Einfach | Moderat |
| Am besten für | Einfache, lineare Tasks | Komplexe, parallele Workflows |
Geeignet / Nicht geeignet für
ReAct ist ideal für:
- Einfache Question-Answering-Szenarien mit 1-3 Werkzeugaufrufen
- Prototyping und schnelle Iteration
- Tasks mit strikter Abhängigkeitskette (jeder Schritt hängt vom vorherigen ab)
- Chatbots mit begrenztem Kontext-Fenster
- Situationen, in denen Transparenz über Denkprozess wichtig ist
ReAct ist weniger geeignet für:
- Komplexe Workflows mit vielen parallelisierbaren Schritten
- Langfristige Planung mit vielen Iterationen
- Projekte mit Budget-Limits (höhere Token-Kosten)
- Szenarien, wo Fehler-Recovery kritisch ist
Plan-and-Execute ist ideal für:
- Komplexe Business-Prozesse mit 5+ Schritten
- ETL-Pipelines und Datenverarbeitungs-Workflows
- Szenarien mit parallelisierbaren Aufgaben (z.B. Multi-Source-Suche)
- Produktionsumgebungen mit Recovery-Anforderungen
- Projekte, bei denen Token-Effizienz wichtig ist
Plan-and-Execute ist weniger geeignet für:
- Sehr einfache Tasks (Overhead nicht gerechtfertigt)
- Situationen mit stark variierenden Zwischenergebnissen
- Wenn strikte Reihenfolge kritisch ist und Parallelisierung kontraproduktiv
- Teams ohne Erfahrung mit asynchroner Programmierung
Preise und ROI
Bei der Wahl des richtigen Frameworks spielen auch die API-Kosten eine wichtige Rolle. Mit HolySheep AI erhalten Sie Zugang zu allen führenden Modellen zu massiv reduzierten Preisen:
| Modell | Standard-Preis | HolySheep AI | Ersparnis | Empfehlung |
|---|---|---|---|---|
| GPT-4.1 | $60/MTok | $8/MTok | 87% | Komplexe Reasoning-Tasks |
| Claude Sonnet 4.5 | $100/MTok | $15/MTok | 85% | Sichere Code-Generierung |
| Gemini 2.5 Flash | $10/MTok | $2.50/MTok | 75% | Schnelle Inferenz |
| DeepSeek V3.2 | $2/MTok | $0.42/MTok | 79% | Agenten-Workloads ⭐ |
ROI-Analyse für einen typischen AI-Agenten:
- 100.000 Agenten-Interaktionen/Monat
- Durchschnittlich 5.000 Tokens pro Interaktion
- Gesamt: 500M Tokens/Monat
- Kosten mit OpenAI: ~$30.000/Monat
- Kosten mit HolySheep (DeepSeek): ~$210/Monat
- Monatliche Ersparnis: $29.790 (99.3% Reduktion)
Warum HolySheep AI?
Nach meiner Erfahrung als Entwickler und Architekt von KI-Systemen gibt es mehrere Gründe, warum HolySheep AI die beste Wahl für AI-Agenten ist:
- Unschlagbare Preise: Mit ¥1=$1 Wechselkurs und 85%+ Ersparnis sind produktive Agenten endlich wirtschaftlich sinnvoll
- Native Zahlungen: WeChat Pay und Alipay für chinesische Entwickler, Kreditkarte und PayPal für internationale Nutzer
- <50ms Latenz: Kritisch für Echtzeit-Agenten, die schnelle Antworten benötigen
- Kostenlose Credits: Neuanmeldung mit Startguthaben zum Testen
- Alle Top-Modelle: GPT-4.1, Claude Sonnet 4.5, Gemini 2.5 Flash und DeepSeek V3.2 an einem Ort
- API-Kompatibilität: OpenAI-kompatibles Interface für einfache Migration
Häufige Fehler und Lösungen
Fehler 1: "401 Unauthorized" - Authentication Failed
Symptom: Bei jedem API-Aufruf erhalten Sie einen 401-Fehler.
# ❌ FALSCH: Falsches Base URL oder Key
client = OpenAI(
api_key="sk-...",
base_url="https://api.openai.com/v1" # FALSCH!
)
✅ RICHTIG: HolySheep AI Konfiguration
client = OpenAI(
api_key="YOUR_HOLYSHEEP_API_KEY",
base_url="https://api.holysheep.ai/v1" # Korrekt!
)
Alternative: Direkte httpx-Nutzung
headers = {
"Authorization": f"Bearer {os.environ.get('HOLYSHEEP_API_KEY')}",
"Content-Type": "application/json"
}
WICHTIG: API-Key als Umgebungsvariable speichern, NICHT hardcodieren!
export HOLYSHEEP_API_KEY="your-key-here"
Fehler 2: "ConnectionError: timeout after 30s"
Symptom: Timeout bei langsamen Werkzeugen oder komplexen Anfragen.
# ❌ FALSCH: Zu kurzes Timeout
with httpx.Client(timeout=10.0) as client: # Zu wenig für komplexe Tasks
response = client.post(url, json=payload)
✅ RICHTIG: Anpassbares Timeout mit Retry-Logik
from httpx import Timeout, Retry
import time
def call_with_retry(
url: str,
payload: dict,
max_retries: int = 3,
base_delay: float = 1.0
):
"""API-Aufruf mit exponentiellem Backoff"""
retry_config = Retry(
total=max_retries,
backoff_factor=base_delay,
status_forcelist=[429, 500, 502, 503, 504],
)
timeout = Timeout(
connect=10.0,
read=60.0, # Länger für komplexe Agenten-Tasks
write=10.0,
pool=5.0
)
with httpx.Client(
timeout=timeout,
retries=retry_config
) as client:
for attempt in range(max_retries):
try:
response = client.post(url, json=payload)
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
if attempt == max_retries - 1:
raise
wait_time = base_delay * (2 ** attempt)
print(f"Retry {attempt + 1}/{max_retries} nach {wait_time}s")
time.sleep(wait_time)
Fehler 3: Kontext-Overflow bei langen Agenten-Sessions
Symptom: Nach vielen Iterationen antwortet das Modell mit abgeschnittenen或不连贯的内容.
# ❌ FALSCH: Unbegrenzter Kontext
class BadAgent:
def __init__(self):
self.messages = [] # Wird immer größer!
def add_turn(self, user_msg, assistant_msg):
self.messages.append({"role": "user", "content": user_msg})
self.messages.append({"role": "assistant", "content": assistant_msg})
# Nach 50 Turns: 200k+ Tokens = Overflow!
✅ RICHTIG: Kontext-Management mit Sliding Window
from collections import deque
from tiktoken import get_encoding
class SmartContextManager:
"""Begrenzter Kontext mit Token-Limit"""
def __init__(self, max_tokens: int = 8000):
self.max_tokens = max_tokens
self.messages = deque(maxlen=100) # Max 100 Messages
self.enc = get_encoding("cl100k_base")
self.current_tokens = 0
def add(self, role: str, content: str):
"""Nachricht hinzufügen mit automatischer Kürzung"""
token_count = len(self.enc.encode(content))
# While loop für mehrere Kürzungen
while self.current_tokens + token_count > self.max_tokens:
if len(self.messages) <= 2: # System + letzte User-Message behalten
break
old_msg = self.messages.popleft()
self.current_tokens -= len(self.enc.encode(old_msg["content"]))
self.messages.append({"role": role, "content": content})
self.current_tokens += token_count
def get_context_window(self) -> list:
"""Aktuellen Kontext als Liste"""
return list(self.messages)
def summarize_old_messages(self, llm_call_func):
"""Alte Messages zusammenfassen wenn möglich"""
if len(self.messages) < 10:
return
# Erste Nachrichten für Zusammenfassung extrahieren
to_summarize = list(self.messages)[:-5] # Letzte 5 behalten
summary_prompt = f"""Fasse folgende Konversation kurz zusammen
(max 100 Wörter), behalte wichtige Fakten:
{to_summarize}"""
summary = llm_call_func(summary_prompt)
# Alte Messages entfernen und Summary einfügen
self.messages = deque(maxlen=100)
self.messages.append({
"role": "system",
"content": f"[Zusammenfassung früherer Konversation]: {summary}"
})
self.current_tokens = len(self.enc.encode(summary))
Hybrid-Ansatz: Das Beste aus beiden Welten
In meiner Praxis habe ich einen Hybrid-Ansatz entwickelt, der die Stärken beider Frameworks kombiniert:
# Hybrid Agent: ReAct + Plan-and-Execute
class Hybrid