基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

如何用 LangGraph 实现一个基于“反思与自我修正(Reflection & Self-Correction)”的循环工作流?

知识点图片

在 LangGraph 中实现“反思与自我修正(Reflection & Self-Correction)”工作流是非常经典且强大的应用场景。这种模式通常用于提高 LLM 输出的质量,例如:写文章、编写代码或解答复杂问题。

它的核心逻辑是一个循环(Loop)

  1. 生成(Generate):LLM 根据需求生成初稿。
  2. 反思(Reflect/Critique):另一个(或同一个)LLM 扮演批评家,审查初稿并给出修改建议,同时判断是否达标。
  3. 条件路由(Conditional Edge)
    • 如果达标(或达到最大迭代次数),结束(END)。
    • 如果不达标,带着修改建议回到“生成”节点,重新修改。

下面我将带你一步步用完整的 Python 代码实现这个工作流。

1. 环境准备

你需要安装相关的包。这里我们使用 OpenAI 作为基础大模型,并利用 Pydantic 来强制 LLM 输出结构化数据(以便程序判断是否结束循环)。

bash
pip install langgraph langchain langchain-openai pydantic

设置环境变量:

bash
export OPENAI_API_KEY="your-api-key"

2. 定义状态 (State)

在 LangGraph 中,一切数据通过 State 传递。我们需要记录当前的任务、草稿、批评意见、是否通过以及迭代次数(防止死循环)。

python
from typing import TypedDict
from pydantic import BaseModel, Field

# 1. 定义 Graph 的状态
class GraphState(TypedDict):
    task: str              # 用户的初始任务
    draft: str             # 当前生成的草稿
    critique: str          # 反思节点给出的修改建议
    is_acceptable: bool    # 是否满足要求
    revision_number: int   # 当前迭代次数
    max_revisions: int     # 最大允许迭代次数

# 2. 定义反思节点输出的结构化数据格式
class ReflectionOutput(BaseModel):
    feedback: str = Field(description="对草稿的详细修改建议和批评")
    is_acceptable: bool = Field(description="草稿是否已经完美满足要求,如果完美返回 True,否则返回 False")

3. 定义节点 (Nodes)

我们需要两个节点:generate_node(生成/修改草稿)和 reflect_node(审查草稿)。

python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

# 初始化 LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)

def generate_node(state: GraphState) -> dict:
    """生成初稿或根据建议修改草稿"""
    task = state["task"]
    critique = state.get("critique", "")
    revision_number = state.get("revision_number", 0)
    
    # 动态构建 Prompt:如果有 critique,则是修改;否则是初稿
    if not critique:
        system_prompt = "你是一个优秀的作家。请根据用户的任务要求撰写一篇文章。"
        user_prompt = f"任务:{task}"
    else:
        system_prompt = "你是一个优秀的作家。请根据审查意见(Critique)修改并完善你的草稿。"
        user_prompt = f"任务:{task}\n\n之前的草稿:\n{state['draft']}\n\n审查意见:\n{critique}\n\n请输出修改后的最新版本。"

    prompt = ChatPromptTemplate.from_messages([
        ("system", system_prompt),
        ("user", user_prompt)
    ])
    
    chain = prompt | llm
    response = chain.invoke({})
    
    # 返回更新的状态
    return {
        "draft": response.content,
        "revision_number": revision_number + 1
    }

def reflect_node(state: GraphState) -> dict:
    """反思并评价当前草稿"""
    task = state["task"]
    draft = state["draft"]
    
    system_prompt = (
        "你是一个严格的资深编辑。请审查作者的草稿是否满足任务要求。\n"
        "如果内容单薄、有逻辑漏洞或不符合要求,请给出具体的修改建议,并将 is_acceptable 设为 False。\n"
        "如果草稿已经非常出色、完全满足要求,请将 is_acceptable 设为 True,feedback 可以写'无需修改'。"
    )
    user_prompt = f"原始任务:{task}\n\n当前草稿:\n{draft}"

    prompt = ChatPromptTemplate.from_messages([
        ("system", system_prompt),
        ("user", user_prompt)
    ])
    
    # 使用 with_structured_output 强制 LLM 返回 JSON 格式(匹配 ReflectionOutput 类)
    evaluator_llm = llm.with_structured_output(ReflectionOutput)
    chain = prompt | evaluator_llm
    
    result: ReflectionOutput = chain.invoke({})
    
    # 返回更新的状态
    return {
        "critique": result.feedback,
        "is_acceptable": result.is_acceptable
    }

