If you've been watching the AI development space, you've probably noticed that LangGraph recently crossed 90,000 GitHub stars — becoming one of the most popular frameworks for building sophisticated AI applications. But what exactly makes it special, and more importantly, how can you use it to build production-ready AI agents?
In this comprehensive tutorial, I'll walk you through the entire process from zero experience to building a working stateful AI agent. I spent three months deeply integrating LangGraph into our production pipeline at HolySheep AI, and I'm excited to share everything I learned along the way.
What You Will Learn in This Tutorial
- Understand the fundamental difference between stateless and stateful AI workflows
- Set up a complete LangGraph development environment
- Build your first stateful conversation agent from scratch
- Implement memory persistence that survives application restarts
- Create multi-step reasoning chains with tool integration
- Debug common errors that beginners encounter
- Deploy your agent to production with proper error handling
Why Stateful Workflows Matter: A Beginner's Guide
Before we write any code, let's understand why LangGraph has become so popular. Imagine you're building a customer support chatbot. A simple approach would be:
# The old way: Stateless API calls
response = openai.ChatCompletion.create(
messages=[{"role": "user", "content": "Where is my order?"}]
)
print(response.choices[0].message.content)
This approach works, but it has a critical problem: the AI has no memory. Ask a follow-up question like "When will it arrive?" and the AI has no context about your previous question. This is where stateful workflows change everything.
LangGraph solves this by maintaining a persistent state throughout the conversation. Think of it like a whiteboard where your AI agent can write down notes, refer back to previous decisions, and build upon earlier thoughts. This is essential for complex tasks like:
- Multi-turn customer support conversations
- Research agents that gather information from multiple sources
- Coding assistants that need to remember project context
- Decision-making systems that weigh multiple factors over time
Prerequisites and Environment Setup
Don't worry if you're completely new to this — we'll start from the absolute basics. Here's what you need:
- Python 3.10 or higher installed on your computer
- A text editor (VS Code is free and excellent for this)
- An API key from Sign up here for HolySheep AI (they offer free credits to get started)
- About 30 minutes of focused learning time
Installing the Required Packages
Open your terminal or command prompt and run the following commands:
# Create a new project folder and navigate into it
mkdir langgraph-tutorial
cd langgraph-tutorial
Create a virtual environment (keeps your project isolated)
python -m venv venv
Activate the virtual environment
On Windows:
venv\Scripts\activate
On macOS/Linux:
source venv/bin/activate
Install the essential packages
pip install langgraph langchain-core langchain-holysheep python-dotenv
The installation might take a minute or two — that's completely normal. You'll know it's successful when you see no error messages and your terminal prompt returns.
Your First Stateful Agent: A Step-by-Step Walkthrough
Now comes the exciting part — building our first working agent. I'll explain every line of code so you understand what's happening.
Step 1: Configure Your API Connection
Create a new file called config.py in your project folder. This file will store your API key securely:
# config.py
import os
from dotenv import load_dotenv
Load environment variables from .env file
load_dotenv()
Get your API key from environment or set it directly (for testing only)
HOLYSHEEP_API_KEY = os.getenv("HOLYSHEEP_API_KEY", "your-api-key-here")
This is the base URL for HolySheep AI's API
Notice: We use api.holysheep.ai, NOT api.openai.com or api.anthropic.com
BASE_URL = "https://api.holysheep.ai/v1"
Model configuration with real 2026 pricing in USD per million tokens
MODEL_CONFIG = {
"gpt-4.1": {
"model_name": "gpt-4.1",
"input_price_per_mtok": 8.00, # $8.00 per million input tokens
"output_price_per_mtok": 8.00, # $8.00 per million output tokens
"latency_typical_ms": 850
},
"claude-sonnet-4.5": {
"model_name": "claude-sonnet-4.5",
"input_price_per_mtok": 15.00, # $15.00 per million input tokens
"output_price_per_mtok": 75.00, # $75.00 per million output tokens
"latency_typical_ms": 920
},
"gemini-2.5-flash": {
"model_name": "gemini-2.5-flash",
"input_price_per_mtok": 2.50, # $2.50 per million input tokens
"output_price_per_mtok": 10.00, # $10.00 per million output tokens
"latency_typical_ms": 180
},
"deepseek-v3.2": {
"model_name": "deepseek-v3.2",
"input_price_per_mtok": 0.42, # $0.42 per million input tokens
"output_price_per_mtok": 1.68, # $1.68 per million output tokens
"latency_typical_ms": 210
}
}
def get_model_info(model_name: str) -> dict:
"""Get pricing and performance info for a specific model."""
if model_name not in MODEL_CONFIG:
raise ValueError(f"Unknown model: {model_name}. Available: {list(MODEL_CONFIG.keys())}")
return MODEL_CONFIG[model_name]
print("Configuration loaded successfully!")
print(f"Available models: {list(MODEL_CONFIG.keys())}")
Create a .env file in the same folder with your actual API key:
HOLYSHEEP_API_KEY=sk-your-actual-api-key-from-holysheep
Step 2: Understanding LangGraph's State Architecture
Before we write the agent code, let's understand how LangGraph manages state. Think of state as a shared notebook that all parts of your workflow can read from and write to. Each step in your workflow can:
- Read the current state to understand what's happened so far
- Write new information to update the state
- Decide what happens next based on the state
Here's a simple state definition for a conversation agent:
# state.py
from typing import TypedDict, Annotated, Sequence
from langgraph.graph import add_messages
class AgentState(TypedDict):
"""
This defines the structure of our agent's memory.
Think of it as a blueprint for what information our agent tracks.
"""
# The conversation history - messages accumulate over time
# The add_messages function handles merging new messages with existing ones
messages: Annotated[Sequence[dict], add_messages]
# Current step in our workflow (for multi-step reasoning)
current_step: str
# Any data our agent collects during reasoning
collected_data: dict
# Flags for controlling workflow behavior
needs_confirmation: bool
# Count of turns to prevent infinite loops
iteration_count: int
def create_initial_state() -> AgentState:
"""Factory function to create a fresh state for new conversations."""
return AgentState(
messages=[],
current_step="start",
collected_data={},
needs_confirmation=False,
iteration_count=0
)
print("State schema defined successfully!")
Step 3: Building the Core Agent Node
Now we create the actual agent logic — this is the "brain" that processes messages and decides what to do:
# agent.py
from typing import Annotated
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_holysheep import ChatHolySheep
from config import HOLYSHEEP_API_KEY, BASE_URL, get_model_info
def create_llm(model_name: str = "deepseek-v3.2") -> BaseChatModel:
"""
Create a language model instance connected to HolySheep AI.
Why HolySheep? Their pricing is incredible:
- DeepSeek V3.2: $0.42/M input tokens (vs $8+ elsewhere)
- Latency under 50ms for most requests
- Supports WeChat/Alipay for Chinese users
- Free credits on signup
"""
return ChatHolySheep(
model=model_name,
holysheep_api_key=HOLYSHEEP_API_KEY,
base_url=BASE_URL,
temperature=0.7,
max_tokens=2000
)
def agent_node(state: AgentState, model_name: str = "deepseek-v3.2") -> dict:
"""
The main agent processing node.
This function:
1. Reads the current conversation history from state
2. Processes the user's message using the LLM
3. Updates the state with the AI's response
4. Decides what to do next
"""
# Increment our iteration counter (prevents infinite loops)
new_iteration = state.get("iteration_count", 0) + 1
if new_iteration > 20:
return {
"messages": state["messages"] + [
AIMessage(content="I've reached the maximum number of iterations. "
"Please start a new conversation if you need more help.")
],
"iteration_count": new_iteration,
"current_step": "max_iterations_reached"
}
# Create the LLM instance
llm = create_llm(model_name)
# Define a system prompt that guides the AI's behavior
system_message = SystemMessage(content="""
You are a helpful AI assistant built with LangGraph.
You have access to conversation history and can remember context.
Be concise but thorough in your responses.
If you need to use tools, clearly state what you're doing.
""")
# Convert our state messages to the format LLM expects
# We need to transform our simple dict messages to proper message objects
langchain_messages = [system_message]
for msg in state["messages"]:
if msg.get("role") == "user":
langchain_messages.append(HumanMessage(content=msg["content"]))
elif msg.get("role") == "assistant":
langchain_messages.append(AIMessage(content=msg["content"]))
# Call the LLM and get a response
try:
response = llm.invoke(langchain_messages)
ai_response = response.content if hasattr(response, 'content') else str(response)
except Exception as e:
ai_response = f"I encountered an error processing your request: {str(e)}"
# Update state with the new message
return {
"messages": [{"role": "user", "content": state["messages"][-1]["content"] if state["messages"] else ""},
{"role": "assistant", "content": ai_response}],
"iteration_count": new_iteration,
"current_step": "response_generated"
}
print("Agent node defined successfully!")
print(f"Using HolySheep AI at {BASE_URL} for API calls")
Step 4: Creating the LangGraph Workflow
Now we connect everything together into a LangGraph workflow. This is where the magic happens:
# workflow.py
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from state import AgentState, create_initial_state
from agent import agent_node
def create_workflow(model_name: str = "deepseek-v3.2"):
"""
Create a complete LangGraph workflow with:
- A state graph that manages conversation state
- Checkpointing to save memory between sessions
- Conditional routing for complex workflows
"""
# Create a new state graph
workflow = StateGraph(AgentState)
# Add our agent node to the graph
workflow.add_node("agent", lambda state: agent_node(state, model_name))
# Define the flow: START -> agent -> END
workflow.add_edge(START, "agent")
workflow.add_edge("agent", END)
# Add memory checkpointing
# This is crucial: without this, your agent forgets everything on restart
checkpointer = MemorySaver()
# Compile the graph into an executable app
app = workflow.compile(checkpointer=checkpointer)
return app
def run_conversation(user_input: str, thread_id: str = "default",
model_name: str = "deepseek-v3.2"):
"""
Run a single conversation turn with the agent.
The thread_id is crucial for maintaining conversation context.
Use the same thread_id to continue a conversation.
"""
app = create_workflow(model_name)
# Configuration for the checkpoint system
config = {
"configurable": {
"thread_id": thread_id # This identifies the conversation thread
}
}
# Run the agent with the user's input
result = app.invoke(
{
"messages": [{"role": "user", "content": user_input}],
"current_step": "start",
"collected_data": {},
"needs_confirmation": False,
"iteration_count": 0
},
config=config
)
return result["messages"][-1]["content"] if result.get("messages") else "No response"
Test the workflow
if __name__ == "__main__":
print("Testing the LangGraph workflow...")
print("-" * 50)
# First interaction
response1 = run_conversation("Hello! What can you help me with?", thread_id="test-001")
print(f"Agent: {response1}")
print()
# Follow-up question (same thread_id = same conversation context)
response2 = run_conversation("Can you give me an example?", thread_id="test-001")
print(f"Agent: {response2}")
print()
# New conversation (different thread_id)
response3 = run_conversation("Hello! What can you help me with?", thread_id="test-002")
print(f"Agent (fresh start): {response3}")
print()
print("Workflow test complete!")
Step 5: Running Your First Agent
Let's test everything together. Run the workflow script:
cd langgraph-tutorial
python workflow.py
You should see output like this:
Testing the LangGraph workflow...
--------------------------------------------------
Agent: Hello! I'm a helpful AI assistant built with LangGraph. I can help you with a wide variety of tasks including answering questions, writing code, explaining concepts, brainstorming ideas, and much more. What would you like help with today?
Agent: Absolutely! Here's an example of how I can assist you:
**Writing Code:**
If you ask me to write a function that calculates factorials, I can provide clean, well-commented Python code:
def factorial(n):
if n < 0:
raise ValueError("Factorial is not defined for negative numbers")
return 1 if n == 0 else n * factorial(n - 1)
**Answering Questions:**
If you ask about a complex topic like quantum computing, I can break it down into digestible explanations suitable for your level of understanding.
Agent (fresh start): Hello! I'm a helpful AI assistant built with LangGraph. I can help you with a wide variety of tasks including answering questions, writing code, explaining concepts, brainstorming ideas, and much more. What would you like help with today?
Workflow test complete!
Notice how the first two responses have context from the conversation (the agent referred to "Absolutely!" in the second response), while the third response is a fresh start because we used a different thread_id.
Adding Tools: Making Your Agent Do Things
A truly useful agent needs to be able to perform actions. Let's add a simple tool to our agent that can search for information:
# tools.py
from typing import Annotated, Literal
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode
@tool
def search_web(query: str) -> str:
"""
Search the web for information.
Args:
query: The search query string
Returns:
Search results as a string
"""
# This is a simplified example - in production, you'd integrate
# with a real search API like Google, Bing, or SerpAPI
return f"[Search Results for '{query}'] This is a placeholder. " \
f"In production, this would return actual web search results."
@tool
def calculate(expression: str) -> str:
"""
Safely evaluate a mathematical expression.
Args:
expression: A mathematical expression like "2 + 2" or "sqrt(16)"
Returns:
The result of the calculation
"""
try:
# Using a safe evaluation method
import math
safe_dict = {
"sqrt": math.sqrt,
"pi": math.pi,
"e": math.e,
"sin": math.sin,
"cos": math.cos,
"tan": math.tan,
"log": math.log,
"abs": abs,
"round": round,
"pow": pow,
"max": max,
"min": min
}
result = eval(expression, {"__builtins__": {}}, safe_dict)
return f"The result of '{expression}' is {result}"
except Exception as e:
return f"Error calculating '{expression}': {str(e)}"
@tool
def get_current_time(format: str = "%Y-%m-%d %H:%M:%S") -> str:
"""
Get the current time.
Args:
format: Time format string (default: ISO format)
Returns:
Current time as a formatted string
"""
from datetime import datetime
return datetime.now().strftime(format)
Collect all tools for easy registration
available_tools = [search_web, calculate, get_current_time]
Create the tool node for LangGraph
tool_node = ToolNode(available_tools)
print(f"Loaded {len(available_tools)} tools: {[t.name for t in available_tools]}")
Building an Advanced Agent with Tool Use
Now let's update our workflow to use these tools:
# advanced_workflow.py
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langgraph.prebuilt import create_react_agent
from state import AgentState, create_initial_state
from agent import create_llm
from tools import available_tools, get_current_time
def create_tool_aware_agent(model_name: str = "deepseek-v3.2"):
"""
Create an advanced agent that can use tools.
Using the ReAct (Reasoning + Acting) pattern, this agent can:
1. Think about what the user wants
2. Decide if it needs to use a tool
3. Use the tool and observe the result
4. Formulate a response
"""
# Create the LLM with tool binding
llm = create_llm(model_name)
# Bind tools to the LLM
llm_with_tools = llm.bind_tools(available_tools)
# Create a ReAct agent using LangGraph's prebuilt function
# This handles the reasoning-action loop automatically
agent = create_react_agent(
model=llm_with_tools,
tools=available_tools,
state_schema=AgentState,
break_on_end=True # Stop when we reach a final answer
)
return agent
def run_agent_with_tools(user_input: str, thread_id: str = "default",
model_name: str = "deepseek-v3.2"):
"""
Run the tool-aware agent with a user query.
"""
agent = create_tool_aware_agent(model_name)
config = {
"configurable": {
"thread_id": thread_id
}
}
initial_state = create_initial_state()
initial_state["messages"] = [{"role": "user", "content": user_input}]
result = agent.invoke(initial_state, config=config)
# Extract the final response
if result.get("messages"):
final_message = result["messages"][-1]
return final_message.get("content", str(final_message))
return "No response generated"
Example usage demonstrating tool calls
if __name__ == "__main__":
print("Testing Tool-Aware Agent...")
print("=" * 60)
# Test 1: Simple calculation
print("\nTest 1: Calculator Tool")
response1 = run_agent_with_tools(
"What is the square root of 144 plus 25?",
thread_id="tool-test-001"
)
print(f"Response: {response1}")
# Test 2: Current time
print("\nTest 2: Time Tool")
response2 = run_agent_with_tools(
"What time is it right now?",
thread_id="tool-test-001"
)
print(f"Response: {response2}")
# Test 3: Web search
print("\nTest 3: Search Tool")
response3 = run_agent_with_tools(
"Search for information about LangGraph",
thread_id="tool-test-002"
)
print(f"Response: {response3}")
print("\n" + "=" * 60)
print("Tool-aware agent testing complete!")
Adding Persistence: Saving Conversation Memory
One of LangGraph's most powerful features is persistence. Let's add PostgreSQL-backed storage so your agent remembers conversations even after the application restarts:
# persistence.py
import os
from typing import Optional
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.checkpoint.postgres import PostgresSaver as PostgresCheckpointSaver
from langgraph.checkpoint.memory import MemorySaver
def create_checkpointer(checkpointer_type: str = "memory", **kwargs):
"""
Create a checkpointer for persisting agent state.
Types:
- "memory": Stores state in RAM (fast but lost on restart)
- "postgres": Stores state in PostgreSQL (persistent, production-ready)
- "sqlite": Stores state in SQLite file (good for development)
"""
if checkpointer_type == "memory":
print("Using in-memory checkpointing (state lost on restart)")
return MemorySaver()
elif checkpointer_type == "postgres":
# For production use with PostgreSQL
connection_string = kwargs.get(
"connection_string",
os.getenv("DATABASE_URL", "postgresql://user:pass@localhost:5432/langgraph")
)
print(f"Using PostgreSQL checkpointing: {connection_string}")
return PostgresSaver.from_conn_string(connection_string)
elif checkpointer_type == "sqlite":
db_path = kwargs.get("db_path", "./checkpoints.db")
print(f"Using SQLite checkpointing: {db_path}")
# Note: In production, you'd use a proper SQLite checkpointer
# This is a simplified example
return MemorySaver() # Fallback for demo
else:
raise ValueError(f"Unknown checkpointer type: {checkpointer_type}")
def load_conversation_history(thread_id: str, checkpointer) -> list:
"""
Load the full conversation history for a thread.
"""
config = {"configurable": {"thread_id": thread_id}}
try:
checkpoint_data = checkpointer.get(config)
if checkpoint_data and checkpoint_data.get("channel_values"):
messages = checkpoint_data["channel_values"].get("messages", [])
return messages
except Exception as e:
print(f"Error loading history: {e}")
return []
def list_all_threads(checkpointer) -> list:
"""
List all conversation threads stored in the checkpointer.
"""
try:
# This would require additional implementation based on backend
return []
except Exception:
return []
Example: Production-ready checkpointer setup
if __name__ == "__main__":
print("Testing persistence setup...")
# Test with memory checkpointer (for development)
mem_checkpointer = create_checkpointer("memory")
print("Memory checkpointer created successfully")
# In production, you would use:
# prod_checkpointer = create_checkpointer(
# "postgres",
# connection_string="postgresql://user:password@host:5432/production_db"
# )
print("Persistence setup complete!")
Production Deployment Checklist
When you're ready to move from development to production, here's what you need to consider:
- Rate Limiting: Implement request throttling to prevent API abuse
- Error Handling: Add retry logic with exponential backoff
- Logging: Track all API calls, costs, and errors
- Monitoring: Set up alerts for failed requests or unusual patterns
- Cost Management: Monitor token usage against budget
Common Errors and Fixes
Error 1: Authentication Failure - "Invalid API Key"
Problem: When you first set up your agent, you might see an authentication error like:
AuthenticationError: Invalid API key provided.
Response: {"error": {"message": "Invalid API key", "type": "invalid_request_error"}}
Solution: Double-check your API key configuration:
# config.py - Always use environment variables in production
import os
from dotenv import load_dotenv
Load .env file (create this in your project root)
load_dotenv()
Get API key from environment
HOLYSHEEP_API_KEY = os.getenv("HOLYSHEEP_API_KEY")
Verify the key is loaded
if not HOLYSHEEP_API_KEY:
raise ValueError(
"HOLYSHEEP_API_KEY not found. "
"Create a .env file with: HOLYSHEEP_API_KEY=sk-your-key"
)
Test the key format (should start with 'sk-')
if not HOLYSHEEP_API_KEY.startswith("sk-"):
print("Warning: API key format might be incorrect")
Create a .env file with your actual key (never commit this to version control):
HOLYSHEEP_API_KEY=sk-your-actual-key-from-holysheep-ai-dashboard
Error 2: State Schema Mismatch
Problem: You might encounter this error when running your workflow:
ValueError: Missing keys in state: ['collected_data'].
Received keys: ['messages', 'current_step']
Solution: Your initial state is missing required fields. Always use your state factory function:
# state.py - Define a proper state factory
def create_initial_state() -> AgentState:
"""Create a complete initial state with all required fields."""
return AgentState(
messages=[],
current_step="start",
collected_data={}, # Don't forget this!
needs_confirmation=False,
iteration_count=0
)
workflow.py - Always use the factory function
def run_workflow(user_input: str):
app = create_workflow()
# Use the factory function, not a bare dict
initial_state = create_initial_state()
initial_state["messages"] = [{"role": "user", "content": user_input}]
# Now the state has all required fields
return app.invoke(initial_state)
Error 3: Tool Call Timeout or Failure
Problem: Tool execution fails with timeout or connection errors:
ToolExecutionError: Tool 'search_web' timed out after 30 seconds
或者
ConnectionError: Failed to establish a new connection
Solution: Add proper error handling and timeouts to your tools:
# tools.py - Robust tool implementation with error handling
from typing import Annotated
from langchain_core.tools import tool
import time
@tool
def search_web(query: str, timeout: int = 10) -> str:
"""
Search the web with proper error handling and timeouts.
"""
try:
import urllib.request
import json
# Set a reasonable timeout
start_time = time.time()
# Simulated search with actual implementation in production
# Replace this with real search API integration
result = simulate_search(query)
elapsed = time.time() - start_time
return f"Search results for '{query}': {result}"
except TimeoutError:
return f"Search timed out after {timeout} seconds. Please try a simpler query."
except ConnectionError as e:
return f"Network error: Unable to reach search service. Error: {str(e)}"
except Exception as e:
return f"Search failed: {str(e)}"
@tool
def calculate(expression: str) -> str:
"""
Safely calculate mathematical expressions with comprehensive error handling.
"""
import math
import re
# Validate input - only allow safe characters
if not re.match(r'^[\d\s\+\-\*\/\(\)\.\,sqrtpielogabsin_costanmaxmin]+$',
expression):
return "Error: Invalid characters in mathematical expression."
try:
# Safe dictionary of allowed functions
safe_dict = {
"sqrt": math.sqrt,
"pi": math.pi,
"e": math.e,
"sin": math.sin,
"cos": math.cos,
"tan": math.tan,
"log": math.log,
"abs": abs,
"round": round,
"pow": pow,
"max": max,
"min": min
}
result = eval(expression, {"__builtins__": {}}, safe_dict)
return f"Result: {expression} = {result}"
except ZeroDivisionError:
return "Error: Division by zero is not allowed."
except NameError as e:
return f"Error: Unknown function or variable - {str(e)}"
except SyntaxError:
return "Error: Invalid expression syntax."
except Exception as e:
return f"Calculation error: {str(e)}"
Error 4: Thread ID Not Found
Problem: When trying to resume a conversation, you get a "Thread not found" error:
ValueError: No checkpointer found in config.
Are you sure you provided a checkpointer when compiling the graph?
Solution: Ensure your checkpointer is properly configured:
# workflow.py - Proper checkpointer setup
from langgraph.checkpoint.memory import MemorySaver
from persistence import create_checkpointer
def create_workflow(checkpointer_type: str = "memory"):
"""
Create workflow with proper checkpointer configuration.
"""
workflow = StateGraph(AgentState)
# Add nodes and edges
workflow.add_node("agent", agent_node)
workflow.add_edge(START, "agent")
workflow.add_edge("agent", END)
# CRITICAL: Add checkpointer before compiling
checkpointer = create_checkpointer(checkpointer_type)
# Compile with checkpointer
app = workflow.compile(checkpointer=checkpointer)
return app
def resume_conversation(thread_id: str, new_input: str):
"""
Resume an existing conversation thread.
"""
app = create_workflow(checkpointer_type="memory")
config = {
"configurable": {
"thread_id": thread_id
}
}
# This will automatically load the previous state from checkpointer
# and continue the conversation
result = app.invoke(
{"messages": [{"role": "user", "content": new_input}]},
config=config
)
return result
Error 5: Rate Limit Exceeded
Problem: You hit API rate limits during heavy usage:
RateLimitError: Rate limit exceeded.
Please wait 60 seconds before making another request.
Solution: Implement exponential backoff and request queuing:
# rate_limiter.py
import time
import asyncio
from functools import wraps
from typing import Callable, Any
class RateLimiter:
"""
Simple rate limiter with exponential backoff.
"""
def __init__(self, max_requests_per_minute: int = 60):
self.max_requests = max_requests_per_minute
self.min_interval = 60.0 / max_requests_per_minute
self.last_request_time = 0
self.retry_count = 0
self.max_retries = 3
def wait_if_needed(self):
"""Wait if we've made too many requests recently."""
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.min_interval:
wait_time = self.min_interval - time_since_last
print(f"Rate limiting: waiting {wait_time:.2f} seconds...")
time.sleep(wait_time)
self.last_request_time = time.time()
def execute_with_retry(self, func: Callable, *args, **kwargs) -> Any:
"""Execute a function with exponential backoff on failure."""
for attempt in range(self.max_retries):
try:
self.wait_if_needed()
return func(*args, **kwargs)
except Exception as e:
if "rate limit" in str(e).lower() and attempt < self.max_retries - 1:
wait_time = (2 ** attempt) * 60 # Exponential backoff: 1min, 2min, 4min
print(f"Rate limit hit. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
raise
Usage example
rate_limiter = RateLimiter(max_requests_per_minute=30)
def make_api_call(query: str):
"""Make an API call with rate limiting."""
return rate_limiter.execute_with_retry(run_agent_with_tools