LangGraph 워크플로우 템플릿 (v32)
LangGraph 아키텍처 개요
LangGraph는 LangChain의 고급 워크플로우 엔진으로, 상태 기반의 순환 그래프를 기반으로 합니다. 핵심 구성 요소는 다음과 같습니다:
노드 (Nodes)
각 노드는 상태를 입력으로 받아 처리하고 상태를 반환하는 함수입니다. 노드는 일반적으로 Runnable 또는 RunnableCallable 인스턴스입니다.
엣지 (Edges)
엣지는 노드 간의 흐름을 정의합니다. 정적 또는 동적 엣지를 사용할 수 있으며, 조건에 따라 다른 경로로 분기할 수 있습니다.
상태 (State)
상태는 모든 노드가 공유하는 데이터 구조입니다. BaseState를 상속받아 정의하며, 메시지, 변수, 기록 등을 포함합니다.
체크포인팅 (Checkpointing)
체크포인팅은 워크플로우 상태를 저장하고 복원할 수 있게 해줍니다. 이를 통해 장애 복구나 중단 후 재개가 가능합니다.
템플릿 1: 단순 RAG 에이전트
문제: 문서 검색 후 생성하는 RAG 워크플로우의 기본 구조
from langgraph.graph import StateGraph, END
from typing import TypedDict, List
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.runnables import RunnableLambda
class RAGState(TypedDict):
question: str
context: str
answer: str
retrieved_docs: List[str]
def retrieve(state: RAGState):
# Simulate retrieval logic
docs = ["문서 1 내용", "문서 2 내용"] # 실제 검색 로직으로 교체
return {"retrieved_docs": docs, "context": "\n".join(docs)}
def generate(state: RAGState):
# 실제 LLM 호출 로직
prompt = f"질문: {state['question']}\n문맥: {state['context']}"
# 예: llm.invoke(prompt)
answer = "생성된 답변"
return {"answer": answer}
def validate(state: RAGState):
# 답변 검증 로직
if len(state["answer"]) < 10:
return {"answer": "검증 실패: 답변이 너무 짧습니다"}
return {"answer": state["answer"]}
# 워크플로우 정의
workflow = StateGraph(RAGState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)
workflow.add_node("validate", validate)
# 엣지 정의
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.add_edge("validate", END)
rag_graph = workflow.compile()
템플릿 2: 다중 도구 에이전트
문제: 계획 → 실행 → 관찰 → 결정의 반복적 워크플로우 구현
from langgraph.graph import StateGraph, END
from typing import TypedDict, List
from langchain_core.messages import HumanMessage
class ToolState(TypedDict):
input: str
plan: List[str]
execution: List[str]
observations: List[str]
final_answer: str
def plan(state: ToolState):
# 도구 사용 계획 생성
tools = ["web_search", "calculator", "database_query"]
plan = [f"도구 {tool} 실행" for tool in tools]
return {"plan": plan}
def execute(state: ToolState):
# 도구 실행 (실제 실행 로직)
execution_results = ["검색 결과", "계산 결과", "데이터 조회 결과"]
return {"execution": execution_results}
def observe(state: ToolState):
# 실행 결과 분석
observations = [
f"검색 결과: {result}" for result in state["execution"]
]
return {"observations": observations}
def decide(state: ToolState):
# 최종 결정
if len(state["observations"]) >= 2:
return {"final_answer": "실행 완료"}
return {"final_answer": "실행 실패"}
# 워크플로우 구현
workflow = StateGraph(ToolState)
workflow.add_node("plan", plan)
workflow.add_node("execute", execute)
workflow.add_node("observe", observe)
workflow.add_node("decide", decide)
workflow.set_entry_point("plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.add_edge("decide", END)
tool_graph = workflow.compile()
템플릿 3: 인간-중개 워크플로우
문제: 인간 검토와 승인을 포함한 워크플로우 구현
from langgraph.graph import StateGraph, END
from typing import TypedDict, List
from langchain_core.messages import HumanMessage
class HumanInLoopState(TypedDict):
input: str
generated_output: str
review_request: str
user_feedback: str
final_output: str
def generate(state: HumanInLoopState):
# AI 생성
return {"generated_output": "생성된 결과"}
def pause_for_review(state: HumanInLoopState):
# 사용자 검토 요청
review_prompt = f"검토 요청:\n{state['generated_output']}"
return {"review_request": review_prompt}
def process_feedback(state: HumanInLoopState):
# 사용자 피드백 처리
if state["user_feedback"]:
return {"final_output": f"수정된 결과: {state['user_feedback']}"}
else:
return {"final_output": state["generated_output"]}
# 사용자 입력 헬퍼 함수
def get_user_input():
return input("사용자 피드백을 입력하세요: ")
# 워크플로우 정의
workflow = StateGraph(HumanInLoopState)
workflow.add_node("generate", generate)
workflow.add_node("pause_for_review", pause_for_review)
workflow.add_node("process_feedback", process_feedback)
workflow.set_entry_point("generate")
workflow.add_edge("generate", "pause_for_review")
workflow.add_edge("pause_for_review", "process_feedback")
workflow.add_edge("process_feedback", END)
human_loop_graph = workflow.compile()
템플릿 4: 병렬 실행 에이전트
문제: 병렬로 여러 작업을 실행하고 결과를 집계하는 워크플로우
from langgraph.graph import StateGraph, END
from typing import TypedDict, List
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelExecutionState(TypedDict):
input_data: List[str]
parallel_results: List[str]
aggregated_result: str
def fan_out(state: ParallelExecutionState):
# 입력 데이터를 병렬 처리
return {"parallel_results": state["input_data"]}
def process_parallel_item(item: str):
# 각 아이템 처리 (실제 AI 처리 로직)
return f"처리 결과: {item}"
async def parallel_process(state: ParallelExecutionState):
# 병렬 처리 로직
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=4) as executor:
tasks = [loop.run_in_executor(executor, process_parallel_item, item)
for item in state["parallel_results"]]
results = await asyncio.gather(*tasks)
return {"parallel_results": results}
def aggregate_results(state: ParallelExecutionState):
# 결과 집계
aggregated = " | ".join(state["parallel_results"])
return {"aggregated_result": aggregated}
# 병렬 워크플로우
workflow = StateGraph(ParallelExecutionState)
workflow.add_node("fan_out", fan_out)
workflow.add_node("parallel_process", parallel_process)
workflow.add_node("aggregate_results", aggregate_results)
workflow.set_entry_point("fan_out")
workflow.add_edge("fan_out", "parallel_process")
workflow.add_edge("parallel_process", "aggregate_results")
workflow.add_edge("aggregate_results", END)
parallel_graph = workflow.compile()
상태 관리 패턴
1. 상태 히스토리 관리
class HistoryState(TypedDict):
messages: List[dict]
current_step: int
history: List[dict]
def append_message(state: HistoryState, message: dict):
return {
"messages": state["messages"] + [message],
"current_step": state["current_step"] + 1
}
2. 상태 재사용 패턴
def state_reuse_pattern(state: dict, new_state: dict):
# 기존 상태와 새로운 상태 병합
merged = {**state, **new_state}
return merged
스트리밍 및 실시간 업데이트
python
from langchain_core.callbacks import StreamingCallbackHandler
class StreamCallback(StreamingCallbackHandler):
def on_llm_new_token(self, token: str, **kwargs):
print(f"토큰: {token}")
async def stream_execution(graph, input_data):
#
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Top comments (0)