核心心智模型:==thread = 一段会话,checkpoint = 这段会话里的一个时间点==。一次 invoke 通常推进多个超步,每个超步都写一个 checkpoint;它们按时间顺序串在同一个 thread_id 下。读最新点用 get_state(config),倒着翻全部历史点用 get_state_history(config),把某个历史点的 checkpoint_id 塞回 config 再跑,就回到了过去。

1. 没有 checkpointer = 失忆:先看反例

先确认问题真实存在。下面这个图不传 checkpointer,两次 invoke 之间状态完全不共享——这正是绝大多数 "机器人记不住上一句话" 的根因。

# 安装:核心包 + 各类 saver
pip install -U langgraph
# Sqlite / Postgres saver 是独立子包:
pip install -U langgraph-checkpoint-sqlite
pip install -U langgraph-checkpoint-postgres
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage, AIMessage


class State(TypedDict):
    # add_messages reducer:新消息追加而非覆盖(见 CHAPTER 05)
    messages: Annotated[list, add_messages]


def chatbot(state: State) -> dict:
    # 这里用规则回复代替真实 LLM,便于离线运行
    last = state["messages"][-1].content
    reply = f"我收到了 {len(state['messages'])} 条消息,最新一条是:{last}"
    return {"messages": [AIMessage(content=reply)]}


builder = StateGraph(State)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)

# 关键:不传 checkpointer
graph = builder.compile()

r1 = graph.invoke({"messages": [HumanMessage("我叫小明")]})
r2 = graph.invoke({"messages": [HumanMessage("我叫什么?")]})

print(len(r1["messages"]))  # 2
print(len(r2["messages"]))  # 2  <-- 不是 4!第二次调用完全不知道第一次
# 结论:没有 checkpointer,状态用完即弃,无法跨 invoke 累积

2. 加上 InMemorySaver:多轮记忆立刻接续

只改两处:compile(checkpointer=InMemorySaver()),以及每次 invoke 带上 config。同一个 thread_id 下,第二轮就能看到第一轮的全部消息。

from langgraph.checkpoint.memory import InMemorySaver

# 复用上面的 builder,唯一变化是传入 checkpointer
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# thread_id 是会话隔离的唯一 key,必须放在 configurable 下
config = {"configurable": {"thread_id": "user-alice"}}

graph.invoke({"messages": [HumanMessage("我叫小明")]}, config)
r2 = graph.invoke({"messages": [HumanMessage("我叫什么?")]}, config)

print(len(r2["messages"]))  # 4  <-- 记忆接续:H, A, H, A 全在
for m in r2["messages"]:
    print(type(m).__name__, "|", m.content)

# 换一个 thread_id,记忆完全隔离
other = {"configurable": {"thread_id": "user-bob"}}
r_bob = graph.invoke({"messages": [HumanMessage("你好")]}, other)
print(len(r_bob["messages"]))  # 2  <-- bob 看不到 alice 的任何消息

3. 生产持久化:SqliteSaver 与 PostgresSaver

InMemorySaver 把 checkpoint 存进进程内存,进程一退记忆全丢——只能用于开发和测试。生产环境用 SqliteSaver(单机 / 嵌入式)或 PostgresSaver(多实例共享)。两者 API 与 InMemorySaver 完全一致,区别只在构造方式,且首次使用必须调用 setup() 建表

from langgraph.checkpoint.sqlite import SqliteSaver

# 推荐用 context manager 管理连接;落盘到 checkpoints.sqlite 文件
with SqliteSaver.from_conn_string("checkpoints.sqlite") as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)
    config = {"configurable": {"thread_id": "user-alice"}}

    graph.invoke({"messages": [HumanMessage("记住:我的密码提示是蓝色")]}, config)
    # 即使进程重启,下次用同一文件 + 同一 thread_id 仍能读回
    snap = graph.get_state(config)
    print(len(snap.values["messages"]))  # 2,已落盘到磁盘

# 异步版本:from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver

DB_URI = "postgresql://user:pass@localhost:5432/langgraph?sslmode=disable"

with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
    # 关键:Postgres / Sqlite 首次使用必须 setup() 创建 checkpoint 相关表
    checkpointer.setup()
    graph = builder.compile(checkpointer=checkpointer)

    config = {"configurable": {"thread_id": "user-alice"}}
    graph.invoke({"messages": [HumanMessage("生产环境的我也能被记住")]}, config)
    print(graph.get_state(config).values["messages"][-1].content)

# 异步版本:from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
# 多实例部署共享同一个 Postgres,即可实现跨进程 / 跨机器的会话续接
Checkpointer存储位置适用场景注意
InMemorySaver进程内存开发、测试、单测进程退出即丢,禁止用于生产
SqliteSaver本地文件 / :memory:单机服务、嵌入式、原型首次 setup();并发写有限
PostgresSaverPostgres 数据库多实例、横向扩展、生产首次 setup();推荐连接池
口诀内存调试、Sqlite 单机、Postgres 上量

4. 读取状态:get_state 与 get_state_history

get_state(config) 返回当前 thread 最新的一个 StateSnapshot.values 是 state 内容,.next 是下一步将执行的节点(为空表示已结束),.config 里带着这一刻的 checkpoint_idget_state_history(config)倒序返回这个 thread 的所有历史 checkpoint,是 time-travel 的入口。

checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "demo"}}

graph.invoke({"messages": [HumanMessage("第一轮")]}, config)
graph.invoke({"messages": [HumanMessage("第二轮")]}, config)

# 4.1 当前快照
snap = graph.get_state(config)
print("当前消息数:", len(snap.values["messages"]))     # 4
print("下一步要跑的节点:", snap.next)                    # () 表示已到 END
print("当前 checkpoint_id:", snap.config["configurable"]["checkpoint_id"])

# 4.2 历史回看(倒序:最新的在前)
print("\n--- 历史 checkpoint ---")
for st in graph.get_state_history(config):
    cid = st.config["configurable"]["checkpoint_id"]
    print(f"ckpt={cid[:12]}... | msgs={len(st.values.get('messages', []))} | next={st.next}")
# 你会看到多条记录:每个超步、每轮 invoke 都留下了痕迹

5. Time-travel:指定 checkpoint_id 回放与分支

把某个历史 checkpoint 的完整 config(含 checkpoint_id)传回给 invoke,图就会从那个时间点之后继续执行,而不是从最新点。这就是 time-travel:你可以回到任意历史节点,改掉输入,让它分叉出一条新的执行分支——原分支不受影响。

checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "tt"}}

graph.invoke({"messages": [HumanMessage("A")]}, config)
graph.invoke({"messages": [HumanMessage("B")]}, config)
graph.invoke({"messages": [HumanMessage("C")]}, config)  # 现在共 6 条消息

# 1) 找到一个较早的历史 checkpoint(比如只跑过 A、B 那一刻)
history = list(graph.get_state_history(config))
# history 倒序:挑一个 messages 较少的早期点
target = next(st for st in history if len(st.values.get("messages", [])) == 4)
print("回到的 checkpoint_id:", target.config["configurable"]["checkpoint_id"])

# 2) 把该历史 config 当作起点 invoke —— 从那一刻分叉
#    注意:传的是 target.config(带 checkpoint_id),不是顶层 config
branch = graph.invoke({"messages": [HumanMessage("B 之后改走另一条路")]}, target.config)
print("分支结果消息数:", len(branch["messages"]))  # 在历史点基础上接续,而非从最新点

# 3) 原始最新点不受影响:get_state 仍能拿到 fork 后的最新分支状态
#    LangGraph 用 checkpoint 的父子链表达分支关系,互不污染
for st in graph.get_state_history(config):
    cid = st.config["configurable"]["checkpoint_id"]
    print(cid[:10], "msgs=", len(st.values.get("messages", [])))
推荐做法
  • thread_id 用稳定唯一标识(user+session),并放在 config.configurable 里
  • 生产用 SqliteSaver / PostgresSaver,首次调用 setup() 建表
  • 调试 / 审计时用 get_state_history 配合 metadata['writes'] 定位状态变更
  • time-travel 时传完整的历史 config(含 checkpoint_id),而不是手拼字符串
不推荐
  • 把 InMemorySaver 带上生产——进程一挂记忆全丢
  • 多个用户共用同一个写死的 thread_id,会话会串台
  • 漏传 config 或漏掉 thread_id,导致报错或退回无记忆
  • 忘记 SqliteSaver/PostgresSaver 的 setup(),会因缺表而失败
常见误区
  • from_conn_string 返回的是 context manager,离开 with 块连接即关闭,别在块外继续用 graph
  • Sqlite 并发写能力有限,高并发请直接上 Postgres
  • checkpoint 会随轮次持续增长,长会话需有清理 / 归档策略

同一 thread_id 多轮 invoke 记忆接续、不同 thread_id 互不可见、进程重启后(Sqlite/Postgres)仍能 get_state 读回历史,即视为持久化层正确接入。

传了 checkpointer 但还是没记忆

典型表现
第二轮 invoke 拿不到第一轮消息,messages 长度不累积
判断标准
检查每次 invoke 是否带了相同 thread_id 的 config
解决方向
确保 config={'configurable': {'thread_id': '同一个值'}} 在每次调用都传入;不同轮必须用同一 thread_id

重启后记忆没了

典型表现
服务重启后 get_state 返回空 / 报找不到
判断标准
确认用的是落盘型 saver 而非 InMemorySaver
解决方向
换成 SqliteSaver.from_conn_string('xxx.sqlite') 或 PostgresSaver,并保证指向同一文件 / 库

PostgresSaver 报表不存在

典型表现
首次写入报 relation "checkpoints" does not exist
判断标准
是否调用过 setup()
解决方向
在 compile 前对 checkpointer 调用一次 checkpointer.setup() 创建所需表结构

time-travel 没回到过去

典型表现
传历史 config 后还是从最新点继续
判断标准
传入的 config 是否包含 checkpoint_id
解决方向
传 get_state_history 返回的 StateSnapshot.config(自带 checkpoint_id),不要只传 {'thread_id': ...}

持久化层打通后,图就能跨轮、跨进程记住状态了。下一章把这个能力推到极致:在节点中途用 interrupt 暂停图、等人审批或补充信息,再凭 checkpoint 从断点恢复——这正是 Human-in-the-loop 的基石,同时配合 stream 的 values / updates / messages 模式实时观察执行过程。