AIアプリケーション開発の現場において、セッション跨ぎの会話コンテキスト管理は避けて通れない課題です。本稿では、LangGraphを活用した状態管理の設計パターンから、HolySheep AIとの統合による本番環境适用的実装まで、の実運用経験を交えながら詳細に解説します。
なぜLangGraphの状態管理が重要なのか
私自身、ECサイトのAIカスタマーサービスボットを開発した際、顧客がページを離脱して戻ってきた際に「最初からやり直し」になる的痛苦を経験しました。LangGraphのPersistent State Managementは、この問題を解決する強力な手段です。
主要ユースケース
- EC AIカスタマーサービスの急増:カート放棄防止のため、セッション中の商品閲覧履歴と照合したレコメンデーション
- 企業RAGシステムの立ち上げ:長いドキュメント解析における、中間過程の保存と再開
- 個人開発者のプロジェクト:Discord/Slackボットでのマルチターンダーケティング
LangGraph StateGraphの基本アーキテクチャ
LangGraphにおける状態管理は、StateGraphクラスとcheckpointerメカニズム为核心とします。以下に私のプロジェクトで実際に動作確認した実装を示します。
import os
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated
from operator import add
HolySheep API設定
BASE_URL = "https://api.holysheep.ai/v1"
API_KEY = os.environ.get("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY")
class ConversationState(TypedDict):
"""会話の状態を管理するスキーマ"""
messages: list[dict]
user_id: str
session_id: str
context: dict
history_summary: str
def create_conversation_graph():
"""会話グラフの構築"""
# メモリチェックポインター(本番ではRedis等を使用)
checkpointer = MemorySaver()
def should_continue(state: ConversationState) -> str:
"""継続判定ロジック"""
if len(state["messages"]) >= 10:
return "summarize"
return END
def chat_node(state: ConversationState) -> ConversationState:
"""AI応答生成ノード"""
import httpx
last_message = state["messages"][-1]["content"]
response = httpx.post(
f"{BASE_URL}/chat/completions",
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
},
json={
"model": "gpt-4.1",
"messages": [
{"role": "system", "content": "あなたは有帮助なAIアシスタントです。"},
*state["messages"]
],
"temperature": 0.7
},
timeout=30.0
)
ai_response = response.json()["choices"][0]["message"]
return {
**state,
"messages": state["messages"] + [ai_response]
}
def summarize_node(state: ConversationState) -> ConversationState:
"""コンテキスト要約ノード(トークン節約)"""
import httpx
recent_messages = state["messages"][-10:]
response = httpx.post(
f"{BASE_URL}/chat/completions",
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
},
json={
"model": "gpt-4.1",
"messages": [
{"role": "system", "content": "この会話の要点を3文で要約してください。"},
*[{"role": m["role"], "content": m["content"]} for m in recent_messages]
]
},
timeout=30.0
)
summary = response.json()["choices"][0]["message"]["content"]
return {
**state,
"history_summary": summary,
"messages": [{"role": "system", "content": f"[要約] {summary}"}]
}
# グラフ構築
workflow = StateGraph(ConversationState)
workflow.add_node("chat", chat_node)
workflow.add_node("summarize", summarize_node)
workflow.set_entry_point("chat")
workflow.add_conditional_edges("chat", should_continue)
workflow.add_edge("summarize", END)
return workflow.compile(checkpointer=checkpointer)
使用例
graph = create_conversation_graph()
config = {"configurable": {"thread_id": "user_123_session_abc"}}
最初のメッセージ
result = graph.invoke(
{"messages": [{"role": "user", "content": "おすすめ商品を教えて"}],
"user_id": "user_123", "session_id": "session_abc", "context": {}, "history_summary": ""},
config=config
)
Redisベースの永続化チェックポインター実装
本番環境では、メモリの代わりにRedisを使用することで、サーバー再起動後も状態が維持されます。私の本番環境ではHolySheep APIの<50msレイテンシと組み合わせ、応答速度を犠牲にせずに永続化を実現しています。
import os
import json
import redis
from typing import TypedDict
from langgraph.checkpoint.base import BaseCheckpointSaver
class RedisCheckpointSaver(BaseCheckpointSaver):
"""Redis対応チェックポインター"""
def __init__(self, redis_url: str = None):
self.redis_client = redis.from_url(
redis_url or os.environ.get("REDIS_URL", "redis://localhost:6379")
)
self.ttl = 86400 * 7 # 7日間保持
def put(self, config: dict, checkpoint: dict, metadata: dict = None):
"""チェックポイント保存"""
thread_id = config["configurable"]["thread_id"]
key = f"langgraph:checkpoint:{thread_id}"
data = {
"checkpoint": checkpoint,
"metadata": metadata or {},
"saved_at": str(time.time())
}
self.redis_client.setex(key, self.ttl, json.dumps(data))
# 会話履歴も保持(分析用)
history_key = f"langgraph:history:{thread_id}"
self.redis_client.lpush(history_key, json.dumps(data))
self.redis_client.expire(history_key, self.ttl)
def get(self, config: dict):
"""チェックポイント取得"""
thread_id = config["configurable"]["thread_id"]
key = f"langgraph:checkpoint:{thread_id}"
data = self.redis_client.get(key)
if data:
return json.loads(data)
return None
def list(self, config: dict = None, limit: int = 10):
"""スレッド一覧取得"""
pattern = "langgraph:checkpoint:*"
keys = self.redis_client.keys(pattern)[:limit]
return [
{"configurable": {"thread_id": k.decode().split(":")[-1]}}
for k in keys
]
def restore_conversation(thread_id: str, redis_saver: RedisCheckpointSaver) -> dict:
"""会話復元API"""
config = {"configurable": {"thread_id": thread_id}}
checkpoint = redis_saver.get(config)
if checkpoint:
return {
"status": "restored",
"thread_id": thread_id,
"checkpoint": checkpoint["checkpoint"],
"saved