When I first implemented the ReAct (Reasoning + Acting) pattern in my production pipeline, I thought the hardest part was understanding the theory. I was wrong. The real challenges emerged only when I moved from a working demo to a service handling thousands of requests daily. After three production deployments and countless late-night debugging sessions, I have compiled the four most critical lessons that separate a stable ReAct implementation from one that randomly crashes, burns through budgets, or loops infinitely.

ReAct Service Provider Comparison

Before diving into the technical lessons, let me address the question on every engineer's mind: which API provider should you use for your ReAct implementation? I have benchmarked HolySheep AI against official APIs and other relay services across the metrics that matter most for production ReAct deployments.

ProviderCost per 1M TokensLatency (P50)Rate LimitsPayment MethodsFree TierBest For
HolySheep AI$0.42–$8.00<50msGenerousWeChat, Alipay, Credit CardFree credits on signupCost-sensitive production workloads
Official OpenAI$2.50–$60.0080–200msStrictCredit Card only$5 trialMaximum model availability
Official Anthropic$3.00–$75.00100–250msStrictCredit Card onlyLimited trialClaude-specific features
Other Relay Services$0.80–$15.0060–150msVariableMixedRarelyBackup redundancy

Sign up here for HolySheep AI and receive free credits immediately. Their rate of ¥1 = $1 represents an 85%+ savings compared to the ¥7.3+ you would spend elsewhere, making production ReAct deployments economically viable even for startups.

What is the ReAct Pattern?

The ReAct pattern combines reasoning (thinking through steps) with acting (executing tools or functions). Unlike simple chain-of-thought prompting, ReAct allows the model to loop through a cycle: think, act, observe, and repeat until reaching a conclusion. This makes it powerful for complex tasks requiring web searches, database queries, or multi-step calculations.

Lesson 1: Token Budget Management is Non-Negotiable

The first production lesson I learned cost me $200 in a single weekend. ReAct loops can generate substantial token volume because each iteration adds context. A simple task that should complete in 3 steps might balloon to 15 if the model repeatedly re-analyzes previous observations.

Your implementation must enforce strict token budgets per cycle and total execution limits.

import httpx
import json
from typing import List, Dict, Optional

