LangGraph 워크플로우 템플릿 (v22)
재사용 가능한 LangGraph 템플릿 가이드
Python 개발자들을 위한 LangChain/LangGraph 기반 AI 에이전트 구축을 위한 실용적인 워크플로우 템플릿 모음
1. LangGraph 아키텍처 개요
LangGraph는 상태 기반 워크플로우를 위한 고급 프레임워크로, 다음과 같은 핵심 구성 요소를 포함합니다:
Nodes (노드): 각각의 작업 단위. 함수나 클래스로 정의됩니다.
Edges (엣지): 노드 간의 실행 흐름을 정의합니다.
State (상태): 워크플로우에서 공유되는 상태 데이터 구조.
Checkpointing (체크포인트): 상태 저장 및 복원 기능.
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator
class State(TypedDict):
messages: Annotated[list, operator.add]
graph = StateGraph(State)
2. 템플릿 1: 간단한 RAG 에이전트 (검색 → 생성 → 검증)
실제 문서 검색 및 생성 시나리오에서 유용한 기본 RAG 워크플로우입니다:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
def retrieve(state):
# 검색 로직
query = state["messages"][-1].content
# 여기서 벡터 데이터베이스 검색 로직 추가
return {"retrieved_docs": ["문서 내용 1", "문서 내용 2"]}
def generate(state):
# 생성 로직
docs = state["retrieved_docs"]
prompt = PromptTemplate.from_template(
"다음 문서를 기반으로 질문에 답하세요: {docs} 질문: {query}"
)
chain = prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()
response = chain.invoke({
"docs": "\n".join(docs),
"query": state["messages"][-1].content
})
return {"generated_response": response}
def validate(state):
# 응답 검증 로직
response = state["generated_response"]
# 간단한 유효성 검사
if len(response) < 10:
return {"validated": False, "error": "응답이 너무 짧습니다"}
return {"validated": True}
# 그래프 구성
workflow = StateGraph(State)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)
workflow.add_node("validate", validate)
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.add_conditional_edges(
"validate",
lambda x: "validated" in x and x["validated"],
{"validated": END, "invalid": "generate"}
)
3. 템플릿 2: 멀티-도구 에이전트 (계획 → 실행 → 관찰 → 결정)
복잡한 작업을 분석하고 여러 도구를 순차적으로 사용하는 에이전트:
from langchain.tools import Tool
from langchain_openai import ChatOpenAI
def plan(state):
# 작업 계획 생성
prompt = "다음 작업을 수행할 계획을 세우세요: {query}"
plan_result = ChatOpenAI(model="gpt-4").invoke(prompt)
return {"plan": plan_result.content}
def execute(state):
# 계획 실행 (도구 호출)
plan = state["plan"]
# 도구 실행 로직 (예: API 호출 등)
return {"executed": True, "tool_output": "도구 실행 결과"}
def observe(state):
# 실행 결과 관찰
tool_output = state["tool_output"]
return {"observation": f"도구 출력: {tool_output}"}
def decide(state):
# 결정 로직
observation = state["observation"]
# 판단 로직 (예: 성공/실패 여부)
return {"decision": "continue" if "성공" in observation else "retry"}
# 도구 정의
tools = [
Tool(
name="search",
func=lambda x: f"검색 결과: {x}",
description="웹 검색을 수행합니다"
)
]
# 워크플로우 구성
workflow = StateGraph(State)
workflow.add_node("plan", plan)
workflow.add_node("execute", execute)
workflow.add_node("observe", observe)
workflow.add_node("decide", decide)
workflow.set_entry_point("plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.add_conditional_edges(
"decide",
lambda x: x["decision"],
{"continue": END, "retry": "plan"}
)
4. 템플릿 3: 인간-중개 워크플로우 (일시정지 → 검토 → 계속)
사용자 검토가 필요한 민감한 작업이나 중요한 결정 시 사용:
import asyncio
from typing import Optional
class HumanReviewState(TypedDict):
messages: list
pending_review: bool
review_approved: Optional[bool]
async def pause_for_review(state):
# 리뷰 대기
print("사용자 검토를 위해 일시정지...")
return {"pending_review": True}
async def review_and_continue(state):
# 사용자 검토 처리
# 이 부분은 실제 UI나 외부 시스템 통신으로 연결됨
user_input = input("승인하시겠습니까? (y/n): ")
approved = user_input.lower() == 'y'
return {"review_approved": approved, "pending_review": False}
async def human_in_the_loop_workflow():
workflow = StateGraph(HumanReviewState)
workflow.add_node("process", process_task)
workflow.add_node("pause", pause_for_review)
workflow.add_node("review", review_and_continue)
workflow.set_entry_point("process")
workflow.add_edge("process", "pause")
workflow.add_edge("pause", "review")
workflow.add_conditional_edges(
"review",
lambda x: x["review_approved"],
{"True": END, "False": "process"} # 재시작
)
return workflow.compile()
async def process_task(state):
# 작업 처리 로직
return {"messages": state["messages"] + ["작업 완료"]}
5. 템플릿 5: 병렬 실행 에이전트 (분기 → 처리 → 집계)
하나의 입력을 여러 경로로 분기하여 병렬로 처리 후 결과 집계:
from concurrent.futures import ThreadPoolExecutor
import asyncio
def fan_out(state):
# 입력 분기
query = state["messages"][-1].content
# 여러 하위 작업 생성
return {"sub_tasks": [
{"task": "분석1", "input": query},
{"task": "분석2", "input": query},
{"task": "분석3", "input": query}
]}
def process_parallel(state):
# 병렬 처리
sub_tasks = state["sub_tasks"]
def worker(task):
# 각각의 작업 처리
return f"{task['task']} 결과"
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(worker, sub_tasks))
return {"results": results}
def aggregate(state):
# 결과 집계
results = state["results"]
final_result = " | ".join(results)
return {"final_output": final_result}
# 그래프 구성
workflow = StateGraph(State)
workflow.add_node("fan_out", fan_out)
workflow.add_node("process_parallel", process_parallel)
workflow.add_node("aggregate", aggregate)
workflow.set_entry_point("fan_out")
workflow.add_edge("fan_out", "process_parallel")
workflow.add_edge("process_parallel", "aggregate")
workflow.add_edge("aggregate", END)
6. 상태 관리 패턴
상태를 효과적으로 관리하기 위한 패턴:
# 상태 정의
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
step_count: int
error_count: int
last_error: Optional[str]
# 상태 업데이트 함수
def update_state(state, updates):
return {**state, **updates}
# 체크포인트 저장
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
graph = workflow.compile(checkpointer=checkpointer)
# 체크포인트 복원
for checkpoint in graph.stream(input_state, config={"configurable": {"thread_id": "1"}}):
print(checkpoint)
7. 스트리밍 및 실시간 업데이트
실시간 응답을 제공하기 위한 스트리밍 기능:
python
from langchain_core.callbacks import BaseCallbackHandler
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Top comments (0)