流式的本质是可观测性,不是吞吐量。 LangGraph 的图本质是同步推进的状态机,stream() 只是在每个 super-step 边界把中间产物 yield 出来。你选择哪种 stream_mode,等于选择在这些边界上观察什么粒度的信息——状态快照、状态 diff,还是底层 LLM 正在吐的字。

三种 stream_mode 的语义

stream_mode每次 yield 的内容数据形态典型用途
values该步执行后的完整 State整个 state dict调试、需要每步全量快照、回放
updates该步节点写入 state 的增量{节点名: 该节点的返回 dict}日志、进度追踪、只关心改了什么
messagesLLM 流式产出的 token + 元数据(message_chunk, metadata) 元组聊天 UI 打字机效果、token 级前端推送
custom节点内用 get_stream_writer() 主动写入的任意数据你写入的对象进度百分比、自定义事件
debug每步的详细调试事件(含 checkpoint)事件 dict深度排障、理解执行细节
口诀values=全量、updates=增量、messages=token、custom=自定义、debug=全暴露

示例一:三种 stream_mode 对比 + 多模式组合

# 安装核心包与 OpenAI 集成(也可换成你用的任意 chat model)
pip install -U langgraph langchain-openai
export OPENAI_API_KEY="sk-..."
from typing import Annotated, TypedDict
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages


class State(TypedDict):
    # add_messages reducer:节点返回 messages 时会追加而非覆盖
    messages: Annotated[list, add_messages]


llm = ChatOpenAI(model="gpt-4o-mini")


def chat_node(state: State):
    # LLM 调用本身会产生 token 流,messages 模式能捕获到
    response = llm.invoke(state["messages"])
    return {"messages": [response]}


builder = StateGraph(State)
builder.add_node("chat", chat_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)
graph = builder.compile()

inp = {"messages": [HumanMessage(content="用一句话解释什么是状态机")]}

# --- 模式 1: values,每步后的完整 state ---
print("===== values =====")
for chunk in graph.stream(inp, stream_mode="values"):
    # chunk 是完整 state dict,messages 字段是累积的全部消息
    print("messages count:", len(chunk["messages"]))

# --- 模式 2: updates,每步的增量 ---
print("===== updates =====")
for chunk in graph.stream(inp, stream_mode="updates"):
    # chunk 形如 {"chat": {"messages": [AIMessage(...)]}}
    for node_name, node_update in chunk.items():
        print(f"node={node_name}, update_keys={list(node_update.keys())}")

# --- 模式 3: messages,token 级输出(打字机效果)---
print("===== messages =====")
for token, metadata in graph.stream(inp, stream_mode="messages"):
    # token 是 AIMessageChunk,metadata 含 langgraph_node 等来源信息
    if token.content:
        print(token.content, end="", flush=True)
print()

# --- 多模式组合:传入 list,yield 出 (mode, chunk) 元组 ---
print("===== combined =====")
for mode, chunk in graph.stream(inp, stream_mode=["updates", "messages"]):
    # 用 mode 区分这一条来自哪种流
    if mode == "updates":
        print("\n[UPDATE]", list(chunk.keys()))
    elif mode == "messages":
        tok, meta = chunk
        if tok.content:
            print(tok.content, end="", flush=True)
print()

Human-in-the-loop:让人类成为图中的一个节点

interrupt() 不是抛异常,而是一次「带数据的暂停」。 调用 interrupt(value) 时,LangGraph 会把 value 通过中断信号抛给外部调用方,并借助 Checkpointer 把当前现场完整落盘。外部审查、决定后,用 Command(resume=人类的决策) 重新 invoke 同一个 thread_id,此时 interrupt()返回那个 resume 值,节点从中断处继续往下执行——就像一个被远程填值的 input()

示例二:需人工审批工具调用的完整闭环

from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command


class State(TypedDict):
    messages: Annotated[list, add_messages]
    # 待执行的工具调用(模拟 LLM 决定要做的高风险动作)
    pending_tool: dict
    result: str


def plan_node(state: State):
    # 模拟 LLM 决定调用一个高风险工具:转账
    return {"pending_tool": {"name": "transfer", "args": {"to": "Alice", "amount": 5000}}}