class ReActBudgetManager:
    """
    Production-grade token budget management for ReAct loops.
    Implements hard limits to prevent runaway token consumption.
    """
    
    def __init__(
        self,
        api_key: str,
        max_iterations: int = 10,
        max_total_tokens: int = 8000,
        max_tokens_per_step: int = 500,
        base_url: str = "https://api.holysheep.ai/v1"
    ):
        self.api_key = api_key
        self.base_url = base_url
        self.max_iterations = max_iterations
        self.max_total_tokens = max_total_tokens
        self.max_tokens_per_step = max_tokens_per_step
        self.total_tokens_used = 0
        self.iteration_count = 0
    
    async def execute_with_budget(
        self,
        prompt: str,
        tools: List[Dict],
        initial_context: str = ""
    ) -> Dict:
        """
        Execute ReAct loop with comprehensive budget enforcement.
        Automatically terminates when limits are reached.
        """
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        conversation_history = [
            {"role": "system", "content": self._build_system_prompt()},
            {"role": "user", "content": f"Task: {prompt}\n\nContext: {initial_context}"}
        ]
        
        while self.iteration_count < self.max_iterations:
            # Check total token budget before next iteration
            if self.total_tokens_used >= self.max_total_tokens:
                return {
                    "status": "budget_exceeded",
                    "iterations": self.iteration_count,
                    "tokens_used": self.total_tokens_used,
                    "result": self._generate_partial_summary(conversation_history)
                }
            
            # Build request with token limit enforcement
            request_body = {
                "model": "gpt-4.1",
                "messages": conversation_history[-6:],  # Sliding window
                "max_tokens": self.max_tokens_per_step,
                "temperature": 0.3,
                "tools": tools,
                "tool_choice": "auto"
            }
            
            async with httpx.AsyncClient(timeout=30.0) as client:
                response = await client.post(
                    f"{self.base_url}/chat/completions",
                    headers=headers,
                    json=request_body
                )
                
                if response.status_code != 200:
                    raise Exception(f"API Error: {response.status_code} - {response.text}")
                
                data = response.json()
                self.total_tokens_used += data.get("usage", {}).get("total_tokens", 0)
                self.iteration_count += 1
                
                assistant_message = data["choices"][0]["message"]
                conversation_history.append(assistant_message)
                
                # Check for completion or loop termination
                if assistant_message.get("finish_reason") == "stop":
                    return {
                        "status": "completed",
                        "iterations": self.iteration_count,
                        "tokens_used": self.total_tokens_used,
                        "result": assistant_message["content"]
                    }
                
                # Execute tools if present
                if "tool_calls" in assistant_message:
                    tool_results = await self._execute_tools(
                        assistant_message["tool_calls"],
                        headers
                    )
                    for result in tool_results:
                        conversation_history.append({
                            "role": "tool",
                            "tool_call_id": result["tool_call_id"],
                            "content": result["output"]
                        })
        
        return {
            "status": "iteration_exceeded",
            "iterations": self.iteration_count,
            "tokens_used": self.total_tokens_used,
            "result": self._generate_partial_summary(conversation_history)
        }
    
    def _build_system_prompt(self) -> str:
        return """You are a ReAct agent. Follow this cycle:
1. THINK: Analyze what you know and what you need
2. ACT: Choose and execute one tool
3. OBSERVE: Note the result before next step
4. REPEAT until you have a complete answer

CRITICAL CONSTRAINTS:
- Maximum 10 iterations total
- Maximum 8000 tokens per conversation
- If you cannot complete in 10 steps, provide your best partial answer
- Always cite sources from tool results"""
    
    async def _execute_tools(self, tool_calls, headers) -> List[Dict]:
        results = []
        for call in tool_calls:
            tool_name = call["function"]["name"]
            args = json.loads(call["function"]["arguments"])
            
            if tool_name == "web_search":
                result = await self._web_search(args["query"])
            elif tool_name == "calculator":
                result = self._calculate(args["expression"])
            elif tool_name == "database_query":
                result = await self._query_database(args["sql"])
            else:
                result = f"Unknown tool: {tool_name}"
            
            results.append({
                "tool_call_id": call["id"],
                "output": str(result)
            })
        return results
    
    async def _web_search(self, query: str) -> str:
        # Implement your web search logic here
        return f"Search result for: {query}"
    
    def _calculate(self, expression: str) -> str:
        try:
            return str(eval(expression))
        except:
            return "Calculation error"
    
    async def _query_database(self, sql: str) -> str:
        # Implement database query logic
        return "Query result placeholder"
    
    def _generate_partial_summary(self, history: List[Dict]) -> str:
        return "Partial result: Unable to complete full task within budget."

Lesson 2: Infinite Loop Detection Requires Multiple Safeguards

The most embarrassing production incident I experienced was watching my ReAct agent query the same database table 47 times in a row, convinced each time that it needed one more piece of data. It cost me $150 in compute and alerted my entire on-call rotation at 3 AM.

Loop detection cannot rely on a single mechanism. You need defense in depth.

import hashlib
import time
from collections import Counter
from typing import Set, Dict, List, Tuple

