LangGraph 워크플로우 템플릿 (v19)
1. LangGraph 아키텍처 개요
LangGraph는 상태 기반 워크플로우 시스템으로, 다음과 같은 핵심 구성 요소를 포함합니다:
노드 (Nodes): 각 작업 단계를 나타내는 함수
엣지 (Edges): 노드 간의 전이 조건
상태 (State): 워크플로우의 공유 상태
체크포인트 (Checkpointing): 상태 저장 및 복원 기능
from typing import TypedDict, Annotated
import operator
class GraphState(TypedDict):
messages: Annotated[list, operator.add]
user_input: str
# 기본 워크플로우 구조
from langgraph.graph import StateGraph
workflow = StateGraph(GraphState)
2. 템플릿 1: 단순 RAG 에이전트 (검색 → 생성 → 검증)
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
class RAGAgent:
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
def retrieve(self, state):
question = state["user_input"]
docs = self.vectorstore.similarity_search(question)
return {"retrieved_docs": docs}
def generate(self, state):
docs = state["retrieved_docs"]
context = "\n\n".join([doc.page_content for doc in docs])
prompt = PromptTemplate.from_template("""
사용자 질문: {question}
제공된 문서: {context}
위 문서를 기반으로 질문에 답해주세요.
""")
chain = prompt | self.llm | StrOutputParser()
answer = chain.invoke({"question": state["user_input"], "context": context})
return {"answer": answer}
def validate(self, state):
# 검증 로직 추가 가능
return {"validated": True}
# 워크플로우 구성
def create_rag_workflow():
agent = RAGAgent(vectorstore, llm)
workflow = StateGraph(GraphState)
workflow.add_node("retrieve", agent.retrieve)
workflow.add_node("generate", agent.generate)
workflow.add_node("validate", agent.validate)
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
return workflow.compile()
3. 템플릿 2: 멀티-도구 에이전트 (계획 → 실행 → 관찰 → 결정)
from typing import List
from langchain_core.tools import Tool
class MultiToolAgent:
def __init__(self, tools: List[Tool], llm):
self.tools = tools
self.llm = llm
def plan(self, state):
# 도구 사용 계획 생성
prompt = f"""
사용자 요청: {state["user_input"]}
사용 가능한 도구들: {[tool.name for tool in self.tools]}
어떤 도구를 사용해야 하는지 결정하고, 실행 순서를 정해주세요.
"""
response = self.llm.invoke(prompt)
# 실제 도구 호출 순서 파싱 로직 구현
return {"plan": response}
def execute(self, state):
# 계획된 도구 실행
plan = state["plan"]
results = []
for tool_name in plan["tools"]:
tool = next(t for t in self.tools if t.name == tool_name)
result = tool.invoke(plan["arguments"])
results.append(result)
return {"tool_results": results}
def observe(self, state):
# 실행 결과 관찰
return {"observations": state["tool_results"]}
def decide(self, state):
# 다음 단계 결정
return {"next_step": "continue"}
# 사용 예시
def create_multi_tool_workflow():
tools = [
Tool(name="search", func=search_function, description="웹 검색"),
Tool(name="calculate", func=calculate_function, description="수학 계산")
]
agent = MultiToolAgent(tools, llm)
workflow = StateGraph(GraphState)
workflow.add_node("plan", agent.plan)
workflow.add_node("execute", agent.execute)
workflow.add_node("observe", agent.observe)
workflow.add_node("decide", agent.decide)
workflow.set_entry_point("plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
return workflow.compile()
4. 템플릿 3: 인간-중개 워크플로우 (중지 → 검토 → 계속)
from langgraph.checkpoint.memory import MemorySaver
class HumanInLoopAgent:
def __init__(self, llm):
self.llm = llm
def process(self, state):
# AI 처리
prompt = f"사용자 요청: {state['user_input']}"
response = self.llm.invoke(prompt)
return {"ai_output": response}
def review(self, state):
# 사용자 검토 요청
return {"status": "waiting_for_review"}
def continue_process(self, state):
# 사용자 승인 후 진행
return {"status": "continue"}
# 인간-중개 워크플로우
def create_human_in_loop_workflow():
agent = HumanInLoopAgent(llm)
workflow = StateGraph(GraphState)
workflow.add_node("process", agent.process)
workflow.add_node("review", agent.review)
workflow.add_node("continue", agent.continue_process)
workflow.set_entry_point("process")
workflow.add_edge("process", "review")
# 중지 및 재개를 위한 커스텀 로직
def should_continue(state):
if state["status"] == "waiting_for_review":
return "review"
else:
return "continue"
workflow.add_conditional_edges(
"review",
should_continue,
{
"review": "review",
"continue": "continue"
}
)
return workflow.compile(checkpointer=MemorySaver())
# 사용 예시
graph = create_human_in_loop_workflow()
result = graph.invoke({"user_input": "사용자 요청"}, config={"configurable": {"thread_id": "123"}})
5. 템플릿 5: 병렬 실행 에이전트 (분기 → 처리 → 집계)
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelAgent:
def __init__(self, llm):
self.llm = llm
def fan_out(self, state):
# 병렬 처리를 위한 작업 분기
tasks = [
{"name": "task1", "data": state["user_input"]},
{"name": "task2", "data": state["user_input"]},
]
return {"tasks": tasks}
def process_parallel(self, task):
# 각 작업 병렬 실행
prompt = f"처리할 작업: {task['data']} (작업명: {task['name']})"
result = self.llm.invoke(prompt)
return {"task": task["name"], "result": result}
def aggregate(self, state):
# 결과 집계
results = state["task_results"]
aggregated = {
"summary": f"총 {len(results)}개 작업 완료",
"details": results
}
return {"final_output": aggregated}
# 병렬 실행 워크플로우
def create_parallel_workflow():
agent = ParallelAgent(llm)
workflow = StateGraph(GraphState)
workflow.add_node("fan_out", agent.fan_out)
workflow.add_node("process", agent.process_parallel)
workflow.add_node("aggregate", agent.aggregate)
workflow.set_entry_point("fan_out")
workflow.add_edge("fan_out", "process")
# 병렬 작업 처리 후 집계
def process_tasks(state):
tasks = state["tasks"]
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(agent.process_parallel, tasks))
return {"task_results": results}
workflow.add_node("process_parallel", process_tasks)
workflow.add_edge("process_parallel", "aggregate")
return workflow.compile()
# 실제 병렬 실행
parallel_graph = create_parallel_workflow()
result = parallel_graph.invoke({"user_input": "병렬 처리할 데이터"})
6. 상태 관리 패턴
python
from langgraph.checkpoint import Checkpoint
from langgraph.checkpoint.base import BaseCheckpointSaver
class CustomCheckpointSaver(BaseCheckpointSaver):
def __init__(self):
self.checkpoints = {}
def get(self, config):
thread_id = config["configurable"]["thread_id"]
return self.checkpoints.get(thread_id, {})
def put(self, config, checkpoint):
thread_id = config["configurable"]["thread_id"]
self.check
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Top comments (0)