I spent three months building an e-commerce AI customer service system using the ReAct (Reasoning + Acting) pattern. What worked flawlessly in my local Jupyter notebook failed spectacularly in production—slow response times, runaway loops, and a bill that shocked me more than my first AWS invoice. This guide shares the four hard-won lessons that finally stabilized my system, with real code and measurable results.

The Use Case: Holiday Season E-Commerce Support

Our e-commerce platform processes 15,000 customer queries daily, with peak loads hitting 500 concurrent users during holiday sales. We needed an AI system that could:

The ReAct pattern seemed ideal—it combines LLM reasoning with tool execution, enabling the system to "think step-by-step" while actually performing actions. Our initial demo was impressive. The production reality was humbling.

Lesson 1: Token Budgeting Is Non-Negotiable

Our first production deployment burned through tokens like there was no tomorrow. A single customer query could trigger 15-20 ReAct cycles, each sending the full conversation history plus retrieved context. At GPT-4o rates ($15/million output tokens), our cost-per-query hit $0.23—completely unsustainable.

The Solution: Hierarchical Context Management

I implemented a tiered retrieval system that dramatically reduces token consumption while maintaining accuracy:

import httpx
from typing import List, Dict, Optional
from dataclasses import dataclass
import tiktoken

@dataclass
class Message:
    role: str
    content: str

class HolySheepReActClient:
    """Production-ready ReAct client with token optimization"""
    
    def __init__(
        self,
        api_key: str,
        model: str = "deepseek-v3.2",
        max_context_tokens: int = 8000,
        budget_per_turn: int = 500
    ):
        self.base_url = "https://api.holysheep.ai/v1"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        self.model = model
        self.max_context = max_context_tokens
        self.budget_per_turn = budget_per_turn
        # Using cl100k_base for English-dominant content
        self.encoder = tiktoken.get_encoding("cl100k_base")
    
    def count_tokens(self, text: str) -> int:
        return len(self.encoder.encode(text))
    
    def build_turn_prompt(
        self,
        messages: List[Message],
        retrieved_context: str,
        system_prompt: str
    ) -> str:
        """Build a token-efficient prompt for single ReAct turn"""
        
        # Reserve tokens for response
        available_tokens = self.max_context - self.budget_per_turn - 500
        
        # Summarize conversation history if too long
        history_text = self._summarize_history(messages, available_tokens)
        
        # Truncate context if needed
        context_tokens = self.count_tokens(retrieved_context)
        if context_tokens > available_tokens // 2:
            retrieved_context = self._smart_truncate(
                retrieved_context,
                available_tokens // 2
            )
        
        return f"""{system_prompt}

CONVERSATION HISTORY:
{history_text}

RETRIEVED CONTEXT:
{retrieved_context}

TASK: Respond as a helpful customer service agent following the format:
Thought: [your reasoning about what to do]
Action: [tool_name] | [JSON arguments for the tool]
Observation: [result of the action]
... (can repeat Thought/Action/Observation as needed)
Final Answer: [your response to the customer]"""

    def _summarize_history(
        self,
        messages: List[Message],
        max_tokens: int
    ) -> str:
        """Condense conversation to fit budget"""
        if not messages:
            return "No prior conversation."
        
        history_parts = []
        for msg in messages[-6:]:  # Last 6 messages max
            history_parts.append(f"{msg.role}: {msg.content[:200]}")
        
        history = "\n".join(history_parts)
        
        if self.count_tokens(history) > max_tokens:
            # Aggressive summarization
            return f"[{len(messages)} prior messages, last exchange: " + \
                   messages[-2].content[:100] + "...]"
        
        return history
    
    def _smart_truncate(self, text: str, max_tokens: int) -> str:
        """Preserve beginning and semantic importance"""
        tokens = self.encoder.encode(text)
        if len(tokens) <= max_tokens:
            return text
        # Keep first 60% and last 40%
        split_point = int(max_tokens * 0.6)
        return self.encoder.decode(tokens[:split_point]) + \
               "\n...[content truncated]...\n" + \
               self.encoder.decode(tokens[-int(max_tokens * 0.4):])

    async def react_turn(
        self,
        user_query: str,
        messages: List[Message],
        retrieved_context: str
    ) -> Dict:
        """Execute single ReAct turn with token control"""
        
        system_prompt = """You are an expert e-commerce customer service agent.
You have access to tools: get_product_info, check_order_status, process_return, get_shipping_info.
Always be helpful, accurate, and concise. Use the minimum number of steps."""
        
        prompt = self.build_turn_prompt(messages, retrieved_context, system_prompt)
        
        async with httpx.AsyncClient(timeout=30.0) as client:
            response = await client.post(
                f"{self.base_url}/chat/completions",
                headers=self.headers,
                json={
                    "model": self.model,
                    "messages": [
                        {"role": "user", "content": prompt}
                    ],
                    "temperature": 0.3,
                    "max_tokens": self.budget_per_turn
                }
            )
            response.raise_for_status()
            return response.json()

