DEV Community

lijesom9-create
lijesom9-create

Posted on

LangGraph Agent Development Guide

LangGraph Agent开发指南:构建有状态的AI工作流

LangGraph是LangChain团队推出的新框架,专门用于构建有状态、可循环的AI Agent。本文基于education-agent的LangGraph实现,带你深入理解LangGraph的核心概念和实战技巧。

前言

LangChain很强大,但有个问题:它不擅长处理循环和条件分支

现实世界的Agent需要:

  • 根据结果决定下一步
  • 循环执行直到满足条件
  • 在多个Agent之间协作

这就是LangGraph要解决的问题。

LangGraph核心概念

图(Graph)

LangGraph的核心是有向图

from langgraph.graph import StateGraph

# 创建状态图
workflow = StateGraph(AgentState)
Enter fullscreen mode Exit fullscreen mode

节点(Node)

节点是图中的处理单元:

def agent_node(state: AgentState):
    """Agent节点"""
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

def tool_node(state: AgentState):
    """工具节点"""
    # 执行工具调用
    results = []
    for tool_call in state["messages"][-1].tool_calls:
        result = tools[tool_call["name"]].invoke(tool_call["args"])
        results.append(result)
    return {"messages": results}

# 添加节点
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
Enter fullscreen mode Exit fullscreen mode

边(Edge)

边定义节点之间的连接:

# 普通边
workflow.add_edge("agent", "tools")

# 条件边
def should_continue(state: AgentState):
    """决定是否继续"""
    if state["messages"][-1].tool_calls:
        return "tools"
    return END

workflow.add_conditional_edges("agent", should_continue)
Enter fullscreen mode Exit fullscreen mode

状态(State)

状态是节点之间传递的数据:

from typing import TypedDict, Annotated
from operator import add

class AgentState(TypedDict):
    messages: Annotated[list, add]  # 消息列表,自动合并
    next_step: str                   # 下一步
Enter fullscreen mode Exit fullscreen mode

完整的Agent实现

基础Agent

# langgraph_agent/basic_agent.py
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage

class AgentState(TypedDict):
    messages: Annotated[list, add]

def create_basic_agent(tools: list):
    """创建基础Agent"""
    llm = ChatOpenAI(model="gpt-4").bind_tools(tools)

    # 定义节点
    def agent_node(state: AgentState):
        response = llm.invoke(state["messages"])
        return {"messages": [response]}

    def tool_node(state: AgentState):
        results = []
        for tool_call in state["messages"][-1].tool_calls:
            tool = next(t for t in tools if t.name == tool_call["name"])
            result = tool.invoke(tool_call["args"])
            results.append(ToolMessage(content=str(result), tool_call_id=tool_call["id"]))
        return {"messages": results}

    # 定义条件边
    def should_continue(state: AgentState):
        if state["messages"][-1].tool_calls:
            return "tools"
        return END

    # 构建图
    workflow = StateGraph(AgentState)
    workflow.add_node("agent", agent_node)
    workflow.add_node("tools", tool_node)

    workflow.set_entry_point("agent")
    workflow.add_conditional_edges("agent", should_continue)
    workflow.add_edge("tools", "agent")

    return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

带记忆的Agent

# langgraph_agent/memory_agent.py
from langgraph.checkpoint.memory import MemorySaver

def create_memory_agent(tools: list):
    """创建带记忆的Agent"""
    agent = create_basic_agent(tools)

    # 添加检查点
    memory = MemorySaver()
    agent = agent.with_config({"checkpointer": memory})

    return agent

# 使用
agent = create_memory_agent(tools)

# 带thread_id的调用(持久化对话)
config = {"configurable": {"thread_id": "user_123"}}
result = agent.invoke({"messages": [HumanMessage(content="你好")]}, config)
Enter fullscreen mode Exit fullscreen mode

Education-Agent的LangGraph实现

RAG Agent

# langgraph_agent/rag_agent.py
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END

class RAGState(TypedDict):
    question: str
    documents: list
    generation: str
    retrieval_needed: bool

