生产事故的第一性原理:LangGraph 的每一次节点执行,都是「读取当前 State → 返回部分更新 → 由 reducer 合并回 State」。所以状态错误的根因永远不在节点逻辑里,而在 reducer 的合并语义thread_id 的隔离边界 上。把这两件事想清楚,80% 的生产坑会在写代码前就消失。

一、状态正确性:reducer 选错的两种死法

节点返回 {"key": value} 后,LangGraph 不是直接赋值,而是调用该 key 的 reducer 把返回值合并进旧状态。没有显式声明 reducer 时,默认行为是「覆盖」(last-write-wins)。这导致两类典型 bug:该累加的字段被覆盖丢数据;该覆盖的字段错用累加导致无限增长。

# pip install -U langgraph langchain-core
# 演示 reducer 选错的两种典型错误及正确写法

from typing import Annotated, TypedDict
from operator import add
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages


# ---------- 反例:该累加的字段没声明 reducer,被覆盖 ----------
class BadState(TypedDict):
    # 默认 reducer = 覆盖。两个节点各返回一条 log,第二条会冲掉第一条
    logs: list[str]


# ---------- 正例:用 Annotated 显式声明合并语义 ----------
class GoodState(TypedDict):
    # operator.add 对 list 表现为拼接 -> 累加语义,不丢数据
    logs: Annotated[list[str], add]
    # add_messages:按 message id 去重 + 追加,是消息历史的标准 reducer
    messages: Annotated[list[AnyMessage], add_messages]
    # 计数器这种「最新值即真相」的字段,保持默认覆盖语义即可(不加 Annotated)
    step: int


def node_a(state: GoodState):
    # 注意:节点只返回「增量」,而不是完整的新 list
    return {"logs": ["a 执行"], "step": 1}


def node_b(state: GoodState):
    return {"logs": ["b 执行"], "step": 2}


graph = (
    StateGraph(GoodState)
    .add_node("a", node_a)
    .add_node("b", node_b)
    .add_edge(START, "a")
    .add_edge("a", "b")
    .add_edge("b", END)
    .compile()
)

result = graph.invoke({"logs": [], "messages": [], "step": 0})
print(result["logs"])  # ['a 执行', 'b 执行'] —— 累加成功,未被覆盖
print(result["step"])  # 2 —— 覆盖语义,保留最新值
字段语义推荐 reducer典型字段选错的后果
最新值即真相默认(不加 Annotated)current_step, route, status若错用 add:状态膨胀、计数错乱
对话历史add_messagesmessages若用默认覆盖:每轮只剩最后一条,上下文丢失
纯追加列表operator.addlogs, tool_outputs若用默认覆盖:并发/多节点写入互相覆盖
集合去重合并自定义 reducer 函数visited_urls, seen_ids若用 add:重复元素堆积,去重失效
口诀覆盖默认、历史 add_messages、追加 operator.add、去重写函数

二、控制流安全:循环图与条件路由

带环的图(如 ReAct 循环、自我反思 loop)一旦缺少终止条件或终止条件失效,就会无限循环。LangGraph 用 recursion_limit 作为最后的安全闸门:超过限制抛 GraphRecursionError,默认值是 25。生产中既要显式设置它,更要保证 conditional edge 一定能走到 END。

# 循环图 + recursion_limit + 条件路由的正确写法
from typing import Annotated, TypedDict, Literal
from operator import add
from langgraph.graph import StateGraph, START, END
from langgraph.errors import GraphRecursionError


class State(TypedDict):
    count: int
    history: Annotated[list[int], add]


def work(state: State):
    n = state["count"] + 1
    return {"count": n, "history": [n]}


# 条件边的路由函数:返回值必须出现在 path_map 里,否则 KeyError
def route(state: State) -> Literal["work", "done"]:
    # 终止条件:必须保证某个分支最终为真,否则永远循环
    return "done" if state["count"] >= 5 else "work"


graph = (
    StateGraph(State)
    .add_node("work", work)
    .add_edge(START, "work")
    # path_map 把路由函数的返回值映射到目标节点/END
    # 返回 'work' 形成环;返回 'done' 跳到 END
    .add_conditional_edges("work", route, {"work": "work", "done": END})
    .compile()
)

