LangGraph 워크플로우 템플릿 (v15)
Overview
이 가이드는 LangChain/LangGraph 기반 AI 에이전트 개발을 위한 재사용 가능한 워크플로우 템플릿을 제공합니다. 각 템플릿은 실제 개발 문제를 해결하도록 설계되었으며, $3-$7 범위의 가치를 제공합니다.
1. LangGraph 아키텍처 개요
LangGraph는 다음과 같은 핵심 구성 요소로 구성됩니다:
- Nodes: 워크플로우의 각 단계 (함수 또는 클래스)
- Edges: 노드 간의 전이 조건
- State: 워크플로우 상태 관리
- Checkpointing: 상태 영속성 및 중단 복구
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator
class GraphState(TypedDict):
messages: Annotated[list, operator.add]
# 기본 워크플로우 구조
workflow = StateGraph(GraphState)
2. 템플릿 1: 간단한 RAG 에이전트 (검색 → 생성 → 검증)
이 템플릿은 문서 검색과 생성을 통합한 간단한 RAG 에이전트입니다:
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain_core.runnables import RunnableLambda
class RAGAgent:
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
def retrieve(self, state):
question = state["messages"][-1].content
docs = self.vectorstore.similarity_search(question, k=3)
return {"context": "\n".join([doc.page_content for doc in docs])}
def generate(self, state):
prompt = PromptTemplate.from_template("""
다음 질문에 대해 제공된 문서를 기반으로 답변하세요:
Question: {question}
Context: {context}
Answer:
""")
chain = prompt | self.llm | StrOutputParser()
question = state["messages"][-1].content
answer = chain.invoke({"question": question, "context": state["context"]})
return {"answer": answer}
def validate(self, state):
# 간단한 검증 로직
if len(state["answer"]) < 10:
return {"validation": "failed", "retry": True}
return {"validation": "passed", "retry": False}
# 워크플로우 구성
rag_workflow = StateGraph(GraphState)
rag_workflow.add_node("retrieve", RunnableLambda(lambda state: RAGAgent(None, None).retrieve(state)))
rag_workflow.add_node("generate", RunnableLambda(lambda state: RAGAgent(None, None).generate(state)))
rag_workflow.add_node("validate", RunnableLambda(lambda state: RAGAgent(None, None).validate(state)))
rag_workflow.set_entry_point("retrieve")
rag_workflow.add_edge("retrieve", "generate")
rag_workflow.add_edge("generate", "validate")
# 재시도 로직 추가
def should_retry(state):
return state["validation"] == "failed"
rag_workflow.add_conditional_edges(
"validate",
should_retry,
{
True: "retrieve",
False: END
}
)
3. 템플릿 2: 멀티-도구 에이전트 (계획 → 실행 → 관찰 → 결정)
이 템플릿은 작업 계획과 실행을 분리한 멀티-도구 에이전트입니다:
from typing import List, Dict, Any
class ToolAgent:
def __init__(self, tools):
self.tools = tools
def plan(self, state):
# 작업 계획 생성
prompt = PromptTemplate.from_template("""
사용자 요청: {question}
제공된 도구: {tools}
계획 생성:
""")
chain = prompt | self.llm | StrOutputParser()
question = state["messages"][-1].content
plan = chain.invoke({"question": question, "tools": [t.name for t in self.tools]})
return {"plan": plan}
def execute(self, state):
# 계획 실행
# 여기서는 간단한 예제로 도구 실행
return {"execution_result": "tool executed successfully"}
def observe(self, state):
# 실행 결과 관찰
return {"observation": "execution completed"}
def decide(self, state):
# 결정 로직
# 예: 성공 여부에 따라 다음 단계 결정
return {"next_step": "complete" if state["execution_result"] else "retry"}
# 도구 정의
class Tool:
def __init__(self, name, func):
self.name = name
self.func = func
tools = [
Tool("search", lambda x: f"search result for {x}"),
Tool("calculate", lambda x: f"calculation result for {x}")
]
# 워크플로우 구성
tool_workflow = StateGraph(GraphState)
tool_workflow.add_node("plan", RunnableLambda(lambda state: ToolAgent(tools).plan(state)))
tool_workflow.add_node("execute", RunnableLambda(lambda state: ToolAgent(tools).execute(state)))
tool_workflow.add_node("observe", RunnableLambda(lambda state: ToolAgent(tools).observe(state)))
tool_workflow.add_node("decide", RunnableLambda(lambda state: ToolAgent(tools).decide(state)))
tool_workflow.set_entry_point("plan")
tool_workflow.add_edge("plan", "execute")
tool_workflow.add_edge("execute", "observe")
tool_workflow.add_edge("observe", "decide")
# 조건부 엣지
def should_continue(state):
return state["next_step"] == "retry"
tool_workflow.add_conditional_edges(
"decide",
should_continue,
{
True: "plan",
False: END
}
)
4. 템플릿 3: 인간-중개 워크플로우 (중지 → 검토 → 계속)
이 템플릿은 인간 검토가 필요한 워크플로우를 구현합니다:
import asyncio
from datetime import datetime
class HumanInLoopAgent:
def __init__(self, llm):
self.llm = llm
def generate_response(self, state):
# AI 응답 생성
prompt = PromptTemplate.from_template("""
사용자 요청: {question}
AI 응답 생성:
""")
chain = prompt | self.llm | StrOutputParser()
question = state["messages"][-1].content
response = chain.invoke({"question": question})
return {"ai_response": response}
def pause_for_review(self, state):
# 인간 검토를 위한 중지
return {"status": "awaiting_review", "review_required": True}
def continue_after_review(self, state):
# 검토 후 계속
return {"status": "resumed"}
# 인간 검토 워크플로우
human_workflow = StateGraph(GraphState)
human_workflow.add_node("generate", RunnableLambda(lambda state: HumanInLoopAgent(None).generate_response(state)))
human_workflow.add_node("pause", RunnableLambda(lambda state: HumanInLoopAgent(None).pause_for_review(state)))
human_workflow.add_node("resume", RunnableLambda(lambda state: HumanInLoopAgent(None).continue_after_review(state)))
human_workflow.set_entry_point("generate")
human_workflow.add_edge("generate", "pause")
human_workflow.add_edge("pause", "resume")
# 사용자 입력 처리
class ReviewHandler:
def __init__(self):
self.pending_reviews = {}
def submit_review(self, user_id, decision):
if user_id in self.pending_reviews:
self.pending_reviews[user_id]["decision"] = decision
return True
return False
5. 템플릿 5: 병렬 실행 에이전트 (분기 → 처리 → 집계)
이 템플릿은 여러 작업을 병렬로 처리하고 결과를 집계합니다:
python
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelAgent:
def __init__(self, processors):
self.processors = processors
def fan_out(self, state):
# 작업 분기
return {"tasks": ["task_1", "task_2", "task_3"]}
def process(self, state):
# 병렬 처리
task = state["current_task"]
# 각각의 작업을 병렬로 처리
return {"processed_result": f"result_from_{task}"}
def aggregate(self, state):
# 결과 집계
return {"final_result": "aggregated results"}
# 병렬 워크플로우
parallel_workflow = StateGraph(GraphState)
parallel_workflow.add_node("fan_out", RunnableLambda(lambda state: ParallelAgent([]).fan_out(state)))
parallel_workflow.add_node("process", RunnableLambda(lambda state: ParallelAgent([]).process(state)))
parallel_workflow.add_node("aggregate", RunnableLambda(lambda state: ParallelAgent([]).aggregate(state)))
parallel_workflow.set
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Top comments (0)