如何用 LangGraph 实现一个基于“反思与自我修正(Reflection & Self-Correction)”的循环工作流?
在 LangGraph 中实现“反思与自我修正(Reflection & Self-Correction)”工作流是非常经典且强大的应用场景。这种模式通常用于提高 LLM 输出的质量,例如:写文章、编写代码或解答复杂问题。
它的核心逻辑是一个循环(Loop):
- 生成(Generate):LLM 根据需求生成初稿。
- 反思(Reflect/Critique):另一个(或同一个)LLM 扮演批评家,审查初稿并给出修改建议,同时判断是否达标。
- 条件路由(Conditional Edge):
- 如果达标(或达到最大迭代次数),结束(END)。
- 如果不达标,带着修改建议回到“生成”节点,重新修改。
下面我将带你一步步用完整的 Python 代码实现这个工作流。
1. 环境准备
你需要安装相关的包。这里我们使用 OpenAI 作为基础大模型,并利用 Pydantic 来强制 LLM 输出结构化数据(以便程序判断是否结束循环)。
bash
pip install langgraph langchain langchain-openai pydantic
设置环境变量:
bash
export OPENAI_API_KEY="your-api-key"
2. 定义状态 (State)
在 LangGraph 中,一切数据通过 State 传递。我们需要记录当前的任务、草稿、批评意见、是否通过以及迭代次数(防止死循环)。
python
from typing import TypedDict
from pydantic import BaseModel, Field
# 1. 定义 Graph 的状态
class GraphState(TypedDict):
task: str # 用户的初始任务
draft: str # 当前生成的草稿
critique: str # 反思节点给出的修改建议
is_acceptable: bool # 是否满足要求
revision_number: int # 当前迭代次数
max_revisions: int # 最大允许迭代次数
# 2. 定义反思节点输出的结构化数据格式
class ReflectionOutput(BaseModel):
feedback: str = Field(description="对草稿的详细修改建议和批评")
is_acceptable: bool = Field(description="草稿是否已经完美满足要求,如果完美返回 True,否则返回 False")
3. 定义节点 (Nodes)
我们需要两个节点:generate_node(生成/修改草稿)和 reflect_node(审查草稿)。
python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
# 初始化 LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
def generate_node(state: GraphState) -> dict:
"""生成初稿或根据建议修改草稿"""
task = state["task"]
critique = state.get("critique", "")
revision_number = state.get("revision_number", 0)
# 动态构建 Prompt:如果有 critique,则是修改;否则是初稿
if not critique:
system_prompt = "你是一个优秀的作家。请根据用户的任务要求撰写一篇文章。"
user_prompt = f"任务:{task}"
else:
system_prompt = "你是一个优秀的作家。请根据审查意见(Critique)修改并完善你的草稿。"
user_prompt = f"任务:{task}\n\n之前的草稿:\n{state['draft']}\n\n审查意见:\n{critique}\n\n请输出修改后的最新版本。"
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("user", user_prompt)
])
chain = prompt | llm
response = chain.invoke({})
# 返回更新的状态
return {
"draft": response.content,
"revision_number": revision_number + 1
}
def reflect_node(state: GraphState) -> dict:
"""反思并评价当前草稿"""
task = state["task"]
draft = state["draft"]
system_prompt = (
"你是一个严格的资深编辑。请审查作者的草稿是否满足任务要求。\n"
"如果内容单薄、有逻辑漏洞或不符合要求,请给出具体的修改建议,并将 is_acceptable 设为 False。\n"
"如果草稿已经非常出色、完全满足要求,请将 is_acceptable 设为 True,feedback 可以写'无需修改'。"
)
user_prompt = f"原始任务:{task}\n\n当前草稿:\n{draft}"
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("user", user_prompt)
])
# 使用 with_structured_output 强制 LLM 返回 JSON 格式(匹配 ReflectionOutput 类)
evaluator_llm = llm.with_structured_output(ReflectionOutput)
chain = prompt | evaluator_llm
result: ReflectionOutput = chain.invoke({})
# 返回更新的状态
return {
"critique": result.feedback,
"is_acceptable": result.is_acceptable
}
4. 定义条件路由 (Conditional Edge)
用来决定是继续修改,还是结束流程。
python
from langgraph.graph import END
def should_continue(state: GraphState) -> str:
"""判断工作流走向"""
# 如果达标,结束
if state["is_acceptable"]:
print("---审查通过,结束工作流---")
return END
# 如果达到最大迭代次数,强制结束
if state["revision_number"] >= state["max_revisions"]:
print(f"---达到最大迭代次数 {state['max_revisions']},强制结束---")
return END
# 否则,返回 generate 节点继续修改
print("---审查未通过,返回重新生成---")
return "generate"
5. 构建与编译 LangGraph 工作流
将上面的节点和边组装起来。
python
from langgraph.graph import StateGraph
# 初始化 Graph
workflow = StateGraph(GraphState)
# 添加节点
workflow.add_node("generate", generate_node)
workflow.add_node("reflect", reflect_node)
# 设置入口点
workflow.set_entry_point("generate")
# 添加普通边:生成完成后,总是进入反思节点
workflow.add_edge("generate", "reflect")
# 添加条件边:反思完成后,根据 should_continue 的结果决定去向
workflow.add_conditional_edges(
"reflect",
should_continue,
{
"generate": "generate",
END: END
}
)
# 编译 Graph
app = workflow.compile()
6. 运行测试
让我们给它一个稍微复杂的任务,看它如何自我修正。
python
def run_workflow():
initial_state = {
"task": "写一段关于人工智能对未来教育影响的短文,要求:1. 至少包含正反两面观点。2. 结尾必须有一句拉丁语名言。3. 字数在150字左右。",
"draft": "",
"critique": "",
"is_acceptable": False,
"revision_number": 0,
"max_revisions": 3 # 最多修改 3 次
}
# 运行图并打印中间过程
for output in app.stream(initial_state):
for key, value in output.items():
print(f"\n[{key.upper()} NODE 执行完毕]")
if key == "generate":
print(f"当前版本 (Revision {value['revision_number']}):\n{value['draft'][:100]}...")
elif key == "reflect":
print(f"审查意见:\n{value['critique']}")
print(f"是否接受: {value['is_acceptable']}")
print("-" * 50)
# 打印最终结果
final_state = app.get_state(config={"configurable": {"thread_id": "1"}}).values # 获取最终状态
print("\n\n======== 最终文章 ========\n")
# 如果使用 app.stream,最终的状态可以从最后一次输出中拿到
# 简单起见,这里直接打印刚刚 stream 结束后的变量
print(value.get("draft", "无结果"))
if __name__ == "__main__":
run_workflow()
💡 核心实现要点解析:
- 结构化输出 (
with_structured_output): 这是反思模式的关键。如果不强制 LLM 输出布尔值 (is_acceptable),条件路由函数将很难解析长文本中的意图。 - 死循环保护 (
max_revisions): LLM 有时会陷入“无限吹毛求疵”的状态,或者由于能力限制永远改不对。在状态中加入迭代计数器是生产环境的必做项。 - 状态累加 vs 覆盖: 在上面的例子中,
GraphState的默认行为是覆盖(Overwrite)。每次进入generate_node时,draft都会被新草稿覆盖;每次进入reflect_node,critique都会被新意见覆盖。这种机制保持了上下文窗口的精简。如果需要保存历史轨迹,可以使用operator.add来定义TypedDict中的字段,将其变为列表。
右滑查看面试常问