def create_rag_agent(retriever, llm):
    """创建RAG Agent"""

    def retrieve(state: RAGState):
        """检索文档"""
        question = state["question"]
        documents = retriever.invoke(question)
        return {"documents": documents}

    def generate(state: RAGState):
        """生成答案"""
        question = state["question"]
        documents = state["documents"]

        prompt = f"""基于以下文档回答问题:

文档:{documents}

问题:{question}

要求:引用来源。"""

        generation = llm.invoke(prompt)
        return {"generation": generation}

    def grade_documents(state: RAGState):
        """评估文档相关性"""
        question = state["question"]
        documents = state["documents"]

        prompt = f"""评估以下文档是否与问题相关:

问题:{question}
文档:{documents}

返回:relevant 或 irrelevant"""

        grade = llm.invoke(prompt)
        return {"retrieval_needed": grade == "irrelevant"}

    def decide_next(state: RAGState):
        """决定下一步"""
        if state.get("retrieval_needed"):
            return "retrieve"
        return "generate"

    # 构建图
    workflow = StateGraph(RAGState)
    workflow.add_node("retrieve", retrieve)
    workflow.add_node("grade", grade_documents)
    workflow.add_node("generate", generate)

    workflow.set_entry_point("retrieve")
    workflow.add_edge("retrieve", "grade")
    workflow.add_conditional_edges("grade", decide_next)
    workflow.add_edge("generate", END)

    return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

Multi-Agent协作

# langgraph_agent/multi_agent.py
from langgraph.graph import StateGraph, END

class MultiAgentState(TypedDict):
    task: str
    current_agent: str
    results: dict
    messages: list

def create_multi_agent_system():
    """创建多Agent系统"""

    def coordinator(state: MultiAgentState):
        """协调者Agent"""
        task = state["task"]

        # 决定由哪个Agent处理
        prompt = f"""决定任务应该由哪个Agent处理:

任务:{task}

可选Agent:
- researcher: 研究和搜索
- coder: 编写代码
- reviewer: 代码审查

返回Agent名称。"""

        next_agent = llm.invoke(prompt)
        return {"current_agent": next_agent}

    def researcher(state: MultiAgentState):
        """研究Agent"""
        task = state["task"]
        # 执行研究任务
        result = research_task(task)
        return {"results": {"researcher": result}}

    def coder(state: MultiAgentState):
        """编码Agent"""
        task = state["task"]
        # 执行编码任务
        result = code_task(task)
        return {"results": {"coder": result}}

    def reviewer(state: MultiAgentState):
        """审查Agent"""
        task = state["task"]
        results = state["results"]
        # 审查结果
        review = review_task(task, results)
        return {"results": {"reviewer": review}}

    def route_agent(state: MultiAgentState):
        """路由到对应Agent"""
        return state["current_agent"]

    # 构建图
    workflow = StateGraph(MultiAgentState)
    workflow.add_node("coordinator", coordinator)
    workflow.add_node("researcher", researcher)
    workflow.add_node("coder", coder)
    workflow.add_node("reviewer", reviewer)

    workflow.set_entry_point("coordinator")
    workflow.add_conditional_edges("coordinator", route_agent)
    workflow.add_edge("researcher", END)
    workflow.add_edge("coder", END)
    workflow.add_edge("reviewer", END)

    return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

状态管理

状态定义

from typing import TypedDict, Annotated, Union
from operator import add

class AgentState(TypedDict):
    # 消息列表(自动合并)
    messages: Annotated[list, add]

    # 当前步骤
    current_step: str

    # 中间结果
    intermediate_results: dict

    # 最终输出
    final_output: str
Enter fullscreen mode Exit fullscreen mode

状态更新

def update_state(state: AgentState, updates: dict) -> AgentState:
    """更新状态"""
    new_state = state.copy()

    for key, value in updates.items():
        if key == "messages":
            # 消息追加
            new_state["messages"] = state["messages"] + value
        else:
            # 其他字段覆盖
            new_state[key] = value

    return new_state
Enter fullscreen mode Exit fullscreen mode

条件路由

基于状态的路由

def route_based_on_state(state: AgentState) -> str:
    """基于状态决定路由"""
    if state.get("needs_search"):
        return "search_agent"
    elif state.get("needs_code"):
        return "code_agent"
    else:
        return END
Enter fullscreen mode Exit fullscreen mode

基于LLM的路由

def route_based_on_llm(state: AgentState) -> str:
    """用LLM决定路由"""
    task = state["task"]

    prompt = f"""决定任务类型:

任务:{task}

类型:
- search: 需要搜索信息
- code: 需要编写代码
- analysis: 需要分析数据

返回类型名称。"""

    task_type = llm.invoke(prompt)

    return {
        "search": "search_agent",
        "code": "code_agent",
        "analysis": "analysis_agent"
    }.get(task_type, END)
Enter fullscreen mode Exit fullscreen mode

流式输出

async def stream_agent(agent, input_data):
    """流式执行Agent"""
    async for event in agent.astream_events(input_data):
        kind = event["event"]

        if kind == "on_chat_model_stream":
            # 流式输出LLM响应
            content = event["data"]["chunk"].content
            print(content, end="", flush=True)

        elif kind == "on_tool_start":
            # 工具开始执行
            print(f"\n🔧 调用工具: {event['name']}")

        elif kind == "on_tool_end":
            # 工具执行完成
            print(f"✅ 工具结果: {event['data'].content[:100]}")
Enter fullscreen mode Exit fullscreen mode

错误处理

from langgraph.graph import StateGraph, END

def create_resilient_agent(tools: list):
    """创建有容错能力的Agent"""

    def agent_node(state: AgentState):
        try:
            response = llm.invoke(state["messages"])
            return {"messages": [response]}
        except Exception as e:
            return {"messages": [AIMessage(content=f"Error: {e}")]}

    def tool_node(state: AgentState):
        results = []
        for tool_call in state["messages"][-1].tool_calls:
            try:
                tool = next(t for t in tools if t.name == tool_call["name"])
                result = tool.invoke(tool_call["args"])
                results.append(ToolMessage(content=str(result), tool_call_id=tool_call["id"]))
            except Exception as e:
                results.append(ToolMessage(content=f"Error: {e}", tool_call_id=tool_call["id"]))
        return {"messages": results}

    def should_continue(state: AgentState):
        last_message = state["messages"][-1]

        # 检查是否有错误
        if "Error" in last_message.content:
            return "error_handler"

        # 检查是否有工具调用
        if hasattr(last_message, "tool_calls") and last_message.tool_calls:
            return "tools"

        return END

    def error_handler(state: AgentState):
        """错误处理"""
        error_msg = state["messages"][-1].content
        return {"messages": [AIMessage(content=f"遇到错误: {error_msg},请重试。")]}

    # 构建图
    workflow = StateGraph(AgentState)
    workflow.add_node("agent", agent_node)
    workflow.add_node("tools", tool_node)
    workflow.add_node("error_handler", error_handler)

    workflow.set_entry_point("agent")
    workflow.add_conditional_edges("agent", should_continue)
    workflow.add_edge("tools", "agent")
    workflow.add_edge("error_handler", "agent")

    return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

实战:构建RAG + Agent系统

# complete_rag_agent.py
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_community.tools import Tool

class RAGAgentState(TypedDict):
    question: str
    documents: list
    generation: str
    iterations: int

def create_complete_rag_agent(vector_store, llm):
    """创建完整的RAG Agent"""

    # 定义工具
    def search_docs(query: str) -> str:
        results = vector_store.search(query, top_k=3)
        return "\n".join([r["text"] for r in results])

    tools = [Tool(name="search_docs", func=search_docs, description="搜索文档")]

    # 定义节点
    def retrieve(state: RAGAgentState):
        question = state["question"]
        documents = search_docs(question)
        return {"documents": documents, "iterations": state.get("iterations", 0) + 1}

    def generate(state: RAGAgentState):
        question = state["question"]
        documents = state["documents"]

        prompt = f"""基于以下文档回答问题:

文档:{documents}

问题:{question}

如果文档不足以回答,请说明需要更多信息。"""

        generation = llm.invoke(prompt)
        return {"generation": generation}

    def should_continue(state: RAGAgentState):
        # 检查是否需要更多检索
        if "信息不足" in state["generation"] and state["iterations"] < 3:
            return "retrieve"
        return END

    # 构建图
    workflow = StateGraph(RAGAgentState)
    workflow.add_node("retrieve", retrieve)
    workflow.add_node("generate", generate)

    workflow.set_entry_point("retrieve")
    workflow.add_edge("retrieve", "generate")
    workflow.add_conditional_edges("generate", should_continue)

    return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

总结

LangGraph的核心优势:

特性 说明 应用场景
有状态 节点间传递状态 多步推理
条件路由 动态决定下一步 智能决策
循环支持 支持循环执行 重试、迭代
检查点 状态持久化 长对话
流式输出 实时反馈 用户体验

参考资料


LangGraph是构建复杂Agent的最佳选择。掌握了它,你就能构建真正智能的AI系统。

tags: langgraph, agent, workflow, state-machine, python
series: rag-knowledge-system

Top comments (0)