LangGraph 워크플로우 템플릿 (v31)
LangGraph는 LangChain의 고급 워크플로우 관리 시스템으로, 복잡한 AI 에이전트 구축에 있어 강력한 기반을 제공합니다. 이 가이드에서는 실제 개발자가 활용할 수 있는 4가지 핵심 템플릿을 소개합니다. 각 템플릿은 3-7달러의 가치를 지니는 실용적인 솔루션으로, 로컬 AI 인프라 구축과 성능 최적화 문제를 해결합니다.
1. LangGraph 아키텍처 개요
LangGraph는 노드(Node), 엣지(Edge), 상태(State), 체크포인트(Chckpoint)로 구성된 그래프 구조를 기반으로 합니다:
- 노드: 각 단계의 작업을 정의 (예: retrieve, generate)
- 엣지: 노드 간 실행 흐름 정의 (예: retrieve → generate)
- 상태: 워크플로우 내 모든 상태 관리 (예: messages, tool_calls)
- 체크포인트: 실패 복구 및 상태 복원 기능 제공
2. 템플릿 1: 단순 RAG 에이전트
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage
import operator
class State(TypedDict):
question: str
context: str
answer: str
def retrieve(state: State):
# 실제 RAG 검색 로직 구현
context = search_vector_db(state["question"])
return {"context": context}
def generate(state: State):
prompt = f"질문: {state['question']}\n컨텍스트: {state['context']}"
answer = llm.invoke(prompt)
return {"answer": answer}
def validate(state: State):
# 검증 로직 추가
return {"validation": "passed"}
# 그래프 생성
workflow = StateGraph(State)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)
workflow.add_node("validate", validate)
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.add_edge("validate", END)
app = workflow.compile()
3. 템플릿 2: 멀티-도구 에이전트
from typing import List
from langchain_core.tools import Tool
from langchain_openai import ChatOpenAI
import json
class AgentState(TypedDict):
messages: List[dict]
tools: List[str]
next: str
class ToolExecutor:
def __init__(self):
self.tools = {
"calculator": CalculatorTool(),
"search": SearchTool(),
"weather": WeatherTool()
}
def execute(self, tool_name: str, input_data: dict):
if tool_name in self.tools:
return self.tools[tool_name].run(input_data)
return {"error": "도구를 찾을 수 없음"}
def plan(state: AgentState):
# 도구 계획
tools_needed = analyze_tools_needed(state["messages"])
return {"tools": tools_needed, "next": "execute"}
def execute(state: AgentState):
# 도구 실행
tool_executor = ToolExecutor()
results = []
for tool_name in state["tools"]:
result = tool_executor.execute(tool_name, state["messages"][-1])
results.append(result)
return {"messages": results, "next": "observe"}
def observe(state: AgentState):
# 실행 결과 검토
if not state["messages"]:
return {"next": "plan"}
return {"next": "decide"}
def decide(state: AgentState):
# 결정 로직
if should_continue(state["messages"]):
return {"next": "plan"}
return {"next": END}
# 워크플로우 설정
workflow = StateGraph(AgentState)
workflow.add_node("plan", plan)
workflow.add_node("execute", execute)
workflow.add_node("observe", observe)
workflow.add_node("decide", decide)
workflow.set_entry_point("plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.add_edge("decide", END)
4. 템플릿 3: 인간-참여 워크플로우
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState
class HumanReviewState(MessagesState):
review_required: bool
review_status: str
def human_review_node(state: HumanReviewState):
# 리뷰 요청
return {
"review_required": True,
"messages": state["messages"] + [
{"type": "human_review", "prompt": "리뷰를 진행해 주세요"}
]
}
def review_handler(state: HumanReviewState):
# 리뷰 완료 처리
if state["review_status"] == "approved":
return {"review_required": False, "messages": state["messages"]}
else:
return {"review_required": True, "messages": state["messages"]}
# 체크포인트 설정
memory = MemorySaver()
# 워크플로우 정의
workflow = StateGraph(HumanReviewState)
workflow.add_node("process", process_node)
workflow.add_node("review", human_review_node)
workflow.add_node("handle_review", review_handler)
workflow.set_entry_point("process")
workflow.add_edge("process", "review")
workflow.add_edge("review", "handle_review")
workflow.add_edge("handle_review", END)
app = workflow.compile(checkpointer=memory)
5. 템플릿 5: 병렬 실행 에이전트
from concurrent.futures import ThreadPoolExecutor
from typing import List
class ParallelState(TypedDict):
inputs: List[str]
results: List[dict]
processed: int
def fan_out(state: ParallelState):
# 입력 분산
return {"results": [], "processed": 0}
def process_parallel(state: ParallelState, input_item: str):
# 병렬 처리
result = process_item(input_item)
return result
def aggregate_results(state: ParallelState):
# 결과 집계
return {"final_result": aggregate(state["results"])}
# 병렬 처리 실행
def parallel_workflow():
inputs = ["task1", "task2", "task3"]
results = []
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(process_parallel, input_item)
for input_item in inputs]
for future in futures:
results.append(future.result())
return {"final_results": results}
6. 상태 관리 패턴
# 상태 저장 및 복원 패턴
class StateManager:
def __init__(self, checkpoint_path: str):
self.checkpoint_path = checkpoint_path
def save_state(self, state: dict, thread_id: str):
import pickle
with open(f"{self.checkpoint_path}/{thread_id}.pkl", "wb") as f:
pickle.dump(state, f)
def load_state(self, thread_id: str):
import pickle
with open(f"{self.checkpoint_path}/{thread_id}.pkl", "rb") as f:
return pickle.load(f)
# 상태 최적화
class OptimizedState(TypedDict):
# 필요한 필드만 저장
messages: Annotated[List[dict], operator.add]
metadata: dict
timestamp: int
7. 스트리밍 및 실시간 업데이트
import asyncio
from langchain_core.messages import AIMessage
async def stream_response(messages: List[dict]):
"""실시간 스트리밍 응답"""
async for chunk in llm.astream(messages):
yield chunk
def setup_streaming_graph():
"""스트리밍 워크플로우 설정"""
workflow = StateGraph(StreamingState)
workflow.add_node("process", process_with_streaming)
workflow.add_node("stream", stream_results)
workflow.set_entry_point("process")
workflow.add_edge("process", "stream")
return workflow.compile()
# 스트리밍 상태 정의
class StreamingState(TypedDict):
input: str
output: List[str]
stream_id: str
8. 오류 복구 및 재시도 로직
python
import time
from functools import wraps
class RetryHandler:
def __init__(self, max_retries: int = 3, backoff_factor: int = 1):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
def retry_on_failure(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == self.max_retries - 1:
raise e
wait_time = self.backoff_factor * (2 ** attempt)
time.sleep(wait_time)
return None
return wrapper
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Top comments (0)