LangGraph 워크플로우 템플릿 (v30)
LangGraph 아키텍처 개요
LangGraph는 그래프 기반 워크플로우를 구축하기 위한 강력한 프레임워크입니다. 핵심 구성 요소는 다음과 같습니다:
노드 (Nodes)
워크플로우의 각 작업 단계입니다. 일반적으로 함수 형태로 구현됩니다.
엣지 (Edges)
노드 간의 실행 흐름을 정의합니다.
상태 (State)
워크플로우 전체에서 공유되는 데이터 구조입니다.
체크포인트 (Checkpointing)
중단된 위치에서 다시 시작할 수 있도록 상태를 저장하는 기능입니다.
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class GraphState(TypedDict):
messages: Annotated[list, operator.add]
user_input: str
context: str
# 기본 워크플로우 구조
workflow = StateGraph(GraphState)
템플릿 1: 간단한 RAG 에이전트 (검색 → 생성 → 유효성 검사)
이 템플릿은 검색기반 생성(RAG) 시스템을 구현합니다.
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableLambda
from langgraph.graph import StateGraph, END
from typing import TypedDict
import operator
class RagState(TypedDict):
query: str
context: str
generated_response: str
validation_result: bool
# 검색 노드
def retrieve_node(state: RagState):
# 실제 검색 로직 (예: ChromaDB, Elasticsearch)
context = search_vector_db(state["query"])
return {"context": context}
# 생성 노드
def generate_node(state: RagState):
prompt = PromptTemplate.from_template(
"다음 문맥을 기반으로 질문에 답하세요:\n\n{context}\n\n질문: {query}"
)
llm = ChatOpenAI(model="gpt-4")
response = llm.invoke(prompt.format(context=state["context"], query=state["query"]))
return {"generated_response": response.content}
# 유효성 검사 노드
def validate_node(state: RagState):
# 응답 유효성 검사 로직
is_valid = check_response_quality(state["generated_response"])
return {"validation_result": is_valid}
# 워크플로우 정의
workflow = StateGraph(RagState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_node("validate", validate_node)
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
# 유효성 검사에 따라 분기
def route_validation(state: RagState):
return "generate" if not state["validation_result"] else END
workflow.add_conditional_edges("validate", route_validation)
템플릿 2: 멀티-도구 에이전트 (계획 → 실행 → 관찰 → 결정)
복잡한 작업을 계획하고 실행하는 에이전트입니다.
import json
from typing import List, Dict
class ToolAgentState(TypedDict):
task: str
plan: List[str]
execution_history: List[Dict]
final_answer: str
# 계획 노드
def plan_node(state: ToolAgentState):
prompt = PromptTemplate.from_template(
"""
다음 작업을 수행하기 위한 단계별 계획을 생성하세요:
{task}
계획은 JSON 형식으로 반환하세요.
"""
)
llm = ChatOpenAI(model="gpt-4")
response = llm.invoke(prompt.format(task=state["task"]))
plan = json.loads(response.content)
return {"plan": plan}
# 실행 노드
def execute_node(state: ToolAgentState):
# 현재 실행할 단계 추출
current_step = state["plan"][len(state["execution_history"])]
# 도구 실행 (예: API 호출, 파일 조작)
result = execute_tool(current_step)
# 실행 기록 업데이트
new_history = state["execution_history"] + [{"step": current_step, "result": result}]
return {"execution_history": new_history}
# 관찰 노드
def observe_node(state: ToolAgentState):
# 실행 결과 검토
last_result = state["execution_history"][-1]["result"]
observations = analyze_result(last_result)
return {"observations": observations}
# 결정 노드
def decide_node(state: ToolAgentState):
# 실행 완료 여부 판단
if len(state["execution_history"]) >= len(state["plan"]):
return {"final_answer": compile_final_answer(state)}
else:
return {"next_step": "execute"}
# 워크플로우 정의
workflow = StateGraph(ToolAgentState)
workflow.add_node("plan", plan_node)
workflow.add_node("execute", execute_node)
workflow.add_node("observe", observe_node)
workflow.add_node("decide", decide_node)
workflow.set_entry_point("plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
def route_decide(state: ToolAgentState):
if len(state["execution_history"]) >= len(state["plan"]):
return END
else:
return "execute"
workflow.add_conditional_edges("decide", route_decide)
템플릿 3: 인간-중개 워크플로우 (일시 정지 → 검토 → 진행)
사람의 개입이 필요한 워크플로우를 구현합니다.
from datetime import datetime
import time
class HumanInLoopState(TypedDict):
task: str
response: str
user_review: str
status: str
timestamp: str
# 작업 생성 노드
def create_task_node(state: HumanInLoopState):
# 작업 생성
task_description = generate_task_description(state["task"])
return {
"response": task_description,
"status": "awaiting_review",
"timestamp": datetime.now().isoformat()
}
# 일시 정지 노드
def pause_node(state: HumanInLoopState):
# 사용자에게 검토 요청
print(f"사용자 검토 필요: {state['response']}")
time.sleep(2) # 실제 구현에서는 웹훅 또는 이벤트 시스템 사용
return {"status": "reviewed"}
# 검토 노드
def review_node(state: HumanInLoopState):
# 사용자 검토 결과 처리
user_feedback = get_user_feedback() # 실제 구현에서 이벤트 처리
return {"user_review": user_feedback}
# 진행 노드
def continue_node(state: HumanInLoopState):
# 사용자 피드백에 따라 처리
if "reject" in state["user_review"].lower():
return {"status": "rejected"}
else:
return {"status": "approved"}
# 워크플로우 정의
workflow = StateGraph(HumanInLoopState)
workflow.add_node("create_task", create_task_node)
workflow.add_node("pause", pause_node)
workflow.add_node("review", review_node)
workflow.add_node("continue", continue_node)
workflow.set_entry_point("create_task")
workflow.add_edge("create_task", "pause")
workflow.add_edge("pause", "review")
workflow.add_edge("review", "continue")
def route_continue(state: HumanInLoopState):
if state["status"] == "rejected":
return "create_task" # 재시작
else:
return END
workflow.add_conditional_edges("continue", route_continue)
템플릿 5: 병렬 실행 에이전트 (분기 → 처리 → 집계)
여러 작업을 동시에 처리하고 결과를 집계합니다.
python
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelAgentState(TypedDict):
tasks: List[str]
parallel_results: List[Dict]
aggregated_result: str
# 분기 노드
def fan_out_node(state: ParallelAgentState):
# 작업 분기
tasks = state["tasks"]
return {"parallel_results": []}
# 병렬 처리 노드
def process_parallel_node(state: ParallelAgentState):
def process_task(task):
# 각 작업 병렬 처리
result = execute_task(task)
return {"task": task, "result": result}
# 병렬 실행
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(process_task, task) for task in state["tasks"]]
results = [future.result() for future in futures]
return {"parallel_results": results}
# 집계 노드
def aggregate_node(state: ParallelAgentState):
# 결과 집계
aggregated = aggregate_results(state["parallel_results"])
return {"aggregated_result": aggregated}
# 워크플로우 정의
workflow = StateGraph(ParallelAgentState)
workflow.add_node("fan_out", fan_out_node)
workflow.add_node("process", process_parallel
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Top comments (0)