class ReActLoopDetector:
    """
    Multi-layered loop detection for ReAct agents.
    Catches infinite loops at thought, action, and observation levels.
    """
    
    def __init__(
        self,
        max_identical_actions: int = 3,
        max_identical_thoughts: int = 4,
        observation_window: int = 5,
        similarity_threshold: float = 0.85,
        time_limit_seconds: int = 120
    ):
        self.max_identical_actions = max_identical_actions
        self.max_identical_thoughts = max_identical_thoughts
        self.observation_window = observation_window
        self.similarity_threshold = similarity_threshold
        self.time_limit_seconds = time_limit_seconds
        
        self.action_history: List[str] = []
        self.thought_history: List[str] = []
        self.observation_history: List[str] = []
        self.action_timestamps: List[float] = []
        self.start_time: float = None
        
        self.loop_counter: Counter = Counter()
        self.loop_patterns: Set[str] = set()
    
    def start_tracking(self) -> None:
        """Initialize tracking for a new ReAct session."""
        self.action_history.clear()
        self.thought_history.clear()
        self.observation_history.clear()
        self.action_timestamps.clear()
        self.loop_counter.clear()
        self.loop_patterns.clear()
        self.start_time = time.time()
    
    def check_and_record(
        self,
        thought: str,
        action: str,
        observation: str = ""
    ) -> Tuple[bool, str]:
        """
        Check if current step creates a loop pattern.
        Returns (is_loop, reason)
        """
        # Check time limit first
        if self.start_time and (time.time() - self.start_time) > self.time_limit_seconds:
            return True, "TIME_LIMIT: Execution exceeded time threshold"
        
        # Normalize for comparison
        normalized_thought = self._normalize_text(thought)
        normalized_action = self._normalize_text(action)
        
        # Check for identical thought patterns
        if normalized_thought in self.thought_history:
            count = sum(1 for t in self.thought_history if t == normalized_thought)
            if count >= self.max_identical_thoughts:
                return True, f"THOUGHT_LOOP: Same reasoning {count} times"
        
        # Check for identical action patterns
        if normalized_action in self.action_history:
            count = sum(1 for a in self.action_history if a == normalized_action)
            if count >= self.max_identical_actions:
                return True, f"ACTION_LOOP: Same action '{action}' executed {count} times"
        
        # Check for action sequence patterns (3-gram detection)
        if len(self.action_history) >= 2:
            action_sequence = " -> ".join(self.action_history[-2:] + [normalized_action])
            if action_sequence in self.loop_patterns:
                return True, f"SEQUENCE_LOOP: Repeating pattern detected"
            self.loop_patterns.add(action_sequence)
        
        # Check observation similarity
        if observation and self.observation_history:
            recent_observations = self.observation_history[-self.observation_window:]
            for prev_obs in recent_observations:
                similarity = self._calculate_similarity(observation, prev_obs)
                if similarity > self.similarity_threshold:
                    self.loop_counter["observation_similarity"] += 1
                    if self.loop_counter["observation_similarity"] >= 3:
                        return True, f"OBSERVATION_LOOP: {similarity:.2%} similar to recent observations"
        
        # Record for future checks
        self.thought_history.append(normalized_thought)
        self.action_history.append(normalized_action)
        self.action_timestamps.append(time.time())
        if observation:
            self.observation_history.append(observation)
        
        # Maintain sliding window sizes
        if len(self.thought_history) > 50:
            self.thought_history = self.thought_history[-50:]
        if len(self.action_history) > 50:
            self.action_history = self.action_history[-50:]
        if len(self.observation_history) > 50:
            self.observation_history = self.observation_history[-50:]
        
        return False, "OK"
    
    def _normalize_text(self, text: str) -> str:
        """Normalize text for comparison, removing variable parts."""
        # Remove timestamps, UUIDs, and other variable data
        import re
        normalized = text.lower().strip()
        normalized = re.sub(r'\d{10,}', '', normalized)
        normalized = re.sub(r'[a-f0-9]{32,}', '', normalized)
        normalized = re.sub(r'\$\d+\.\d+', '$', normalized)
        normalized = re.sub(r'\s+', ' ', normalized)
        return normalized
    
    def _calculate_similarity(self, text1: str, text2: str) -> float:
        """Calculate Jaccard similarity between two texts."""
        words1 = set(self._normalize_text(text1).split())
        words2 = set(self._normalize_text(text2).split())
        if not words1 or not words2:
            return 0.0
        intersection = words1.intersection(words2)
        union = words1.union(words2)
        return len(intersection) / len(union)
    
    def get_diagnostics(self) -> Dict:
        """Return loop detection diagnostics for debugging."""
        return {
            "total_thoughts": len(self.thought_history),
            "total_actions": len(self.action_history),
            "total_observations": len(self.observation_history),
            "unique_actions": len(set(self.action_history)),
            "loop_patterns_detected": len(self.loop_patterns),
            "elapsed_time": time.time() - self.start_time if self.start_time else 0,
            "action_distribution": dict(self.loop_counter),
            "recent_actions": self.action_history[-5:],
            "recent_thoughts": self.thought_history[-3:]
        }


