记住一句话:节点的返回值是「这一步产生了什么」,不是「State 现在长什么样」。 LangGraph 拿到返回的 partial update 后,对每个 key 查它的 reducer:有 reducer 就调用 reducer(existing_value, update_value),没有就直接 existing = update(覆盖)。所以「字段怎么合并」这件事,写在 State 的类型定义里,而不是写在节点逻辑里。

默认覆盖 vs Annotated 自定义合并

先安装并看最小对照。同一个图里放两个字段:一个普通字段走覆盖,一个 Annotated 字段走 operator.add 累加。两个节点各返回一次,结果一目了然。

# LangGraph 主包;本页示例只用到核心 StateGraph 与内置 reducer
pip install -U langgraph
# 若要跑后面 add_messages + 真实消息对象的示例,再装 langchain-core
pip install -U langchain-core
import operator
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END


class State(TypedDict):
    # 普通字段:默认 "覆盖" 语义,后写的值替换先写的值
    last_step: str
    # Annotated 字段:挂上 operator.add 这个 reducer -> list 拼接(累加)
    # 签名上等价于 merged = operator.add(existing, update) 即 existing + update
    log: Annotated[list[str], operator.add]


def node_a(state: State):
    # 节点返回 partial update:只写自己关心的 key
    return {"last_step": "a", "log": ["enter a"]}


def node_b(state: State):
    return {"last_step": "b", "log": ["enter b"]}


builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_node("b", node_b)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("b", END)
graph = builder.compile()

result = graph.invoke({"last_step": "", "log": []})
print(result)
# {'last_step': 'b', 'log': ['enter a', 'enter b']}
#   last_step 被覆盖:a 写的 'a' 被 b 写的 'b' 顶掉
#   log 被累加:operator.add 把 ['enter a'] + ['enter b'] 拼成两条

add_messages:消息列表的专用 reducer

对话 / Agent 场景里,State 几乎都有一个消息列表字段。直接用 operator.add 拼接会有两个问题:无法按 id 更新已有消息(流式时同一条消息会被多次写入),也无法删除消息。LangGraph 为此内置了 add_messages,它做三件事:追加新消息按 message.id 去重并更新、配合 RemoveMessage 删除消息

from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import (
    HumanMessage, AIMessage, RemoveMessage,
)


class ChatState(TypedDict):
    # 这一行就是 langgraph 内置 MessagesState 的核心写法
    messages: Annotated[list, add_messages]


def append_node(state: ChatState):
    # 1) 追加:返回新消息,add_messages 把它接到列表末尾
    return {"messages": [AIMessage(content="hi there", id="ai-1")]}


def update_node(state: ChatState):
    # 2) 按 id 更新:返回一条 id 已存在的消息 -> 原地替换,而不是再追加一条
    return {"messages": [AIMessage(content="hi there (edited)", id="ai-1")]}


def delete_node(state: ChatState):
    # 3) 删除:返回 RemoveMessage(id=...),add_messages 会把该 id 的消息移除
    first_id = state["messages"][0].id
    return {"messages": [RemoveMessage(id=first_id)]}


builder = StateGraph(ChatState)
builder.add_node("append", append_node)
builder.add_node("update", update_node)
builder.add_node("delete", delete_node)
builder.add_edge(START, "append")
builder.add_edge("append", "update")
builder.add_edge("update", "delete")
builder.add_edge("delete", END)
graph = builder.compile()

# 初始放一条用户消息(带显式 id,方便观察删除)
out = graph.invoke({"messages": [HumanMessage(content="hello", id="human-1")]})
for m in out["messages"]:
    print(type(m).__name__, repr(m.content), m.id)
# 过程:
#   start:  [Human 'hello' human-1]
#   append: [Human 'hello' human-1, AI 'hi there' ai-1]
#   update: [Human 'hello' human-1, AI 'hi there (edited)' ai-1]  # 按 ai-1 原地更新
#   delete: [AI 'hi there (edited)' ai-1]                          # human-1 被移除
# 最终输出只剩一条 AI 消息

自定义 reducer:签名与字典合并实战

reducer 就是一个普通函数,签名固定为 (existing, update) -> mergedexisting 是当前 State 里该字段的值(首次可能是 None 或初始值),update 是节点这次返回的值,返回的 merged 成为字段新值。约束:必须是纯函数、不可有副作用、对相同输入可重复调用(LangGraph 在并发 / 重试 / 持久化恢复时可能多次调用)。

from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END


def merge_dicts(existing: dict | None, update: dict | None) -> dict:
    """自定义 reducer:浅合并两个字典,update 的同名 key 覆盖 existing。
    纯函数:不修改入参,返回新 dict。"""
    existing = existing or {}
    update = update or {}
    return {**existing, **update}


class State(TypedDict):
    # scores 字段走自定义的字典合并 reducer
    scores: Annotated[dict, merge_dicts]


def grader_a(state: State):
    return {"scores": {"relevance": 0.8}}


def grader_b(state: State):
    # 不同 key -> 合并;若是相同 key 则后者覆盖
    return {"scores": {"fluency": 0.9}}


builder = StateGraph(State)
builder.add_node("a", grader_a)
builder.add_node("b", grader_b)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("b", END)
graph = builder.compile()

