LangGraph 워크플로우 템플릿 (v39)
LangGraph 아키텍처 개요
LangGraph는 상태 기반 워크플로우를 구현하기 위한 프레임워크로, 다음과 같은 핵심 구성 요소로 작동합니다:
Nodes (노드): 워크플로우의 각 단계. 각 노드는 특정 작업을 수행하고 상태를 업데이트합니다.
Edges (엣지): 노드 간의 전이 조건을 정의합니다.
State (상태): 워크플로우의 현재 상태를 유지합니다. 노드가 상태를 읽고 변경할 수 있습니다.
Checkpointing (체크포인팅): 상태를 저장하고 복원할 수 있어 중단 후 복구가 가능합니다.
from langgraph.graph import StateGraph
from typing import TypedDict, Annotated
import operator
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
# 기본 워크플로우 구성
workflow = StateGraph(AgentState)
템플릿 1: 단순 RAG 에이전트
문서 검색 후 생성하고 검증하는 단순한 RAG 워크플로우입니다:
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
class RAGState(TypedDict):
query: str
documents: list
response: str
validated: bool
def retrieve(state: RAGState):
# 문서 검색 로직
documents = vector_store.similarity_search(state["query"])
return {"documents": documents}
def generate(state: RAGState):
# 생성 로직
prompt = PromptTemplate.from_template(
"Context: {context}\n\nQuestion: {query}\n\nAnswer:"
)
llm = ChatOpenAI(model="gpt-4")
response = llm.invoke([
("system", "You are a helpful assistant."),
("user", prompt.format(
context="\n".join([doc.page_content for doc in state["documents"]]),
query=state["query"]
))
])
return {"response": response.content}
def validate(state: RAGState):
# 응답 검증
# 간단한 예: 응답이 질문과 관련된 내용인지 확인
return {"validated": True}
# 워크플로우 생성
rag_workflow = StateGraph(RAGState)
rag_workflow.add_node("retrieve", retrieve)
rag_workflow.add_node("generate", generate)
rag_workflow.add_node("validate", validate)
rag_workflow.add_edge("retrieve", "generate")
rag_workflow.add_edge("generate", "validate")
rag_workflow.set_entry_point("retrieve")
rag_workflow.set_finish_point("validate")
rag_app = rag_workflow.compile()
템플릿 2: 다중 도구 에이전트
계획 → 실행 → 관찰 → 결정의 반복적인 루프:
from typing import List
import json
class ToolAgentState(TypedDict):
task: str
plan: List[str]
execution_history: List[dict]
final_answer: str
def plan(state: ToolAgentState):
# 작업 계획 생성
llm = ChatOpenAI(model="gpt-4")
plan_prompt = PromptTemplate.from_template(
"Break down the task '{task}' into specific steps. Format as JSON array of strings."
)
response = llm.invoke([
("system", "You are a task planning assistant."),
("user", plan_prompt.format(task=state["task"]))
])
try:
steps = json.loads(response.content)
return {"plan": steps}
except:
return {"plan": [state["task"]]}
def execute(state: ToolAgentState):
# 도구 실행
execution_results = []
for step in state["plan"]:
# 예시: 각 단계에 대한 도구 실행 로직
result = {"step": step, "status": "completed", "output": f"Result for {step}"}
execution_results.append(result)
return {"execution_history": execution_results}
def observe(state: ToolAgentState):
# 실행 결과 관찰
# 예: 성공/실패 여부 확인
failed_steps = [
step for step in state["execution_history"]
if step.get("status") == "failed"
]
return {"execution_history": state["execution_history"]}
def decide(state: ToolAgentState):
# 결정 로직
if not state["execution_history"]:
return {"final_answer": "No execution history available"}
all_success = all(step.get("status") == "completed"
for step in state["execution_history"])
if all_success:
return {"final_answer": "All tasks completed successfully"}
else:
return {"final_answer": "Some tasks failed"}
# 워크플로우 구성
tool_workflow = StateGraph(ToolAgentState)
tool_workflow.add_node("plan", plan)
tool_workflow.add_node("execute", execute)
tool_workflow.add_node("observe", observe)
tool_workflow.add_node("decide", decide)
tool_workflow.add_edge("plan", "execute")
tool_workflow.add_edge("execute", "observe")
tool_workflow.add_edge("observe", "decide")
tool_workflow.set_entry_point("plan")
tool_workflow.set_finish_point("decide")
tool_app = tool_workflow.compile()
템플릿 3: 인간-중개 워크플로우
사용자 검토 후 진행하는 인간 인터페이스:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState
import asyncio
class HumanReviewState(MessagesState):
review_status: str
review_comment: str
def pause_for_review(state: HumanReviewState):
# 사용자 검토를 위한 일시정지
return {"review_status": "pending"}
def review_process(state: HumanReviewState):
# 검토 프로세스
return {"review_status": "approved"} # 또는 "rejected"
def continue_with_review(state: HumanReviewState):
# 검토 후 진행
if state["review_status"] == "rejected":
return {"messages": [{"role": "assistant", "content": "Process rejected by human review"}]}
return {"messages": [{"role": "assistant", "content": "Process continues after review"}]}
# 인간 검토 워크플로우
review_workflow = StateGraph(HumanReviewState)
review_workflow.add_node("pause", pause_for_review)
review_workflow.add_node("review", review_process)
review_workflow.add_node("continue", continue_with_review)
review_workflow.add_edge("pause", "review")
review_workflow.add_edge("review", "continue")
review_workflow.set_entry_point("pause")
review_workflow.set_finish_point("continue")
review_app = review_workflow.compile(checkpointer=MemorySaver())
템플릿 5: 병렬 실행 에이전트
파이프라인에서 병렬 처리 후 집계:
from concurrent.futures import ThreadPoolExecutor
import time
class ParallelAgentState(TypedDict):
data: List[str]
processed_results: List[dict]
aggregated_result: dict
def fan_out(state: ParallelAgentState):
# 데이터를 병렬로 분할
return {"processed_results": []}
def process_item(item: dict):
# 각 항목 처리
time.sleep(0.1) # 시뮬레이션
return {
"id": item["id"],
"processed": True,
"result": f"Processed {item['data']}"
}
def aggregate_results(state: ParallelAgentState):
# 결과 집계
results = state["processed_results"]
total = len(results)
successful = sum(1 for r in results if r.get("processed"))
return {
"aggregated_result": {
"total_items": total,
"successful": successful,
"failed": total - successful
}
}
# 병렬 처리 워크플로우
parallel_workflow = StateGraph(ParallelAgentState)
parallel_workflow.add_node("fan_out", fan_out)
parallel_workflow.add_node("aggregate", aggregate_results)
parallel_workflow.add_edge("fan_out", "aggregate")
parallel_workflow.set_entry_point("fan_out")
parallel_workflow.set_finish_point("aggregate")
parallel_app = parallel_workflow.compile()
상태 관리 패턴
1. 상태 초기화
def initialize_state():
return {
"messages": [],
"timestamp": time.time(),
"version": "v39",
"metadata": {}
}
# 상태 초기화 및 체크포인트
initial_state = initialize_state()
2. 상태 병합
def merge_state(current_state, new_state):
merged = current_state.copy()
for key, value in new_state.items():
if key in merged and isinstance(merged[key], list):
merged[key].extend(value)
else:
merged[key] = value
return merged
3. 상태 유효성 검사
python
def validate_state(state):
required_fields = ["messages", "timestamp"]
for field in required_fields:
if field not in state:
raise ValueError(f"Missing required field: {field}")
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Top comments (0)