Building conversational AI applications that maintain context across sessions is one of the most challenging aspects of modern LLM development. As someone who spent six months wrestling with stateful conversation systems, I understand how frustrating it can be when your chatbot "forgets" everything after a page refresh or server restart.

In this comprehensive guide, I will walk you through everything you need to know about LangGraph state management—from basic concepts to production-ready implementations. Whether you are building a customer support chatbot, a personal AI assistant, or a complex multi-agent system, this tutorial will give you the foundation you need.

What you will learn:

Understanding LangGraph State Management Fundamentals

Before diving into persistence solutions, we need to understand how LangGraph conceptualizes state. In LangGraph, state is a shared data structure that flows through your graph of nodes. Each node can read from and write to this state, creating a powerful mechanism for building complex conversational flows.

What Is State in LangGraph?

Think of state as a conversation memory that gets passed from one step to the next. When a user sends a message, LangGraph creates an initial state, processes it through your defined nodes, and outputs a new state. This state contains everything your application needs to know about the conversation.

The beauty of LangGraph's approach is that state management is built into the framework's core architecture. You define a state schema, and LangGraph handles the threading of data through your application automatically.

HolySheep AI Integration Context

When building production conversational systems, API costs can quickly spiral out of control. HolySheep AI offers a compelling alternative with rates as low as $1 per $1 equivalent (saving 85%+ compared to typical ¥7.3 rates), supporting WeChat and Alipay payments with latency under 50ms. For developers building stateful applications that make many API calls, this cost structure makes a significant difference to your bottom line.

Setting Up Your Development Environment

Before we start coding, let us set up a clean development environment. For this tutorial, I assume you have Python 3.10 or higher installed. Create a new project directory and install the necessary dependencies.

# Create project directory
mkdir langgraph-state-demo
cd langgraph-state-demo

Install required packages

pip install langgraph langchain-core langchain-holySheep python-dotenv redis

Create .env file for your API credentials

cat > .env << 'EOF' HOLYSHEEP_API_KEY=YOUR_HOLYSHEEP_API_KEY HOLYSHEEP_BASE_URL=https://api.holysheep.ai/v1 REDIS_URL=redis://localhost:6379 EOF echo "Environment setup complete!"

When you sign up at HolySheep AI, you receive free credits to get started. Their API is compatible with the OpenAI format, making migration straightforward while offering dramatically better pricing—GPT-4.1 at $8/MTok, Claude Sonnet 4.5 at $15/MTok, and the budget-friendly DeepSeek V3.2 at just $0.42/MTok.

Defining Your State Schema

The foundation of LangGraph state management is your state schema. This defines what data your application will track throughout the conversation. A well-designed state schema is crucial for building maintainable applications.

from typing import TypedDict, Annotated, Sequence
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from operator import add as add_messages

class ConversationState(TypedDict):
    """Core state schema for conversation management."""
    messages: Annotated[Sequence[BaseMessage], add_messages]
    user_id: str
    session_id: str
    conversation_stage: str
    extracted_entities: dict
    interrupted: bool
    checkpoint_id: str | None

class ResearchState(TypedDict):
    """Extended state for research workflows."""
    messages: Annotated[Sequence[BaseMessage], add_messages]
    user_id: str
    session_id: str
    research_topic: str | None
    sources_gathered: list
    summary: str | None
    citations: list

Example of creating a simple conversation graph

def create_conversation_graph(): """Initialize a basic conversation graph with state management.""" workflow = StateGraph(ConversationState) # Define nodes workflow.add_node("process_input", process_input_node) workflow.add_node("generate_response", generate_response_node) workflow.add_node("extract_entities", extract_entities_node) # Set entry point workflow.set_entry_point("process_input") # Define edges workflow.add_edge("process_input", "extract_entities") workflow.add_edge("extract_entities", "generate_response") workflow.add_edge("generate_response", END) return workflow.compile() print("State schema and graph definition complete!")

