生产事故的第一性原理:LangGraph 的每一次节点执行,都是「读取当前 State → 返回部分更新 → 由 reducer 合并回 State」。所以状态错误的根因永远不在节点逻辑里,而在 reducer 的合并语义 与 thread_id 的隔离边界 上。把这两件事想清楚,80% 的生产坑会在写代码前就消失。
一、状态正确性:reducer 选错的两种死法
节点返回 {"key": value} 后,LangGraph 不是直接赋值,而是调用该 key 的 reducer 把返回值合并进旧状态。没有显式声明 reducer 时,默认行为是「覆盖」(last-write-wins)。这导致两类典型 bug:该累加的字段被覆盖丢数据;该覆盖的字段错用累加导致无限增长。
# pip install -U langgraph langchain-core
# 演示 reducer 选错的两种典型错误及正确写法
from typing import Annotated, TypedDict
from operator import add
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages
# ---------- 反例:该累加的字段没声明 reducer,被覆盖 ----------
class BadState(TypedDict):
# 默认 reducer = 覆盖。两个节点各返回一条 log,第二条会冲掉第一条
logs: list[str]
# ---------- 正例:用 Annotated 显式声明合并语义 ----------
class GoodState(TypedDict):
# operator.add 对 list 表现为拼接 -> 累加语义,不丢数据
logs: Annotated[list[str], add]
# add_messages:按 message id 去重 + 追加,是消息历史的标准 reducer
messages: Annotated[list[AnyMessage], add_messages]
# 计数器这种「最新值即真相」的字段,保持默认覆盖语义即可(不加 Annotated)
step: int
def node_a(state: GoodState):
# 注意:节点只返回「增量」,而不是完整的新 list
return {"logs": ["a 执行"], "step": 1}
def node_b(state: GoodState):
return {"logs": ["b 执行"], "step": 2}
graph = (
StateGraph(GoodState)
.add_node("a", node_a)
.add_node("b", node_b)
.add_edge(START, "a")
.add_edge("a", "b")
.add_edge("b", END)
.compile()
)
result = graph.invoke({"logs": [], "messages": [], "step": 0})
print(result["logs"]) # ['a 执行', 'b 执行'] —— 累加成功,未被覆盖
print(result["step"]) # 2 —— 覆盖语义,保留最新值
| 字段语义 | 推荐 reducer | 典型字段 | 选错的后果 |
|---|---|---|---|
| 最新值即真相 | 默认(不加 Annotated) | current_step, route, status | 若错用 add:状态膨胀、计数错乱 |
| 对话历史 | add_messages | messages | 若用默认覆盖:每轮只剩最后一条,上下文丢失 |
| 纯追加列表 | operator.add | logs, tool_outputs | 若用默认覆盖:并发/多节点写入互相覆盖 |
| 集合去重合并 | 自定义 reducer 函数 | visited_urls, seen_ids | 若用 add:重复元素堆积,去重失效 |
二、控制流安全:循环图与条件路由
带环的图(如 ReAct 循环、自我反思 loop)一旦缺少终止条件或终止条件失效,就会无限循环。LangGraph 用 recursion_limit 作为最后的安全闸门:超过限制抛 GraphRecursionError,默认值是 25。生产中既要显式设置它,更要保证 conditional edge 一定能走到 END。
# 循环图 + recursion_limit + 条件路由的正确写法
from typing import Annotated, TypedDict, Literal
from operator import add
from langgraph.graph import StateGraph, START, END
from langgraph.errors import GraphRecursionError
class State(TypedDict):
count: int
history: Annotated[list[int], add]
def work(state: State):
n = state["count"] + 1
return {"count": n, "history": [n]}
# 条件边的路由函数:返回值必须出现在 path_map 里,否则 KeyError
def route(state: State) -> Literal["work", "done"]:
# 终止条件:必须保证某个分支最终为真,否则永远循环
return "done" if state["count"] >= 5 else "work"
graph = (
StateGraph(State)
.add_node("work", work)
.add_edge(START, "work")
# path_map 把路由函数的返回值映射到目标节点/END
# 返回 'work' 形成环;返回 'done' 跳到 END
.add_conditional_edges("work", route, {"work": "work", "done": END})
.compile()
)
# 正常:5 步内收敛
print(graph.invoke({"count": 0, "history": []})["history"]) # [1, 2, 3, 4, 5]
# 显式设置 recursion_limit;若终止条件写错导致跑飞,这里兜底报错而非吃满内存
try:
graph.invoke(
{"count": -1000, "history": []},
config={"recursion_limit": 10}, # 关键:生产必设
)
except GraphRecursionError as e:
print("触发递归上限,已安全中止:", e)
✓推荐做法
- 路由函数返回值用
Literal[...]标注,让类型检查器提前发现拼写错位 - 任何带环的图都显式传
config={"recursion_limit": N},并按业务最大步数留余量 - 为路由函数兜底:无法匹配任何业务分支时显式 return 一个走向 END 的安全分支
- 把 LLM 的路由决策做白名单校验(如
choice if choice in ALLOWED else "fallback")
✗不推荐
- 依赖默认 recursion_limit=25 隐式兜底,出事时只看到一行 GraphRecursionError
- 把 LLM 自由文本直接作为 conditional edge 的返回值
- 在循环节点里写「可能永远为假」的终止条件(如浮点数精确相等)
⚠常见误区
- recursion_limit 计的是「超步数(super-step)」而非节点调用次数,并行分支会更快逼近上限
- GraphRecursionError 不会自动回滚 checkpoint,恢复时要从最后一个有效 checkpoint 重放
在故意构造的死循环 / 错误路由输入下,图能在有限步内以明确异常终止,且不污染已持久化的会话状态。
三、会话隔离与持久化恢复
会话串台:忘了传 thread_id
- 典型表现
- 多个用户的对话历史混在一起,A 看到 B 的上下文;或同一用户多轮对话彼此污染。
- 判断标准
- 每次 invoke/stream 的 config 里都带唯一 thread_id;不同用户/会话 thread_id 必须不同。
- 解决方向
- 配 checkpointer 后,所有调用都传 config={"configurable": {"thread_id": <会话唯一 id>}};thread_id 由业务(用户 id + 会话 id)生成,绝不复用。
无法 resume:interrupt 了却没持久化
- 典型表现
- 调用 interrupt 暂停后,再次调用想恢复却从头重跑,或报找不到 checkpoint。
- 判断标准
- compile 时传入了 checkpointer,且 resume 用的是同一个 thread_id。
- 解决方向
- interrupt 依赖 checkpointer 落盘当前状态。compile(checkpointer=saver) 必须配置;恢复时用 Command(resume=...) 配同一 thread_id 续跑。
MemorySaver 上了生产
- 典型表现
- 进程重启后所有会话丢失;多副本部署时各实例状态不一致。
- 判断标准
- 生产持久化用 PostgresSaver / SqliteSaver,而非内存版 InMemorySaver。
- 解决方向
- 把 InMemorySaver 换成 PostgresSaver,首次运行调用 saver.setup() 建表,连接走连接池。
# 生产持久化:PostgresSaver + 连接池 + thread_id 会话隔离 + interrupt 恢复
# pip install -U langgraph langgraph-checkpoint-postgres psycopg[binary,pool]
from typing import TypedDict
from psycopg_pool import ConnectionPool
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
DB_URI = "postgresql://user:pass@localhost:5432/langgraph?sslmode=disable"
# 连接池:autocommit + 复用连接,避免高并发下连接耗尽
pool = ConnectionPool(
conninfo=DB_URI,
max_size=20, # 按并发与 DB max_connections 配比
kwargs={"autocommit": True, "prepare_threshold": 0},
)
class State(TypedDict):
topic: str
approved: bool
def draft(state: State):
return {"topic": f"draft about {state['topic']}"}
def review(state: State):
# interrupt 暂停执行,把当前状态落盘到 checkpointer,等待人工输入
decision = interrupt({"question": "approve this draft?", "draft": state["topic"]})
return {"approved": decision == "yes"}
saver = PostgresSaver(pool)
saver.setup() # 首次运行建表,幂等
graph = (
StateGraph(State)
.add_node("draft", draft)
.add_node("review", review)
.add_edge(START, "draft")
.add_edge("draft", "review")
.add_edge("review", END)
.compile(checkpointer=saver) # 关键:interrupt / resume 依赖它
)
# thread_id 是会话隔离边界,不同用户必须不同
cfg = {"configurable": {"thread_id": "user-42-session-1"}}
# 第一次调用:跑到 review 的 interrupt 暂停
out = graph.invoke({"topic": "langgraph", "approved": False}, config=cfg)
print(out.get("__interrupt__")) # 暂停点信息
# 之后(可跨进程、跨请求)用同一 thread_id 恢复,状态从 Postgres 读回
final = graph.invoke(Command(resume="yes"), config=cfg)
print(final["approved"]) # True —— 成功 resume
pool.close()
四、部署路径与可观测性
把图本身写对之后,剩下的是「怎么暴露成稳定 API」。LangGraph 官方路径是用 langgraph.json 声明图入口,再用 langgraph build / langgraph up 容器化,由 LangGraph Server 提供带持久化、流式、HITL 的 REST/SSE 接口;同时接 LangSmith 做 trace 回放与可观测性。
// langgraph.json —— LangGraph Platform / Server 的部署清单
// 放在项目根目录,langgraph build / up 读取它定位图入口
{
"dependencies": ["."],
"graphs": {
"agent": "./src/agent/graph.py:graph"
},
"env": ".env",
"python_version": "3.11"
}
# pip install -U "langgraph-cli[inmem]"
# 1) 本地热重载开发(内存 checkpointer,调试用)
langgraph dev
# 2) 生产容器化:build 镜像 -> up 起服务(自带 Postgres + Redis)
langgraph build -t my-langgraph-app:latest
langgraph up # 本地 docker compose 起完整栈
# 3) 服务起来后即是标准 LangGraph Server,按 thread 调用:
# POST /threads 创建会话线程
# POST /threads/{thread_id}/runs/stream 流式运行(SSE)
# 4) 可观测性:接 LangSmith,所有 run 自动 trace、可回放
export LANGSMITH_TRACING=true
export LANGSMITH_API_KEY=ls-xxxx
export LANGSMITH_PROJECT=langgraph-prod
| 流式模式 stream_mode | 推送内容 | 生产用途 | 注意点 |
|---|---|---|---|
| values | 每步后的完整 State 快照 | 需要前端拿到全量状态时 | State 大时带宽/序列化开销高 |
| updates | 每个节点返回的增量更新 | 审计「哪个节点改了什么」 | 需前端自行累积合并 |
| messages | LLM token 级流式 + 元数据 | 聊天 UI 打字机效果 | 只对 LLM 节点有 token 流 |
| custom | 节点内 get_stream_writer 写出的自定义事件 | 推送进度条 / 中间产物 | 需在节点里显式写入 |
✓推荐做法
- 用 langgraph.json 固定图入口与 python_version,让 build 可复现
- 生产 checkpointer 用 Postgres + 连接池,setup() 在迁移阶段单独执行
- 流式接口为 SSE/长连接设置合理超时与心跳,前端处理断线重连
- 全链路开启 LangSmith trace,用回放定位「为什么走了这个分支」
✗不推荐
- 把 langgraph dev / InMemorySaver 直接当生产服务
- 在容器里硬编码 DB 密码,应走 .env / secret 注入
- 对超长 values 流不做体积控制,导致背压打满网关
⚠常见误区
- stream 模式选 values 且 State 很大时,每步全量推送会成为带宽瓶颈,优先 updates/messages
- 多副本部署务必共用同一 Postgres,否则 thread_id 在不同副本上找不到 checkpoint
服务可水平扩缩容、进程重启后会话不丢、任意一次 run 都能在 LangSmith 里完整回放并定位到分支决策。
瓶颈是验证,不是生成。能被度量的状态错误才能被修复,能在 recursion_limit 处停下的循环胜过吃满内存的循环。
— 本 Wiki 生产化原则