多智能体编排的统一原语是 Command。一个节点 return Command(goto="next_node", update={...}) 等价于「同时完成状态更新 + 决定下一跳」,把原本要靠条件边表达的路由收进节点内部。supervisor 是「所有边都指回中心」的特例,swarm 是「节点之间任意 goto」的一般形态。理解了这一点,两种范式就只是同一机制的不同拓扑。

Supervisor 模式:中心路由分派

supervisor 模式有一个 router 节点,它读取当前 State(通常是 messages 或一个显式的 next 字段),用 LLM 或规则决定把控制权交给哪个专家 agent;专家执行完后控制权回到 supervisor,直到 supervisor 判定 FINISH。这是最易调试的多智能体拓扑:所有路由决策都集中在一处。

pip install -U langgraph langchain-openai
export OPENAI_API_KEY=sk-...
from typing import Annotated, Literal, TypedDict
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.types import Command

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# 两个专家 agent + 一个 supervisor,专家集合
MEMBERS = ["researcher", "coder"]
OPTIONS = MEMBERS + ["FINISH"]

# supervisor 用结构化输出强制只返回 next 字段
class Route(TypedDict):
    next: Literal["researcher", "coder", "FINISH"]

SUP_PROMPT = (
    "You are a supervisor routing between workers: "
    f"{MEMBERS}. Given the conversation, respond with the worker "
    "to act next. When the task is fully done, respond FINISH."
)

# router 节点:返回 Command,goto 直接指向下一跳
def supervisor(state: MessagesState) -> Command[Literal["researcher", "coder", "__end__"]]:
    messages = [{"role": "system", "content": SUP_PROMPT}] + state["messages"]
    route: Route = llm.with_structured_output(Route).invoke(messages)
    nxt = route["next"]
    if nxt == "FINISH":
        return Command(goto=END)
    return Command(goto=nxt)

# 专家节点:干完活后控制权交回 supervisor
def researcher(state: MessagesState) -> Command[Literal["supervisor"]]:
    reply = llm.invoke(
        [{"role": "system", "content": "You are a researcher. Give concise facts."}]
        + state["messages"]
    )
    return Command(
        goto="supervisor",
        update={"messages": [AIMessage(content=reply.content, name="researcher")]},
    )

def coder(state: MessagesState) -> Command[Literal["supervisor"]]:
    reply = llm.invoke(
        [{"role": "system", "content": "You are a coder. Write minimal code."}]
        + state["messages"]
    )
    return Command(
        goto="supervisor",
        update={"messages": [AIMessage(content=reply.content, name="coder")]},
    )

builder = StateGraph(MessagesState)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", researcher)
builder.add_node("coder", coder)
builder.add_edge(START, "supervisor")
# 注意:因为节点用 Command(goto=...) 自己决定下一跳,
# 这里不需要再手写 add_conditional_edges,路由已内聚在节点里
graph = builder.compile()

result = graph.invoke({"messages": [HumanMessage("先查一下快排的平均复杂度,再给个 Python 实现")]})
for m in result["messages"]:
    print(getattr(m, "name", "user"), "->", m.content[:60])

Swarm 模式:点对点直接移交

swarm 没有中心。每个 agent 自己判断「这事该不该我管」,不该自己管就用 Command(goto=另一个 agent) 把控制权连同更新后的 State 一起交出去。适合角色边界清晰、移交链条较深的场景(如「分诊 → 退款专员 → 账务专员」)。代价是路由逻辑分散在各节点,调试需要追踪移交路径。

from typing import Annotated, Literal
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.types import Command

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# swarm: 客服分诊 agent 与退款 agent 互相移交
# 关键技巧:用一个 "handoff" 结构化决策让 agent 输出是否移交
from typing import TypedDict
class Decision(TypedDict):
    handoff_to: Literal["triage", "refund", "none"]
    reply: str

def triage(state: MessagesState) -> Command[Literal["refund", "__end__"]]:
    d: Decision = llm.with_structured_output(Decision).invoke(
        [{"role": "system", "content": (
            "You are triage. If the user asks for a refund, set handoff_to=refund. "
            "Otherwise answer directly and set handoff_to=none.")}]
        + state["messages"]
    )
    msg = AIMessage(content=d["reply"], name="triage")
    if d["handoff_to"] == "refund":
        # 把控制权交给退款 agent,同时把这一轮回复写进 State
        return Command(goto="refund", update={"messages": [msg]})
    return Command(goto=END, update={"messages": [msg]})

def refund(state: MessagesState) -> Command[Literal["triage", "__end__"]]:
    d: Decision = llm.with_structured_output(Decision).invoke(
        [{"role": "system", "content": (
            "You are the refund specialist. Process the refund and explain steps. "
            "If it's NOT actually about refunds, set handoff_to=triage to send it back.")}]
        + state["messages"]
    )
    msg = AIMessage(content=d["reply"], name="refund")
    if d["handoff_to"] == "triage":
        return Command(goto="triage", update={"messages": [msg]})
    return Command(goto=END, update={"messages": [msg]})