def human_approval_node(state: State):
    tool = state["pending_tool"]
    # interrupt() 把待审内容抛给外部,并暂停执行
    # 外部 resume 传回的值会成为 decision
    decision = interrupt(
        {
            "question": "是否批准以下工具调用?",
            "tool": tool,
            "hint": "回复 {'action':'approve'} 或 {'action':'edit','args':{...}} 或 {'action':'reject'}",
        }
    )

    action = decision.get("action")
    if action == "reject":
        return {"result": "工具调用被拒绝,已取消", "pending_tool": {}}
    if action == "edit":
        # 人类修改了参数,覆盖 pending_tool 的 args
        new_args = decision.get("args", {})
        tool = {**tool, "args": new_args}
    # approve 或 edit 后落地 tool
    return {"pending_tool": tool}


def execute_node(state: State):
    tool = state["pending_tool"]
    if not tool:
        return {}
    # 真正执行工具(这里用打印模拟副作用)
    args = tool["args"]
    msg = f"已执行 transfer:向 {args['to']} 转账 {args['amount']}"
    return {"result": msg}


builder = StateGraph(State)
builder.add_node("plan", plan_node)
builder.add_node("human", human_approval_node)
builder.add_node("execute", execute_node)
builder.add_edge(START, "plan")
builder.add_edge("plan", "human")
builder.add_edge("human", "execute")
builder.add_edge("execute", END)

# 关键:必须提供 checkpointer,否则 interrupt 无法恢复
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# thread_id 标识这一次会话,恢复时必须用同一个
config = {"configurable": {"thread_id": "approval-001"}}

# --- 第一次运行:跑到 human 节点的 interrupt 处暂停 ---
for chunk in graph.stream({"messages": []}, config, stream_mode="updates"):
    print(chunk)
# 输出里会出现 {'__interrupt__': (Interrupt(value={...}),)}

# --- 检查当前现场:state 停在哪、待审值是什么 ---
state = graph.get_state(config)
print("next nodes:", state.next)            # ('human',) 表示停在 human 节点
print("interrupts:", state.interrupts)       # 取出 interrupt() 抛出的待审值
print("pending_tool:", state.values["pending_tool"])

# --- 人类决策:把金额从 5000 改成 1000 再批准(演示 edit 分支)---
resume_value = {"action": "edit", "args": {"to": "Alice", "amount": 1000}}

# --- 用 Command(resume=...) 恢复,传同一个 config ---
for chunk in graph.stream(Command(resume=resume_value), config, stream_mode="updates"):
    print(chunk)

# --- 验证最终结果 ---
final = graph.get_state(config)
print("RESULT:", final.values["result"])
# RESULT: 已执行 transfer:向 Alice 转账 1000

在恢复前直接修改 state

除了用 Command(resume=...) 把值喂给 interrupt(),你还可以在恢复前用 graph.update_state() 直接改写现场 state,再无参恢复。这适用于人类想编辑的不是「interrupt 的返回值」而是「state 本身的某个字段」的场景。

# 假设图已停在某个 interrupt 处。人类想直接改 state 里的 pending_tool。
config = {"configurable": {"thread_id": "approval-001"}}

# 1) 直接覆盖 state 字段(as_node 可指定以哪个节点身份写入)
graph.update_state(
    config,
    {"pending_tool": {"name": "transfer", "args": {"to": "Bob", "amount": 1}}},
)

# 2) 仍需用 Command(resume=...) 满足 interrupt() 的返回值
for chunk in graph.stream(Command(resume={"action": "approve"}), config):
    print(chunk)

# 提示:update_state 改的是 state 字段,Command(resume) 喂的是 interrupt() 的返回值,
# 两者是不同通道,按需要单用或合用。

静态断点 interrupt_before:编译期就钉住的暂停点

# 不在节点内写 interrupt(),而是在 compile 时声明:"进入 execute 节点前先停"
graph = builder.compile(
    checkpointer=InMemorySaver(),
    interrupt_before=["execute"],   # 也有 interrupt_after=[...]
)

config = {"configurable": {"thread_id": "static-001"}}