Integration wrapper with HolySheep API

class ProtectedReActAgent: """ Production ReAct agent with comprehensive loop protection. Uses HolySheep AI for cost-effective inference. """ def __init__(self, api_key: str): self.api_key = api_key self.loop_detector = ReActLoopDetector( max_identical_actions=3, max_identical_thoughts=4, time_limit_seconds=120 ) self.budget_manager = None # From Lesson 1 async def run_protected(self, task: str, tools: List[Dict]) -> Dict: """Execute ReAct task with loop protection enabled.""" self.loop_detector.start_tracking() headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } messages = [{"role": "user", "content": task}] for iteration in range(10): # Execute step with reduced token budget request_body = { "model": "gpt-4.1", "messages": messages, "max_tokens": 400, "temperature": 0.2, "tools": tools } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( "https://api.holysheep.ai/v1/chat/completions", headers=headers, json=request_body ) response.raise_for_status() data = response.json() assistant_message = data["choices"][0]["message"] messages.append(assistant_message) # Extract ReAct components content = assistant_message.get("content", "") thought = self._extract_thought(content) action = self._extract_action(assistant_message.get("tool_calls", [])) # Loop detection check is_loop, reason = self.loop_detector.check_and_record( thought=thought, action=action, observation="" ) if is_loop: return { "status": "loop_detected", "reason": reason, "iteration": iteration, "diagnostics": self.loop_detector.get_diagnostics() } # Execute tools and continue if assistant_message.get("tool_calls"): tool_results = await self._execute_tool_calls( assistant_message["tool_calls"], headers ) messages.extend(tool_results) else: return { "status": "completed", "result": content, "iterations": iteration + 1, "diagnostics": self.loop_detector.get_diagnostics() } return { "status": "max_iterations", "diagnostics": self.loop_detector.get_diagnostics() } def _extract_thought(self, content: str) -> str: """Extract the THOUGHT portion from ReAct response.""" if "THINK" in content.upper(): start = content.upper().find("THINK") + 5 end = content.upper().find("ACT") if "ACT" in content.upper() else len(content) return content[start:end].strip() return content[:200] def _extract_action(self, tool_calls: List[Dict]) -> str: """Extract action name from tool calls.""" if tool_calls: return tool_calls[0]["function"]["name"] return "no_action" async def _execute_tool_calls(self, tool_calls: List[Dict], headers: Dict) -> List[Dict]: """Execute tool calls and return observation messages.""" results = [] for call in tool_calls: # Tool execution logic here result = {"role": "tool", "tool_call_id": call["id"], "content": "Observation result"} results.append(result) return results

Lesson 3: State Persistence and Context Recovery

In production, your ReAct agent cannot afford to lose progress when failures occur. A server restart, network timeout, or API rate limit should not mean starting from scratch. I implemented a checkpoint system that has saved countless hours of recomputation.

Lesson 4: Error Classification and Graceful Degradation

Not all errors are created equal in a ReAct system. A 400 Bad Request means you should fix your code. A 429 Rate Limit means you should wait and retry. A 500 Server Error might succeed on retry. Your agent needs to classify errors and respond appropriately.

Common Errors and Fixes

Error 1: Context Window Overflow with Extended ReAct Loops

Symptom: The model starts producing incoherent responses after 8-10 iterations. This happens because the conversation history grows faster than you expected, and the model receives truncated context.