builder = StateGraph(MessagesState)
builder.add_node("triage", triage)
builder.add_node("refund", refund)
builder.add_edge(START, "triage")  # 入口固定从分诊开始
swarm = builder.compile()

out = swarm.invoke({"messages": [HumanMessage("我上周买的耳机想退款")]})
for m in out["messages"]:
    print(getattr(m, "name", "user"), "->", m.content[:60])
维度Supervisor(中心路由)Swarm(点对点移交)
路由位置集中在一个 router 节点分散在每个 agent 节点
拓扑星形:边都连回 supervisor网状:节点间任意 goto
可调试性高,决策可单点观察低,需追踪移交链
适用场景任务可被一个调度者全局规划角色边界清晰、长移交链
共同原语Command(goto, update)Command(goto, update)
口诀中心调度选 supervisor,平级接力选 swarm,底层都是一个 Command。

子图:把 CompiledGraph 当节点嵌入

任何 builder.compile() 得到的 CompiledGraph 都实现了 Runnable 接口,因此能被当作父图的一个节点。子图封装分两种:当 父子共享 State key 时,直接把编译后的子图对象作为节点添加,LangGraph 自动透传匹配的字段;当 父子 State schema 不同 时,需要写一个包装函数,在其中 subgraph.invoke(...) 并手工做字段映射。

from typing import TypedDict
from langgraph.graph import StateGraph, START, END

# ---------- 情形 A:共享 State key,直接嵌入 ----------
class SharedState(TypedDict):
    value: int

def sub_double(state: SharedState) -> SharedState:
    return {"value": state["value"] * 2}

sub_builder = StateGraph(SharedState)
sub_builder.add_node("double", sub_double)
sub_builder.add_edge(START, "double")
sub_builder.add_edge("double", END)
subgraph = sub_builder.compile()

parent_a = StateGraph(SharedState)
parent_a.add_node("plus_one", lambda s: {"value": s["value"] + 1})
# 关键:编译后的 subgraph 直接作为节点,共享 key=value 自动透传
parent_a.add_node("sub", subgraph)
parent_a.add_edge(START, "plus_one")
parent_a.add_edge("plus_one", "sub")
parent_a.add_edge("sub", END)
graph_a = parent_a.compile()
print(graph_a.invoke({"value": 10}))   # (10+1)*2 -> {'value': 22}

# ---------- 情形 B:State schema 不同,包装函数做映射 ----------
class ParentState(TypedDict):
    text: str
    length: int

class InnerState(TypedDict):   # 子图用完全不同的字段名
    payload: str
    size: int

def inner_measure(state: InnerState) -> InnerState:
    return {"size": len(state["payload"])}

inner_builder = StateGraph(InnerState)
inner_builder.add_node("measure", inner_measure)
inner_builder.add_edge(START, "measure")
inner_builder.add_edge("measure", END)
inner_graph = inner_builder.compile()

# 包装函数:父 State -> 子 State -> 父 State 的双向映射
def call_inner(state: ParentState) -> ParentState:
    sub_in = {"payload": state["text"]}          # 入口字段映射
    sub_out = inner_graph.invoke(sub_in)          # 显式 invoke 子图
    return {"length": sub_out["size"]}            # 出口字段映射

parent_b = StateGraph(ParentState)
parent_b.add_node("inner", call_inner)
parent_b.add_edge(START, "inner")
parent_b.add_edge("inner", END)
graph_b = parent_b.compile()
print(graph_b.invoke({"text": "langgraph", "length": 0}))  # {'text': 'langgraph', 'length': 9}

工具集成:bind_tools 与 ToolNode

工具调用是一个两步闭环:模型用 bind_tools 后会在响应里产出 tool_callsToolNode 负责读取这些 tool_calls、实际执行对应的 @tool 函数、并把结果包成 ToolMessage 写回 messages。再用一条条件边判断「上一条 AI 消息还有没有 tool_calls」来决定是回到模型还是结束,就构成了手写版 ReAct 循环。

from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.prebuilt import ToolNode, tools_condition

@tool
def get_weather(city: str) -> str:
    """Return the current weather for a city."""
    return f"{city} is sunny, 24C"

@tool
def add(a: int, b: int) -> int:
    """Add two integers."""
    return a + b

tools = [get_weather, add]
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0).bind_tools(tools)

def call_model(state: MessagesState):
    return {"messages": [llm.invoke(state["messages"])]}

builder = StateGraph(MessagesState)
builder.add_node("agent", call_model)
builder.add_node("tools", ToolNode(tools))   # 自动执行 tool_calls -> ToolMessage
builder.add_edge(START, "agent")
# tools_condition 是 prebuilt 的现成路由:有 tool_calls 去 tools,否则 END
builder.add_conditional_edges("agent", tools_condition)
builder.add_edge("tools", "agent")           # 工具结果回灌给模型,形成 ReAct 循环
graph = builder.compile()

