如果一个图中有两个 Node 被并行执行,且它们都试图更新同一个 State 字段,LangGraph 底层是如何处理这种状态合并的?
在 LangGraph 中,当两个节点(Node)被并行执行,且它们都试图更新同一个 State 字段时,底层处理这种状态合并的核心机制完全依赖于你如何定义该字段的 Reducer(归约器)。
LangGraph 底层基于 Pregel 模型(一种图计算模型),它的状态更新不是“实时就地修改”的,而是基于超步(Superstep)的概念。
以下是 LangGraph 底层处理并行更新的具体机制和不同场景下的表现:
1. 核心机制:超步(Superstep)与更新收集
当并行节点执行时,底层发生了什么?
- 隔离执行:并行节点 A 和节点 B 会基于当前超步开始时的同一个静态 State 快照进行计算。它们不会在执行期间互相干扰。
- 收集更新:节点执行完毕后,它们不会立刻修改全局 State,而是将它们想要更新的内容(一个字典)返回给 LangGraph 引擎。
- 统一合并:当该超步中所有并行节点都执行完毕后,LangGraph 会收集所有的状态更新包,并根据 State Schema 中定义的 Reducer 统一进行状态合并。
2. 场景一:未定义 Reducer(默认的覆盖行为)
如果你在定义 State 时使用了普通的类型注解(没有使用 Annotated 和 reducer),例如:
python
class State(TypedDict):
my_string: str # 没有 reducer,默认行为是覆盖
- 底层处理:如果在同一个超步中,并行节点 A 返回
{"my_string": "value_A"},节点 B 返回{"my_string": "value_B"}。 - 结果:LangGraph 会直接抛出异常 (
InvalidUpdateError)。 - 原因:LangGraph 引擎在合并阶段发现,同一个没有 Reducer 的通道(Channel/Field)收到了两个相互冲突的更新。为了防止竞态条件(Race Condition)和不可预测的结果(到底谁覆盖谁?),LangGraph 设计为严格报错,拒绝静默覆盖。
报错信息类似如下:langgraph.errors.InvalidUpdateError: At key 'my_string': Can receive only one value per step.
3. 场景二:定义了 Reducer(如追加、合并)
如果你希望并行节点能够同时更新同一个字段,必须使用 typing.Annotated 并提供一个 Reducer 函数。最常见的是列表追加或字典合并。
例如,使用内置的 operator.add 来合并列表:
python
import operator
from typing import Annotated, TypedDict
class State(TypedDict):
# 使用 operator.add 作为 reducer
my_list: Annotated[list, operator.add]
- 底层处理:并行节点 A 返回
{"my_list": ["A"]},节点 B 返回{"my_list": ["B"]}。 - 结果:LangGraph 引擎在超步结束时,发现
my_list字段有 Reducer。它会将当前状态和所有的更新依次传入 Reducer。 - 合并过程:
new_state = operator.add(operator.add(current_state, ["A"]), ["B"])。最终my_list会变成["A", "B"](假设初始为空)。 - 顺序问题:在并行执行时,LangGraph 传递给 Reducer 的更新顺序是无法严格保证的(通常取决于哪个底层的异步任务先完成或引擎内部的收集顺序)。因此,当你在并行节点中更新同一个字段时,你的 Reducer 逻辑最好满足交换律(即 A+B 和 B+A 结果一致,或者你不在乎列表的具体顺序)。
4. 场景三:自定义复杂的 Reducer
你也可以编写自定义的合并逻辑来处理复杂的并行冲突。
python
def merge_dicts(dict1: dict, dict2: dict) -> dict:
# 自定义合并逻辑:如果键冲突,怎么处理?
merged = dict1.copy()
for k, v in dict2.items():
if k in merged:
merged[k] = merged[k] + v # 例如:值相加
else:
merged[k] = v
return merged
class State(TypedDict):
data: Annotated[dict, merge_dicts]
在这个场景中,如果并行节点返回了不同的字典更新,LangGraph 就会调用你写的 merge_dicts 函数来处理两个节点并发写入的冲突。
总结
LangGraph 底层处理并行状态更新的哲学是:防患于未然,交由开发者定义。
- 节点不直接写 State,而是返回更新意图 (Updates)。
- 引擎在并行结束时(Superstep 末尾)统一拦截这些更新。
- 如果发生针对同一字段的并发更新:
- 没有声明合并策略(Reducer) 👉 抛出异常,防止数据被随机覆盖。
- 声明了合并策略(Reducer) 👉 调用策略函数,将两个节点的更新依次合并进去。