基于本文回答

播面 播面

刷题像听歌,多听自然懂
0
评论

LangGraph Send API 是如何工作的?

知识点图片

在 LangGraph 中,Send API 的核心作用是实现 动态并行执行(Dynamic Parallelism),也就是我们常说的 Map-Reduce 模式。

简单来说,它允许你根据运行时的状态,将同一个任务拆解成多份,同时发送给同一个节点进行并行处理

以下是 Send API 工作原理的详细解析:

1. 为什么需要 Send API?

在普通的 LangGraph 条件边(add_conditional_edges)中,路由函数通常返回下一个节点的名称(字符串)。
例如:如果条件 A 成立,返回 "node_1",图就会走向 node_1

痛点: 如果你的图生成了一个包含 5 个元素的列表,你想让 "process_item" 节点同时处理这 5 个元素,普通条件边做不到,因为它只能把整个列表作为一个整体传给下一个节点一次。

解决方案: Send 对象。你可以返回一个 Send 对象的列表:
[Send("process_item", {"item": 1}), Send("process_item", {"item": 2}), ...]
LangGraph 会拦截这些 Send 对象,并为 "process_item" 节点动态创建 5 个并行的实例去分别处理这些数据。


2. Send API 的工作流程

Send 接收两个参数:Send(node_name: str, payload: dict)

  • node_name: 要执行的目标节点名称。
  • payload: 传递给该节点的状态(State)。

工作步骤如下:

  1. 生成数据(Map): 图中的某个节点生成了一个列表(比如 3 个需要总结的文档)。
  2. 触发路由: 图进入条件边(Conditional Edge),该条件边的函数遍历这 3 个文档,返回 3 个 Send 对象。
  3. 并行执行: LangGraph 接收到这 3 个 Send 对象后,会同时启动 3 个目标节点(Worker)。每个节点接收对应的 payload 作为它的局部状态。
  4. 聚合结果(Reduce): 3 个节点并行执行完毕后,它们的返回结果会被汇总,并写入到全局状态(Global State)中。

3. 核心代码示例

下面是一个最简单的 Map-Reduce 示例:生成多个主题,并并行地为每个主题生成一段摘要。

python
import operator
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send

# 1. 定义状态 (注意:并行聚合的字段必须使用 reducer,如 operator.add)
class State(TypedDict):
    topics: list[str]
    # summaries 必须用 operator.add,否则并行节点的结果会互相覆盖
    summaries: Annotated[list[str], operator.add] 

# 2. 节点:生成一组主题
def generate_topics(state: State):
    # 模拟生成了 3 个主题
    return {"topics": ["苹果", "香蕉", "樱桃"]}

# 3. 路由函数:将每个主题包装成一个 Send 对象
def map_topics(state: State):
    # 返回一个 Send 列表,告诉 LangGraph 启动 3 个 "generate_summary" 节点
    # payload 就是 {"topic": t}
    return [Send("generate_summary", {"topic": t}) for t in state["topics"]]

# 4. 工作节点:处理单个主题 (它的输入是从 Send 传过来的局部状态)
class WorkerState(TypedDict):
    topic: str

def generate_summary(state: WorkerState):
    topic = state["topic"]
    summary = f"这是关于 {topic} 的详细摘要。"
    # 注意:返回的 key 必须匹配全局 State 中的聚合字段 summaries
    return {"summaries": [summary]} 

# 5. 构建图
builder = StateGraph(State)

builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_summary", generate_summary)

builder.add_edge(START, "generate_topics")

# 使用条件边来触发 Send
builder.add_conditional_edges("generate_topics", map_topics, ["generate_summary"])

builder.add_edge("generate_summary", END)

graph = builder.compile()

# 运行图
result = graph.invoke({"topics": [], "summaries": []})
print(result["summaries"])
# 输出: ['这是关于 苹果 的详细摘要。', '这是关于 香蕉 的详细摘要。', '这是关于 樱桃 的详细摘要。']

4. 使用 Send 的关键注意事项(避坑指南)

  1. 必须配合 Reducer 使用:
    在上面的例子中,summaries 字段使用了 Annotated[list[str], operator.add]。这是强制要求的。因为多个 generate_summary 节点是并行运行的,它们都会尝试更新 summaries 字段。如果没有 reducer,最后完成的节点会覆盖其他节点的结果。使用了 operator.add,它们的结果才会被合并成一个大列表。
  2. 只能在 Conditional Edges 中使用:
    Send 对象不能在普通节点(Node)的返回值中使用,只能作为 add_conditional_edges 绑定的路由函数的返回值。
  3. Payload 结构:
    Send 的第二个参数 payload 可以只包含目标节点需要的字段,不必传递整个完整的全局状态,这可以极大地节省内存和传输开销。

总结

LangGraph 的 Send API 就是为了解决 “我不知道会产生多少条数据,但我需要对产生的所有数据进行并发处理” 这个动态并发难题而设计的。它将传统的静态流程图转变成了具备强大 Map-Reduce 能力的动态图。

00:00
00:00