LangGraph 워크플로우 템플릿 (v36)
1. LangGraph 아키텍처 개요
LangGraph는 상태 기반 워크플로우 엔진으로, 다음과 같은 핵심 구성 요소로 작동합니다:
- Nodes (노드): 각 단계의 처리 로직
- Edges (엣지): 노드 간의 전이 조건
- State (상태): 워크플로우 내 모든 정보 저장
- Checkpointing (체크포인팅): 상태 저장/복원 기능
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator
class GraphState(TypedDict):
messages: Annotated[list, operator.add]
# 기본 워크플로우 구조
workflow = StateGraph(GraphState)
2. 템플릿 1: 간단한 RAG 에이전트 (검색 → 생성 → 검증)
실제 문서 검색 후 답변 생성 시스템:
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
class RAGAgent:
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
def retrieve(self, state):
question = state["messages"][-1]["content"]
docs = self.vectorstore.similarity_search(question, k=3)
return {"retrieved_docs": docs}
def generate(self, state):
context = "\n".join([doc.page_content for doc in state["retrieved_docs"]])
prompt = PromptTemplate.from_template(
"다음 문맥을 기반으로 질문에 답변하세요:\n{context}\n질문: {question}"
)
chain = prompt | self.llm | StrOutputParser()
answer = chain.invoke({
"context": context,
"question": state["messages"][-1]["content"]
})
return {"answer": answer}
def validate(self, state):
# 답변 검증 로직
return {"validated": True}
def build_graph(self):
workflow = StateGraph(GraphState)
workflow.add_node("retrieve", self.retrieve)
workflow.add_node("generate", self.generate)
workflow.add_node("validate", self.validate)
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.add_edge("validate", END)
return workflow.compile()
# 사용 예시
rag_agent = RAGAgent(vectorstore, ChatOpenAI(model="gpt-4"))
graph = rag_agent.build_graph()
3. 템플릿 2: 다중 도구 에이전트 (계획 → 실행 → 관찰 → 결정)
다양한 도구를 활용한 에이전트 시스템:
from langchain.tools import Tool
from langchain_core.tools import tool
import json
class ToolAgent:
def __init__(self, tools):
self.tools = tools
def plan(self, state):
# 계획 생성
return {"plan": ["tool1", "tool2"]}
def execute(self, state):
# 도구 실행
executed_tools = []
for tool_name in state["plan"]:
tool = next((t for t in self.tools if t.name == tool_name), None)
if tool:
result = tool.run(state["messages"][-1]["content"])
executed_tools.append({"tool": tool_name, "result": result})
return {"executed_tools": executed_tools}
def observe(self, state):
# 실행 결과 관찰
return {"observations": "실행 완료"}
def decide(self, state):
# 결정 로직
return {"final_decision": "continue"}
def build_graph(self):
workflow = StateGraph(GraphState)
workflow.add_node("plan", self.plan)
workflow.add_node("execute", self.execute)
workflow.add_node("observe", self.observe)
workflow.add_node("decide", self.decide)
workflow.set_entry_point("plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.add_edge("decide", END)
return workflow.compile()
# 도구 정의 예시
tools = [
Tool(
name="search",
func=lambda x: f"검색 결과: {x}",
description="검색 도구"
),
Tool(
name="calculate",
func=lambda x: f"계산 결과: {eval(x)}",
description="계산 도구"
)
]
4. 템플릿 3: 인간-중개 워크플로우 (중지 → 검토 → 계속)
사용자 검토를 포함한 작업 흐름:
import asyncio
from datetime import datetime
class HumanInLoopAgent:
def __init__(self, llm):
self.llm = llm
def process(self, state):
# 기본 처리
return {"processed_data": "처리된 데이터"}
def pause_for_review(self, state):
# 중지 및 검토
return {"status": "awaiting_review"}
def review(self, state):
# 검토 상태
return {"review_status": "approved"}
def continue_processing(self, state):
# 계속 진행
return {"status": "processing"}
def build_graph(self):
workflow = StateGraph(GraphState)
workflow.add_node("process", self.process)
workflow.add_node("pause", self.pause_for_review)
workflow.add_node("review", self.review)
workflow.add_node("continue", self.continue_processing)
workflow.set_entry_point("process")
workflow.add_edge("process", "pause")
workflow.add_edge("pause", "review")
workflow.add_edge("review", "continue")
workflow.add_edge("continue", END)
return workflow.compile()
# 인간 검토를 위한 특별 처리
def human_review_callback(state):
# 외부 시스템과 통신
print("사용자 검토 요청:", state["messages"][-1]["content"])
# 실제 구현에서는 API 호출이나 이메일 알림 필요
return {"review_requested": True}
5. 템플릿 5: 병렬 실행 에이전트 (분기 → 처리 → 집계)
병렬 작업 처리 및 결과 집계:
class ParallelAgent:
def __init__(self, llm):
self.llm = llm
def fan_out(self, state):
# 작업 분기
return {"tasks": ["task1", "task2", "task3"]}
def process_parallel(self, state):
# 병렬 처리
tasks = state["tasks"]
results = []
for task in tasks:
# 각 작업 병렬 처리
result = f"결과: {task}"
results.append({"task": task, "result": result})
return {"results": results}
def aggregate(self, state):
# 결과 집계
aggregated = "\n".join([r["result"] for r in state["results"]])
return {"aggregated_result": aggregated}
def build_graph(self):
workflow = StateGraph(GraphState)
workflow.add_node("fan_out", self.fan_out)
workflow.add_node("process_parallel", self.process_parallel)
workflow.add_node("aggregate", self.aggregate)
workflow.set_entry_point("fan_out")
workflow.add_edge("fan_out", "process_parallel")
workflow.add_edge("process_parallel", "aggregate")
workflow.add_edge("aggregate", END)
return workflow.compile()
6. 상태 관리 패턴
상태를 효율적으로 관리하고 저장하는 방법:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
from langchain_core.runnables import RunnableConfig
class StateManager:
def __init__(self, checkpoint_backend=None):
self.checkpoint_backend = checkpoint_backend or MemorySaver()
def create_workflow_with_checkpoint(self, workflow_builder):
return workflow_builder.with_config(
configurable={
"thread_id": "12345"
}
)
def get_state(self, graph, thread_id):
# 현재 상태 조회
return graph.get_state({"configurable": {"thread_id": thread_id}})
def update_state(self, graph, thread_id, update_data):
# 상태 업데이트
return graph.update_state(
{"configurable": {"thread_id": thread_id}},
update_data
)
# 사용 예시
state_manager = StateManager(SqliteSaver("checkpoints.db"))
7. 스트리밍 및 실시간 업데이트
실시간 데이터 스트리밍 처리:
python
import asyncio
from typing import AsyncGenerator
class StreamingAgent:
async def stream_process(self, state):
# 스트리밍 처리
messages = state
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Top comments (0)