LangGraph 워크플로우 템플릿 (v44)
Reusable LangGraph templates for Python developers building AI agents
Overview
LangGraph는 LangChain과 함께 사용되는 고급 워크플로우 엔진으로, 복잡한 AI 에이전트를 쉽게 구축할 수 있게 해줍니다. 이 문서는 실제 개발자가 문제를 해결할 수 있도록 설계된 4가지 핵심 템플릿을 제공합니다.
1. LangGraph 구조 개요
LangGraph는 세 가지 핵심 구성 요소로 이루어져 있습니다:
- Nodes: 각각의 작업 단계 (예: 검색, 생성, 검증)
- Edges: 노드 간의 흐름 정의 (조건부 또는 고정)
- State: 워크플로우 상태 관리
- Checkpointing: 상태 저장 및 복구 기능
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph
import operator
class State(TypedDict):
messages: Annotated[list, operator.add]
graph = StateGraph(State)
2. 템플릿 1: 단순 RAG 에이전트 (검색 → 생성 → 검증)
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langgraph.graph import StateGraph, END
class SimpleRAGState(TypedDict):
question: str
context: str
answer: str
validation: bool
def retrieve(state: SimpleRAGState):
# 검색 로직 구현
# 예: ChromaDB 또는 Pinecone에서 유사 문서 검색
return {"context": "검색된 문서 내용"}
def generate(state: SimpleRAGState):
# LLM 기반 생성
llm = ChatOpenAI(model="gpt-4")
prompt = f"질문: {state['question']}\n문맥: {state['context']}"
response = llm.invoke([HumanMessage(content=prompt)])
return {"answer": response.content}
def validate(state: SimpleRAGState):
# 생성된 답변의 유효성 검증
# 예: 관련성 점수 계산 또는 키워드 검사
return {"validation": True} # 실제 구현에서는 더 복잡한 로직
workflow = StateGraph(SimpleRAGState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)
workflow.add_node("validate", validate)
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.add_edge("validate", END)
app = workflow.compile()
3. 템플릿 2: 멀티-도구 에이전트 (계획 → 실행 → 관찰 → 결정)
from typing import List
from langchain_core.tools import tool
import json
class MultiToolAgentState(TypedDict):
plan: List[str]
execution_log: List[str]
observations: List[str]
final_answer: str
@tool
def search_web(query: str) -> str:
"""웹 검색 도구"""
return f"검색 결과: {query}"
@tool
def calculate_math(expression: str) -> str:
"""수학 계산 도구"""
return f"계산 결과: {eval(expression)}"
@tool
def get_weather(location: str) -> str:
"""날씨 정보 도구"""
return f"{location} 날씨 정보"
def plan(state: MultiToolAgentState):
# 초기 계획 생성
return {"plan": ["search_web", "calculate_math"]}
def execute(state: MultiToolAgentState):
# 계획 실행 및 관찰
tools = [search_web, calculate_math]
execution_log = []
observations = []
for tool_name in state["plan"]:
tool_instance = next(t for t in tools if t.name == tool_name)
result = tool_instance.run(f"도구 실행: {tool_name}")
execution_log.append(f"{tool_name}: {result}")
observations.append(result)
return {"execution_log": execution_log, "observations": observations}
def decide(state: MultiToolAgentState):
# 관찰 결과 분석 후 최종 결정
return {"final_answer": "최종 답변"}
workflow = StateGraph(MultiToolAgentState)
workflow.add_node("plan", plan)
workflow.add_node("execute", execute)
workflow.add_node("decide", decide)
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "decide")
workflow.add_edge("decide", END)
app = workflow.compile()
4. 템플릿 3: 인간-중개 워크플로우 (일시정지 → 검토 → 계속)
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END, START
class HumanInLoopState(TypedDict):
user_request: str
intermediate_response: str
final_answer: str
needs_review: bool
def generate_initial(state: HumanInLoopState):
# 초기 생성
return {"intermediate_response": "초기 답변 생성"}
def check_review_needed(state: HumanInLoopState):
# 검토 필요 여부 확인 (예: 복잡한 요청)
needs_review = len(state["user_request"]) > 100 # 간단한 조건
return {"needs_review": needs_review}
def human_review(state: HumanInLoopState):
# 인간 검토 절차
print("사용자 검토 대기...")
# 실제 구현에서는 사용자 인터페이스 또는 외부 시스템과 연동
return {"final_answer": "사용자 검토 결과"}
def final_generate(state: HumanInLoopState):
# 최종 생성
return {"final_answer": "최종 답변"}
workflow = StateGraph(HumanInLoopState)
workflow.add_node("generate_initial", generate_initial)
workflow.add_node("check_review", check_review_needed)
workflow.add_node("human_review", human_review)
workflow.add_node("final_generate", final_generate)
workflow.add_edge(START, "generate_initial")
workflow.add_edge("generate_initial", "check_review")
workflow.add_conditional_edges(
"check_review",
lambda x: "needs_review" if x["needs_review"] else "final_generate",
{
"needs_review": "human_review",
"final_generate": "final_generate"
}
)
workflow.add_edge("human_review", "final_generate")
workflow.add_edge("final_generate", END)
app = workflow.compile(checkpointer=MemorySaver())
5. 템플릿 5: 병렬 실행 에이전트 (팬아웃 → 처리 → 집계)
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelAgentState(TypedDict):
tasks: List[str]
results: List[dict]
aggregated_result: str
def fan_out(state: ParallelAgentState):
# 작업 분할
tasks = ["task_1", "task_2", "task_3"]
return {"tasks": tasks}
def process_task(state: ParallelAgentState, task: str):
# 개별 작업 처리
# 예: API 호출, 데이터 처리 등
return {task: f"결과_{task}"}
def aggregate_results(state: ParallelAgentState):
# 결과 집계
results = [f"결과_{task}" for task in state["tasks"]]
return {"aggregated_result": " | ".join(results)}
# 병렬 처리 구현
async def parallel_process(state: ParallelAgentState):
tasks = state["tasks"]
# 비동기 작업 처리
results = []
for task in tasks:
result = process_task(state, task)
results.append(result)
# 결과 집계
return {"results": results, "aggregated_result": " | ".join([str(r) for r in results])}
workflow = StateGraph(ParallelAgentState)
workflow.add_node("fan_out", fan_out)
workflow.add_node("process_parallel", parallel_process)
workflow.add_node("aggregate", aggregate_results)
workflow.add_edge("fan_out", "process_parallel")
workflow.add_edge("process_parallel", "aggregate")
workflow.add_edge("aggregate", END)
app = workflow.compile()
6. 상태 관리 패턴
6.1 상태 업데이트 패턴
from typing import Annotated
import operator
class UpdateState(TypedDict):
messages: Annotated[list, operator.add]
current_step: int
status: str
def update_with_logging(state: UpdateState):
# 상태 업데이트 및 로깅
current_step = state.get("current_step", 0) + 1
return {
"current_step": current_step,
"status": f"처리 중 (단계 {current_step})"
}
6.2 상태 저장 및 복구
python
from langgraph.checkpoint.sqlite import SqliteSaver
# 체크포인트 저장
checkpointer = SqliteSaver.from_conn_string(":memory:")
app = workflow.compile(checkpointer=checkpointer)
# 상태 저장
config = {"configurable": {"thread_id": "thread_123"}}
app
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Top comments (0)