记住一句话:节点的返回值是「这一步产生了什么」,不是「State 现在长什么样」。 LangGraph 拿到返回的 partial update 后,对每个 key 查它的 reducer:有 reducer 就调用
reducer(existing_value, update_value),没有就直接existing = update(覆盖)。所以「字段怎么合并」这件事,写在 State 的类型定义里,而不是写在节点逻辑里。
默认覆盖 vs Annotated 自定义合并
先安装并看最小对照。同一个图里放两个字段:一个普通字段走覆盖,一个 Annotated 字段走 operator.add 累加。两个节点各返回一次,结果一目了然。
# LangGraph 主包;本页示例只用到核心 StateGraph 与内置 reducer
pip install -U langgraph
# 若要跑后面 add_messages + 真实消息对象的示例,再装 langchain-core
pip install -U langchain-core
import operator
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# 普通字段:默认 "覆盖" 语义,后写的值替换先写的值
last_step: str
# Annotated 字段:挂上 operator.add 这个 reducer -> list 拼接(累加)
# 签名上等价于 merged = operator.add(existing, update) 即 existing + update
log: Annotated[list[str], operator.add]
def node_a(state: State):
# 节点返回 partial update:只写自己关心的 key
return {"last_step": "a", "log": ["enter a"]}
def node_b(state: State):
return {"last_step": "b", "log": ["enter b"]}
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_node("b", node_b)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("b", END)
graph = builder.compile()
result = graph.invoke({"last_step": "", "log": []})
print(result)
# {'last_step': 'b', 'log': ['enter a', 'enter b']}
# last_step 被覆盖:a 写的 'a' 被 b 写的 'b' 顶掉
# log 被累加:operator.add 把 ['enter a'] + ['enter b'] 拼成两条
add_messages:消息列表的专用 reducer
对话 / Agent 场景里,State 几乎都有一个消息列表字段。直接用 operator.add 拼接会有两个问题:无法按 id 更新已有消息(流式时同一条消息会被多次写入),也无法删除消息。LangGraph 为此内置了 add_messages,它做三件事:追加新消息、按 message.id 去重并更新、配合 RemoveMessage 删除消息。
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import (
HumanMessage, AIMessage, RemoveMessage,
)
class ChatState(TypedDict):
# 这一行就是 langgraph 内置 MessagesState 的核心写法
messages: Annotated[list, add_messages]
def append_node(state: ChatState):
# 1) 追加:返回新消息,add_messages 把它接到列表末尾
return {"messages": [AIMessage(content="hi there", id="ai-1")]}
def update_node(state: ChatState):
# 2) 按 id 更新:返回一条 id 已存在的消息 -> 原地替换,而不是再追加一条
return {"messages": [AIMessage(content="hi there (edited)", id="ai-1")]}
def delete_node(state: ChatState):
# 3) 删除:返回 RemoveMessage(id=...),add_messages 会把该 id 的消息移除
first_id = state["messages"][0].id
return {"messages": [RemoveMessage(id=first_id)]}
builder = StateGraph(ChatState)
builder.add_node("append", append_node)
builder.add_node("update", update_node)
builder.add_node("delete", delete_node)
builder.add_edge(START, "append")
builder.add_edge("append", "update")
builder.add_edge("update", "delete")
builder.add_edge("delete", END)
graph = builder.compile()
# 初始放一条用户消息(带显式 id,方便观察删除)
out = graph.invoke({"messages": [HumanMessage(content="hello", id="human-1")]})
for m in out["messages"]:
print(type(m).__name__, repr(m.content), m.id)
# 过程:
# start: [Human 'hello' human-1]
# append: [Human 'hello' human-1, AI 'hi there' ai-1]
# update: [Human 'hello' human-1, AI 'hi there (edited)' ai-1] # 按 ai-1 原地更新
# delete: [AI 'hi there (edited)' ai-1] # human-1 被移除
# 最终输出只剩一条 AI 消息
自定义 reducer:签名与字典合并实战
reducer 就是一个普通函数,签名固定为
(existing, update) -> merged。existing是当前 State 里该字段的值(首次可能是None或初始值),update是节点这次返回的值,返回的merged成为字段新值。约束:必须是纯函数、不可有副作用、对相同输入可重复调用(LangGraph 在并发 / 重试 / 持久化恢复时可能多次调用)。
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
def merge_dicts(existing: dict | None, update: dict | None) -> dict:
"""自定义 reducer:浅合并两个字典,update 的同名 key 覆盖 existing。
纯函数:不修改入参,返回新 dict。"""
existing = existing or {}
update = update or {}
return {**existing, **update}
class State(TypedDict):
# scores 字段走自定义的字典合并 reducer
scores: Annotated[dict, merge_dicts]
def grader_a(state: State):
return {"scores": {"relevance": 0.8}}
def grader_b(state: State):
# 不同 key -> 合并;若是相同 key 则后者覆盖
return {"scores": {"fluency": 0.9}}
builder = StateGraph(State)
builder.add_node("a", grader_a)
builder.add_node("b", grader_b)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("b", END)
graph = builder.compile()
print(graph.invoke({"scores": {}}))
# {'scores': {'relevance': 0.8, 'fluency': 0.9}}
# 两个节点各写一个 key,merge_dicts 把它们合到同一个 dict 里,互不覆盖
| 想要的语义 | 字段标注 | 节点返回什么 | 结果 |
|---|---|---|---|
| 后写覆盖先写(默认) | 不加 Annotated,直接 x: str | 完整新值 | 新值替换旧值 |
| 列表累加 / 中间结果收集 | Annotated[list, operator.add] | 增量列表 [item] | 旧列表 + 新列表 |
| 数值累加计数 | Annotated[int, operator.add] | 增量数字 1 | 旧值 + 新值 |
| 对话消息追加+去重+删除 | Annotated[list, add_messages] | 新消息 / 带相同 id 的消息 / RemoveMessage | 追加或按 id 更新或删除 |
| 字典逐 key 合并 | Annotated[dict, merge_dicts](自定义) | 局部字典 {k: v} | 按 key 合并 |
并发写同一字段:reducer 是唯一合法出口
当多个节点从同一个点 fan-out 并行执行、且都写同一个字段时,LangGraph 在同一个 super-step 内会收到多份对该字段的更新。如果该字段没有 reducer,LangGraph 无法决定保留谁,直接抛 InvalidUpdateError。给字段挂 reducer,并发的多份更新就会被 reducer 依次合并——这正是 reducer 设计的核心用途。
import operator
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# 有 reducer:并发分支各写一段,operator.add 负责合并
results: Annotated[list[str], operator.add]
# 没有 reducer 的 plain 字段在并发写时会触发 InvalidUpdateError,见下方注释
# winner: str
def branch_1(state: State):
return {"results": ["from-branch-1"]}
def branch_2(state: State):
return {"results": ["from-branch-2"]}
builder = StateGraph(State)
builder.add_node("b1", branch_1)
builder.add_node("b2", branch_2)
# 从 START 同时 fan-out 到 b1 和 b2 -> 同一 super-step 内并行执行
builder.add_edge(START, "b1")
builder.add_edge(START, "b2")
builder.add_edge("b1", END)
builder.add_edge("b2", END)
graph = builder.compile()
print(graph.invoke({"results": []}))
# {'results': ['from-branch-1', 'from-branch-2']}
# 两个分支并发写 results,operator.add 把两份更新合并;顺序由运行时决定
#
# 反例:若把 winner: str(无 reducer)让 b1、b2 同时写,会得到
# langgraph.errors.InvalidUpdateError:
# Can receive only one value per step. Use an Annotated key to handle multiple values.
✓推荐做法
- 并发会写到的字段,一律加 reducer(
operator.add或自定义合并) - reducer 写成纯函数:不改入参、返回新对象、对相同输入幂等
- 对话 / Agent 的消息字段直接用
add_messages或继承MessagesState - 节点只返回它真正改动的字段(partial update),把合并交给 reducer
✗不推荐
- 不要让多个并发节点写同一个无 reducer 的普通字段(必触发 InvalidUpdateError)
- 不要在 reducer 里发网络请求、写日志、改全局状态等有副作用的操作
- 不要在节点里返回「完整列表」却又用 operator.add,会导致旧值被重复拼接放大
- 不要靠返回无 id 的消息去『更新』已有消息——它会被当成新消息追加
⚠常见误区
- 用 operator.add 累加时,节点误返回
state['log'] + ['x'](完整列表)会和 reducer 再拼一次,结果翻倍 - 自定义 reducer 直接
existing.append(update)修改了入参,导致持久化恢复 / 重试时状态被污染 - 并发分支跑得很快、看似单值能过,等数据量上来才偶发 InvalidUpdateError,本质就是字段缺 reducer
把任意一个节点拆成两个并发节点同时写目标字段,图仍能 invoke 成功且数据正确合并——说明该字段的 reducer 设计到位。
消息列表越来越长且重复
- 典型表现
- 每轮对话后 messages 里出现重复的 AI 回复,列表无限增长
- 判断标准
- 确认是否在用 add_messages 且更新消息时带了相同 id
- 解决方向
- 更新已有消息时显式复用同一
id;用 operator.add 拼消息列表则换成 add_messages,让它按 id 去重更新
InvalidUpdateError: Can receive only one value per step
- 典型表现
- 图里有 fan-out 并行分支,invoke 时抛 InvalidUpdateError
- 判断标准
- 看报错指向的字段是否被多个并发节点写、且没有 Annotated reducer
- 解决方向
- 给该字段加 reducer:列表用
Annotated[list, operator.add],需要自定义合并就写 (existing, update)->merged
累加结果翻倍 / 错乱
- 典型表现
- 用 operator.add 的列表字段,元素数量是预期的两倍
- 判断标准
- 检查节点返回的是增量还是完整列表
- 解决方向
- 节点只返回增量
[item],由 reducer 负责拼接;不要返回existing + [item]
持久化恢复后 State 被污染
- 典型表现
- 加 checkpointer 后,多次运行同一 thread 数据异常累积或丢失
- 判断标准
- 审查 reducer 是否修改了入参或有副作用
- 解决方向
- 把 reducer 改成纯函数:
return {**existing, **update}/return existing + update,绝不 in-place 修改 existing