# 正常:5 步内收敛
print(graph.invoke({"count": 0, "history": []})["history"])  # [1, 2, 3, 4, 5]

# 显式设置 recursion_limit;若终止条件写错导致跑飞,这里兜底报错而非吃满内存
try:
    graph.invoke(
        {"count": -1000, "history": []},
        config={"recursion_limit": 10},  # 关键:生产必设
    )
except GraphRecursionError as e:
    print("触发递归上限,已安全中止:", e)
推荐做法
  • 路由函数返回值用 Literal[...] 标注,让类型检查器提前发现拼写错位
  • 任何带环的图都显式传 config={"recursion_limit": N},并按业务最大步数留余量
  • 为路由函数兜底:无法匹配任何业务分支时显式 return 一个走向 END 的安全分支
  • 把 LLM 的路由决策做白名单校验(如 choice if choice in ALLOWED else "fallback"
不推荐
  • 依赖默认 recursion_limit=25 隐式兜底,出事时只看到一行 GraphRecursionError
  • 把 LLM 自由文本直接作为 conditional edge 的返回值
  • 在循环节点里写「可能永远为假」的终止条件(如浮点数精确相等)
常见误区
  • recursion_limit 计的是「超步数(super-step)」而非节点调用次数,并行分支会更快逼近上限
  • GraphRecursionError 不会自动回滚 checkpoint,恢复时要从最后一个有效 checkpoint 重放

在故意构造的死循环 / 错误路由输入下,图能在有限步内以明确异常终止,且不污染已持久化的会话状态。

三、会话隔离与持久化恢复

会话串台:忘了传 thread_id

典型表现
多个用户的对话历史混在一起,A 看到 B 的上下文;或同一用户多轮对话彼此污染。
判断标准
每次 invoke/stream 的 config 里都带唯一 thread_id;不同用户/会话 thread_id 必须不同。
解决方向
配 checkpointer 后,所有调用都传 config={"configurable": {"thread_id": <会话唯一 id>}};thread_id 由业务(用户 id + 会话 id)生成,绝不复用。

无法 resume:interrupt 了却没持久化

典型表现
调用 interrupt 暂停后,再次调用想恢复却从头重跑,或报找不到 checkpoint。
判断标准
compile 时传入了 checkpointer,且 resume 用的是同一个 thread_id。
解决方向
interrupt 依赖 checkpointer 落盘当前状态。compile(checkpointer=saver) 必须配置;恢复时用 Command(resume=...) 配同一 thread_id 续跑。

MemorySaver 上了生产

典型表现
进程重启后所有会话丢失;多副本部署时各实例状态不一致。
判断标准
生产持久化用 PostgresSaver / SqliteSaver,而非内存版 InMemorySaver。
解决方向
把 InMemorySaver 换成 PostgresSaver,首次运行调用 saver.setup() 建表,连接走连接池。
# 生产持久化:PostgresSaver + 连接池 + thread_id 会话隔离 + interrupt 恢复
# pip install -U langgraph langgraph-checkpoint-postgres psycopg[binary,pool]

from typing import TypedDict
from psycopg_pool import ConnectionPool
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command

DB_URI = "postgresql://user:pass@localhost:5432/langgraph?sslmode=disable"

# 连接池:autocommit + 复用连接,避免高并发下连接耗尽
pool = ConnectionPool(
    conninfo=DB_URI,
    max_size=20,                       # 按并发与 DB max_connections 配比
    kwargs={"autocommit": True, "prepare_threshold": 0},
)


class State(TypedDict):
    topic: str
    approved: bool


def draft(state: State):
    return {"topic": f"draft about {state['topic']}"}


def review(state: State):
    # interrupt 暂停执行,把当前状态落盘到 checkpointer,等待人工输入
    decision = interrupt({"question": "approve this draft?", "draft": state["topic"]})
    return {"approved": decision == "yes"}


saver = PostgresSaver(pool)
saver.setup()  # 首次运行建表,幂等

graph = (
    StateGraph(State)
    .add_node("draft", draft)
    .add_node("review", review)
    .add_edge(START, "draft")
    .add_edge("draft", "review")
    .add_edge("review", END)
    .compile(checkpointer=saver)  # 关键:interrupt / resume 依赖它
)

# thread_id 是会话隔离边界,不同用户必须不同
cfg = {"configurable": {"thread_id": "user-42-session-1"}}

# 第一次调用:跑到 review 的 interrupt 暂停
out = graph.invoke({"topic": "langgraph", "approved": False}, config=cfg)
print(out.get("__interrupt__"))  # 暂停点信息

# 之后(可跨进程、跨请求)用同一 thread_id 恢复,状态从 Postgres 读回
final = graph.invoke(Command(resume="yes"), config=cfg)
print(final["approved"])  # True —— 成功 resume

pool.close()

四、部署路径与可观测性

把图本身写对之后,剩下的是「怎么暴露成稳定 API」。LangGraph 官方路径是用 langgraph.json 声明图入口,再用 langgraph build / langgraph up 容器化,由 LangGraph Server 提供带持久化、流式、HITL 的 REST/SSE 接口;同时接 LangSmith 做 trace 回放与可观测性。

// langgraph.json —— LangGraph Platform / Server 的部署清单
// 放在项目根目录,langgraph build / up 读取它定位图入口
{
  "dependencies": ["."],
  "graphs": {
    "agent": "./src/agent/graph.py:graph"
  },
  "env": ".env",
  "python_version": "3.11"
}
# pip install -U "langgraph-cli[inmem]"

# 1) 本地热重载开发(内存 checkpointer,调试用)
langgraph dev

# 2) 生产容器化:build 镜像 -> up 起服务(自带 Postgres + Redis)
langgraph build -t my-langgraph-app:latest
langgraph up               # 本地 docker compose 起完整栈

# 3) 服务起来后即是标准 LangGraph Server,按 thread 调用:
#    POST /threads                       创建会话线程
#    POST /threads/{thread_id}/runs/stream 流式运行(SSE)

# 4) 可观测性:接 LangSmith,所有 run 自动 trace、可回放
export LANGSMITH_TRACING=true
export LANGSMITH_API_KEY=ls-xxxx
export LANGSMITH_PROJECT=langgraph-prod
流式模式 stream_mode推送内容生产用途注意点
values每步后的完整 State 快照需要前端拿到全量状态时State 大时带宽/序列化开销高
updates每个节点返回的增量更新审计「哪个节点改了什么」需前端自行累积合并
messagesLLM token 级流式 + 元数据聊天 UI 打字机效果只对 LLM 节点有 token 流
custom节点内 get_stream_writer 写出的自定义事件推送进度条 / 中间产物需在节点里显式写入
口诀看全量用 values、看变更用 updates、做打字机用 messages、推进度用 custom
推荐做法
  • 用 langgraph.json 固定图入口与 python_version,让 build 可复现
  • 生产 checkpointer 用 Postgres + 连接池,setup() 在迁移阶段单独执行
  • 流式接口为 SSE/长连接设置合理超时与心跳,前端处理断线重连
  • 全链路开启 LangSmith trace,用回放定位「为什么走了这个分支」
不推荐
  • 把 langgraph dev / InMemorySaver 直接当生产服务
  • 在容器里硬编码 DB 密码,应走 .env / secret 注入
  • 对超长 values 流不做体积控制,导致背压打满网关
常见误区
  • stream 模式选 values 且 State 很大时,每步全量推送会成为带宽瓶颈,优先 updates/messages
  • 多副本部署务必共用同一 Postgres,否则 thread_id 在不同副本上找不到 checkpoint

服务可水平扩缩容、进程重启后会话不丢、任意一次 run 都能在 LangSmith 里完整回放并定位到分支决策。

瓶颈是验证,不是生成。能被度量的状态错误才能被修复,能在 recursion_limit 处停下的循环胜过吃满内存的循环。

— 本 Wiki 生产化原则