LangGraph的Checkpointer(检查点机制)?
在 LangGraph 中,Checkpointer(检查点机制) 是其最核心、最强大的特性之一。简单来说,Checkpointer 就是 LangGraph 的记忆与持久化系统。
默认情况下,LangGraph 的图(Graph)是无状态的(Stateless),每次运行结束状态就会丢失。而引入 Checkpointer 后,图就变成了有状态的(Stateful),它会在图执行的每一步(每一个超步 Super-step)自动保存当前的状态(State)。
以下是关于 LangGraph Checkpointer 的深度解析:
1. Checkpointer 的四大核心功能
引入 Checkpointer 后,你的应用将立刻获得以下能力:
- 🧠 持续记忆 (Memory): 针对多轮对话,它可以记住用户之前的聊天记录。不同的用户/会话可以通过不同的
thread_id隔离开来。 - ⏸️ 人类介入 (Human-in-the-Loop, HITL): 你可以设置在某个特定的节点前暂停(Interrupt)。图会把状态保存下来,等待人类审批、修改数据或补充输入后,再恢复执行。
- ⏪ 时间旅行 (Time Travel): 因为它保存了每一步的历史快照,你可以随时查询过去的某个状态,甚至“让时光倒流”,退回到之前的某个节点,修改当时的输入,然后让图重新产生不同的分支运行。
- 🛡️ 容错与断点续传 (Fault Tolerance): 如果你的工作流在运行中崩溃(比如 API 超时、服务器重启),你可以直接从崩溃前的最后一个 Checkpoint 恢复执行,而不需要从头再来。
2. 核心概念:thread_id(线程 ID)
要使用 Checkpointer,你必须在运行图(invoke 或 stream)时传入一个配置项:thread_id。
- Thread (线程) 代表一次独立的执行过程(比如一个用户的某次完整对话)。
- Checkpointer 会根据
thread_id来存取对应的状态。
config = {"configurable": {"thread_id": "user_123_session_1"}}
3. 常用的 Checkpointer 实现
LangGraph 提供了多种存储后端的 Checkpointer,以适应不同的场景:
MemorySaver: 内存级别的检查点。适用于开发、测试和简单的单机脚本。(包含在langgraph主包中)- SQLite: 适用于本地文件系统的持久化。(需要安装
langgraph-checkpoint-sqlite) - Postgres: 生产环境首选,支持高并发和分布式持久化。(需要安装
langgraph-checkpoint-postgres)
4. 代码示例:如何使用 Checkpointer
下面是一个最简单的示例,展示如何使用 MemorySaver 让一个图拥有记忆。
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
# 1. 定义状态 (State)
class State(TypedDict):
# add_messages 会将新消息追加到列表中,而不是覆盖
messages: Annotated[list, add_messages]
# 2. 定义节点函数
def chatbot(state: State):
user_message = state["messages"][-1]
# 模拟 LLM 回复
return {"messages": [f"AI 说:我已经收到了你的消息 '{user_message}'"]}
# 3. 构建图
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)
# 4. 实例化 Checkpointer
memory = MemorySaver()
# 5. 编译图时传入 Checkpointer !!!
graph = graph_builder.compile(checkpointer=memory)
# ==================== 测试运行 ====================
# 配置 thread_id
config = {"configurable": {"thread_id": "thread_1"}}
# 第一轮对话
print("--- 第一轮 ---")
graph.invoke({"messages": ["你好!我是张三。"]}, config=config)
# 第二轮对话:系统会记住我是张三
print("--- 第二轮 ---")
result = graph.invoke({"messages": ["你还记得我叫什么吗?"]}, config=config)
for msg in result["messages"]:
print(msg)
运行逻辑: 在第二轮对话 invoke 时,LangGraph 会先去 memory 里通过 thread_1 取出第一轮的 messages 历史,将新的输入追加进去,然后再传给 chatbot 节点。
5. 高级操作:时间旅行与状态修改
一旦有了 Checkpointer,你就可以对图的状态进行读写操作:
获取状态历史 (get_state & get_state_history)
config = {"configurable": {"thread_id": "thread_1"}}
# 获取当前最新状态
current_state = graph.get_state(config)
print(current_state.values) # 打印当前的状态字典
print(current_state.next) # 打印下一步要执行的节点
# 获取历史轨迹
history = graph.get_state_history(config)
for snapshot in history:
print(snapshot)
人类介入:修改状态 (update_state)
如果 AI 在某一步生成了错误的内容被拦截了,人类可以手动覆写(Update)状态,然后让图继续跑。
# 假设我们在某个节点中断了
# 手动更新状态,伪造或修改数据
graph.update_state(
config,
{"messages": ["这是人类修改后的消息"]} # 要更新的状态数据
)
# 恢复执行 (传入 None 即可,它会接着 Checkpoint 继续跑)
graph.invoke(None, config=config)
总结
LangGraph 的 Checkpointer 机制是将大模型应用从单次调用(Stateless RPC)转变为长时间运行的代理工作流(Long-running Agentic Workflow)的关键桥梁。不管是做聊天机器人、多步审批流,还是复杂的自愈合(Self-healing)代码生成助手,Checkpointer 都是必不可少的基础设施。