基于本文回答

播面 播面

刷题像听歌,多听自然懂
0
评论

在 LangGraph 中,如何实现任务的并行执行(Parallel Branching / Fan-out),并在后续节点汇总结果(Fan-in)?

知识点图片

在 LangGraph 中实现任务的并行执行(Fan-out)和结果汇总(Fan-in)非常直观。根据你的需求,主要有两种实现方式:

  1. 静态并行(Static Parallelism):分支数量和节点是固定的(例如:同时调用“分析情感”节点和“提取实体”节点,然后汇总)。
  2. 动态并行(Dynamic Parallelism / Map-Reduce):分支数量根据输入数据动态决定(例如:输入包含 NN 篇文章,并行启动 NN 个节点处理,最后汇总)。

下面我将分别用代码示例详细讲解这两种场景的实现方法。


场景一:静态并行(固定分支 Fan-out / Fan-in)

在 LangGraph 中,如果你将同一个源节点连接到多个目标节点,LangGraph 会自动并行执行这些目标节点。 同样,如果多个节点都指向同一个下游节点,下游节点会等待它们全部执行完毕后再触发(隐式的 Barrier/Wait)。

关键点: 状态(State)设计时,并行节点写入的字段必须相互独立,或者使用 Reducer(如 operator.add),以防止结果被互相覆盖。

代码示例:

python
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

# 1. 定义 State
# 注意:并行节点分别更新不同的字段(joke 和 poem),避免状态覆盖
class State(TypedDict):
    topic: str
    joke: str
    poem: str
    final_result: str

# 2. 定义节点函数
def node_start(state: State):
    print("-> 启动节点:接收到主题")
    return {"topic": state["topic"]}

def generate_joke(state: State):
    print("   [并行] 正在生成笑话...")
    joke = f"关于 {state['topic']} 的笑话:Why did the {state['topic']} cross the road?"
    return {"joke": joke}

def generate_poem(state: State):
    print("   [并行] 正在生成诗歌...")
    poem = f"关于 {state['topic']} 的诗歌:Roses are red, {state['topic']} is blue."
    return {"poem": poem}

def aggregator_node(state: State):
    print("-> 汇总节点:合并结果")
    # Fan-in: 在这里可以同时访问 joke 和 poem
    combined = f"【笑话】\n{state['joke']}\n\n【诗歌】\n{state['poem']}"
    return {"final_result": combined}

# 3. 构建图
builder = StateGraph(State)

builder.add_node("start", node_start)
builder.add_node("joke_node", generate_joke)
builder.add_node("poem_node", generate_poem)
builder.add_node("aggregator", aggregator_node)

# Fan-out: start 节点同时指向 joke_node 和 poem_node
builder.add_edge(START, "start")
builder.add_edge("start", "joke_node")
builder.add_edge("start", "poem_node")

# Fan-in: joke_node 和 poem_node 都指向 aggregator
builder.add_edge("joke_node", "aggregator")
builder.add_edge("poem_node", "aggregator")
builder.add_edge("aggregator", END)

graph = builder.compile()

# 运行测试
result = graph.invoke({"topic": "AI"})
print("\n--- 最终输出 ---")
print(result["final_result"])

场景二:动态并行(Map-Reduce / 使用 Send API)

如果你需要根据列表动态生成并行的任务(例如处理未知数量的文档),你需要使用 LangGraph 的 Send API

关键点:

  1. 必须在 State 中使用带有 Reducer 的字段(如 Annotated[list, operator.add])来收集多个并行节点返回的结果。
  2. 使用 add_conditional_edges 结合 Send 对象来动态派发任务。

代码示例:

python
import operator
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.constants import Send

# 1. 定义 State
class State(TypedDict):
    subjects: list[str]
    # 必须使用 operator.add 作为 reducer,否则并行节点的结果会互相覆盖
    processed_results: Annotated[list[str], operator.add]
    final_report: str

# 为动态生成的子节点定义独立的 State
class WorkerState(TypedDict):
    subject: str

# 2. 定义节点函数
def map_dispatcher(state: State):
    print(f"-> 分发器:收到 {len(state['subjects'])} 个任务")
    # 这个节点本身不更新状态,任务派发通过 conditional_edge 完成
    return {}

def process_worker(state: WorkerState):
    # 处理单个 subject
    subject = state["subject"]
    print(f"   [并行 Worker] 正在处理: {subject}")
    result = f"[{subject} 的处理结果]"
    # 注意:返回的 key 必须对应主 State 中的带有 reducer 的字段
    return {"processed_results": [result]} 

def reduce_aggregator(state: State):
    print("-> 汇总节点:生成最终报告")
    # Fan-in: 所有 worker 的结果都被收集在 processed_results 列表中
    all_results = "\n".join(state["processed_results"])
    return {"final_report": f"总报告包含以下内容:\n{all_results}"}

# 3. 动态路由逻辑
def continue_to_workers(state: State):
    # 为 subjects 列表中的每一个元素,发送一个独立的状态给 "worker" 节点
    return [Send("worker", {"subject": s}) for s in state["subjects"]]

# 4. 构建图
builder = StateGraph(State)

builder.add_node("dispatcher", map_dispatcher)
builder.add_node("worker", process_worker)
builder.add_node("aggregator", reduce_aggregator)

builder.add_edge(START, "dispatcher")

# Fan-out: 使用 conditional_edges 和 Send API 动态派发
builder.add_conditional_edges(
    "dispatcher",
    continue_to_workers,
    ["worker"] # 声明可能指向的节点
)

# Fan-in: 所有 worker 节点执行完毕后,流入 aggregator
builder.add_edge("worker", "aggregator")
builder.add_edge("aggregator", END)

graph = builder.compile()

# 运行测试
result = graph.invoke({"subjects": ["苹果", "香蕉", "橙子"]})
print("\n--- 最终输出 ---")
print(result["final_report"])

总结与排错指南

  1. 自动等待:在 LangGraph 中进行 Fan-in 时,汇聚节点(如上述的 aggregator)会自动等待其所有上游节点(如 jokepoem,或所有的 worker)全部执行完成(成功返回状态)后,才会开始执行。无需你手动写锁或异步等待机制。
  2. 状态覆盖陷阱:在并行执行时,如果多个节点返回同一个不带 Reducer 的键值(例如两个并行节点都返回 {"text": "..."}),会导致后面的覆盖前面的,引发竞态条件。解决办法:要么像场景一那样写入不同的 Key,要么像场景二那样使用 Annotated[list, operator.add]
  3. 异步支持:LangGraph 的并行在底层使用 asyncio。为了最大化性能,如果你的节点函数包含网络请求(如调用大模型),建议使用 async def 定义节点,并使用 await graph.ainvoke(...) 运行。即使你使用普通的同步函数 def,LangGraph 也会使用多线程在后台并发执行它们。
00:00
00:00