# LangGraph 워크플로우 템플릿 (v27)
# Python 개발자를 위한 실용적인 LangChain/LangGraph 워크플로우 템플릿 가이드
## 1. LangGraph 아키텍처 개요
LangGraph는 상태 기반 워크플로우 시스템으로, 노드(Node), 엣지(Edge), 상태(State), 체크포인트(Checkpointing)로 구성됩니다.
python
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_core.messages import Message
import operator
상태 정의
class GraphState(TypedDict):
messages: Annotated[list[Message], operator.add]
# 추가 상태 필드
노드 정의
def retrieve_node(state):
# 검색 로직
return {"messages": [Message(content="검색 결과")]}
def generate_node(state):
# 생성 로직
return {"messages": [Message(content="생성 결과")]}
워크플로우 생성
workflow = StateGraph(GraphState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", END)
## 2. 템플릿 1: 단순 RAG 에이전트 (검색 → 생성 → 검증)
python
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
class RAGState(TypedDict):
question: str
context: str
answer: str
validation: bool
class RAGAgent:
def init(self, llm, vector_store):
self.llm = llm
self.vector_store = vector_store
def retrieve(self, state):
question = state["question"]
# 벡터 저장소에서 관련 문서 검색
docs = self.vector_store.similarity_search(question, k=3)
context = "\n".join([doc.page_content for doc in docs])
return {"context": context}
def generate(self, state):
prompt = f"""
질문: {state['question']}
컨텍스트: {state['context']}
"""
response = self.llm.invoke(prompt)
return {"answer": response.content}
def validate(self, state):
# 응답 검증
validation_result = self.check_answer_quality(state["answer"])
return {"validation": validation_result}
def check_answer_quality(self, answer):
# 간단한 품질 검증
if len(answer) < 10:
return False
return True
워크플로우 정의
rag_workflow = StateGraph(RAGState)
rag_workflow.add_node("retrieve", lambda s: RAGAgent(None, None).retrieve(s))
rag_workflow.add_node("generate", lambda s: RAGAgent(None, None).generate(s))
rag_workflow.add_node("validate", lambda s: RAGAgent(None, None).validate(s))
엣지 정의
rag_workflow.add_edge("retrieve", "generate")
rag_workflow.add_edge("generate", "validate")
rag_workflow.add_edge("validate", END)
실행
rag_app = rag_workflow.compile()
## 3. 템플릿 2: 멀티-도구 에이전트 (계획 → 실행 → 관찰 → 결정)
python
from typing import List
from langchain.tools import Tool
from langchain_core.runnables import RunnableLambda
class ToolAgentState(TypedDict):
plan: List[str]
execution_log: List[str]
observations: List[str]
final_decision: str
class ToolAgent:
def init(self, tools: List[Tool]):
self.tools = tools
def plan(self, state):
# 문제 분석 및 도구 계획
plan = [
"1. 정보 수집",
"2. 도구 실행",
"3. 결과 분석",
"4. 최종 결정"
]
return {"plan": plan}
def execute(self, state):
# 도구 실행
execution_log = []
for tool in self.tools[:2]: # 간단한 실행
try:
result = tool.invoke("실행 매개변수")
execution_log.append(f"{tool.name}: {result}")
except Exception as e:
execution_log.append(f"{tool.name}: 실패 - {str(e)}")
return {"execution_log": execution_log}
def observe(self, state):
# 실행 결과 관찰
observations = [f"관찰 {i}: {log}" for i, log in enumerate(state["execution_log"])]
return {"observations": observations}
def decide(self, state):
# 최종 결정
if len(state["execution_log"]) > 0:
decision = "작업 완료"
else:
decision = "작업 실패"
return {"final_decision": decision}
워크플로우 정의
tool_workflow = StateGraph(ToolAgentState)
tool_workflow.add_node("plan", lambda s: ToolAgent([]).plan(s))
tool_workflow.add_node("execute", lambda s: ToolAgent([]).execute(s))
tool_workflow.add_node("observe", lambda s: ToolAgent([]).observe(s))
tool_workflow.add_node("decide", lambda s: ToolAgent([]).decide(s))
tool_workflow.add_edge("plan", "execute")
tool_workflow.add_edge("execute", "observe")
tool_workflow.add_edge("observe", "decide")
tool_workflow.add_edge("decide", END)
## 4. 템플릿 3: 인간-중개 워크플로우 (일시정지 → 검토 → 계속)
python
from langchain_core.messages import ToolMessage
import asyncio
class HumanInLoopState(TypedDict):
task_description: str
intermediate_result: str
human_feedback: str
is_approved: bool
class HumanInLoopAgent:
def init(self, llm):
self.llm = llm
async def process_task(self, state):
# 작업 처리
response = self.llm.invoke(state["task_description"])
return {"intermediate_result": response.content}
async def await_human_feedback(self, state):
# 인간 피드백 대기
print("작업 결과를 검토하세요:")
print(state["intermediate_result"])
print("승인 (y) 또는 거절 (n): ")
# 실제 구현에서는 웹훅이나 큐 시스템 사용
feedback = "y" # 테스트용
is_approved = feedback.lower() == "y"
return {"human_feedback": feedback, "is_approved": is_approved}
워크플로우 정의
human_loop_workflow = StateGraph(HumanInLoopState)
human_loop_workflow.add_node("process", lambda s: HumanInLoopAgent(None).process_task(s))
human_loop_workflow.add_node("wait_for_feedback", lambda s: HumanInLoopAgent(None).await_human_feedback(s))
human_loop_workflow.add_edge("process", "wait_for_feedback")
human_loop_workflow.add_conditional_edges(
"wait_for_feedback",
lambda s: "approved" if s["is_approved"] else "rejected",
{
"approved": END,
"rejected": "process" # 다시 시작
}
)
## 5. 템플릿 5: 병렬 실행 에이전트 (분기 → 처리 → 집계)
python
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelExecutionState(TypedDict):
input_data: List[str]
parallel_results: List[dict]
aggregated_result: dict
class ParallelAgent:
def init(self, llm):
self.llm = llm
def fan_out(self, state):
# 데이터 분기
tasks = state["input_data"][:3] # 첫 3개만 처리
return {"parallel_results": []}
def process_item(self, item):
# 개별 아이템 처리
try:
result = self.llm.invoke(f"처리 요청: {item}")
return {"item": item, "result": result.content, "status": "success"}
except Exception as e:
return {"item": item, "result": str(e), "status": "error"}
def aggregate(self, state):
# 결과 집계
results = state["parallel_results"]
success_count = sum(1 for r in results if r["status"] == "success")
error_count = len(results) - success_count
return {
"aggregated_result": {
"total": len(results),
"success": success_count,
"errors": error_count,
"details": results
}
}
워크플로우 정의
parallel_workflow = StateGraph(ParallelExecutionState)
parallel_workflow.add_node("fan_out", lambda s: ParallelAgent(None).fan_out(s))
parallel_workflow.add_node("process", lambda s: ParallelAgent(None).process_item(s))
parallel_workflow.add_node("aggregate", lambda s: ParallelAgent(None).aggregate(s))
parallel_workflow.add_edge("fan_out", "process")
parallel_workflow.add_edge("process", "aggregate")
parallel_workflow.add_edge("aggregate", END)
## 6. 상태 관리 패턴
python
from langgraph.checkpoint.memory import MemoryS
📥 Get the full guide on Gumroad: https://gumroad.com/l/auto ($5)
Top comments (0)