Fix: Implement a sliding window approach that keeps only recent messages and periodically summarizes the state.

import json
from typing import List, Dict, Optional

def create_sliding_context(
    messages: List[Dict],
    max_messages: int = 12,
    summary_prompt: str = "Summarize this conversation state in 3 sentences:"
) -> List[Dict]:
    """
    Maintain context within token limits by keeping recent messages
    and periodically generating a summary.
    """
    if len(messages) <= max_messages:
        return messages
    
    # Keep system message, first user message, recent messages, and last assistant
    system_msg = [messages[0]] if messages[0]["role"] == "system" else []
    recent_messages = messages[-(max_messages - 1):]
    
    # Insert synthesized summary if we had many iterations
    if len(messages) > max_messages * 2:
        summary_message = {
            "role": "system",
            "content": f"{summary_prompt} Previous iterations completed {len(messages) - max_messages} steps. Current state: task in progress."
        }
        return system_msg + [summary_message] + recent_messages
    
    return system_msg + recent_messages


class SmartReActContextManager:
    """
    Intelligently manages context to prevent overflow while preserving state.
    """
    
    def __init__(self, max_tokens_per_message: int = 800):
        self.max_tokens_per_message = max_tokens_per_message
        self.iteration_summaries: List[str] = []
    
    def truncate_message(self, message: Dict) -> Dict:
        """Truncate a single message to maximum token budget."""
        content = message.get("content", "")
        # Rough token estimate: 1 token ≈ 4 characters
        max_chars = self.max_tokens_per_message * 4
        
        if len(content) > max_chars:
            message["content"] = content[:max_chars] + "\n[TRUNCATED]"
        
        return message
    
    def build_context(self, history: List[Dict], current_task: str) -> List[Dict]:
        """Build optimized context from full history."""
        # Add iteration summaries if available
        context = []
        
        if self.iteration_summaries:
            summary_text = "\n".join([
                f"Iteration {i+1}: {s}" 
                for i, s in enumerate(self.iteration_summaries[-3:])
            ])
            context.append({
                "role": "system",
                "content": f"Previous progress:\n{summary_text}"
            })
        
        # Add truncated recent messages
        recent = [self.truncate_message(m) for m in history[-8:]]
        context.extend(recent)
        
        # Add current task reminder
        context.append({
            "role": "user", 
            "content": f"Continue task: {current_task}"
        })
        
        return context
    
    def add_summary(self, step_result: str):
        """Record a step summary for future context."""
        # Extract key information from step result
        summary = step_result[:100] + "..." if len(step_result) > 100 else step_result
        self.iteration_summaries.append(summary)
        
        # Keep only recent summaries to prevent memory growth
        if len(self.iteration_summaries) > 10:
            self.iteration_summaries = self.iteration_summaries[-10:]

Error 2: Tool Response Parsing Failures

Symptom: The model fails to properly parse tool responses, leading to repeated failed tool calls or malformed arguments. This typically occurs when tool outputs have inconsistent formats.

Fix: Standardize all tool responses into a consistent JSON schema with clear success/failure indicators.

from typing import Any, Dict, Optional
from dataclasses import dataclass, asdict
import json

@dataclass
class StandardToolResponse:
    """Standardized format for all tool responses."""
    tool_name: str
    success: bool
    data: Optional[Any] = None
    error: Optional[str] = None
    metadata: Optional[Dict] = None
    execution_time_ms: Optional[float] = None
    
    def to_message_content(self) -> str:
        """
        Convert to string format suitable for model consumption.
        Always includes success status and structured data.
        """
        if self.success:
            return json.dumps({
                "status": "success",
                "tool": self.tool_name,
                "result": self.data,
                "metadata": self.metadata or {}
            }, indent=2, ensure_ascii=False)
        else:
            return json.dumps({
                "status": "error",
                "tool": self.tool_name,
                "error": self.error,
                "retry_possible": self._is_retryable()
            }, indent=2)
    
    def _is_retryable(self) -> bool:
        """Determine if this error type is retryable."""
        non_retryable = ["invalid_query", "unauthorized", "not_found"]
        if self.error:
            return not any(e in self.error.lower() for e in non_retryable)
        return True