# 跑到 execute 之前自动暂停
graph.invoke({"messages": []}, config)
print(graph.get_state(config).next)   # ('execute',)

# 人类看完后,无参 invoke(None, ...) 即从断点继续
graph.invoke(None, config)
维度interrupt()(动态)interrupt_before / after(静态)
声明位置节点函数内部,运行期触发compile() 参数,编译期固定
能否携带数据能,抛出任意待审 value不能,只是单纯暂停
恢复方式Command(resume=值)invoke(None, config) 无参续跑
条件暂停支持,可写 if 决定是否 interrupt不支持,进/出该节点必停
适用场景审批、动态向人要数据、按内容决定是否打断固定调试断点、每次进某节点都人工确认
口诀要数据/要条件用 interrupt(),要固定卡点用 interrupt_before
推荐做法
  • 前端聊天用 stream_mode='messages' 做打字机,后台监控用 'updates' 记进度
  • interrupt() 之前只放纯计算与读操作,副作用一律放其后或下游节点
  • 恢复前先 graph.get_state(config) 确认 .next 与 .interrupts,再决定 resume 值
  • 生产环境用持久化 checkpointer(Postgres/SQLite),InMemorySaver 仅限本地
  • 需要携带审批内容时用 interrupt(),只是想插个固定断点时用 interrupt_before
不推荐
  • 不要在没有 checkpointer 的图里调用 interrupt()——直接报错
  • 不要在 interrupt() 之前写扣款 / 发消息等不可逆副作用——节点会重跑导致重复执行
  • 不要假设 resume 后整张图从头跑——已完成节点不重跑,但当前中断节点会重跑
  • 不要用 stream_mode='values' 做 token 打字机——它给的是每步全量状态,不是 token
  • 不要在多模式组合时忘记按 (mode, chunk) 解包——单模式和多模式的 yield 结构不同
常见误区
  • 误以为 interrupt() 像 input() 阻塞当前线程:它实际是把控制权交还给外部 stream/invoke 循环
  • 多模式 stream(stream_mode=[...]) 时仍按单模式解包,导致拿到的是元组而非 dict

能在不重启进程的前提下,对同一 thread_id 完成 中断→检查 state→修改→resume 的闭环,且不可逆副作用只执行一次。

interrupt 后 resume 报 "no interrupt" 或行为异常

典型表现
Command(resume=...) 后图没有从断点继续,或抱怨没有可恢复的中断
判断标准
确认 invoke/stream 用的是与中断时完全相同的 thread_id,且图编译时带了 checkpointer
解决方向
复用同一个 config(含相同 configurable.thread_id),编译时传 checkpointer;用 graph.get_state(config).interrupts 确认确有挂起的中断

审批节点的副作用被执行了两次

典型表现
恢复后发现日志/数据库里出现重复写入,扣款扣了两次
判断标准
副作用代码是否位于 interrupt() 调用之前
解决方向
把不可逆操作移到 interrupt() 之后,或拆到一个仅在 resume 后才会运行的下游节点;interrupt() 之前只保留幂等的读/算逻辑

stream_mode='messages' 收不到 token

典型表现
迭代 stream 没有 token 流出,或只在结束时拿到完整消息
判断标准
节点内是否真的调用了支持流式的 chat model,且未被 invoke 一次性返回吞掉
解决方向
确保用的是 LangChain ChatModel(其 .invoke 在 messages 模式下会被框架自动转为流式);前端用 astream 配 stream_mode='messages',按 (chunk, metadata) 解包并取 chunk.content

多模式组合时解包出错 too many values to unpack

典型表现
for a, b in graph.stream(..., stream_mode=['updates','messages']) 抛解包错误
判断标准
yield 结构:单模式 yield chunk,多模式 yield (mode, chunk)
解决方向
多模式下统一写成 for mode, chunk in ...,再按 mode 分支处理;其中 messages 的 chunk 本身又是 (message, metadata) 元组,需二次解包

把人类放进循环,不是给系统打补丁,而是承认有些决策本就该由人来做——LangGraph 让这个交接点变成图里一条可持久化、可恢复、可审计的边。

— 本页小结