LangGraph 워크플로우 템플릿 (v9)
개요
LangGraph는 LangChain 기반의 상태 기반 워크플로우 엔진으로, 복잡한 AI 에이전트를 구현하는 데 강력한 도구입니다. 이 가이드는 실전에서 바로 사용할 수 있는 4가지 핵심 워크플로우 템플릿을 제공하며, Python 개발자들이 빠르게 AI 에이전트를 구축하고 운영할 수 있도록 돕습니다.
1. LangGraph 아키텍처 개요
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator
class State(TypedDict):
messages: Annotated[list, operator.add]
# 노드 정의
def node1(state: State):
return {"messages": [{"role": "assistant", "content": "Hello from node1"}]}
def node2(state: State):
return {"messages": [{"role": "assistant", "content": "Hello from node2"}]}
# 그래프 생성
graph = StateGraph(State)
graph.add_node("node1", node1)
graph.add_node("node2", node2)
graph.add_edge("node1", "node2")
graph.set_entry_point("node1")
graph.set_finish_point("node2")
2. 간단한 RAG 에이전트 템플릿
문서 검색 → 생성 → 검증의 간단한 워크플로우입니다.
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
class RAGState(TypedDict):
query: str
retrieved_docs: list
generated_answer: str
validation_result: bool
def retrieve_docs(state: RAGState):
# 문서 검색 (예: ChromaDB)
vectorstore = Chroma(persist_directory="./chroma_db")
docs = vectorstore.similarity_search(state["query"], k=3)
return {"retrieved_docs": [doc.page_content for doc in docs]}
def generate_answer(state: RAGState):
# LLM로 답변 생성
prompt = PromptTemplate.from_template("""
Based on the following documents, answer the question:
Documents: {docs}
Question: {query}
Answer:
""")
llm = ChatOpenAI(model="gpt-4")
chain = prompt | llm | StrOutputParser()
answer = chain.invoke({
"docs": "\n".join(state["retrieved_docs"]),
"query": state["query"]
})
return {"generated_answer": answer}
def validate_answer(state: RAGState):
# 답변 검증 (예: 정확성 체크)
return {"validation_result": True} # 간소화된 검증
# 워크플로우 정의
def create_rag_graph():
workflow = StateGraph(RAGState)
workflow.add_node("retrieve", retrieve_docs)
workflow.add_node("generate", generate_answer)
workflow.add_node("validate", validate_answer)
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.set_entry_point("retrieve")
workflow.set_finish_point("validate")
return workflow.compile()
3. 다중 도구 에이전트 템플릿
계획 → 실행 → 관찰 → 결정의 반복 워크플로우입니다.
from typing import Literal
from langchain.tools import Tool
from langchain_openai import ChatOpenAI
class ToolAgentState(TypedDict):
messages: list
plan: str
execution_result: str
next_step: Literal["execute", "decide", "finish"]
# 도구 정의
def weather_tool(query: str) -> str:
return f"Weather for {query}: Sunny, 25°C"
def calendar_tool(query: str) -> str:
return f"Meeting scheduled for {query}: 10:00 AM"
# 도구 목록
tools = [
Tool(name="weather", func=weather_tool, description="Get weather information"),
Tool(name="calendar", func=calendar_tool, description="Get calendar information")
]
def create_plan(state: ToolAgentState):
# 계획 생성
return {"plan": "Check weather and calendar for user's needs"}
def execute_tools(state: ToolAgentState):
# 도구 실행
tool_name = "weather" # 실제 구현에서는 계획 기반
tool = next(t for t in tools if t.name == tool_name)
result = tool.func(state["messages"][-1]["content"])
return {"execution_result": result}
def observe_and_decide(state: ToolAgentState):
# 실행 결과 관찰 및 결정
if len(state["messages"]) > 5:
return {"next_step": "finish"}
return {"next_step": "execute"}
# 워크플로우 정의
def create_tool_agent_graph():
workflow = StateGraph(ToolAgentState)
workflow.add_node("plan", create_plan)
workflow.add_node("execute", execute_tools)
workflow.add_node("observe", observe_and_decide)
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_conditional_edges(
"observe",
lambda x: x["next_step"],
{"execute": "execute", "finish": END}
)
workflow.set_entry_point("plan")
return workflow.compile()
4. 인간-중개 워크플로우 템플릿
사용자 검토를 포함한 일시정지/계속 흐름입니다.
from langgraph.checkpoint.memory import MemorySaver
class HumanLoopState(TypedDict):
messages: list
user_review: str
approval_status: bool
current_step: Literal["generate", "review", "continue"]
def generate_content(state: HumanLoopState):
# 콘텐츠 생성
return {"messages": [{"role": "assistant", "content": "Generated content for review"}]}
def pause_for_review(state: HumanLoopState):
# 사용자 리뷰 대기
return {"current_step": "review"}
def get_user_approval(state: HumanLoopState):
# 사용자 승인 받기
# 실제 구현에서는 외부 API 또는 DB에서 승인 상태 가져오기
approval = input("Approve content? (y/n): ")
return {
"approval_status": approval.lower() == 'y',
"current_step": "continue"
}
def continue_workflow(state: HumanLoopState):
# 워크플로우 재개
if state["approval_status"]:
return {"messages": [{"role": "assistant", "content": "Content approved and continuing"}]}
else:
return {"messages": [{"role": "assistant", "content": "Content rejected"}]}
# 체크포인트 사용
def create_human_loop_graph():
checkpointer = MemorySaver()
workflow = StateGraph(HumanLoopState)
workflow.add_node("generate", generate_content)
workflow.add_node("pause", pause_for_review)
workflow.add_node("review", get_user_approval)
workflow.add_node("continue", continue_workflow)
workflow.add_edge("generate", "pause")
workflow.add_edge("pause", "review")
workflow.add_edge("review", "continue")
workflow.add_edge("continue", END)
workflow.set_entry_point("generate")
return workflow.compile(checkpointer=checkpointer)
5. 병렬 실행 에이전트 템플릿
팬아웃 → 처리 → 집계 구조입니다.
python
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelAgentState(TypedDict):
tasks: list
results: dict
aggregated_result: str
def fan_out_tasks(state: ParallelAgentState):
# 작업 분산
tasks = [
{"id": 1, "action": "process_data", "data": "data1"},
{"id": 2, "action": "process_data", "data": "data2"},
{"id": 3, "action": "process_data", "data": "data3"}
]
return {"tasks": tasks, "results": {}}
def process_task(state: ParallelAgentState, task_id: int):
# 개별 작업 처리
task = next(t for t in state["tasks"] if t["id"] == task_id)
# 실제 작업 처리 로직
return {"task_id": task_id, "result": f"Processed {task['data']}"}
def aggregate_results(state: ParallelAgentState):
# 결과 집계
results = list(state["results"].values())
return {"aggregated_result": f"Aggregated: {', '.join(results)}"}
# 병렬 처리
def parallel_workflow():
workflow = StateGraph(ParallelAgentState)
workflow.add_node("fan_out", fan_out_tasks)
workflow.add_node("aggregate", aggregate_results)
workflow.add_edge("fan_out", "aggregate")
workflow.set_entry_point("fan_out")
return workflow.compile()
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Top comments (0)