LangGraph Send API 是如何工作的?
在 LangGraph 中,Send API 的核心作用是实现 动态并行执行(Dynamic Parallelism),也就是我们常说的 Map-Reduce 模式。
简单来说,它允许你根据运行时的状态,将同一个任务拆解成多份,同时发送给同一个节点进行并行处理。
以下是 Send API 工作原理的详细解析:
1. 为什么需要 Send API?
在普通的 LangGraph 条件边(add_conditional_edges)中,路由函数通常返回下一个节点的名称(字符串)。
例如:如果条件 A 成立,返回 "node_1",图就会走向 node_1。
痛点: 如果你的图生成了一个包含 5 个元素的列表,你想让 "process_item" 节点同时处理这 5 个元素,普通条件边做不到,因为它只能把整个列表作为一个整体传给下一个节点一次。
解决方案: Send 对象。你可以返回一个 Send 对象的列表:[Send("process_item", {"item": 1}), Send("process_item", {"item": 2}), ...]
LangGraph 会拦截这些 Send 对象,并为 "process_item" 节点动态创建 5 个并行的实例去分别处理这些数据。
2. Send API 的工作流程
Send 接收两个参数:Send(node_name: str, payload: dict)
node_name: 要执行的目标节点名称。payload: 传递给该节点的状态(State)。
工作步骤如下:
- 生成数据(Map): 图中的某个节点生成了一个列表(比如 3 个需要总结的文档)。
- 触发路由: 图进入条件边(Conditional Edge),该条件边的函数遍历这 3 个文档,返回 3 个
Send对象。 - 并行执行: LangGraph 接收到这 3 个
Send对象后,会同时启动 3 个目标节点(Worker)。每个节点接收对应的payload作为它的局部状态。 - 聚合结果(Reduce): 3 个节点并行执行完毕后,它们的返回结果会被汇总,并写入到全局状态(Global State)中。
3. 核心代码示例
下面是一个最简单的 Map-Reduce 示例:生成多个主题,并并行地为每个主题生成一段摘要。
import operator
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
# 1. 定义状态 (注意:并行聚合的字段必须使用 reducer,如 operator.add)
class State(TypedDict):
topics: list[str]
# summaries 必须用 operator.add,否则并行节点的结果会互相覆盖
summaries: Annotated[list[str], operator.add]
# 2. 节点:生成一组主题
def generate_topics(state: State):
# 模拟生成了 3 个主题
return {"topics": ["苹果", "香蕉", "樱桃"]}
# 3. 路由函数:将每个主题包装成一个 Send 对象
def map_topics(state: State):
# 返回一个 Send 列表,告诉 LangGraph 启动 3 个 "generate_summary" 节点
# payload 就是 {"topic": t}
return [Send("generate_summary", {"topic": t}) for t in state["topics"]]
# 4. 工作节点:处理单个主题 (它的输入是从 Send 传过来的局部状态)
class WorkerState(TypedDict):
topic: str
def generate_summary(state: WorkerState):
topic = state["topic"]
summary = f"这是关于 {topic} 的详细摘要。"
# 注意:返回的 key 必须匹配全局 State 中的聚合字段 summaries
return {"summaries": [summary]}
# 5. 构建图
builder = StateGraph(State)
builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_summary", generate_summary)
builder.add_edge(START, "generate_topics")
# 使用条件边来触发 Send
builder.add_conditional_edges("generate_topics", map_topics, ["generate_summary"])
builder.add_edge("generate_summary", END)
graph = builder.compile()
# 运行图
result = graph.invoke({"topics": [], "summaries": []})
print(result["summaries"])
# 输出: ['这是关于 苹果 的详细摘要。', '这是关于 香蕉 的详细摘要。', '这是关于 樱桃 的详细摘要。']
4. 使用 Send 的关键注意事项(避坑指南)
- 必须配合 Reducer 使用:
在上面的例子中,summaries字段使用了Annotated[list[str], operator.add]。这是强制要求的。因为多个generate_summary节点是并行运行的,它们都会尝试更新summaries字段。如果没有 reducer,最后完成的节点会覆盖其他节点的结果。使用了operator.add,它们的结果才会被合并成一个大列表。 - 只能在 Conditional Edges 中使用:
Send对象不能在普通节点(Node)的返回值中使用,只能作为add_conditional_edges绑定的路由函数的返回值。 - Payload 结构:
Send的第二个参数payload可以只包含目标节点需要的字段,不必传递整个完整的全局状态,这可以极大地节省内存和传输开销。
总结
LangGraph 的 Send API 就是为了解决 “我不知道会产生多少条数据,但我需要对产生的所有数据进行并发处理” 这个动态并发难题而设计的。它将传统的静态流程图转变成了具备强大 Map-Reduce 能力的动态图。