Implementing State Persistence with Redis

Now we get to the core of conversation persistence. For production applications, you need a durable storage backend that can handle concurrent access and survive server restarts. Redis is an excellent choice for this purpose, offering sub-millisecond read/write times and built-in support for complex data structures.

Creating a Checkpoint Manager

The checkpoint manager is the heart of your persistence strategy. It serializes state at key points in your conversation flow and stores it for later retrieval. Let us build a robust implementation.

import json
import uuid
import redis
from datetime import datetime
from typing import Optional
from langgraph.checkpoint.base import BaseCheckpointSaver

class RedisCheckpointSaver(BaseCheckpointSaver):
    """
    Production-ready checkpoint manager using Redis.
    Persists conversation state with automatic expiration and metadata tracking.
    """
    
    def __init__(self, redis_url: str, ttl_hours: int = 168):
        self.redis_client = redis.from_url(redis_url, decode_responses=True)
        self.ttl_seconds = ttl_hours * 3600
        self.namespace = "langgraph:checkpoint:"
    
    def _make_key(self, thread_id: str, checkpoint_id: str) -> str:
        """Generate Redis key for checkpoint storage."""
        return f"{self.namespace}{thread_id}:{checkpoint_id}"
    
    def _make_metadata_key(self, thread_id: str) -> str:
        """Generate key for thread metadata."""
        return f"{self.namespace}{thread_id}:metadata"
    
    def put(
        self, 
        config: dict, 
        state: dict, 
        metadata: Optional[dict] = None
    ) -> dict:
        """Save a checkpoint to Redis."""
        thread_id = config["configurable"]["thread_id"]
        checkpoint_id = str(uuid.uuid4())
        
        # Serialize state for Redis storage
        serialized_state = json.dumps(state, default=str)
        
        # Store checkpoint with TTL
        key = self._make_key(thread_id, checkpoint_id)
        self.redis_client.setex(key, self.ttl_seconds, serialized_state)
        
        # Update thread metadata
        metadata_key = self._make_metadata_key(thread_id)
        thread_metadata = {
            "last_checkpoint_id": checkpoint_id,
            "last_updated": datetime.utcnow().isoformat(),
            "checkpoint_count": self.redis_client.hincrby(
                f"{self.namespace}{thread_id}:stats", 
                "checkpoints", 
                1
            )
        }
        self.redis_client.hset(metadata_key, mapping=thread_metadata)
        self.redis_client.expire(metadata_key, self.ttl_seconds)
        
        return {"configurable": {"thread_id": thread_id, "checkpoint_id": checkpoint_id}}
    
    def get(self, config: dict) -> Optional[dict]:
        """Retrieve a checkpoint from Redis."""
        thread_id = config["configurable"]["thread_id"]
        checkpoint_id = config["configurable"].get("checkpoint_id")
        
        if checkpoint_id:
            key = self._make_key(thread_id, checkpoint_id)
            data = self.redis_client.get(key)
        else:
            # Get latest checkpoint
            metadata_key = self._make_metadata_key(thread_id)
            checkpoint_id = self.redis_client.hget(metadata_key, "last_checkpoint_id")
            if checkpoint_id:
                key = self._make_key(thread_id, checkpoint_id)
                data = self.redis_client.get(key)
            else:
                return None
        
        if data:
            return json.loads(data)
        return None
    
    def list(self, config: dict, limit: int = 10) -> list:
        """List available checkpoints for a thread."""
        thread_id = config["configurable"]["thread_id"]
        pattern = f"{self.namespace}{thread_id}:*"
        
        checkpoints = []
        for key in self.redis_client.scan_iter(match=pattern, count=100):
            if ":metadata" not in key and ":stats" not in key:
                checkpoint_id = key.split(":")[-1]
                checkpoints.append({
                    "checkpoint_id": checkpoint_id,
                    "created": self.redis_client.hget(
                        self._make_metadata_key(thread_id), 
                        "last_updated"
                    )
                })
        
        return sorted(checkpoints, key=lambda x: x["created"], reverse=True)[:limit]

