流式的本质是可观测性,不是吞吐量。 LangGraph 的图本质是同步推进的状态机,
stream()只是在每个 super-step 边界把中间产物 yield 出来。你选择哪种stream_mode,等于选择在这些边界上观察什么粒度的信息——状态快照、状态 diff,还是底层 LLM 正在吐的字。
三种 stream_mode 的语义
| stream_mode | 每次 yield 的内容 | 数据形态 | 典型用途 |
|---|---|---|---|
| values | 该步执行后的完整 State | 整个 state dict | 调试、需要每步全量快照、回放 |
| updates | 该步节点写入 state 的增量 | {节点名: 该节点的返回 dict} | 日志、进度追踪、只关心改了什么 |
| messages | LLM 流式产出的 token + 元数据 | (message_chunk, metadata) 元组 | 聊天 UI 打字机效果、token 级前端推送 |
| custom | 节点内用 get_stream_writer() 主动写入的任意数据 | 你写入的对象 | 进度百分比、自定义事件 |
| debug | 每步的详细调试事件(含 checkpoint) | 事件 dict | 深度排障、理解执行细节 |
口诀values=全量、updates=增量、messages=token、custom=自定义、debug=全暴露
示例一:三种 stream_mode 对比 + 多模式组合
# 安装核心包与 OpenAI 集成(也可换成你用的任意 chat model)
pip install -U langgraph langchain-openai
export OPENAI_API_KEY="sk-..."
from typing import Annotated, TypedDict
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
class State(TypedDict):
# add_messages reducer:节点返回 messages 时会追加而非覆盖
messages: Annotated[list, add_messages]
llm = ChatOpenAI(model="gpt-4o-mini")
def chat_node(state: State):
# LLM 调用本身会产生 token 流,messages 模式能捕获到
response = llm.invoke(state["messages"])
return {"messages": [response]}
builder = StateGraph(State)
builder.add_node("chat", chat_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)
graph = builder.compile()
inp = {"messages": [HumanMessage(content="用一句话解释什么是状态机")]}
# --- 模式 1: values,每步后的完整 state ---
print("===== values =====")
for chunk in graph.stream(inp, stream_mode="values"):
# chunk 是完整 state dict,messages 字段是累积的全部消息
print("messages count:", len(chunk["messages"]))
# --- 模式 2: updates,每步的增量 ---
print("===== updates =====")
for chunk in graph.stream(inp, stream_mode="updates"):
# chunk 形如 {"chat": {"messages": [AIMessage(...)]}}
for node_name, node_update in chunk.items():
print(f"node={node_name}, update_keys={list(node_update.keys())}")
# --- 模式 3: messages,token 级输出(打字机效果)---
print("===== messages =====")
for token, metadata in graph.stream(inp, stream_mode="messages"):
# token 是 AIMessageChunk,metadata 含 langgraph_node 等来源信息
if token.content:
print(token.content, end="", flush=True)
print()
# --- 多模式组合:传入 list,yield 出 (mode, chunk) 元组 ---
print("===== combined =====")
for mode, chunk in graph.stream(inp, stream_mode=["updates", "messages"]):
# 用 mode 区分这一条来自哪种流
if mode == "updates":
print("\n[UPDATE]", list(chunk.keys()))
elif mode == "messages":
tok, meta = chunk
if tok.content:
print(tok.content, end="", flush=True)
print()
Human-in-the-loop:让人类成为图中的一个节点
interrupt()不是抛异常,而是一次「带数据的暂停」。 调用interrupt(value)时,LangGraph 会把value通过中断信号抛给外部调用方,并借助 Checkpointer 把当前现场完整落盘。外部审查、决定后,用Command(resume=人类的决策)重新 invoke 同一个thread_id,此时interrupt()会返回那个 resume 值,节点从中断处继续往下执行——就像一个被远程填值的input()。
示例二:需人工审批工具调用的完整闭环
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command
class State(TypedDict):
messages: Annotated[list, add_messages]
# 待执行的工具调用(模拟 LLM 决定要做的高风险动作)
pending_tool: dict
result: str
def plan_node(state: State):
# 模拟 LLM 决定调用一个高风险工具:转账
return {"pending_tool": {"name": "transfer", "args": {"to": "Alice", "amount": 5000}}}
def human_approval_node(state: State):
tool = state["pending_tool"]
# interrupt() 把待审内容抛给外部,并暂停执行
# 外部 resume 传回的值会成为 decision
decision = interrupt(
{
"question": "是否批准以下工具调用?",
"tool": tool,
"hint": "回复 {'action':'approve'} 或 {'action':'edit','args':{...}} 或 {'action':'reject'}",
}
)
action = decision.get("action")
if action == "reject":
return {"result": "工具调用被拒绝,已取消", "pending_tool": {}}
if action == "edit":
# 人类修改了参数,覆盖 pending_tool 的 args
new_args = decision.get("args", {})
tool = {**tool, "args": new_args}
# approve 或 edit 后落地 tool
return {"pending_tool": tool}
def execute_node(state: State):
tool = state["pending_tool"]
if not tool:
return {}
# 真正执行工具(这里用打印模拟副作用)
args = tool["args"]
msg = f"已执行 transfer:向 {args['to']} 转账 {args['amount']}"
return {"result": msg}
builder = StateGraph(State)
builder.add_node("plan", plan_node)
builder.add_node("human", human_approval_node)
builder.add_node("execute", execute_node)
builder.add_edge(START, "plan")
builder.add_edge("plan", "human")
builder.add_edge("human", "execute")
builder.add_edge("execute", END)
# 关键:必须提供 checkpointer,否则 interrupt 无法恢复
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# thread_id 标识这一次会话,恢复时必须用同一个
config = {"configurable": {"thread_id": "approval-001"}}
# --- 第一次运行:跑到 human 节点的 interrupt 处暂停 ---
for chunk in graph.stream({"messages": []}, config, stream_mode="updates"):
print(chunk)
# 输出里会出现 {'__interrupt__': (Interrupt(value={...}),)}
# --- 检查当前现场:state 停在哪、待审值是什么 ---
state = graph.get_state(config)
print("next nodes:", state.next) # ('human',) 表示停在 human 节点
print("interrupts:", state.interrupts) # 取出 interrupt() 抛出的待审值
print("pending_tool:", state.values["pending_tool"])
# --- 人类决策:把金额从 5000 改成 1000 再批准(演示 edit 分支)---
resume_value = {"action": "edit", "args": {"to": "Alice", "amount": 1000}}
# --- 用 Command(resume=...) 恢复,传同一个 config ---
for chunk in graph.stream(Command(resume=resume_value), config, stream_mode="updates"):
print(chunk)
# --- 验证最终结果 ---
final = graph.get_state(config)
print("RESULT:", final.values["result"])
# RESULT: 已执行 transfer:向 Alice 转账 1000
在恢复前直接修改 state
除了用 Command(resume=...) 把值喂给 interrupt(),你还可以在恢复前用 graph.update_state() 直接改写现场 state,再无参恢复。这适用于人类想编辑的不是「interrupt 的返回值」而是「state 本身的某个字段」的场景。
# 假设图已停在某个 interrupt 处。人类想直接改 state 里的 pending_tool。
config = {"configurable": {"thread_id": "approval-001"}}
# 1) 直接覆盖 state 字段(as_node 可指定以哪个节点身份写入)
graph.update_state(
config,
{"pending_tool": {"name": "transfer", "args": {"to": "Bob", "amount": 1}}},
)
# 2) 仍需用 Command(resume=...) 满足 interrupt() 的返回值
for chunk in graph.stream(Command(resume={"action": "approve"}), config):
print(chunk)
# 提示:update_state 改的是 state 字段,Command(resume) 喂的是 interrupt() 的返回值,
# 两者是不同通道,按需要单用或合用。
静态断点 interrupt_before:编译期就钉住的暂停点
# 不在节点内写 interrupt(),而是在 compile 时声明:"进入 execute 节点前先停"
graph = builder.compile(
checkpointer=InMemorySaver(),
interrupt_before=["execute"], # 也有 interrupt_after=[...]
)
config = {"configurable": {"thread_id": "static-001"}}
# 跑到 execute 之前自动暂停
graph.invoke({"messages": []}, config)
print(graph.get_state(config).next) # ('execute',)
# 人类看完后,无参 invoke(None, ...) 即从断点继续
graph.invoke(None, config)
| 维度 | interrupt()(动态) | interrupt_before / after(静态) |
|---|---|---|
| 声明位置 | 节点函数内部,运行期触发 | compile() 参数,编译期固定 |
| 能否携带数据 | 能,抛出任意待审 value | 不能,只是单纯暂停 |
| 恢复方式 | Command(resume=值) | invoke(None, config) 无参续跑 |
| 条件暂停 | 支持,可写 if 决定是否 interrupt | 不支持,进/出该节点必停 |
| 适用场景 | 审批、动态向人要数据、按内容决定是否打断 | 固定调试断点、每次进某节点都人工确认 |
口诀要数据/要条件用 interrupt(),要固定卡点用 interrupt_before
✓推荐做法
- 前端聊天用 stream_mode='messages' 做打字机,后台监控用 'updates' 记进度
- interrupt() 之前只放纯计算与读操作,副作用一律放其后或下游节点
- 恢复前先 graph.get_state(config) 确认 .next 与 .interrupts,再决定 resume 值
- 生产环境用持久化 checkpointer(Postgres/SQLite),InMemorySaver 仅限本地
- 需要携带审批内容时用 interrupt(),只是想插个固定断点时用 interrupt_before
✗不推荐
- 不要在没有 checkpointer 的图里调用 interrupt()——直接报错
- 不要在 interrupt() 之前写扣款 / 发消息等不可逆副作用——节点会重跑导致重复执行
- 不要假设 resume 后整张图从头跑——已完成节点不重跑,但当前中断节点会重跑
- 不要用 stream_mode='values' 做 token 打字机——它给的是每步全量状态,不是 token
- 不要在多模式组合时忘记按 (mode, chunk) 解包——单模式和多模式的 yield 结构不同
⚠常见误区
- 误以为 interrupt() 像 input() 阻塞当前线程:它实际是把控制权交还给外部 stream/invoke 循环
- 多模式 stream(stream_mode=[...]) 时仍按单模式解包,导致拿到的是元组而非 dict
能在不重启进程的前提下,对同一 thread_id 完成 中断→检查 state→修改→resume 的闭环,且不可逆副作用只执行一次。
interrupt 后 resume 报 "no interrupt" 或行为异常
- 典型表现
- Command(resume=...) 后图没有从断点继续,或抱怨没有可恢复的中断
- 判断标准
- 确认 invoke/stream 用的是与中断时完全相同的 thread_id,且图编译时带了 checkpointer
- 解决方向
- 复用同一个 config(含相同 configurable.thread_id),编译时传 checkpointer;用 graph.get_state(config).interrupts 确认确有挂起的中断
审批节点的副作用被执行了两次
- 典型表现
- 恢复后发现日志/数据库里出现重复写入,扣款扣了两次
- 判断标准
- 副作用代码是否位于 interrupt() 调用之前
- 解决方向
- 把不可逆操作移到 interrupt() 之后,或拆到一个仅在 resume 后才会运行的下游节点;interrupt() 之前只保留幂等的读/算逻辑
stream_mode='messages' 收不到 token
- 典型表现
- 迭代 stream 没有 token 流出,或只在结束时拿到完整消息
- 判断标准
- 节点内是否真的调用了支持流式的 chat model,且未被 invoke 一次性返回吞掉
- 解决方向
- 确保用的是 LangChain ChatModel(其 .invoke 在 messages 模式下会被框架自动转为流式);前端用 astream 配 stream_mode='messages',按 (chunk, metadata) 解包并取 chunk.content
多模式组合时解包出错 too many values to unpack
- 典型表现
- for a, b in graph.stream(..., stream_mode=['updates','messages']) 抛解包错误
- 判断标准
- yield 结构:单模式 yield chunk,多模式 yield (mode, chunk)
- 解决方向
- 多模式下统一写成 for mode, chunk in ...,再按 mode 分支处理;其中 messages 的 chunk 本身又是 (message, metadata) 元组,需二次解包
把人类放进循环,不是给系统打补丁,而是承认有些决策本就该由人来做——LangGraph 让这个交接点变成图里一条可持久化、可恢复、可审计的边。
— 本页小结