多智能体编排的统一原语是 Command。一个节点
return Command(goto="next_node", update={...})等价于「同时完成状态更新 + 决定下一跳」,把原本要靠条件边表达的路由收进节点内部。supervisor 是「所有边都指回中心」的特例,swarm 是「节点之间任意 goto」的一般形态。理解了这一点,两种范式就只是同一机制的不同拓扑。
Supervisor 模式:中心路由分派
supervisor 模式有一个 router 节点,它读取当前 State(通常是 messages 或一个显式的 next 字段),用 LLM 或规则决定把控制权交给哪个专家 agent;专家执行完后控制权回到 supervisor,直到 supervisor 判定 FINISH。这是最易调试的多智能体拓扑:所有路由决策都集中在一处。
pip install -U langgraph langchain-openai
export OPENAI_API_KEY=sk-...
from typing import Annotated, Literal, TypedDict
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.types import Command
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# 两个专家 agent + 一个 supervisor,专家集合
MEMBERS = ["researcher", "coder"]
OPTIONS = MEMBERS + ["FINISH"]
# supervisor 用结构化输出强制只返回 next 字段
class Route(TypedDict):
next: Literal["researcher", "coder", "FINISH"]
SUP_PROMPT = (
"You are a supervisor routing between workers: "
f"{MEMBERS}. Given the conversation, respond with the worker "
"to act next. When the task is fully done, respond FINISH."
)
# router 节点:返回 Command,goto 直接指向下一跳
def supervisor(state: MessagesState) -> Command[Literal["researcher", "coder", "__end__"]]:
messages = [{"role": "system", "content": SUP_PROMPT}] + state["messages"]
route: Route = llm.with_structured_output(Route).invoke(messages)
nxt = route["next"]
if nxt == "FINISH":
return Command(goto=END)
return Command(goto=nxt)
# 专家节点:干完活后控制权交回 supervisor
def researcher(state: MessagesState) -> Command[Literal["supervisor"]]:
reply = llm.invoke(
[{"role": "system", "content": "You are a researcher. Give concise facts."}]
+ state["messages"]
)
return Command(
goto="supervisor",
update={"messages": [AIMessage(content=reply.content, name="researcher")]},
)
def coder(state: MessagesState) -> Command[Literal["supervisor"]]:
reply = llm.invoke(
[{"role": "system", "content": "You are a coder. Write minimal code."}]
+ state["messages"]
)
return Command(
goto="supervisor",
update={"messages": [AIMessage(content=reply.content, name="coder")]},
)
builder = StateGraph(MessagesState)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", researcher)
builder.add_node("coder", coder)
builder.add_edge(START, "supervisor")
# 注意:因为节点用 Command(goto=...) 自己决定下一跳,
# 这里不需要再手写 add_conditional_edges,路由已内聚在节点里
graph = builder.compile()
result = graph.invoke({"messages": [HumanMessage("先查一下快排的平均复杂度,再给个 Python 实现")]})
for m in result["messages"]:
print(getattr(m, "name", "user"), "->", m.content[:60])
Swarm 模式:点对点直接移交
swarm 没有中心。每个 agent 自己判断「这事该不该我管」,不该自己管就用 Command(goto=另一个 agent) 把控制权连同更新后的 State 一起交出去。适合角色边界清晰、移交链条较深的场景(如「分诊 → 退款专员 → 账务专员」)。代价是路由逻辑分散在各节点,调试需要追踪移交路径。
from typing import Annotated, Literal
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.types import Command
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# swarm: 客服分诊 agent 与退款 agent 互相移交
# 关键技巧:用一个 "handoff" 结构化决策让 agent 输出是否移交
from typing import TypedDict
class Decision(TypedDict):
handoff_to: Literal["triage", "refund", "none"]
reply: str
def triage(state: MessagesState) -> Command[Literal["refund", "__end__"]]:
d: Decision = llm.with_structured_output(Decision).invoke(
[{"role": "system", "content": (
"You are triage. If the user asks for a refund, set handoff_to=refund. "
"Otherwise answer directly and set handoff_to=none.")}]
+ state["messages"]
)
msg = AIMessage(content=d["reply"], name="triage")
if d["handoff_to"] == "refund":
# 把控制权交给退款 agent,同时把这一轮回复写进 State
return Command(goto="refund", update={"messages": [msg]})
return Command(goto=END, update={"messages": [msg]})
def refund(state: MessagesState) -> Command[Literal["triage", "__end__"]]:
d: Decision = llm.with_structured_output(Decision).invoke(
[{"role": "system", "content": (
"You are the refund specialist. Process the refund and explain steps. "
"If it's NOT actually about refunds, set handoff_to=triage to send it back.")}]
+ state["messages"]
)
msg = AIMessage(content=d["reply"], name="refund")
if d["handoff_to"] == "triage":
return Command(goto="triage", update={"messages": [msg]})
return Command(goto=END, update={"messages": [msg]})
builder = StateGraph(MessagesState)
builder.add_node("triage", triage)
builder.add_node("refund", refund)
builder.add_edge(START, "triage") # 入口固定从分诊开始
swarm = builder.compile()
out = swarm.invoke({"messages": [HumanMessage("我上周买的耳机想退款")]})
for m in out["messages"]:
print(getattr(m, "name", "user"), "->", m.content[:60])
| 维度 | Supervisor(中心路由) | Swarm(点对点移交) |
|---|---|---|
| 路由位置 | 集中在一个 router 节点 | 分散在每个 agent 节点 |
| 拓扑 | 星形:边都连回 supervisor | 网状:节点间任意 goto |
| 可调试性 | 高,决策可单点观察 | 低,需追踪移交链 |
| 适用场景 | 任务可被一个调度者全局规划 | 角色边界清晰、长移交链 |
| 共同原语 | Command(goto, update) | Command(goto, update) |
子图:把 CompiledGraph 当节点嵌入
任何 builder.compile() 得到的 CompiledGraph 都实现了 Runnable 接口,因此能被当作父图的一个节点。子图封装分两种:当 父子共享 State key 时,直接把编译后的子图对象作为节点添加,LangGraph 自动透传匹配的字段;当 父子 State schema 不同 时,需要写一个包装函数,在其中 subgraph.invoke(...) 并手工做字段映射。
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
# ---------- 情形 A:共享 State key,直接嵌入 ----------
class SharedState(TypedDict):
value: int
def sub_double(state: SharedState) -> SharedState:
return {"value": state["value"] * 2}
sub_builder = StateGraph(SharedState)
sub_builder.add_node("double", sub_double)
sub_builder.add_edge(START, "double")
sub_builder.add_edge("double", END)
subgraph = sub_builder.compile()
parent_a = StateGraph(SharedState)
parent_a.add_node("plus_one", lambda s: {"value": s["value"] + 1})
# 关键:编译后的 subgraph 直接作为节点,共享 key=value 自动透传
parent_a.add_node("sub", subgraph)
parent_a.add_edge(START, "plus_one")
parent_a.add_edge("plus_one", "sub")
parent_a.add_edge("sub", END)
graph_a = parent_a.compile()
print(graph_a.invoke({"value": 10})) # (10+1)*2 -> {'value': 22}
# ---------- 情形 B:State schema 不同,包装函数做映射 ----------
class ParentState(TypedDict):
text: str
length: int
class InnerState(TypedDict): # 子图用完全不同的字段名
payload: str
size: int
def inner_measure(state: InnerState) -> InnerState:
return {"size": len(state["payload"])}
inner_builder = StateGraph(InnerState)
inner_builder.add_node("measure", inner_measure)
inner_builder.add_edge(START, "measure")
inner_builder.add_edge("measure", END)
inner_graph = inner_builder.compile()
# 包装函数:父 State -> 子 State -> 父 State 的双向映射
def call_inner(state: ParentState) -> ParentState:
sub_in = {"payload": state["text"]} # 入口字段映射
sub_out = inner_graph.invoke(sub_in) # 显式 invoke 子图
return {"length": sub_out["size"]} # 出口字段映射
parent_b = StateGraph(ParentState)
parent_b.add_node("inner", call_inner)
parent_b.add_edge(START, "inner")
parent_b.add_edge("inner", END)
graph_b = parent_b.compile()
print(graph_b.invoke({"text": "langgraph", "length": 0})) # {'text': 'langgraph', 'length': 9}
工具集成:bind_tools 与 ToolNode
工具调用是一个两步闭环:模型用 bind_tools 后会在响应里产出 tool_calls;ToolNode 负责读取这些 tool_calls、实际执行对应的 @tool 函数、并把结果包成 ToolMessage 写回 messages。再用一条条件边判断「上一条 AI 消息还有没有 tool_calls」来决定是回到模型还是结束,就构成了手写版 ReAct 循环。
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.prebuilt import ToolNode, tools_condition
@tool
def get_weather(city: str) -> str:
"""Return the current weather for a city."""
return f"{city} is sunny, 24C"
@tool
def add(a: int, b: int) -> int:
"""Add two integers."""
return a + b
tools = [get_weather, add]
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0).bind_tools(tools)
def call_model(state: MessagesState):
return {"messages": [llm.invoke(state["messages"])]}
builder = StateGraph(MessagesState)
builder.add_node("agent", call_model)
builder.add_node("tools", ToolNode(tools)) # 自动执行 tool_calls -> ToolMessage
builder.add_edge(START, "agent")
# tools_condition 是 prebuilt 的现成路由:有 tool_calls 去 tools,否则 END
builder.add_conditional_edges("agent", tools_condition)
builder.add_edge("tools", "agent") # 工具结果回灌给模型,形成 ReAct 循环
graph = builder.compile()
out = graph.invoke({"messages": [("user", "北京天气怎么样?顺便算 3+5")]})
print(out["messages"][-1].content)
prebuilt:create_react_agent 一行起步
上面那套 agent + ToolNode + tools_condition 的循环极其常见,LangGraph 把它封进了 create_react_agent。它返回的就是一个普通 CompiledGraph,所以照样能 .invoke / .stream,也照样能传 checkpointer 启用 thread 持久化、传 interrupt_before 接入 HITL。
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import InMemorySaver
@tool
def get_weather(city: str) -> str:
"""Return the current weather for a city."""
return f"{city} is sunny, 24C"
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
checkpointer = InMemorySaver()
# 一行构建 ReAct agent:内部自带 agent/tools 循环
agent = create_react_agent(
model=model,
tools=[get_weather],
prompt="You are a concise weather assistant.", # system prompt
checkpointer=checkpointer, # 接入持久化
interrupt_before=["tools"], # HITL:执行工具前中断
)
config = {"configurable": {"thread_id": "u-1"}}
# 第一次跑会在进入 tools 节点前停下(interrupt_before)
first = agent.invoke({"messages": [("user", "上海天气如何?")]}, config)
snap = agent.get_state(config)
print("被中断,下一步要执行:", snap.next) # ('tools',)
# 人工审核通过后,用 None 继续从中断点恢复
resumed = agent.invoke(None, config)
print(resumed["messages"][-1].content)
✓推荐做法
- 先用 create_react_agent 把单 agent 跑通,确认工具与 prompt 正确,再考虑拆成多 agent
- supervisor 的 router 用 with_structured_output 强制输出固定的 next 字段,避免自由文本路由不稳定
- 把多 agent 共享的会话历史放在 MessagesState 的 messages 上,靠 add_messages reducer 累积
- 子图复用前先确认 State schema 是否兼容:兼容直接嵌入,不兼容写映射包装函数
✗不推荐
- 不要在一个巨型 agent 里堆 20+ 工具和超长 system prompt,工具选择准确率会断崖下降
- 不要在用 Command(goto=...) 的节点上又手写重复的 add_conditional_edges,路由会冲突且难调试
- 不要忘了给专家节点写回 Command 的 goto,否则控制权无处可去导致图卡住或提前 END
- 不要假设 parent.stream 默认能看到子图内部事件——必须显式 subgraphs=True
⚠常见误区
- Command 节点漏写 Literal 类型注解:图能跑但 draw_mermaid 缺边,拓扑不可视化
- swarm 两个 agent 互相 handoff 形成死循环:需要在 prompt/逻辑里设终止条件或加跳数上限(recursion_limit)
- 子图与父图都定义了同名 key 但 reducer 不一致:合并行为会出乎意料,务必让共享 key 的 Annotated reducer 一致
能画出系统的节点拓扑图、说清每个 Command 的 goto 去向、并在断点处用 get_state().next 看到正确的下一跳,多智能体编排才算搭对。
GraphRecursionError
- 典型表现
- swarm 或 supervisor 运行时抛 GraphRecursionError: Recursion limit of 25 reached
- 判断标准
- agent 之间反复 handoff / supervisor 反复分派同一专家,没有收敛到 FINISH/END
- 解决方向
- 在 prompt 里加明确的终止判据;或 invoke 时传 config={"recursion_limit": 50} 临时放宽,但根因是路由逻辑不收敛,要修决策而非单纯调大上限
子图状态没透传
- 典型表现
- 把 compiled subgraph 当节点嵌入后,子图拿不到父图传入的字段或更新没写回父图
- 判断标准
- 父子 State schema 字段名不一致,却用了情形 A 的直接嵌入写法
- 解决方向
- 改用情形 B:写 call_inner 包装函数,显式 subgraph.invoke(映射后的输入),再把子图输出映射回父图字段
ToolNode 不执行工具
- 典型表现
- 模型回复里像要调工具,但 ToolNode 节点没产出 ToolMessage,循环空转
- 判断标准
- 模型没 bind_tools,或返回的是自由文本而非结构化 tool_calls
- 解决方向
- 确认 llm = ChatOpenAI(...).bind_tools(tools);条件边用 prebuilt 的 tools_condition 检测 tool_calls,而不是自己 split 字符串
create_react_agent 中断不生效
- 典型表现
- 传了 interrupt_before=['tools'] 但图一口气跑完,没有停在工具前
- 判断标准
- create_react_agent 没传 checkpointer,或 invoke 时没带 thread_id
- 解决方向
- 构建时传 checkpointer=InMemorySaver(),invoke 时带 config={'configurable': {'thread_id': '...'}},中断与恢复依赖持久化
多智能体不是把一个 agent 复制多份,而是把控制流显式化:谁决策、交给谁、带着什么状态——这正是 Command 在做的事。
— LangGraph 设计哲学
下一章进入 踩坑与生产部署:recursion_limit 与无限循环、checkpointer 选型(InMemory vs Sqlite vs Postgres)、状态膨胀与 messages 裁剪、并发写同一 thread 的竞态,以及把图部署为长期服务时的可观测性与回滚策略。