4. 定义条件路由 (Conditional Edge)

用来决定是继续修改,还是结束流程。

python
from langgraph.graph import END

def should_continue(state: GraphState) -> str:
    """判断工作流走向"""
    # 如果达标,结束
    if state["is_acceptable"]:
        print("---审查通过,结束工作流---")
        return END
    
    # 如果达到最大迭代次数,强制结束
    if state["revision_number"] >= state["max_revisions"]:
        print(f"---达到最大迭代次数 {state['max_revisions']},强制结束---")
        return END
    
    # 否则,返回 generate 节点继续修改
    print("---审查未通过,返回重新生成---")
    return "generate"

5. 构建与编译 LangGraph 工作流

将上面的节点和边组装起来。

python
from langgraph.graph import StateGraph

# 初始化 Graph
workflow = StateGraph(GraphState)

# 添加节点
workflow.add_node("generate", generate_node)
workflow.add_node("reflect", reflect_node)

# 设置入口点
workflow.set_entry_point("generate")

# 添加普通边:生成完成后,总是进入反思节点
workflow.add_edge("generate", "reflect")

# 添加条件边:反思完成后,根据 should_continue 的结果决定去向
workflow.add_conditional_edges(
    "reflect",
    should_continue,
    {
        "generate": "generate",
        END: END
    }
)

# 编译 Graph
app = workflow.compile()

6. 运行测试

让我们给它一个稍微复杂的任务,看它如何自我修正。

python
def run_workflow():
    initial_state = {
        "task": "写一段关于人工智能对未来教育影响的短文,要求:1. 至少包含正反两面观点。2. 结尾必须有一句拉丁语名言。3. 字数在150字左右。",
        "draft": "",
        "critique": "",
        "is_acceptable": False,
        "revision_number": 0,
        "max_revisions": 3 # 最多修改 3 次
    }

    # 运行图并打印中间过程
    for output in app.stream(initial_state):
        for key, value in output.items():
            print(f"\n[{key.upper()} NODE 执行完毕]")
            if key == "generate":
                print(f"当前版本 (Revision {value['revision_number']}):\n{value['draft'][:100]}...")
            elif key == "reflect":
                print(f"审查意见:\n{value['critique']}")
                print(f"是否接受: {value['is_acceptable']}")
            print("-" * 50)

    # 打印最终结果
    final_state = app.get_state(config={"configurable": {"thread_id": "1"}}).values # 获取最终状态
    print("\n\n======== 最终文章 ========\n")
    # 如果使用 app.stream,最终的状态可以从最后一次输出中拿到
    # 简单起见,这里直接打印刚刚 stream 结束后的变量
    print(value.get("draft", "无结果"))

if __name__ == "__main__":
    run_workflow()

💡 核心实现要点解析:

  1. 结构化输出 (with_structured_output): 这是反思模式的关键。如果不强制 LLM 输出布尔值 (is_acceptable),条件路由函数将很难解析长文本中的意图。
  2. 死循环保护 (max_revisions): LLM 有时会陷入“无限吹毛求疵”的状态,或者由于能力限制永远改不对。在状态中加入迭代计数器是生产环境的必做项
  3. 状态累加 vs 覆盖: 在上面的例子中,GraphState 的默认行为是覆盖(Overwrite)。每次进入 generate_node 时,draft 都会被新草稿覆盖;每次进入 reflect_nodecritique 都会被新意见覆盖。这种机制保持了上下文窗口的精简。如果需要保存历史轨迹,可以使用 operator.add 来定义 TypedDict 中的字段,将其变为列表。
00:00
00:00