print(graph.invoke({"scores": {}}))
# {'scores': {'relevance': 0.8, 'fluency': 0.9}}
# 两个节点各写一个 key,merge_dicts 把它们合到同一个 dict 里,互不覆盖
想要的语义字段标注节点返回什么结果
后写覆盖先写(默认)不加 Annotated,直接 x: str完整新值新值替换旧值
列表累加 / 中间结果收集Annotated[list, operator.add]增量列表 [item]旧列表 + 新列表
数值累加计数Annotated[int, operator.add]增量数字 1旧值 + 新值
对话消息追加+去重+删除Annotated[list, add_messages]新消息 / 带相同 id 的消息 / RemoveMessage追加或按 id 更新或删除
字典逐 key 合并Annotated[dict, merge_dicts](自定义)局部字典 {k: v}按 key 合并
口诀覆盖看默认,累加用 operator.add,消息用 add_messages,复杂合并自己写 (existing, update)->merged

并发写同一字段:reducer 是唯一合法出口

当多个节点从同一个点 fan-out 并行执行、且都写同一个字段时,LangGraph 在同一个 super-step 内会收到多份对该字段的更新。如果该字段没有 reducer,LangGraph 无法决定保留谁,直接抛 InvalidUpdateError。给字段挂 reducer,并发的多份更新就会被 reducer 依次合并——这正是 reducer 设计的核心用途。

import operator
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END


class State(TypedDict):
    # 有 reducer:并发分支各写一段,operator.add 负责合并
    results: Annotated[list[str], operator.add]
    # 没有 reducer 的 plain 字段在并发写时会触发 InvalidUpdateError,见下方注释
    # winner: str


def branch_1(state: State):
    return {"results": ["from-branch-1"]}


def branch_2(state: State):
    return {"results": ["from-branch-2"]}


builder = StateGraph(State)
builder.add_node("b1", branch_1)
builder.add_node("b2", branch_2)
# 从 START 同时 fan-out 到 b1 和 b2 -> 同一 super-step 内并行执行
builder.add_edge(START, "b1")
builder.add_edge(START, "b2")
builder.add_edge("b1", END)
builder.add_edge("b2", END)
graph = builder.compile()

print(graph.invoke({"results": []}))
# {'results': ['from-branch-1', 'from-branch-2']}
# 两个分支并发写 results,operator.add 把两份更新合并;顺序由运行时决定
#
# 反例:若把 winner: str(无 reducer)让 b1、b2 同时写,会得到
#   langgraph.errors.InvalidUpdateError:
#   Can receive only one value per step. Use an Annotated key to handle multiple values.
推荐做法
  • 并发会写到的字段,一律加 reducer(operator.add 或自定义合并)
  • reducer 写成纯函数:不改入参、返回新对象、对相同输入幂等
  • 对话 / Agent 的消息字段直接用 add_messages 或继承 MessagesState
  • 节点只返回它真正改动的字段(partial update),把合并交给 reducer
不推荐
  • 不要让多个并发节点写同一个无 reducer 的普通字段(必触发 InvalidUpdateError)
  • 不要在 reducer 里发网络请求、写日志、改全局状态等有副作用的操作
  • 不要在节点里返回「完整列表」却又用 operator.add,会导致旧值被重复拼接放大
  • 不要靠返回无 id 的消息去『更新』已有消息——它会被当成新消息追加
常见误区
  • 用 operator.add 累加时,节点误返回 state['log'] + ['x'](完整列表)会和 reducer 再拼一次,结果翻倍
  • 自定义 reducer 直接 existing.append(update) 修改了入参,导致持久化恢复 / 重试时状态被污染
  • 并发分支跑得很快、看似单值能过,等数据量上来才偶发 InvalidUpdateError,本质就是字段缺 reducer

把任意一个节点拆成两个并发节点同时写目标字段,图仍能 invoke 成功且数据正确合并——说明该字段的 reducer 设计到位。

消息列表越来越长且重复

典型表现
每轮对话后 messages 里出现重复的 AI 回复,列表无限增长
判断标准
确认是否在用 add_messages 且更新消息时带了相同 id
解决方向
更新已有消息时显式复用同一 id;用 operator.add 拼消息列表则换成 add_messages,让它按 id 去重更新

InvalidUpdateError: Can receive only one value per step

典型表现
图里有 fan-out 并行分支,invoke 时抛 InvalidUpdateError
判断标准
看报错指向的字段是否被多个并发节点写、且没有 Annotated reducer
解决方向
给该字段加 reducer:列表用 Annotated[list, operator.add],需要自定义合并就写 (existing, update)->merged

累加结果翻倍 / 错乱

典型表现
用 operator.add 的列表字段,元素数量是预期的两倍
判断标准
检查节点返回的是增量还是完整列表
解决方向
节点只返回增量 [item],由 reducer 负责拼接;不要返回 existing + [item]

持久化恢复后 State 被污染

典型表现
加 checkpointer 后,多次运行同一 thread 数据异常累积或丢失
判断标准
审查 reducer 是否修改了入参或有副作用
解决方向
把 reducer 改成纯函数:return {**existing, **update} / return existing + update,绝不 in-place 修改 existing