LangGraph 워크플로우 템플릿 (v29)
1. LangGraph 아키텍처 개요
LangGraph는 상태 기반 워크플로우 엔진으로, 노드(Node), 엣지(Edge), 상태(State), 체크포인트(Checkpointing)로 구성됩니다.
핵심 구성 요소:
- Node: 작업 단위 (예: RAG 검색, 툴 실행)
- Edge: 노드 간 흐름 제어 (조건부 경로)
- State: 전체 워크플로우 상태 (context, history, variables)
- Checkpointing: 실패 복구 및 상태 저장
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class GraphState(TypedDict):
messages: Annotated[list, operator.add]
context: str
step: int
workflow = StateGraph(GraphState)
2. 템플릿 1: 간단한 RAG 에이전트
실제 개발에서 가장 흔한 패턴. 검색 → 생성 → 검증 단계로 구성됩니다.
from langchain_core.messages import HumanMessage
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain_core.prompts import PromptTemplate
class RAGState(TypedDict):
question: str
context: str
answer: str
retrieved_docs: list
def retrieve(state: RAGState):
# Chroma 검색
vectorstore = Chroma(persist_directory="./chroma_db")
docs = vectorstore.similarity_search(state["question"], k=3)
return {"context": "\n".join([doc.page_content for doc in docs])}
def generate(state: RAGState):
prompt = PromptTemplate.from_template("""
다음 질문에 답해주세요:
{question}
참고 문서:
{context}
답변:
""")
# 실제 LLM 호출
llm = ChatOpenAI(model="gpt-4")
response = llm.invoke(prompt.format(question=state["question"], context=state["context"]))
return {"answer": response.content}
def validate(state: RAGState):
# 답변 검증 로직
if len(state["answer"]) < 20:
return {"answer": "답변이 너무 짧습니다. 다시 시도해주세요."}
return {}
# 워크플로우 정의
workflow = StateGraph(RAGState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)
workflow.add_node("validate", validate)
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.set_entry_point("retrieve")
workflow.set_finish_point("validate")
3. 템플릿 2: 다중 툴 에이전트
계획 → 실행 → 관찰 → 결정 프로세스로 구성되는 복잡한 에이전트입니다.
from langchain.tools import Tool
from typing import List
import json
class ToolAgentState(TypedDict):
task: str
plan: List[str]
execution_results: List[str]
current_tool: str
tool_input: str
decision: str
class ToolExecutor:
def __init__(self):
self.tools = {
"calculator": Tool(
name="calculator",
func=lambda x: str(eval(x)) if x.replace(" ", "").replace(".", "").isdigit() else "Invalid input",
description="수학 계산기"
),
"web_search": Tool(
name="web_search",
func=lambda q: f"검색 결과: {q}",
description="웹 검색"
)
}
def plan(state: ToolAgentState):
# 작업 분석 및 계획 수립
if "calculate" in state["task"]:
plan = ["calculator", "validate_result"]
else:
plan = ["web_search", "validate_result"]
return {"plan": plan}
def execute(state: ToolAgentState):
# 다음 툴 실행
tool_name = state["plan"][0]
tool = ToolExecutor().tools[tool_name]
# 툴 실행
result = tool.run(state["task"])
return {
"execution_results": [result],
"current_tool": tool_name,
"tool_input": state["task"]
}
def observe(state: ToolAgentState):
# 실행 결과 분석
if state["current_tool"] == "calculator":
try:
float(state["execution_results"][0])
return {"decision": "success"}
except:
return {"decision": "retry"}
return {"decision": "success"}
def decide(state: ToolAgentState):
if state["decision"] == "retry":
return "execute"
return END
# 워크플로우 구성
workflow = StateGraph(ToolAgentState)
workflow.add_node("plan", plan)
workflow.add_node("execute", execute)
workflow.add_node("observe", observe)
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_conditional_edges("observe", decide, {"execute": "execute", "end": END})
workflow.set_entry_point("plan")
4. 템플릿 3: 인간-중개 워크플로우
사람의 검토와 승인을 포함한 중요한 결정 프로세스입니다.
from enum import Enum
from datetime import datetime
class ApprovalStatus(str, Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
class HumanInLoopState(TypedDict):
task: str
generated_output: str
user_review: str
approval_status: ApprovalStatus
timestamp: datetime
def generate_with_review(state: HumanInLoopState):
# 초기 생성
llm = ChatOpenAI(model="gpt-4")
prompt = f"다음 작업을 수행해주세요:\n{state['task']}"
result = llm.invoke(prompt)
return {
"generated_output": result.content,
"approval_status": ApprovalStatus.PENDING
}
def human_review(state: HumanInLoopState):
# 인간 리뷰 대기 (실제 구현에서는 API 또는 이벤트 시스템 사용)
# 이 부분은 실제 애플리케이션에서 웹훅 또는 DB 업데이트로 처리
return {"approval_status": ApprovalStatus.PENDING}
def process_approval(state: HumanInLoopState):
# 승인/거부 처리
if state["approval_status"] == ApprovalStatus.APPROVED:
return {"generated_output": state["generated_output"]}
else:
return {"generated_output": "사용자에 의해 거부된 작업"}
# 인간-중개 워크플로우
workflow = StateGraph(HumanInLoopState)
workflow.add_node("generate", generate_with_review)
workflow.add_node("review", human_review)
workflow.add_node("process", process_approval)
workflow.add_edge("generate", "review")
workflow.add_conditional_edges("review",
lambda s: "approved" if s["approval_status"] == ApprovalStatus.APPROVED else "rejected",
{"approved": "process", "rejected": "generate"}
)
workflow.set_entry_point("generate")
5. 템플릿 5: 병렬 실행 에이전트
Fan-out → 프로세싱 → 집계 구조로 빠른 처리가 필요한 경우 사용합니다.
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelProcessingState(TypedDict):
tasks: List[str]
results: List[dict]
aggregated_result: dict
def fan_out(state: ParallelProcessingState):
# 작업 분할 (fan-out)
tasks = state["tasks"]
return {"results": []}
def process_task(state: ParallelProcessingState, task: str):
# 각 작업 처리
llm = ChatOpenAI(model="gpt-4")
result = llm.invoke(f"처리 작업: {task}")
return {"task": task, "output": result.content}
async def parallel_process(state: ParallelProcessingState):
# 병렬 처리
tasks = state["tasks"]
# 병렬로 실행 (ThreadPoolExecutor 사용)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(process_task, state, task) for task in tasks]
results = [future.result() for future in futures]
return {"results": results}
def aggregate_results(state: ParallelProcessingState):
# 결과 집계
aggregated = {}
for result in state["results"]:
if result["task"] in aggregated:
aggregated[result["task"]].append(result["output"])
else:
aggregated[result["task"]] = [result["output"]]
return {"aggregated_result": aggregated}
# 병렬 처리 워크플로우
workflow = StateGraph(ParallelProcessingState)
workflow.add_node("fan_out", fan_out)
workflow.add_node("parallel_process", parallel_process)
workflow.add_node("aggregate", aggregate_results)
workflow.add_edge("fan_out", "parallel_process")
workflow.add_edge("parallel_process", "aggregate")
workflow.set_entry_point("fan_out")
6. 상태 관리 패턴
실제
📥 Get the full guide on Gumroad: https://gumroad.com/l/auto ($5)
Top comments (0)