class StandardizedToolExecutor:
    """
    Executes tools and returns responses in standardized format.
    Ensures the ReAct model always receives consistent, parseable responses.
    """
    
    def __init__(self):
        self.tools = {}
        self.register_default_tools()
    
    def register_tool(self, name: str, func: callable):
        """Register a tool with standardized execution."""
        self.tools[name] = func
    
    def register_default_tools(self):
        """Register built-in standardized tools."""
        self.tools["web_search"] = self._standardized_web_search
        self.tools["database_query"] = self._standardized_database_query
        self.tools["calculator"] = self._standardized_calculator
        self.tools["file_read"] = self._standardized_file_read
    
    async def execute(self, tool_name: str, arguments: Dict) -> StandardToolResponse:
        """Execute tool with standardized error handling and response formatting."""
        import time
        start_time = time.time()
        
        if tool_name not in self.tools:
            return StandardToolResponse(
                tool_name=tool_name,
                success=False,
                error=f"Unknown tool: {tool_name}. Available tools: {list(self.tools.keys())}"
            )
        
        try:
            result = await self.tools[tool_name](**arguments)
            execution_time = (time.time() - start_time) * 1000
            
            return StandardToolResponse(
                tool_name=tool_name,
                success=True,
                data=result,
                metadata={"execution_time_ms": execution_time}
            )
        except TypeError as e:
            # Invalid arguments
            return StandardToolResponse(
                tool_name=tool_name,
                success=False,
                error=f"Invalid arguments: {str(e)}"
            )
        except Exception as e:
            # Unexpected errors - always include details for debugging
            return StandardToolResponse(
                tool_name=tool_name,
                success=False,
                error=f"Execution failed: {type(e).__name__}: {str(e)}"
            )
    
    async def _standardized_web_search(self, query: str, max_results: int = 5) -> Dict:
        """Web search with standardized output."""
        # Your web search implementation
        return {
            "query": query,
            "results": [
                {"title": "Example Result", "url": "https://example.com", "snippet": "..."}
            ],
            "total_found": max_results
        }
    
    async def _standardized_database_query(self, sql: str, parameters: Dict = None) -> Dict:
        """Database query with standardized output."""
        # Your database implementation
        return {
            "sql": sql,
            "rows_affected": 0,
            "data": []
        }
    
    async def _standardized_calculator(self, expression: str) -> Dict:
        """Calculator with standardized output."""
        try:
            result = eval(expression)
            return {
                "expression": expression,
                "result": float(result) if isinstance(result, (int, float)) else str(result)
            }
        except Exception as e:
            raise ValueError(f"Invalid expression: {expression} - {str(e)}")
    
    async def _standardized_file_read(self, path: str, encoding: str = "utf-8") -> Dict:
        """File reading with standardized output."""
        try:
            with open(path, "r", encoding=encoding) as f:
                content = f.read()
            return {
                "path": path,
                "size_bytes": len(content),
                "preview": content[:500]
            }
        except FileNotFoundError:
            raise FileNotFoundError(f"File not found: {path}")
        except PermissionError:
            raise PermissionError(f"Permission denied: {path}")

Error 3: Rate Limit Handling Causing Cascading Failures

Symptom: When you hit rate limits, naive retry logic causes thundering herd problems. All your workers retry simultaneously, hitting the rate limit again, creating an infinite retry loop.

Fix: Implement exponential backoff with jitter and per-worker cooldown tracking.

import asyncio
import random
import time
from typing import Optional, Callable, Any
from dataclasses import dataclass

@dataclass
class RateLimitConfig:
    """Configuration for rate limit handling."""
    max_retries: int = 5
    base_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    jitter_range: float = 0.5
    respect_retry_after: bool = True