Initialize checkpoint saver

checkpoint_saver = RedisCheckpointSaver( redis_url="redis://localhost:6379", ttl_hours=168 # 7 days retention ) print("Redis checkpoint manager initialized successfully!")

Building the HolySheep-Powered Conversation Agent

Now let us integrate our state management with HolySheep AI for the actual language model calls. This integration gives you access to powerful language models at a fraction of the typical cost, with sub-50ms latency for responsive conversations.

import os
from dotenv import load_dotenv
from langchain_huggingface import ChatHolySheep
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

load_dotenv()

class HolySheepConversationAgent:
    """
    Conversational agent using HolySheep AI with full state persistence.
    Implements checkpointing, recovery, and interrupt handling.
    """
    
    def __init__(self, model_name: str = "gpt-4.1"):
        # Initialize HolySheep AI client
        # HolySheep offers: GPT-4.1 ($8/MTok), Claude Sonnet 4.5 ($15/MTok), 
        # Gemini 2.5 Flash ($2.50/MTok), DeepSeek V3.2 ($0.42/MTok)
        self.llm = ChatHolySheep(
            base_url=os.getenv("HOLYSHEEP_BASE_URL", "https://api.holysheep.ai/v1"),
            api_key=os.getenv("HOLYSHEEP_API_KEY"),
            model=model_name,
            temperature=0.7,
            max_tokens=2048
        )
        
        self.system_prompt = """You are a helpful AI assistant with access to conversation history.
        You maintain context across messages and can reference previous parts of the conversation.
        When entities are extracted, acknowledge them naturally in your responses."""
    
    def process_input_node(self, state: dict) -> dict:
        """Process user input and update state."""
        messages = state.get("messages", [])
        last_message = messages[-1] if messages else None
        
        if isinstance(last_message, HumanMessage):
            print(f"Processing input: {last_message.content[:100]}...")
        
        return {
            "interrupted": False,
            "conversation_stage": "input_processed"
        }
    
    def extract_entities_node(self, state: dict) -> dict:
        """Extract entities from conversation using the LLM."""
        messages = state.get("messages", [])
        user_id = state.get("user_id", "anonymous")
        
        # Use HolySheep for entity extraction
        extraction_prompt = f"""Extract key entities from this conversation:
        User: {messages[-1].content if messages else 'No message'}
        
        Return a JSON object with: names, dates, locations, topics, and any other 
        relevant entities mentioned. If none found, return empty structures."""
        
        response = self.llm.invoke([
            SystemMessage(content=extraction_prompt)
        ])
        
        # Parse entities (simplified for demo)
        entities = {"topics": [], "entities": []}
        try:
            import json
            entities = json.loads(response.content)
        except:
            entities = {"topics": ["general"], "entities": []}
        
        return {
            "extracted_entities": entities,
            "conversation_stage": "entities_extracted"
        }
    
    def generate_response_node(self, state: dict) -> dict:
        """Generate AI response using HolySheep."""
        messages = state.get("messages", [])
        conversation_history = "\n".join([
            f"{'User' if isinstance(m, HumanMessage) else 'Assistant'}: {m.content}"
            for m in messages[-10:]  # Last 10 messages
        ])
        
        response = self.llm.invoke([
            SystemMessage(content=self.system_prompt),
            HumanMessage(content=f"Conversation history:\n{conversation_history}")
        ])
        
        new_messages = messages + [AIMessage(content=response.content)]
        
        return {
            "messages": new_messages,
            "conversation_stage": "response_generated"
        }
    
    def create_graph(self):
        """Build the conversation graph with state management."""
        from langgraph.graph import StateGraph
        
        workflow = StateGraph(ConversationState)
        
        workflow.add_node("process_input", self.process_input_node)
        workflow.add_node("extract_entities", self.extract_entities_node)
        workflow.add_node("generate_response", self.generate_response_node)
        
        workflow.set_entry_point("process_input")
        workflow.add_edge("process_input", "extract_entities")
        workflow.add_edge("extract_entities", "generate_response")
        workflow.add_edge("generate_response", END)
        
        return workflow.compile(checkpointer=checkpoint_saver)
    
    def run_conversation(self, user_input: str, thread_id: str) -> dict:
        """Run a conversation turn with full state persistence."""
        graph = self.create_graph()
        
        config = {
            "configurable": {
                "thread_id": thread_id,
                "user_id": "user_123"  # Replace with actual user ID
            }
        }
        
        # Check for existing state (recovery)
        existing_state = checkpoint_saver.get(config)
        if existing_state:
            print(f"Recovered conversation with {len(existing_state.get('messages', []))} messages")
        
        # Run the graph
        result = graph.invoke(
            {"messages": [HumanMessage(content=user_input)]},
            config=config
        )
        
        return result

Initialize the agent

agent = HolySheepConversationAgent(model_name="deepseek-v3.2") print("HolySheep conversation agent initialized!") print(f"Using DeepSeek V3.2 at $0.42/MTok for cost efficiency")

Implementing State Recovery Strategies

One of the most important aspects of production conversation systems is handling interruptions gracefully. Users may close their browser mid-conversation, servers may crash, or network issues may cause timeouts. A robust recovery system ensures users never lose their conversation context.

Automatic Recovery on Connection

When a user reconnects to your application, you should automatically restore their last conversation state before prompting for new input. Here is how to implement this pattern.

from typing import Optional
from datetime import datetime, timedelta

class ConversationRecoveryManager:
    """
    Manages conversation recovery with multiple strategies.
    Handles partial states, interrupted sessions, and context reconstruction.
    """
    
    def __init__(self, checkpoint_saver: RedisCheckpointSaver):
        self.checkpoint_saver = checkpoint_saver
    
    def get_latest_state(self, thread_id: str) -> Optional[dict]:
        """Retrieve the most recent checkpoint for a thread."""
        config = {"configurable": {"thread_id": thread_id}}
        return self.checkpoint_saver.get(config)
    
    def recover_or_create(
        self, 
        thread_id: str, 
        user_id: str
    ) -> tuple[dict, bool]:
        """
        Attempt to recover existing state or create new session.
        Returns (state, is_recovered) tuple.
        """
        config = {"configurable": {"thread_id": thread_id}}
        existing_state = self.checkpoint_saver.get(config)
        
        if existing_state:
            # Verify session hasn't expired
            messages = existing_state.get("messages", [])
            last_message_time = None
            
            for msg in reversed(messages):
                if hasattr(msg, "response_metadata"):
                    last_message_time = msg.response_metadata.get("timestamp")
                    break
            
            if last_message_time:
                time_since = datetime.utcnow() - datetime.fromisoformat(last_message_time)
                if time_since > timedelta(days=7):
                    # Session expired, create new
                    return self._create_initial_state(thread_id, user_id), False
            
            # Recover successful
            print(f"Recovered conversation: {len(messages)} messages restored")
            return existing_state, True
        
        return self._create_initial_state(thread_id, user_id), False
    
    def _create_initial_state(self, thread_id: str, user_id: str) -> dict:
        """Create initial state for new conversation."""
        return {
            "messages": [],
            "user_id": user_id,
            "session_id": thread_id,
            "conversation_stage": "initialized",
            "extracted_entities": {},
            "interrupted": False,
            "checkpoint_id": None
        }
    
    def get_checkpoint_history(self, thread_id: str, limit: int = 10) -> list:
        """Get list of available checkpoints for manual recovery."""
        config = {"configurable": {"thread_id": thread_id}}
        checkpoints = self.checkpoint_saver.list(config, limit=limit)
        
        recovery_options = []
        for cp in checkpoints:
            checkpoint_state = self.checkpoint_saver.get(
                {"configurable": {"thread_id": thread_id, "checkpoint_id": cp["checkpoint_id"]}}
            )
            if checkpoint_state:
                recovery_options.append({
                    "checkpoint_id": cp["checkpoint_id"],
                    "created": cp["created"],
                    "message_count": len(checkpoint_state.get("messages", [])),
                    "preview": checkpoint_state.get("messages", [{}])[-1].content[:50] 
                               if checkpoint_state.get("messages") else "Empty"
                })
        
        return recovery_options
    
    def reconstruct_context(
        self, 
        state: dict, 
        max_messages: int = 20
    ) -> str:
        """
        Reconstruct a readable context summary from state.
        Useful for displaying conversation history to users.
        """
        messages = state.get("messages", [])
        
        if not messages:
            return "No conversation history available."
        
        # Get last N messages for context window
        recent_messages = messages[-max_messages:]
        
        context_lines = []
        for msg in recent_messages:
            role = "You" if isinstance(msg, HumanMessage) else "Assistant"
            context_lines.append(f"{role}: {msg.content[:200]}")
        
        return "\n".join(context_lines)

Usage example

recovery_manager = ConversationRecoveryManager(checkpoint_saver) def handle_user_connection(thread_id: str, user_id: str): """Handle a user connecting to the conversation system.""" state, is_recovered = recovery_manager.recover_or_create(thread_id, user_id) if is_recovered: context = recovery_manager.reconstruct_context(state) print(f"\nWelcome back! Here's where we left off:\n{context}\n") return state else: print("\nStarting new conversation. How can I help you?\n") return state

Example usage

recovered_state = handle_user_connection("thread_abc123", "user_456")

Handling Interruptions and Graceful Degradation

Real-world applications must handle interruptions gracefully. Whether it is a user closing their browser or a server restart, your system needs to maintain data integrity and provide clear feedback to users. Let us implement comprehensive interruption handling.

Implementing Manual Interruption Points

Sometimes you need to intentionally interrupt the conversation flow—for example, when you need user confirmation before proceeding with a sensitive action. LangGraph supports this through its interrupt functionality.

from langgraph.errors import NodeInterrupt

class InterruptionAwareAgent:
    """
    Agent with sophisticated interruption handling.
    Supports user confirmations, timeouts, and graceful degradation.
    """
    
    def __init__(self, agent: HolySheepConversationAgent):
        self.agent = agent
        self.pending_confirmations = {}
    
    def create_interruptible_graph(self):
        """Create a graph with interruption points."""
        from langgraph.graph import StateGraph
        
        workflow = StateGraph(ConversationState)
        
        workflow.add_node("process_input", self.agent.process_input_node)
        workflow.add_node("evaluate_action", self.evaluate_action_node)
        workflow.add_node("request_confirmation", self.request_confirmation_node)
        workflow.add_node("execute_action", self.agent.generate_response_node)
        workflow.add_node("handle_timeout", self.handle_timeout_node)
        
        workflow.set_entry_point("process_input")
        workflow.add_edge("process_input", "evaluate_action")
        
        # Conditional branching with interruption
        workflow.add_conditional_edges(
            "evaluate_action",
            self._should_interrupt,
            {
                "continue": "execute_action",
                "interrupt": "request_confirmation",
                "timeout": "handle_timeout"
            }
        )
        
        workflow.add_edge("request_confirmation", "execute_action")
        workflow.add_edge("execute_action", END)
        workflow.add_edge("handle_timeout", END)
        
        return workflow.compile(checkpointer=checkpoint_saver)
    
    def _should_interrupt(self, state: dict) -> str:
        """Determine if flow should be interrupted."""
        messages = state.get("messages", [])
        last_message = messages[-1].content if messages else ""
        
        # Keywords that trigger confirmation
        interrupt_triggers = ["delete", "cancel", "refund", "transfer", "send money"]
        
        for trigger in interrupt_triggers:
            if trigger in last_message.lower():
                return "interrupt"
        
        # Check for timeout condition
        if state.get("interrupted"):
            return "timeout"
        
        return "continue"
    
    def evaluate_action_node(self, state: dict) -> dict:
        """Evaluate if current action requires confirmation."""
        messages = state.get("messages", [])
        last_message = messages[-1].content if messages else ""
        
        return {
            "conversation_stage": "action_evaluated",
            "requires_confirmation": any(
                word in last_message.lower() 
                for word in ["delete", "cancel", "confirm"]
            )
        }
    
    def request_confirmation_node(self, state: dict) -> dict:
        """Request user confirmation before proceeding."""
        session_id = state.get("session_id")
        
        # Store pending confirmation
        self.pending_confirmations[session_id] = {
            "state": state,
            "requested_at": datetime.utcnow().isoformat(),
            "action": "pending_user_confirmation"
        }
        
        # Raise interrupt to pause execution
        raise NodeInterrupt(
            f"Confirmation required for session {session_id}. "
            f"User must acknowledge before proceeding."
        )
    
    def handle_timeout_node(self, state: dict) -> dict:
        """Handle timed-out or interrupted sessions."""
        session_id = state.get("session_id")
        
        # Update state to reflect timeout
        return {
            "interrupted": True,
            "conversation_stage": "timed_out",
            "messages": state.get("messages", []) + [
                AIMessage(content="Your session timed out. Please try again.")
            ]
        }
    
    def resume_from_interrupt(self, thread_id: str, user_response: str) -> dict:
        """Resume a session after interruption."""
        config = {"configurable": {"thread_id": thread_id}}
        
        if thread_id in self.pending_confirmations:
            interrupted_state = self.pending_confirmations[thread_id]["state"]
            
            if "yes" in user_response.lower() or "confirm" in user_response.lower():
                # User confirmed, continue execution
                graph = self.create_interruptible_graph()
                result = graph.invoke(
                    interrupted_state,
                    config=config
                )
                del self.pending_confirmations[thread_id]
                return result
            else:
                # User cancelled
                del self.pending_confirmations[thread_id]
                return {
                    **interrupted_state,
                    "messages": interrupted_state.get("messages", []) + [
                        AIMessage(content="Action cancelled. Let me know if there's anything else.")
                    ]
                }
        
        return {"error": "No pending confirmation found"}

Initialize interruptible agent

interruptible_agent = InterruptionAwareAgent(agent) print("Interruption-aware agent initialized!")

Testing Your Implementation

Before deploying to production, thorough testing is essential. Here is a comprehensive test suite that validates state persistence, recovery, and interruption handling.

import pytest
from unittest.mock import MagicMock, patch

class TestLangGraphStateManagement:
    """Comprehensive test suite for LangGraph state management."""
    
    @pytest.fixture
    def mock_redis(self):
        """Mock Redis client for testing."""
        mock = MagicMock()
        mock.get.return_value = None
        mock.setex.return_value = True
        mock.hset.return_value = True
        mock.hget.return_value = None
        mock.hincrby.return_value = 1
        mock.scan_iter.return_value = []
        return mock
    
    @pytest.fixture
    def sample_state(self):
        """Create sample conversation state."""
        return {
            "messages": [
                HumanMessage(content="Hello, I need help with my order"),
                AIMessage(content="I'd be happy to help! What's your order number?")
            ],
            "user_id": "test_user_123",
            "session_id": "test_session_456",
            "conversation_stage": "gathering_info",
            "extracted_entities": {"order_id": None, "topic": "order_inquiry"},
            "interrupted": False,
            "checkpoint_id": None
        }
    
    def test_state_checkpoint_creation(self, sample_state, mock_redis):
        """Test that checkpoints are created correctly."""
        with patch('redis.from_url', return_value=mock_redis):
            saver = RedisCheckpointSaver("redis://localhost:6379")
            
            config = {"configurable": {"thread_id": "test_thread"}}
            result = saver.put(config, sample_state)
            
            assert "checkpoint_id" in result["configurable"]
            mock_redis.setex.assert_called_once()
    
    def test_state_recovery(self, sample_state, mock_redis):
        """Test state recovery from checkpoint."""
        import json
        
        mock_redis.get.return_value = json.dumps(sample_state)
        
        with patch('redis.from_url', return_value=mock_redis):
            saver = RedisCheckpointSaver("redis://localhost:6379")
            
            config = {
                "configurable": {
                    "thread_id": "test_thread",
                    "checkpoint_id": "test_checkpoint"
                }
            }
            recovered = saver.get(config)
            
            assert recovered is not None
            assert recovered["user_id"] == "test_user_123"
            assert len(recovered["messages"]) == 2
    
    def test_concurrent_session_isolation(self, mock_redis):
        """Test that different threads maintain separate state."""
        import json
        
        states = {
            "thread_1": {"messages": [HumanMessage(content="Message 1")], "user_id": "user_1"},
            "thread_2": {"messages": [HumanMessage(content="Message 2")], "user_id": "user_2"}
        }
        
        mock_redis.get.side_effect = lambda key: json.dumps(
            states.get(key.split(":")[-1].split(":")[0], {})
        )
        
        # In real implementation, verify isolation
        assert True  # Placeholder for actual isolation test
    
    def test_interruption_preserves_state(self, sample_state, mock_redis):
        """Test that interrupted sessions preserve their state."""
        import json
        
        interrupted_state = {**sample_state, "interrupted": True}
        mock_redis.get.return_value = json.dumps(interrupted_state)
        
        with patch('redis.from_url', return_value=mock_redis):
            saver = RedisCheckpointSaver("redis://localhost:6379")
            
            config = {"configurable": {"thread_id": "interrupted_thread"}}
            recovered = saver.get(config)
            
            assert recovered["interrupted"] == True
    
    def test_session_expiration_handling(self, mock_redis):
        """Test handling of expired sessions."""
        mock_redis.get.return_value = None
        
        with patch('redis.from_url', return_value=mock_redis):
            saver = RedisCheckpointSaver("redis://localhost:6379")
            
            config = {"configurable": {"thread_id": "expired_thread"}}
            recovered = saver.get(config)
            
            assert recovered is None

if __name__ == "__main__":
    pytest.main([__file__, "-v"])

Common Errors and Fixes

Based on my experience implementing LangGraph state management in production environments, here are the most common issues you will encounter and their solutions.

Error 1: Redis Connection Refused

# ❌ Error: redis.exceptions.ConnectionError: Error 111 connecting to localhost:6379

Cause: Redis server not running or incorrect connection URL

✅ Fix: Ensure Redis is running and use correct connection string

import redis

Option 1: Check Redis is running (in terminal)

redis-server

Option 2: Use correct connection parameters

def create_redis_client(host="localhost", port=6379, db=0): try: client = redis.Redis( host=host, port=port, db=db, decode_responses=True, socket_connect_timeout=5, socket_timeout=5 ) # Test connection client.ping() print(f"Connected to Redis at {host}:{port}") return client except redis.ConnectionError as e: print(f"Redis connection failed: {e}") print("Ensure Redis is running: redis-server") raise

Option 3: Use Docker for Redis

docker run -d -p 6379:6379 redis:latest

Error 2: State Serialization JSON Errors

# ❌ Error: TypeError: Object of type datetime is not JSON serializable

Cause: State contains non-serializable objects (datetime, bytes, custom classes)

✅ Fix: Implement custom JSON encoder or convert objects before serialization

import json from datetime import datetime class LangGraphJSONEncoder(json.JSONEncoder): """Custom JSON encoder for LangGraph state objects.""" def default(self, obj): if isinstance(obj, datetime): return obj.isoformat() if isinstance(obj, bytes): return obj.decode('utf-8') if hasattr(obj, '__dict__'): return str(obj) return super().default(obj) def safe_serialize_state(state: dict) -> str: """Safely serialize state for Redis storage.""" return json.dumps(state, cls=LangGraphJSONEncoder) def safe_deserialize_state(data: str) -> dict: """Safely deserialize state from Redis.""" return json.loads(data)

Usage in checkpoint saver

def put(self, config: dict, state: dict, metadata: dict = None) -> dict: # Convert state before serialization serialized = safe_serialize_state(state) self.redis_client.setex(key, self.ttl_seconds, serialized) # ... def get(self, config: dict) -> dict: data = self.redis_client.get(key) if data: return safe_deserialize_state(data) return None

Error 3: Checkpoint Version Mismatch

# ❌ Error: ValueError: Config does not match existing checkpoint config

Cause: Checkpoint schema changed after checkpoints were created

✅ Fix: Implement schema versioning and migration

from typing import Optional class VersionedCheckpointSaver(RedisCheckpointSaver): """Checkpoint saver with schema versioning support.""" SCHEMA_VERSION = 2 # Increment when schema changes def put(self, config: dict, state: dict, metadata: dict = None) -> dict: # Add version info to state state_with_version = { **state, "_schema_version": self.SCHEMA_VERSION, "_saved_at": datetime.utcnow().isoformat() } return super().put(config, state_with_version, metadata) def get(self, config: dict) -> Optional[dict]: state = super().get(config) if state is None: return None # Handle version migration schema_version = state.get("_schema_version", 1) if schema_version < self.SCHEMA_VERSION: state = self._migrate_state(state, schema_version) return state def _migrate_state(self, state: dict, from_version: int) -> dict: """Migrate state from older schema versions.""" if from_version < 2: # Migration: v1 -> v2 # Example: renamed field if "userId" in state: state["user_id"] = state.pop("userId") # Add new fields with defaults if "extracted_entities" not in state: state["extracted_entities"] = {} # Add future migrations here # if from_version < 3: # state = self._migrate_v2_to_v3(state) state["_schema_version"] = self.SCHEMA_VERSION state["_migrated_from"] = from_version return state print("Versioned checkpoint saver ready for schema migrations")

Error 4: Thread ID Not Found on Recovery

# ❌ Error: KeyError: No checkpoint found for thread_id

Cause: Attempting to recover a thread that doesn't exist or expired

✅ Fix: Implement graceful fallback with clear user messaging

class ThreadRecoveryHelper: """Helper for safely recovering conversation threads.""" def __init__(self, checkpoint_saver: RedisCheckpointSaver): self.checkpoint_saver = checkpoint_saver def safe_recover(self, thread_id: str) -> tuple[Optional[dict], str]: """ Safely attempt thread recovery with clear status messages. Returns (state, message) tuple. """ config = {"configurable": {"thread_id": thread_id}} try: state = self.checkpoint_saver.get(config) if state is None: # Try to get any checkpoint with same user checkpoints = self.checkpoint_saver.list(config, limit=5) if checkpoints: latest = checkpoints[0]["checkpoint_id"] state = self.checkpoint_saver.get( {"configurable": {"thread_id": thread_id, "checkpoint_id": latest}} ) if state: return state, "Found recent checkpoint" if state: return state, "Conversation recovered successfully" return None, "No previous conversation found. Starting fresh." except Exception as e: print(f"Recovery error: {e}") return None, "Could not recover conversation. Starting fresh." def create_fresh_session(self, user_id: str) -> dict: """Create a fresh session state.""" import uuid return { "messages": [], "user_id": user_id, "session_id": str(uuid.uuid4()), "conversation_stage": "initialized", "extracted_entities": {}, "inter