LangGraph 워크플로우 템플릿 (v46)
1. LangGraph 아키텍처 개요
LangGraph는 상태 기반 워크플로우를 구축하기 위한 고급 프레임워크입니다. 핵심 구성 요소는 다음과 같습니다:
노드(Node): 각 워크플로우 단계를 나타내는 함수
엣지(Edge): 노드 간의 전이 조건
상태(State): 워크플로우 상태를 저장하는 Pydantic 모델
체크포인트(Checkpoint): 상태 저장 및 복구 메커니즘
from typing import Annotated
from langgraph.graph import StateGraph, END
from pydantic import BaseModel
from typing import List, Dict, Any
# 상태 정의
class GraphState(BaseModel):
messages: List[Dict[str, Any]]
context: Dict[str, Any]
step: int = 0
# 노드 정의
def retrieve_node(state: GraphState):
# RAG 검색 로직
return {"messages": [{"role": "assistant", "content": "검색 결과"}]}
def generate_node(state: GraphState):
# 생성 로직
return {"messages": [{"role": "assistant", "content": "생성된 텍스트"}]}
# 그래프 생성
workflow = StateGraph(GraphState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", END)
2. 템플릿 1: 단순 RAG 에이전트
이 템플릿은 검색-생성-검증 워크플로우를 구현합니다:
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
class RAGAgent:
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
def retrieve(self, query):
docs = self.vectorstore.similarity_search(query, k=3)
return "\n".join([doc.page_content for doc in docs])
def generate(self, query, context):
prompt = f"질문: {query}\n참고 자료: {context}"
response = self.llm.invoke([HumanMessage(content=prompt)])
return response.content
def validate(self, response, query):
# 응답 검증 로직
return True
# LangGraph 구현
class RAGState(BaseModel):
query: str
context: str
response: str
validated: bool = False
def retrieve_node(state: RAGState):
agent = RAGAgent(vectorstore, llm)
context = agent.retrieve(state.query)
return {"context": context}
def generate_node(state: RAGState):
agent = RAGAgent(vectorstore, llm)
response = agent.generate(state.query, state.context)
return {"response": response}
def validate_node(state: RAGState):
agent = RAGAgent(vectorstore, llm)
validated = agent.validate(state.response, state.query)
return {"validated": validated}
# 워크플로우 구성
workflow = StateGraph(RAGState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_node("validate", validate_node)
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.add_edge("validate", END)
# 반복 조건
def should_retry(state: RAGState):
if not state.validated:
return "retry"
return "end"
workflow.add_conditional_edges(
"validate",
should_retry,
{
"retry": "retrieve",
"end": END
}
)
3. 템플릿 2: 멀티 툴 에이전트
계획-실행-관찰-결정 워크플로우를 구현합니다:
from typing import Literal
from langchain.tools import Tool
from langchain_openai import ChatOpenAI
class ToolAgentState(BaseModel):
plan: str
execution_result: str
observation: str
decision: str
tools: List[str]
# 사용자 정의 도구
def search_tool(query: str) -> str:
return f"검색 결과: {query}"
def calculator_tool(expression: str) -> str:
try:
result = eval(expression)
return f"계산 결과: {result}"
except:
return "계산 오류"
# 도구 정의
tools = [
Tool(
name="search",
func=search_tool,
description="웹 검색을 위한 도구"
),
Tool(
name="calculator",
func=calculator_tool,
description="수학 계산을 위한 도구"
)
]
def plan_node(state: ToolAgentState):
# 계획 생성
plan = "검색 및 계산 수행"
return {"plan": plan}
def execute_node(state: ToolAgentState):
# 도구 실행
tool_results = []
for tool_name in state.tools:
tool = next((t for t in tools if t.name == tool_name), None)
if tool:
result = tool.func("test query")
tool_results.append(f"{tool_name}: {result}")
return {"execution_result": "\n".join(tool_results)}
def observe_node(state: ToolAgentState):
# 관찰 및 평가
observation = f"실행 결과: {state.execution_result}"
return {"observation": observation}
def decide_node(state: ToolAgentState):
# 결정
if "오류" in state.execution_result:
decision = "retry"
else:
decision = "complete"
return {"decision": decision}
# 워크플로우 구성
workflow = StateGraph(ToolAgentState)
workflow.add_node("plan", plan_node)
workflow.add_node("execute", execute_node)
workflow.add_node("observe", observe_node)
workflow.add_node("decide", decide_node)
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.add_conditional_edges(
"decide",
lambda state: state.decision,
{
"retry": "plan",
"complete": END
}
)
4. 템플릿 3: 인간-인-라인 워크플로우
사용자 검토 및 승인을 포함한 워크플로우:
from enum import Enum
from typing import Optional
class ApprovalStatus(str, Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
class HumanInLoopState(BaseModel):
task: str
result: str
approval_status: ApprovalStatus = ApprovalStatus.PENDING
user_review: Optional[str] = None
def task_execution_node(state: HumanInLoopState):
# 작업 실행
result = f"작업 '{state.task}' 완료"
return {"result": result}
def human_review_node(state: HumanInLoopState):
# 사용자 검토 대기
return {"approval_status": ApprovalStatus.PENDING}
def process_approval_node(state: HumanInLoopState):
# 승인 처리
if state.approval_status == ApprovalStatus.APPROVED:
return {"approval_status": ApprovalStatus.APPROVED}
else:
return {"approval_status": ApprovalStatus.REJECTED}
# 워크플로우 구성
workflow = StateGraph(HumanInLoopState)
workflow.add_node("execute", task_execution_node)
workflow.add_node("review", human_review_node)
workflow.add_node("process_approval", process_approval_node)
workflow.add_edge("execute", "review")
workflow.add_edge("review", "process_approval")
workflow.add_conditional_edges(
"process_approval",
lambda state: state.approval_status,
{
ApprovalStatus.APPROVED: END,
ApprovalStatus.REJECTED: "execute" # 재시작
}
)
# 체크포인트 기능 추가
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
5. 템플릿 5: 병렬 실행 에이전트
팬아웃-프로세스-집계 워크플로우:
python
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelAgentState(BaseModel):
tasks: List[str]
results: List[str] = []
aggregation: str = ""
def fan_out_node(state: ParallelAgentState):
# 작업 분할
return {"tasks": ["task1", "task2", "task3"]}
def process_parallel_node(state: ParallelAgentState):
# 병렬 처리
def worker(task):
return f"{task} 처리 완료"
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(worker, task) for task in state.tasks]
results = [future.result() for future in futures]
return {"results": results}
def aggregate_node(state
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Top comments (0)