基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

如何将 LangGraph 的运行状态持久化到外部数据库(如 PostgreSQL/Sqlite/Redis)中?

知识点图片

在 LangGraph 中,状态的持久化是通过 Checkpointer(检查点保存器) 来实现的。当你将 Checkpointer 传递给 graph.compile(checkpointer=...) 时,LangGraph 会自动在图的每个节点执行完毕后保存状态。

LangGraph 官方提供了对 SQLitePostgreSQL 的原生支持包,而对于 Redis 等其他数据库,你可以通过继承基类来实现自定义持久化,或者使用社区提供的包。

以下是如何将状态持久化到这三种数据库的具体指南:


1. 核心概念:thread_id

无论使用哪种数据库,持久化状态的核心在于 config 参数中的 thread_id。LangGraph 通过 thread_id 来区分不同用户或不同会话的状态。

python
# 调用图时的配置示例
config = {"configurable": {"thread_id": "session_001"}}
graph.invoke(input_data, config=config)

2. 持久化到 SQLite (适合本地开发/轻量级应用)

官方提供了 langgraph-checkpoint-sqlite 包。

安装:

bash
pip install langgraph-checkpoint-sqlite

代码实现:

python
import sqlite3
from langgraph.graph import StateGraph
from langgraph.checkpoint.sqlite import SqliteSaver

# 1. 定义你的图 (假设你已经定义好了 builder)
# builder = StateGraph(State)
# ... 添加节点和边 ...

# 2. 连接 SQLite 数据库
conn = sqlite3.connect("langgraph_state.db", check_same_thread=False)

# 3. 初始化 Checkpointer
memory = SqliteSaver(conn)

# 注意:SqliteSaver 会自动在数据库中创建 checkpoints 等表
# 4. 编译图并传入 checkpointer
graph = builder.compile(checkpointer=memory)

# 5. 运行图
config = {"configurable": {"thread_id": "user_123"}}
result = graph.invoke({"messages": ["Hello!"]}, config=config)

# 此时,状态已经永久保存在 langgraph_state.db 中

3. 持久化到 PostgreSQL (适合生产环境)

官方提供了 langgraph-checkpoint-postgres 包,它底层使用了高效的 psycopg 连接池。支持同步和异步(AsyncPostgresSaver)。

安装:

bash
pip install langgraph-checkpoint-postgres psycopg psycopg-pool

代码实现 (同步版本):

python
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg_pool import ConnectionPool

DB_URI = "postgresql://username:password@localhost:5432/my_database"

# 1. 创建数据库连接池
pool = ConnectionPool(conninfo=DB_URI)

# 2. 初始化 Checkpointer
saver = PostgresSaver(pool)

# 3. 初始化数据库表 (首次运行或建表时调用)
saver.setup() 

# 4. 编译图
graph = builder.compile(checkpointer=saver)

# 5. 运行图
config = {"configurable": {"thread_id": "prod_session_999"}}
result = graph.invoke({"messages": ["Hello Postgres!"]}, config=config)

注:如果你使用异步图(graph.ainvoke),请使用 AsyncPostgresSaverAsyncConnectionPool


4. 持久化到 Redis (高并发/缓存场景)

目前 LangGraph 官方主推的关系型/文档型数据库扩展(Sqlite, Postgres, MongoDB),但因为 Redis 是纯 K-V 存储,你可以通过继承 BaseCheckpointSaver 轻松实现,或者使用社区方案。

如果你需要自己实现一个简单的 Redis Saver,思路如下:

安装依赖:

bash
pip install redis langgraph

自定义 RedisSaver 代码示例:

python
import pickle
import redis
from typing import Optional, RunnableConfig
from langgraph.checkpoint.base import BaseCheckpointSaver, Checkpoint, CheckpointMetadata

class RedisSaver(BaseCheckpointSaver):
    def __init__(self, redis_client: redis.Redis):
        super().__init__()
        self.redis = redis_client

    def put(self, config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata) -> RunnableConfig:
        thread_id = config["configurable"]["thread_id"]
        # 使用 pickle 序列化状态
        data = pickle.dumps(checkpoint)
        # 将状态保存到 Redis,key 为 thread_id
        self.redis.set(f"checkpoint:{thread_id}", data)
        return config

    def get_tuple(self, config: RunnableConfig) -> Optional[tuple]:
        thread_id = config["configurable"]["thread_id"]
        data = self.redis.get(f"checkpoint:{thread_id}")
        if data:
            checkpoint = pickle.loads(data)
            # 返回格式: (config, checkpoint, metadata)
            return config, checkpoint, {}
        return None

# 使用方法:
redis_client = redis.Redis(host='localhost', port=6379, db=0)
redis_saver = RedisSaver(redis_client)

graph = builder.compile(checkpointer=redis_saver)

(注意:上面是一个极简的 Redis 实现。如果需要完整的“时间旅行”或版本控制功能,需要将 thread_ts(时间戳)也作为 Redis Hash 的 key 保存起来。)


总结与进阶建议

  1. 官方推荐:生产环境首选 PostgreSQL (langgraph-checkpoint-postgres),因为它原生支持完整的 LangGraph 特性(包括并发处理、状态回退/时间旅行)。
  2. 状态恢复:只要你传入了相同的 thread_id,图在 invoke 时不会从头开始,而是自动读取数据库中该 thread_id 最后的状态继续往下执行。这非常适合开发 打断机制 (Human-in-the-loop)长时对话机器人
  3. 清理策略:如果使用数据库持久化,随着会话增多,数据表会越来越大。建议在生产环境中配置定期清理旧 thread_id 数据的定时任务。
00:00
00:00