在 LangGraph 中,如何实现任务的并行执行(Parallel Branching / Fan-out),并在后续节点汇总结果(Fan-in)?
在 LangGraph 中实现任务的并行执行(Fan-out)和结果汇总(Fan-in)非常直观。根据你的需求,主要有两种实现方式:
- 静态并行(Static Parallelism):分支数量和节点是固定的(例如:同时调用“分析情感”节点和“提取实体”节点,然后汇总)。
- 动态并行(Dynamic Parallelism / Map-Reduce):分支数量根据输入数据动态决定(例如:输入包含 篇文章,并行启动 个节点处理,最后汇总)。
下面我将分别用代码示例详细讲解这两种场景的实现方法。
场景一:静态并行(固定分支 Fan-out / Fan-in)
在 LangGraph 中,如果你将同一个源节点连接到多个目标节点,LangGraph 会自动并行执行这些目标节点。 同样,如果多个节点都指向同一个下游节点,下游节点会等待它们全部执行完毕后再触发(隐式的 Barrier/Wait)。
关键点: 状态(State)设计时,并行节点写入的字段必须相互独立,或者使用 Reducer(如 operator.add),以防止结果被互相覆盖。
代码示例:
python
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
# 1. 定义 State
# 注意:并行节点分别更新不同的字段(joke 和 poem),避免状态覆盖
class State(TypedDict):
topic: str
joke: str
poem: str
final_result: str
# 2. 定义节点函数
def node_start(state: State):
print("-> 启动节点:接收到主题")
return {"topic": state["topic"]}
def generate_joke(state: State):
print(" [并行] 正在生成笑话...")
joke = f"关于 {state['topic']} 的笑话:Why did the {state['topic']} cross the road?"
return {"joke": joke}
def generate_poem(state: State):
print(" [并行] 正在生成诗歌...")
poem = f"关于 {state['topic']} 的诗歌:Roses are red, {state['topic']} is blue."
return {"poem": poem}
def aggregator_node(state: State):
print("-> 汇总节点:合并结果")
# Fan-in: 在这里可以同时访问 joke 和 poem
combined = f"【笑话】\n{state['joke']}\n\n【诗歌】\n{state['poem']}"
return {"final_result": combined}
# 3. 构建图
builder = StateGraph(State)
builder.add_node("start", node_start)
builder.add_node("joke_node", generate_joke)
builder.add_node("poem_node", generate_poem)
builder.add_node("aggregator", aggregator_node)
# Fan-out: start 节点同时指向 joke_node 和 poem_node
builder.add_edge(START, "start")
builder.add_edge("start", "joke_node")
builder.add_edge("start", "poem_node")
# Fan-in: joke_node 和 poem_node 都指向 aggregator
builder.add_edge("joke_node", "aggregator")
builder.add_edge("poem_node", "aggregator")
builder.add_edge("aggregator", END)
graph = builder.compile()
# 运行测试
result = graph.invoke({"topic": "AI"})
print("\n--- 最终输出 ---")
print(result["final_result"])
场景二:动态并行(Map-Reduce / 使用 Send API)
如果你需要根据列表动态生成并行的任务(例如处理未知数量的文档),你需要使用 LangGraph 的 Send API。
关键点:
- 必须在 State 中使用带有
Reducer的字段(如Annotated[list, operator.add])来收集多个并行节点返回的结果。 - 使用
add_conditional_edges结合Send对象来动态派发任务。
代码示例:
python
import operator
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.constants import Send
# 1. 定义 State
class State(TypedDict):
subjects: list[str]
# 必须使用 operator.add 作为 reducer,否则并行节点的结果会互相覆盖
processed_results: Annotated[list[str], operator.add]
final_report: str
# 为动态生成的子节点定义独立的 State
class WorkerState(TypedDict):
subject: str
# 2. 定义节点函数
def map_dispatcher(state: State):
print(f"-> 分发器:收到 {len(state['subjects'])} 个任务")
# 这个节点本身不更新状态,任务派发通过 conditional_edge 完成
return {}
def process_worker(state: WorkerState):
# 处理单个 subject
subject = state["subject"]
print(f" [并行 Worker] 正在处理: {subject}")
result = f"[{subject} 的处理结果]"
# 注意:返回的 key 必须对应主 State 中的带有 reducer 的字段
return {"processed_results": [result]}
def reduce_aggregator(state: State):
print("-> 汇总节点:生成最终报告")
# Fan-in: 所有 worker 的结果都被收集在 processed_results 列表中
all_results = "\n".join(state["processed_results"])
return {"final_report": f"总报告包含以下内容:\n{all_results}"}
# 3. 动态路由逻辑
def continue_to_workers(state: State):
# 为 subjects 列表中的每一个元素,发送一个独立的状态给 "worker" 节点
return [Send("worker", {"subject": s}) for s in state["subjects"]]
# 4. 构建图
builder = StateGraph(State)
builder.add_node("dispatcher", map_dispatcher)
builder.add_node("worker", process_worker)
builder.add_node("aggregator", reduce_aggregator)
builder.add_edge(START, "dispatcher")
# Fan-out: 使用 conditional_edges 和 Send API 动态派发
builder.add_conditional_edges(
"dispatcher",
continue_to_workers,
["worker"] # 声明可能指向的节点
)
# Fan-in: 所有 worker 节点执行完毕后,流入 aggregator
builder.add_edge("worker", "aggregator")
builder.add_edge("aggregator", END)
graph = builder.compile()
# 运行测试
result = graph.invoke({"subjects": ["苹果", "香蕉", "橙子"]})
print("\n--- 最终输出 ---")
print(result["final_report"])
总结与排错指南
- 自动等待:在 LangGraph 中进行 Fan-in 时,汇聚节点(如上述的
aggregator)会自动等待其所有上游节点(如joke和poem,或所有的worker)全部执行完成(成功返回状态)后,才会开始执行。无需你手动写锁或异步等待机制。 - 状态覆盖陷阱:在并行执行时,如果多个节点返回同一个不带 Reducer 的键值(例如两个并行节点都返回
{"text": "..."}),会导致后面的覆盖前面的,引发竞态条件。解决办法:要么像场景一那样写入不同的 Key,要么像场景二那样使用Annotated[list, operator.add]。 - 异步支持:LangGraph 的并行在底层使用
asyncio。为了最大化性能,如果你的节点函数包含网络请求(如调用大模型),建议使用async def定义节点,并使用await graph.ainvoke(...)运行。即使你使用普通的同步函数def,LangGraph 也会使用多线程在后台并发执行它们。