LangGraph 워크플로우 템플릿 (v3)
1. LangGraph 아키텍처 개요
LangGraph는 그래프 기반의 AI 워크플로우 엔진으로, 노드(Node), 엣지(Edge), 상태(State), 체크포인트(Chckpointing)로 구성됩니다.
핵심 구성 요소:
- Nodes: 각각의 작업 단위 (함수 또는 클래스)
- Edges: 노드 간의 실행 흐름 정의
- State: 워크플로우 내 모든 데이터 저장
- Checkpointing: 상태 저장 및 복구 기능
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
current_node: str
# 그래프 생성
workflow = StateGraph(AgentState)
2. 템플릿 1: 단순 RAG 에이전트
사용 사례: 문서 검색 및 요약 생성
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import ChatPromptTemplate
class RAGAgentState(TypedDict):
query: str
retrieved_docs: list
generated_response: str
def retrieve(state):
# 문서 검색
vectorstore = Chroma(persist_directory="./chroma_db")
retriever = vectorstore.as_retriever()
docs = retriever.invoke(state["query"])
return {"retrieved_docs": docs}
def generate(state):
# 응답 생성
llm = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("human", "{query}\n\nContext: {context}")
])
chain = prompt | llm
context = "\n".join([doc.page_content for doc in state["retrieved_docs"]])
response = chain.invoke({
"query": state["query"],
"context": context
})
return {"generated_response": response.content}
def validate_response(state):
# 응답 검증
if not state["generated_response"]:
return {"generated_response": "Error: No response generated"}
return {}
# 워크플로우 정의
rag_workflow = StateGraph(RAGAgentState)
rag_workflow.add_node("retrieve", retrieve)
rag_workflow.add_node("generate", generate)
rag_workflow.add_node("validate", validate_response)
# 엣지 정의
rag_workflow.add_edge("retrieve", "generate")
rag_workflow.add_edge("generate", "validate")
rag_workflow.set_entry_point("retrieve")
rag_workflow.set_finish_point("validate")
3. 템플릿 2: 멀티-도구 에이전트
사용 사례: 복잡한 작업 계획 및 실행 (Plan-Execute-Observe-Decide)
from typing import List, Dict
from langchain.tools import Tool
from langchain_openai import OpenAI
class MultiToolAgentState(TypedDict):
task: str
plan: List[str]
execution_history: List[Dict]
current_step: int
final_result: str
def plan_task(state):
llm = OpenAI()
prompt = f"Plan the following task in steps: {state['task']}"
plan = llm.invoke(prompt)
steps = plan.split('\n')
return {"plan": steps}
def execute_step(state):
# 단계별 실행
current_step = state["plan"][state["current_step"]]
# 예시 도구들
tools = [
Tool(name="search", func=lambda x: f"Search result for {x}"),
Tool(name="calculate", func=lambda x: f"Calculation result: {x}"),
Tool(name="summarize", func=lambda x: f"Summary: {x}")
]
# 도구 실행 로직 (실제 구현은 도구에 따라 다름)
tool = tools[0] # 간단한 예시
result = tool.run(current_step)
return {
"execution_history": [
*state["execution_history"],
{"step": current_step, "result": result}
],
"current_step": state["current_step"] + 1
}
def decide_next(state):
# 다음 단계 결정
if state["current_step"] >= len(state["plan"]):
return {"final_result": "Task completed"}
return {"current_step": state["current_step"]}
# 워크플로우 정의
multi_tool_workflow = StateGraph(MultiToolAgentState)
multi_tool_workflow.add_node("plan", plan_task)
multi_tool_workflow.add_node("execute", execute_step)
multi_tool_workflow.add_node("decide", decide_next)
multi_tool_workflow.add_edge("plan", "execute")
multi_tool_workflow.add_edge("execute", "decide")
multi_tool_workflow.add_conditional_edges(
"decide",
lambda x: "continue" if x["current_step"] < len(x["plan"]) else "finish",
{
"continue": "execute",
"finish": END
}
)
multi_tool_workflow.set_entry_point("plan")
4. 템플릿 3: 인간-중개 워크플로우
사용 사례: 인간 검토 후 실행 (중요한 결정이나 보안 이슈)
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END
class HumanInLoopState(TypedDict):
task: str
decision_required: bool
human_review: str
processed_data: str
def auto_process(state):
# 자동 처리
llm = ChatOpenAI(model="gpt-4")
prompt = f"Process this task: {state['task']}"
result = llm.invoke(prompt)
return {"processed_data": result.content, "decision_required": True}
def human_review(state):
# 인간 검토 (일시 정지)
return {"human_review": "Review required"}
def manual_decision(state):
# 인간 결정
return {"decision_required": False}
# 체크포인트 설정
memory = MemorySaver()
human_loop_workflow = StateGraph(HumanInLoopState)
human_loop_workflow.add_node("auto_process", auto_process)
human_loop_workflow.add_node("human_review", human_review)
human_loop_workflow.add_node("manual_decision", manual_decision)
human_loop_workflow.add_edge("auto_process", "human_review")
human_loop_workflow.add_edge("human_review", "manual_decision")
human_loop_workflow.set_entry_point("auto_process")
5. 템플릿 5: 병렬 실행 에이전트
사용 사례: 여러 작업을 동시에 처리 후 집계
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelAgentState(TypedDict):
inputs: List[str]
processed_results: List[Dict]
aggregated_result: str
def fan_out_process(state):
# fan-out: 여러 입력 처리
inputs = state["inputs"]
results = []
# 병렬 처리
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(process_single_input, inp) for inp in inputs]
for future in futures:
results.append(future.result())
return {"processed_results": results}
def process_single_input(input_data):
# 각 입력 처리
llm = ChatOpenAI(model="gpt-4")
prompt = f"Process: {input_data}"
result = llm.invoke(prompt)
return {"input": input_data, "output": result.content}
def aggregate_results(state):
# 결과 집계
results = state["processed_results"]
aggregated = "\n".join([f"{r['input']}: {r['output']}" for r in results])
return {"aggregated_result": aggregated}
# 워크플로우 정의
parallel_workflow = StateGraph(ParallelAgentState)
parallel_workflow.add_node("fan_out", fan_out_process)
parallel_workflow.add_node("aggregate", aggregate_results)
parallel_workflow.add_edge("fan_out", "aggregate")
parallel_workflow.set_entry_point("fan_out")
6. 상태 관리 패턴
python
from langgraph.checkpoint import Checkpoint
from typing import Any, Dict
class StateManager:
def __init__(self, checkpoint_path: str):
self.checkpoint_path = checkpoint_path
def save_state(self, state: Dict[str, Any], thread_id: str):
"""상태 저장"""
checkpoint = Checkpoint(
thread_id=thread_id,
state=state,
metadata={"timestamp": time.time()}
)
# 실제 저장 로직
with open(f"{self.checkpoint_path}/{thread_id}.json", "w") as f:
json.dump(checkpoint.dict(), f)
def load_state(self, thread_id: str) -> Dict[str, Any]:
"""상태 로드"""
try:
with open(f"{self.checkpoint_path}/{thread_id}.json", "r") as f:
checkpoint_data = json.load(f)
return checkpoint_data["state"]
except FileNotFoundError:
return {}
# 상태 초기화 및 관
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Top comments (0)