LangGraph 워크플로우 템플릿 (v24)
1. LangGraph 아키텍처 개요
LangGraph는 상태 기반 워크플로우 시스템으로, 다음과 같은 핵심 구성 요소로 이루어져 있습니다:
Nodes: 워크플로우의 각 단계
Edges: 노드 간의 전이 조건
State: 워크플로우 내에서 공유되는 상태
Checkpointing: 상태 저장 및 복구 기능
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator
class GraphState(TypedDict):
messages: Annotated[list, operator.add]
workflow = StateGraph(GraphState)
2. 템플릿 1: 간단한 RAG 에이전트 (검색 → 생성 → 검증)
이 템플릿은 문서 검색 및 생성을 위한 기본적인 RAG 워크플로우입니다:
import json
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
class RAGState(TypedDict):
query: str
context: str
response: str
validation: bool
def retrieve_node(state: RAGState):
# 간단한 문서 검색 로직
# 실제 구현에서는 vector store 검색 사용
return {"context": "검색된 문서 내용"}
def generate_node(state: RAGState):
prompt = PromptTemplate.from_template(
"질문: {query}\n문맥: {context}\n답변:"
)
llm = ChatOpenAI(model="gpt-4")
chain = prompt | llm | StrOutputParser()
response = chain.invoke({
"query": state["query"],
"context": state["context"]
})
return {"response": response}
def validate_node(state: RAGState):
# 답변 검증 로직
# 예: 관련성 점수 계산
return {"validation": True}
# 워크플로우 구성
workflow = StateGraph(RAGState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_node("validate", validate_node)
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.set_entry_point("retrieve")
workflow.set_finish_point("validate")
3. 템플릿 2: 다중 툴 에이전트 (계획 → 실행 → 관찰 → 결정)
다중 도구를 활용한 에이전트 워크플로우:
from langchain.tools import Tool
from langchain_core.messages import ToolMessage
from typing import List
class ToolAgentState(TypedDict):
task: str
plan: List[str]
execution: str
observation: str
decision: str
def plan_node(state: ToolAgentState):
# 작업 계획 생성
plan = ["분석 시작", "도구 선택", "실행", "결과 검토"]
return {"plan": plan}
def execute_node(state: ToolAgentState):
# 도구 실행
# 실제 도구 호출 로직
tools = [Tool(name="search", func=lambda x: f"검색 결과: {x}")]
tool = tools[0]
result = tool.func(state["task"])
return {"execution": result}
def observe_node(state: ToolAgentState):
# 실행 결과 관찰
return {"observation": f"관찰 결과: {state['execution']}"}
def decide_node(state: ToolAgentState):
# 결정 로직
if "결과" in state["observation"]:
return {"decision": "성공"}
return {"decision": "실패"}
workflow = StateGraph(ToolAgentState)
workflow.add_node("plan", plan_node)
workflow.add_node("execute", execute_node)
workflow.add_node("observe", observe_node)
workflow.add_node("decide", decide_node)
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.set_entry_point("plan")
workflow.set_finish_point("decide")
4. 템플릿 3: 인간-중개 워크플로우 (일시정지 → 검토 → 계속)
사람의 개입이 필요한 경우 사용:
import asyncio
from datetime import datetime
class HumanInLoopState(TypedDict):
task: str
status: str
human_review: str
final_result: str
def process_task_node(state: HumanInLoopState):
# 작업 처리
return {"status": "processing"}
def pause_for_review_node(state: HumanInLoopState):
# 인간 검토를 위한 일시정지
print(f"작업 일시정지: {state['task']}")
# 실제 구현에서는 API 호출 또는 메시지 대기
return {"status": "awaiting_review"}
def review_node(state: HumanInLoopState):
# 인간 검토
# 이 부분은 외부 인터페이스를 통해 처리
return {"human_review": "검토 완료"}
def continue_processing_node(state: HumanInLoopState):
# 검토 후 처리 계속
return {"status": "completed", "final_result": "최종 결과"}
workflow = StateGraph(HumanInLoopState)
workflow.add_node("process_task", process_task_node)
workflow.add_node("pause_for_review", pause_for_review_node)
workflow.add_node("review", review_node)
workflow.add_node("continue_processing", continue_processing_node)
workflow.add_edge("process_task", "pause_for_review")
workflow.add_edge("pause_for_review", "review")
workflow.add_edge("review", "continue_processing")
workflow.set_entry_point("process_task")
workflow.set_finish_point("continue_processing")
5. 템플릿 5: 병렬 실행 에이전트 (팬아웃 → 처리 → 집계)
병렬 작업 처리:
from concurrent.futures import ThreadPoolExecutor
import threading
class ParallelAgentState(TypedDict):
tasks: List[str]
results: List[str]
aggregated_result: str
def fan_out_node(state: ParallelAgentState):
# 작업 분할
tasks = ["task_1", "task_2", "task_3"]
return {"tasks": tasks, "results": []}
def process_parallel_node(state: ParallelAgentState):
# 병렬 처리
def process_task(task):
# 각 작업 병렬 처리
return f"{task}_결과"
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(process_task, state["tasks"]))
return {"results": results}
def aggregate_node(state: ParallelAgentState):
# 결과 집계
aggregated = ", ".join(state["results"])
return {"aggregated_result": aggregated}
workflow = StateGraph(ParallelAgentState)
workflow.add_node("fan_out", fan_out_node)
workflow.add_node("process_parallel", process_parallel_node)
workflow.add_node("aggregate", aggregate_node)
workflow.add_edge("fan_out", "process_parallel")
workflow.add_edge("process_parallel", "aggregate")
workflow.set_entry_point("fan_out")
workflow.set_finish_point("aggregate")
6. 상태 관리 패턴
상태를 효율적으로 관리하는 방법:
from langgraph.checkpoint import Checkpoint
from langgraph.checkpoint.memory import MemorySaver
# 메모리 체크포인트 사용
memory = MemorySaver()
# 사용자별 상태 저장
class UserState(TypedDict):
user_id: str
conversation_history: List[dict]
preferences: dict
def save_checkpoint(state: UserState):
# 사용자별 체크포인트 저장
checkpoint = {
"user_id": state["user_id"],
"conversation_history": state["conversation_history"],
"preferences": state["preferences"],
"timestamp": datetime.now().isoformat()
}
return checkpoint
7. 스트리밍 및 실시간 업데이트
실시간 처리 및 스트리밍:
from langchain_core.callbacks import BaseCallbackHandler
class StreamingCallback(BaseCallbackHandler):
def on_llm_new_token(self, token: str, **kwargs):
print(token, end="", flush=True)
def streaming_node(state: StreamingState):
llm = ChatOpenAI(streaming=True, callbacks=[StreamingCallback()])
# 스트리밍 처리 로직
return {"streaming_output": "스트리밍 결과"}
# 스트리밍 워크플로우
workflow = StateGraph(StreamingState)
workflow.add_node("streaming", streaming_node)
workflow.add_edge("streaming", END)
8. 오류 복구 및 재시도 로직
안정적인 워크플로우를 위한 오류 처리:
python
import time
from functools import wraps
def retry_on_failure(max_retries=3, delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries -
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Top comments (0)