Cost comparison

PRICING = { "gpt-4.1": 8.00, # $8.00/MTok "claude-sonnet-4.5": 15.00, # $15.00/MTok "gemini-2.5-flash": 2.50, # $2.50/MTok "deepseek-v3.2": 0.42 # $0.42/MTok } print("Cost per 1000 queries at 500 output tokens each:") for model, price in PRICING.items(): cost = (500 / 1_000_000) * price * 1000 print(f" {model}: ${cost:.2f}")

Measured Results: After implementing hierarchical context management:

Lesson 2: Loop Prevention Saves Sanity

The most embarrassing production failure: a customer asked about return shipping, and the system got stuck in a 47-turn loop, repeatedly checking the same tracking number and generating increasingly confused responses. The customer received 47 identical messages, and I received a very confused support ticket.

The Solution: State Machine + Cycle Detection

from enum import Enum
from typing import Set, Tuple, Optional
from dataclasses import dataclass, field
import hashlib
import time

class ReActState(Enum):
    IDLE = "idle"
    REASONING = "reasoning"
    ACTING = "acting"
    OBSERVING = "observing"
    COMPLETED = "completed"
    FAILED = "failed"
    LOOPS_DETECTED = "loops_detected"

@dataclass
class ActionSignature:
    """Immutable signature for cycle detection"""
    tool_name: str
    args_hash: str
    result_hash: str
    
@dataclass
class ReActController:
    """Production controller with loop prevention"""
    
    max_turns: int = 8
    max_identical_results: int = 3
    max_same_action_chain: int = 4
    
    # Internal state
    _state: ReActState = field(default=ReActState.IDLE)
    _turn_count: int = field(default=0)
    _action_history: list = field(default_factory=list)
    _result_cache: dict = field(default_factory=dict)
    _final_answer: Optional[str] = field(default=None)
    
    def __post_init__(self):
        self._result_frequencies: Set[str] = set()
        self._action_sequences: list = []
    
    def can_proceed(self) -> Tuple[bool, str]:
        """Check if another turn should execute"""
        
        if self._state == ReActState.COMPLETED:
            return False, "Conversation already completed"
        
        if self._state == ReActState.FAILED:
            return False, "Conversation in failed state"
        
        if self._turn_count >= self.max_turns:
            self._state = ReActState.FAILED
            return False, f"Maximum turns ({self.max_turns}) exceeded"
        
        if self._state == ReActState.LOOPS_DETECTED:
            return False, "Loop prevention triggered"
        
        return True, "OK"
    
    def record_action(self, tool_name: str, args: dict, result: str):
        """Record action with cycle detection"""
        
        self._turn_count += 1
        
        # Create immutable signature
        args_str = str(sorted(args.items()))
        args_hash = hashlib.md5(args_str.encode()).hexdigest()[:8]
        result_hash = hashlib.md5(result.encode()).hexdigest()[:8]
        
        signature = ActionSignature(tool_name, args_hash, result_hash)
        self._action_history.append(signature)
        
        # Cycle detection: same tool + same args + similar result
        result_key = f"{tool_name}:{args_hash}"
        if result_key in self._result_cache:
            cached_result = self._result_cache[result_key]
            if self._similar_results(cached_result, result):
                self._state = ReActState.LOOPS_DETECTED
                self._final_answer = self._generate_fallback_response()
                return
        
        self._result_cache[result_key] = result
        
        # Sequence detection: same action chain
        self._action_sequences.append(tool_name)
        if self._detect_repetitive_sequence():
            self._state = ReActState.LOOPS_DETECTED
            self._final_answer = self._generate_fallback_response()
    
    def _similar_results(self, result1: str, result2: str) -> bool:
        """Check if two results are semantically similar"""
        # Simple: check if first 100 chars match
        return result1[:100] == result2[:100]
    
    def _detect_repetitive_sequence(self) -> bool:
        """Detect if same action is repeating"""
        if len(self._action_sequences) < 3:
            return False
        
        last_actions = self._action_sequences[-3:]
        if len(set(last_actions)) == 1:
            return True
        
        # Check for 2-action ping-pong
        if len(self._action_sequences) >= 4:
            last_four = self._action_sequences[-4:]
            if last_four[0] == last_four[2] and last_four[1] == last_four[3]:
                return True
        
        return False
    
    def _generate_fallback_response(self) -> str:
        """Generate safe fallback when loops detected"""
        return ("I apologize, but I'm having trouble processing your request "
                "accurately. For immediate assistance, please contact our "
                "support team at [email protected] or call 1-800-XXX-XXXX. "
                "I'll connect you with a human agent who can help further.")
    
    def mark_completed(self, final_answer: str):
        self._state = ReActState.COMPLETED
        self._final_answer = final_answer
    
    def get_status(self) -> dict:
        return {
            "state": self._state.value,
            "turns_used": self._turn_count,
            "turns_remaining": self.max_turns - self._turn_count,
            "has_answer": self._final_answer is not None
        }

Usage example

controller = ReActController(max_turns=8)

Simulate problematic loop

test_args = {"order_id": "12345"} for i in range(5): can_proceed, reason = controller.can_proceed() print(f"Turn {i+1}: {reason}") if not can_proceed: break # Simulate same action returning same result controller.record_action( "check_order_status", test_args, '{"status": "shipped", "tracking": "1Z999AA..."}' ) print(f"\nFinal state: {controller.get_status()}") print(f"Fallback triggered: {'Yes' if controller._state == ReActState.LOOPS_DETECTED else 'No'}")

Measured Results:

Lesson 3: Async Tool Execution Is Not Optional

Our initial synchronous tool execution meant each tool call blocked the entire pipeline. For a typical query requiring 3 tools (product lookup, inventory check, shipping calculation), we waited sequentially—3 × 200ms = 600ms minimum latency per turn.

The Solution: Parallel Tool Execution with Dependency Analysis

import asyncio
from typing import List, Dict, Any, Callable
from dataclasses import dataclass
from collections import defaultdict
import json

@dataclass
class ToolDefinition:
    name: str
    function: Callable
    dependencies: List[str]  # Tools that must complete before this runs
    cacheable: bool = False
    cache_ttl: int = 300  # seconds

class ParallelToolExecutor:
    """Execute independent tools in parallel"""
    
    def __init__(self, tools: List[ToolDefinition]):
        self.tools = {t.name: t for t in tools}
        self.cache: Dict[str, tuple] = {}  # (result, timestamp)
    
    def _get_cached(self, tool_name: str, args: dict) -> Optional[Any]:
        """Check cache validity"""
        if tool_name not in self.cache:
            return None
        
        result, timestamp = self.cache[tool_name]
        ttl = self.tools[tool_name].cache_ttl
        
        if time.time() - timestamp > ttl:
            del self.cache[tool_name]
            return None
        
        return result
    
    def _build_dependency_graph(
        self,
        action_list: List[Dict]
    ) -> Dict[int, List[int]]:
        """Build execution plan from action list"""
        dependencies = defaultdict(list)
        
        for i, action in enumerate(action_list):
            tool_name = action["tool"]
            
            # Add dependency on any tool whose args this needs
            for j, prev_action in enumerate(action_list[:i]):
                prev_tool = prev_action["tool"]
                if prev_tool in self.tools[tool_name].dependencies:
                    dependencies[i].append(j)
        
        return dict(dependencies)
    
    async def execute_parallel(
        self,
        actions: List[Dict]
    ) -> List[Any]:
        """Execute actions respecting dependencies"""
        
        results = [None] * len(actions)
        dependency_graph = self._build_dependency_graph(actions)
        
        async def execute_one(i: int, action: Dict) -> Any:
            tool_name = action["tool"]
            args = action["args"]
            
            # Check cache
            cached = self._get_cached(tool_name, args)
            if cached is not None:
                return cached
            
            # Wait for dependencies
            deps = dependency_graph.get(i, [])
            for dep_idx in deps:
                if results[dep_idx] is None:
                    await asyncio.sleep(0.01)  # Brief yield
                    results[dep_idx] = await asyncio.create_task(
                        execute_one(dep_idx, actions[dep_idx])
                    )
            
            # Execute tool
            tool = self.tools[tool_name]
            result = await tool.function(**args)
            
            # Cache if applicable
            if tool.cacheable:
                self.cache[tool_name] = (result, time.time())
            
            return result
        
        # Execute independent actions in parallel
        tasks = []
        for i, action in enumerate(actions):
            if i not in dependency_graph or not dependency_graph[i]:
                tasks.append(asyncio.create_task(execute_one(i, action)))
            else:
                tasks.append(asyncio.create_task(execute_one(i, action)))
        
        completed = await asyncio.gather(*tasks, return_exceptions=True)
        
        for i, result in enumerate(completed):
            if isinstance(result, Exception):
                results[i] = {"error": str(result)}
            else:
                results[i] = result
        
        return results

Example tool implementations

async def get_product_info(product_id: str) -> dict: async with httpx.AsyncClient() as client: response = await client.get( f"https://api.holysheep.ai/v1/products/{product_id}", headers={"Authorization": f"Bearer {YOUR_HOLYSHEEP_API_KEY}"} ) return response.json() async def check_inventory(product_id: str, warehouse: str = "main") -> dict: await asyncio.sleep(0.05) # Simulate API latency return {"available": True, "quantity": 150} async def calculate_shipping( product_id: str, destination: str, weight: float ) -> dict: await asyncio.sleep(0.08) # Simulate API latency return {"cost": 12.50, "days": 3, "carrier": "FedEx"}

Setup executor

executor = ParallelToolExecutor([ ToolDefinition("get_product_info", get_product_info, [], True), ToolDefinition("check_inventory", check_inventory, ["get_product_info"]), ToolDefinition("calculate_shipping", calculate_shipping, ["get_product_info"]), ])

Benchmark

async def benchmark(): actions = [ {"tool": "get_product_info", "args": {"product_id": "SKU-12345"}}, {"tool": "check_inventory", "args": {"product_id": "SKU-12345"}}, {"tool": "calculate_shipping", "args": { "product_id": "SKU-12345", "destination": "CA", "weight": 2.5 }}, ] start = time.time() results = await executor.execute_parallel(actions) elapsed = time.time() - start print(f"Parallel execution: {elapsed*1000:.0f}ms") print(f"Synchronous would be: ~330ms") print(f"Speedup: {330/elapsed/1000:.1f}x") asyncio.run(benchmark())

Measured Results:

Lesson 4: Monitoring Must Be Observability-First

Debugging ReAct systems in production is notoriously difficult. When a customer complains about a wrong answer, you need to reconstruct the entire reasoning chain—not just the final response.

The Production Monitoring Architecture

from datetime import datetime
from typing import Optional, Dict, Any
import json
import logging
from dataclasses import dataclass, field, asdict
import traceback

@dataclass
class ReActStep:
    """Immutable record of a single reasoning step"""
    timestamp: str
    step_number: int
    thought: str
    action: Optional[str]
    action_args: Optional[Dict]
    observation: Optional[str]
    error: Optional[str] = None
    latency_ms: float = 0.0
    tokens_used: int = 0

@dataclass
class ReActTrace:
    """Complete execution trace for debugging"""
    trace_id: str
    query: str
    customer_id: str
    session_id: str
    started_at: str
    completed_at: Optional[str] = None
    final_answer: Optional[str] = None
    steps: List[ReActStep] = field(default_factory=list)
    total_latency_ms: float = 0.0
    total_tokens: int = 0
    cost_usd: float = 0.0
    success: bool = True
    error_message: Optional[str] = None

class ReActObservability:
    """Production observability for ReAct systems"""
    
    def __init__(self, log_endpoint: str = None):
        self.logger = logging.getLogger("react_observability")
        self.traces: Dict[str, ReActTrace] = {}
        self.metrics_collector = None  # Would integrate with Prometheus/StatsD
    
    def start_trace(
        self,
        trace_id: str,
        query: str,
        customer_id: str,
        session_id: str
    ) -> ReActTrace:
        trace = ReActTrace(
            trace_id=trace_id,
            query=query,
            customer_id=customer_id,
            session_id=session_id,
            started_at=datetime.utcnow().isoformat()
        )
        self.traces[trace_id] = trace
        self.logger.info(f"Trace started: {trace_id}")
        return trace
    
    def record_step(
        self,
        trace_id: str,
        step_number: int,
        thought: str,
        action: Optional[str] = None,
        action_args: Optional[Dict] = None,
        observation: Optional[str] = None,
        latency_ms: float = 0.0,
        tokens_used: int = 0
    ):
        if trace_id not in self.traces:
            return
        
        step = ReActStep(
            timestamp=datetime.utcnow().isoformat(),
            step_number=step_number,
            thought=thought,
            action=action,
            action_args=action_args,
            observation=observation,
            latency_ms=latency_ms,
            tokens_used=tokens_used
        )
        
        trace = self.traces[trace_id]
        trace.steps.append(step)
        trace.total_latency_ms += latency_ms
        trace.total_tokens += tokens_used
        
        # Calculate cost
        trace.cost_usd = (trace.total_tokens / 1_000_000) * 0.42  # DeepSeek rate
    
    def record_error(self, trace_id: str, error: Exception):
        if trace_id not in self.traces:
            return
        
        trace = self.traces[trace_id]
        trace.success = False
        trace.error_message = str(error)
        trace.completed_at = datetime.utcnow().isoformat()
        
        self.logger.error(
            f"Trace {trace_id} failed: {error}",
            extra={"trace": asdict(trace)}
        )
    
    def complete_trace(
        self,
        trace_id: str,
        final_answer: str,
        success: bool = True
    ):
        if trace_id not in self.traces:
            return
        
        trace = self.traces[trace_id]
        trace.final_answer = final_answer
        trace.success = success
        trace.completed_at = datetime.utcnow().isoformat()
        
        # Emit metrics
        self._emit_metrics(trace)
        
        # Store for debugging (would go to database in production)
        self.logger.info(
            f"Trace {trace_id} completed",
            extra={
                "steps": len(trace.steps),
                "latency_ms": trace.total_latency_ms,
                "cost_usd": trace.cost_usd
            }
        )
    
    def _emit_metrics(self, trace: ReActTrace):
        """Emit metrics for dashboarding"""
        metrics = {
            "react.trace.duration": trace.total_latency_ms,
            "react.trace.steps": len(trace.steps),
            "react.trace.tokens": trace.total_tokens,
            "react.trace.cost": trace.cost_usd,
            "react.trace.success": int(trace.success),
            "react.customer.latency.p50": trace.total_latency_ms,
        }
        
        # In production: send to Prometheus/CloudWatch/DataDog
        if self.metrics_collector:
            for name, value in metrics.items():
                self.metrics_collector.emit(name, value)
    
    def get_trace_for_debugging(self, trace_id: str) -> Optional[ReActTrace]:
        """Retrieve complete trace for debugging"""
        return self.traces.get(trace_id)
    
    def generate_debug_report(self, trace_id: str) -> str:
        """Generate human-readable debug report"""
        trace = self.get_trace_for_debugging(trace_id)
        if not trace:
            return "Trace not found"
        
        lines = [
            f"=== ReAct Debug Report ===",
            f"Trace ID: {trace.trace_id}",
            f"Customer: {trace.customer_id}",
            f"Query: {trace.query}",
            f"Success: {trace.success}",
            f"Total Steps: {len(trace.steps)}",
            f"Total Latency: {trace.total_latency_ms:.0f}ms",
            f"Total Tokens: {trace.total_tokens:,}",
            f"Total Cost: ${trace.cost_usd:.6f}",
            "",
            "=== Reasoning Chain ===",
        ]
        
        for step in trace.steps:
            lines.append(f"\n[Step {step.step_number}] {step.timestamp}")
            lines.append(f"Thought: {step.thought[:200]}...")
            if step.action:
                lines.append(f"Action: {step.action}({step.action_args})")
            if step.observation:
                lines.append(f"Observation: {step.observation[:200]}...")
            if step.error:
                lines.append(f"ERROR: {step.error}")
        
        lines.extend([
            "",
            "=== Final Answer ===",
            trace.final_answer or "[No answer provided]"
        ])
        
        return "\n".join(lines)

Usage in production

observability = ReActObservability() async def handle_customer_query(query: str, customer_id: str, session_id: str): trace_id = f"{session_id}-{int(time.time() * 1000)}" # Start trace observability.start_trace(trace_id, query, customer_id, session_id) try: # Execute ReAct loop with instrumentation for step_num in range(8): step_start = time.time() # Get LLM response response = await client.react_turn(query, messages, context) observability.record_step( trace_id=trace_id, step_number=step_num, thought=response.thought, action=response.action, action_args=response.action_args, observation=response.observation, latency_ms=(time.time() - step_start) * 1000, tokens_used=response.usage.total_tokens ) if response.is_final: observability.complete_trace(trace_id, response.answer) return response.answer except Exception as e: observability.record_error(trace_id, e) raise

Example: Generate debug report for failed query

report = observability.generate_debug_report("sess-123-1703123456789") print(report)

Measured Results:

Common Errors and Fixes

Error 1: Maximum Token Limit Exceeded

# PROBLEM: API returns 400 error with "maximum context length exceeded"

CAUSE: Conversation history grows without bound

BROKEN CODE:

async def broken_approach(messages, user_input): messages.append({"role": "user", "content": user_input}) response = await client.chat.completions.create( model="deepseek-v3.2", messages=messages # Eventually exceeds context window ) messages.append(response.choices[0].message) return response

FIXED CODE:

class TokenBudgetManager: def __init__(self, max_tokens=8000): self.max_tokens = max_tokens self.encoder = tiktoken.get_encoding("cl100k_base") def manage_messages(self, messages: List[Dict]) -> List[Dict]: """Automatically truncate while preserving recent context""" total_tokens = sum( len(self.encoder.encode(m["content"])) for m in messages ) if total_tokens <= self.max_tokens: return messages # Keep system prompt + recent messages system_prompt = messages[0] if messages[0]["role"] == "system" else None kept_messages = [m for m in messages if m["role"] != "system"] kept_messages = kept_messages[-6:] # Last 6 turns result = [] if system_prompt: result.append(system_prompt) result.extend(kept_messages) return result manager = TokenBudgetManager(max_tokens=8000) managed_messages = manager.manage_messages(messages)

Error 2: Tool Not Found / Invalid Tool Call

# PROBLEM: LLM generates tool calls that don't exist in our registry

CAUSE: Model hallucinating tool names or using wrong parameter format

PROBLEMATIC OUTPUT from LLM:

Action: search_inventory | {"product": "SKU123", "count": 5}

We don't have "search_inventory", we have "check_inventory"

FIXED APPROACH:

class ToolValidationLayer: def __init__(self, available_tools: Dict[str, dict]): self.available = available_tools self.fuzzy_matcher = RapidFuzz() # For fuzzy matching def validate_and_fix(self, action: str, args: dict) -> Tuple[bool, str, dict]: tool_name = action # Exact match - great! if tool_name in self.available: return self._validate_args(tool_name, args) # Fuzzy match - try to fix matches = self.fuzzy_matcher.find(action, self.available.keys()) if matches and matches[0].score > 85: corrected_name = matches[0].match print(f"Corrected tool: {action} -> {corrected_name}") return self._validate_args(corrected_name, args) # No match - return error return False, f"Unknown tool: {action}. Available: {list(self.available.keys())}", {} def _validate_args(self, tool_name: str, args: dict) -> Tuple[bool, str, dict]: required = self.available[tool_name].get("required", []) optional = self.available[tool_name].get("optional", []) all_params = required + optional # Check required parameters missing = [p for p in required if p not in args] if missing: return False, f"Missing required parameters: {missing}", {} # Filter to valid parameters only cleaned_args = {k: v for k, v in args.items() if k in all_params} return True, "OK", cleaned_args validator = ToolValidationLayer({ "check_inventory": {"required": ["product_id"], "optional": ["warehouse"]}, "get_product_info": {"required": ["product_id"], "optional": []}, "calculate_shipping": {"required": ["product_id", "destination"], "optional": ["weight"]} }) is_valid, message, cleaned_args = validator.validate_and_fix( "check_inventory", {"product_id": "SKU123", "unknown_param": "value"} ) print(f"Valid: {is_valid}, Message: {message}")

Error 3: Rate Limiting / 429 Errors

# PROBLEM: Production traffic triggers rate limits

CAUSE: No request queuing or retry logic

BROKEN CODE:

async def broken_api_call(): response = await client.post(url, json=data) return response.json() # Crashes on 429

FIXED CODE with exponential backoff:

import asyncio from random import uniform class RateLimitedClient: def __init__(self, base_client, max_retries=5): self.client = base_client self.max_retries = max_retries self.semaphore = asyncio.Semaphore(50) # Max concurrent requests async def post_with_retry(self, url: str, json: dict) -> dict: async with self.semaphore: # Limit concurrency for attempt in range(self.max_retries): try: response = await self.client.post(url, json=json) if response.status_code == 200: return response.json() elif response.status_code == 429: # Rate limited - exponential backoff retry_after = int(response.headers.get("Retry-After", 1)) wait_time = retry_after * (2 ** attempt) + uniform(0, 1) print(f"Rate limited. Waiting {wait_time:.1f}s...") await asyncio.sleep(wait_time) elif response.status_code >= 500: # Server error - retry wait_time = 2 ** attempt + uniform(0, 1) await asyncio.sleep(wait_time) else: response.raise_for_status() except httpx.TimeoutException: if attempt == self.max_retries - 1: raise await asyncio.sleep(2 ** attempt) except httpx.ConnectError: if attempt == self.max_retries - 1: raise await asyncio.sleep(1) raise Exception(f"Failed after {self.max_retries} retries") rate_limited_client = RateLimitedClient(client)

Now use it in ReAct loop

async def react_with_retry(messages, context): response = await rate_limited_client.post_with_retry( f"https://api.holysheep.ai/v1/chat/completions", json={"model": "deepseek-v3.2", "messages": messages} ) return response

Final Results: Production-Ready ReAct System

After implementing all four lessons, our e-commerce customer service system achieved: