DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v9)

LangGraph 워크플로우 템플릿 (v9)

개요

LangGraph는 LangChain 기반의 상태 기반 워크플로우 엔진으로, 복잡한 AI 에이전트를 구현하는 데 강력한 도구입니다. 이 가이드는 실전에서 바로 사용할 수 있는 4가지 핵심 워크플로우 템플릿을 제공하며, Python 개발자들이 빠르게 AI 에이전트를 구축하고 운영할 수 있도록 돕습니다.

1. LangGraph 아키텍처 개요

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator

class State(TypedDict):
    messages: Annotated[list, operator.add]

# 노드 정의
def node1(state: State):
    return {"messages": [{"role": "assistant", "content": "Hello from node1"}]}

def node2(state: State):
    return {"messages": [{"role": "assistant", "content": "Hello from node2"}]}

# 그래프 생성
graph = StateGraph(State)
graph.add_node("node1", node1)
graph.add_node("node2", node2)
graph.add_edge("node1", "node2")
graph.set_entry_point("node1")
graph.set_finish_point("node2")
Enter fullscreen mode Exit fullscreen mode

2. 간단한 RAG 에이전트 템플릿

문서 검색 → 생성 → 검증의 간단한 워크플로우입니다.

from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI

class RAGState(TypedDict):
    query: str
    retrieved_docs: list
    generated_answer: str
    validation_result: bool

def retrieve_docs(state: RAGState):
    # 문서 검색 (예: ChromaDB)
    vectorstore = Chroma(persist_directory="./chroma_db")
    docs = vectorstore.similarity_search(state["query"], k=3)
    return {"retrieved_docs": [doc.page_content for doc in docs]}

def generate_answer(state: RAGState):
    # LLM로 답변 생성
    prompt = PromptTemplate.from_template("""
    Based on the following documents, answer the question:
    Documents: {docs}
    Question: {query}
    Answer:
    """)

    llm = ChatOpenAI(model="gpt-4")
    chain = prompt | llm | StrOutputParser()

    answer = chain.invoke({
        "docs": "\n".join(state["retrieved_docs"]),
        "query": state["query"]
    })

    return {"generated_answer": answer}

def validate_answer(state: RAGState):
    # 답변 검증 (예: 정확성 체크)
    return {"validation_result": True}  # 간소화된 검증

# 워크플로우 정의
def create_rag_graph():
    workflow = StateGraph(RAGState)
    workflow.add_node("retrieve", retrieve_docs)
    workflow.add_node("generate", generate_answer)
    workflow.add_node("validate", validate_answer)

    workflow.add_edge("retrieve", "generate")
    workflow.add_edge("generate", "validate")
    workflow.set_entry_point("retrieve")
    workflow.set_finish_point("validate")

    return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

3. 다중 도구 에이전트 템플릿

계획 → 실행 → 관찰 → 결정의 반복 워크플로우입니다.

from typing import Literal
from langchain.tools import Tool
from langchain_openai import ChatOpenAI

class ToolAgentState(TypedDict):
    messages: list
    plan: str
    execution_result: str
    next_step: Literal["execute", "decide", "finish"]

# 도구 정의
def weather_tool(query: str) -> str:
    return f"Weather for {query}: Sunny, 25°C"

def calendar_tool(query: str) -> str:
    return f"Meeting scheduled for {query}: 10:00 AM"

# 도구 목록
tools = [
    Tool(name="weather", func=weather_tool, description="Get weather information"),
    Tool(name="calendar", func=calendar_tool, description="Get calendar information")
]

def create_plan(state: ToolAgentState):
    # 계획 생성
    return {"plan": "Check weather and calendar for user's needs"}

def execute_tools(state: ToolAgentState):
    # 도구 실행
    tool_name = "weather"  # 실제 구현에서는 계획 기반
    tool = next(t for t in tools if t.name == tool_name)
    result = tool.func(state["messages"][-1]["content"])
    return {"execution_result": result}

def observe_and_decide(state: ToolAgentState):
    # 실행 결과 관찰 및 결정
    if len(state["messages"]) > 5:
        return {"next_step": "finish"}
    return {"next_step": "execute"}

# 워크플로우 정의
def create_tool_agent_graph():
    workflow = StateGraph(ToolAgentState)
    workflow.add_node("plan", create_plan)
    workflow.add_node("execute", execute_tools)
    workflow.add_node("observe", observe_and_decide)

    workflow.add_edge("plan", "execute")
    workflow.add_edge("execute", "observe")
    workflow.add_conditional_edges(
        "observe",
        lambda x: x["next_step"],
        {"execute": "execute", "finish": END}
    )

    workflow.set_entry_point("plan")
    return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

4. 인간-중개 워크플로우 템플릿

사용자 검토를 포함한 일시정지/계속 흐름입니다.

from langgraph.checkpoint.memory import MemorySaver

class HumanLoopState(TypedDict):
    messages: list
    user_review: str
    approval_status: bool
    current_step: Literal["generate", "review", "continue"]

def generate_content(state: HumanLoopState):
    # 콘텐츠 생성
    return {"messages": [{"role": "assistant", "content": "Generated content for review"}]}

def pause_for_review(state: HumanLoopState):
    # 사용자 리뷰 대기
    return {"current_step": "review"}

def get_user_approval(state: HumanLoopState):
    # 사용자 승인 받기
    # 실제 구현에서는 외부 API 또는 DB에서 승인 상태 가져오기
    approval = input("Approve content? (y/n): ")
    return {
        "approval_status": approval.lower() == 'y',
        "current_step": "continue"
    }

def continue_workflow(state: HumanLoopState):
    # 워크플로우 재개
    if state["approval_status"]:
        return {"messages": [{"role": "assistant", "content": "Content approved and continuing"}]}
    else:
        return {"messages": [{"role": "assistant", "content": "Content rejected"}]}

# 체크포인트 사용
def create_human_loop_graph():
    checkpointer = MemorySaver()

    workflow = StateGraph(HumanLoopState)
    workflow.add_node("generate", generate_content)
    workflow.add_node("pause", pause_for_review)
    workflow.add_node("review", get_user_approval)
    workflow.add_node("continue", continue_workflow)

    workflow.add_edge("generate", "pause")
    workflow.add_edge("pause", "review")
    workflow.add_edge("review", "continue")
    workflow.add_edge("continue", END)

    workflow.set_entry_point("generate")

    return workflow.compile(checkpointer=checkpointer)
Enter fullscreen mode Exit fullscreen mode

5. 병렬 실행 에이전트 템플릿

팬아웃 → 처리 → 집계 구조입니다.


python
from concurrent.futures import ThreadPoolExecutor
import asyncio

class ParallelAgentState(TypedDict):
    tasks: list
    results: dict
    aggregated_result: str

def fan_out_tasks(state: ParallelAgentState):
    # 작업 분산
    tasks = [
        {"id": 1, "action": "process_data", "data": "data1"},
        {"id": 2, "action": "process_data", "data": "data2"},
        {"id": 3, "action": "process_data", "data": "data3"}
    ]
    return {"tasks": tasks, "results": {}}

def process_task(state: ParallelAgentState, task_id: int):
    # 개별 작업 처리
    task = next(t for t in state["tasks"] if t["id"] == task_id)
    # 실제 작업 처리 로직
    return {"task_id": task_id, "result": f"Processed {task['data']}"}

def aggregate_results(state: ParallelAgentState):
    # 결과 집계
    results = list(state["results"].values())
    return {"aggregated_result": f"Aggregated: {', '.join(results)}"}

# 병렬 처리
def parallel_workflow():
    workflow = StateGraph(ParallelAgentState)
    workflow.add_node("fan_out", fan_out_tasks)
    workflow.add_node("aggregate", aggregate_results)

    workflow.add_edge("fan_out", "aggregate")
    workflow.set_entry_point("fan_out")

    return workflow.compile()


---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)