out = graph.invoke({"messages": [("user", "北京天气怎么样?顺便算 3+5")]})
print(out["messages"][-1].content)

prebuilt:create_react_agent 一行起步

上面那套 agent + ToolNode + tools_condition 的循环极其常见,LangGraph 把它封进了 create_react_agent。它返回的就是一个普通 CompiledGraph,所以照样能 .invoke / .stream,也照样能传 checkpointer 启用 thread 持久化、传 interrupt_before 接入 HITL。

from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import InMemorySaver

@tool
def get_weather(city: str) -> str:
    """Return the current weather for a city."""
    return f"{city} is sunny, 24C"

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
checkpointer = InMemorySaver()

# 一行构建 ReAct agent:内部自带 agent/tools 循环
agent = create_react_agent(
    model=model,
    tools=[get_weather],
    prompt="You are a concise weather assistant.",   # system prompt
    checkpointer=checkpointer,                        # 接入持久化
    interrupt_before=["tools"],                       # HITL:执行工具前中断
)

config = {"configurable": {"thread_id": "u-1"}}
# 第一次跑会在进入 tools 节点前停下(interrupt_before)
first = agent.invoke({"messages": [("user", "上海天气如何?")]}, config)
snap = agent.get_state(config)
print("被中断,下一步要执行:", snap.next)   # ('tools',)

# 人工审核通过后,用 None 继续从中断点恢复
resumed = agent.invoke(None, config)
print(resumed["messages"][-1].content)
推荐做法
  • 先用 create_react_agent 把单 agent 跑通,确认工具与 prompt 正确,再考虑拆成多 agent
  • supervisor 的 router 用 with_structured_output 强制输出固定的 next 字段,避免自由文本路由不稳定
  • 把多 agent 共享的会话历史放在 MessagesState 的 messages 上,靠 add_messages reducer 累积
  • 子图复用前先确认 State schema 是否兼容:兼容直接嵌入,不兼容写映射包装函数
不推荐
  • 不要在一个巨型 agent 里堆 20+ 工具和超长 system prompt,工具选择准确率会断崖下降
  • 不要在用 Command(goto=...) 的节点上又手写重复的 add_conditional_edges,路由会冲突且难调试
  • 不要忘了给专家节点写回 Command 的 goto,否则控制权无处可去导致图卡住或提前 END
  • 不要假设 parent.stream 默认能看到子图内部事件——必须显式 subgraphs=True
常见误区
  • Command 节点漏写 Literal 类型注解:图能跑但 draw_mermaid 缺边,拓扑不可视化
  • swarm 两个 agent 互相 handoff 形成死循环:需要在 prompt/逻辑里设终止条件或加跳数上限(recursion_limit)
  • 子图与父图都定义了同名 key 但 reducer 不一致:合并行为会出乎意料,务必让共享 key 的 Annotated reducer 一致

能画出系统的节点拓扑图、说清每个 Command 的 goto 去向、并在断点处用 get_state().next 看到正确的下一跳,多智能体编排才算搭对。

GraphRecursionError

典型表现
swarm 或 supervisor 运行时抛 GraphRecursionError: Recursion limit of 25 reached
判断标准
agent 之间反复 handoff / supervisor 反复分派同一专家,没有收敛到 FINISH/END
解决方向
在 prompt 里加明确的终止判据;或 invoke 时传 config={"recursion_limit": 50} 临时放宽,但根因是路由逻辑不收敛,要修决策而非单纯调大上限

子图状态没透传

典型表现
把 compiled subgraph 当节点嵌入后,子图拿不到父图传入的字段或更新没写回父图
判断标准
父子 State schema 字段名不一致,却用了情形 A 的直接嵌入写法
解决方向
改用情形 B:写 call_inner 包装函数,显式 subgraph.invoke(映射后的输入),再把子图输出映射回父图字段

ToolNode 不执行工具

典型表现
模型回复里像要调工具,但 ToolNode 节点没产出 ToolMessage,循环空转
判断标准
模型没 bind_tools,或返回的是自由文本而非结构化 tool_calls
解决方向
确认 llm = ChatOpenAI(...).bind_tools(tools);条件边用 prebuilt 的 tools_condition 检测 tool_calls,而不是自己 split 字符串

create_react_agent 中断不生效

典型表现
传了 interrupt_before=['tools'] 但图一口气跑完,没有停在工具前
判断标准
create_react_agent 没传 checkpointer,或 invoke 时没带 thread_id
解决方向
构建时传 checkpointer=InMemorySaver(),invoke 时带 config={'configurable': {'thread_id': '...'}},中断与恢复依赖持久化

多智能体不是把一个 agent 复制多份,而是把控制流显式化:谁决策、交给谁、带着什么状态——这正是 Command 在做的事。

— LangGraph 设计哲学

下一章进入 踩坑与生产部署:recursion_limit 与无限循环、checkpointer 选型(InMemory vs Sqlite vs Postgres)、状态膨胀与 messages 裁剪、并发写同一 thread 的竞态,以及把图部署为长期服务时的可观测性与回滚策略。