class HolySheepRateLimiter:
    """
    Production-grade rate limiter for HolySheep API calls.
    Implements exponential backoff with jitter to prevent thundering herd.
    """
    
    def __init__(self, config: Optional[RateLimitConfig] = None):
        self.config = config or RateLimitConfig()
        self.worker_id = str(random.randint(1000, 9999))
        self.cooldown_until: float = 0
        self.request_count: int = 0
        self.total_retries: int = 0
        self.last_rate_limit_time: float = 0
    
    async def execute_with_retry(
        self,
        request_func: Callable,
        *args,
        **kwargs
    ) -> Any:
        """
        Execute request with automatic rate limit handling.
        Returns result or raises exception after max retries.
        """
        last_exception = None
        
        for attempt in range(self.config.max_retries):
            # Check cooldown period
            if time.time() < self.cooldown_until:
                wait_time = self.cooldown_until - time.time()
                await asyncio.sleep(wait_time)
            
            try:
                self.request_count += 1
                result = await request_func(*args, **kwargs)
                return result
                
            except Exception as e:
                last_exception = e
                status_code = getattr(e, "status_code", None) or 
                    getattr(e, "response", {}).get("status_code", 500)
                
                if status_code == 429:
                    self.last_rate_limit_time = time.time()
                    retry_delay = self._calculate_retry_delay(attempt, e)
                    self.cooldown_until = time.time() + retry_delay
                    self.total_retries += 1
                    
                    print(f"[Worker {self.worker_id}] Rate limited. "
                          f"Attempt {attempt + 1}/{self.config.max_retries}. "
                          f"Waiting {retry_delay:.1f}s. Error: {str(e)}")
                    
                    await asyncio.sleep(retry_delay)
                    
                elif status_code in [500, 502, 503, 504]:
                    # Server error - retry with backoff
                    retry_delay = self._calculate_retry_delay(attempt, None)
                    await asyncio.sleep(retry_delay)
                    
                elif status_code == 400:
                    # Bad request - do not retry
                    raise e
                else:
                    # Unknown error - retry once
                    if attempt < 1:
                        await asyncio.sleep(1)
                    else:
                        raise e
        
        raise Exception(
            f"Max retries ({self.config.max_retries}) exceeded. "
            f"Total retries: {self.total_retries}. Last error: {last_exception}"
        )
    
    def _calculate_retry_delay(
        self,
        attempt: int,
        error: Optional[Exception]
    ) -> float:
        """Calculate delay using exponential backoff with jitter."""
        # Check for Retry-After header
        if self.config.respect_retry_after and error:
            retry_after = getattr(error, "retry_after", None)
            if retry_after:
                return float(retry_after)
        
        # Calculate exponential backoff
        exponential_delay = self.config.base_delay * (
            self.config.exponential_base ** attempt
        )
        
        # Add jitter to prevent synchronization
        jitter = exponential_delay * random.uniform(
            -self.config.jitter_range,
            self.config.jitter_range
        )
        
        final_delay = exponential_delay + jitter
        
        # Cap at maximum delay
        return min(final_delay, self.config.max_delay)
    
    def get_stats(self) -> dict:
        """Return rate limiter statistics for monitoring."""
        return {
            "worker_id": self.worker_id,
            "total_requests": self.request_count,
            "total_retries": self.total_retries,
            "in_cooldown": time.time() < self.cooldown_until,
            "cooldown_remaining_s": max(0, self.cooldown_until - time.time()),
            "last_rate_limit": self.last_rate_limit_time,
            "retry_rate": self.total_retries / max(self.request_count, 1)
        }


Usage example with HolySheep API

async def call_holysheep_with_rate_limiting(api_key: str, messages: list): """Example: Call HolySheep API with production rate limit handling.""" limiter = HolySheepRateLimiter() async def make_request(): import httpx headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } async with httpx.AsyncClient() as client: response = await client.post( "https://api.holysheep.ai/v1/chat/completions", headers=headers, json={ "model": "gpt