DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v27)

# LangGraph 워크플로우 템플릿 (v27)
# Python 개발자를 위한 실용적인 LangChain/LangGraph 워크플로우 템플릿 가이드

## 1. LangGraph 아키텍처 개요

LangGraph는 상태 기반 워크플로우 시스템으로, 노드(Node), 엣지(Edge), 상태(State), 체크포인트(Checkpointing) 구성됩니다.

Enter fullscreen mode Exit fullscreen mode


python
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_core.messages import Message
import operator

상태 정의

class GraphState(TypedDict):
messages: Annotated[list[Message], operator.add]
# 추가 상태 필드

노드 정의

def retrieve_node(state):
# 검색 로직
return {"messages": [Message(content="검색 결과")]}

def generate_node(state):
# 생성 로직
return {"messages": [Message(content="생성 결과")]}

워크플로우 생성

workflow = StateGraph(GraphState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", END)


## 2. 템플릿 1: 단순 RAG 에이전트 (검색 → 생성 → 검증)

Enter fullscreen mode Exit fullscreen mode


python
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI

class RAGState(TypedDict):
question: str
context: str
answer: str
validation: bool

class RAGAgent:
def init(self, llm, vector_store):
self.llm = llm
self.vector_store = vector_store

def retrieve(self, state):
    question = state["question"]
    # 벡터 저장소에서 관련 문서 검색
    docs = self.vector_store.similarity_search(question, k=3)
    context = "\n".join([doc.page_content for doc in docs])
    return {"context": context}

def generate(self, state):
    prompt = f"""
    질문: {state['question']}
    컨텍스트: {state['context']}
    """
    response = self.llm.invoke(prompt)
    return {"answer": response.content}

def validate(self, state):
    # 응답 검증
    validation_result = self.check_answer_quality(state["answer"])
    return {"validation": validation_result}

def check_answer_quality(self, answer):
    # 간단한 품질 검증
    if len(answer) < 10:
        return False
    return True
Enter fullscreen mode Exit fullscreen mode

워크플로우 정의

rag_workflow = StateGraph(RAGState)
rag_workflow.add_node("retrieve", lambda s: RAGAgent(None, None).retrieve(s))
rag_workflow.add_node("generate", lambda s: RAGAgent(None, None).generate(s))
rag_workflow.add_node("validate", lambda s: RAGAgent(None, None).validate(s))

엣지 정의

rag_workflow.add_edge("retrieve", "generate")
rag_workflow.add_edge("generate", "validate")
rag_workflow.add_edge("validate", END)

실행

rag_app = rag_workflow.compile()


## 3. 템플릿 2: 멀티-도구 에이전트 (계획 → 실행 → 관찰 → 결정)

Enter fullscreen mode Exit fullscreen mode


python
from typing import List
from langchain.tools import Tool
from langchain_core.runnables import RunnableLambda

class ToolAgentState(TypedDict):
plan: List[str]
execution_log: List[str]
observations: List[str]
final_decision: str

class ToolAgent:
def init(self, tools: List[Tool]):
self.tools = tools

def plan(self, state):
    # 문제 분석 및 도구 계획
    plan = [
        "1. 정보 수집",
        "2. 도구 실행",
        "3. 결과 분석",
        "4. 최종 결정"
    ]
    return {"plan": plan}

def execute(self, state):
    # 도구 실행
    execution_log = []
    for tool in self.tools[:2]:  # 간단한 실행
        try:
            result = tool.invoke("실행 매개변수")
            execution_log.append(f"{tool.name}: {result}")
        except Exception as e:
            execution_log.append(f"{tool.name}: 실패 - {str(e)}")
    return {"execution_log": execution_log}

def observe(self, state):
    # 실행 결과 관찰
    observations = [f"관찰 {i}: {log}" for i, log in enumerate(state["execution_log"])]
    return {"observations": observations}

def decide(self, state):
    # 최종 결정
    if len(state["execution_log"]) > 0:
        decision = "작업 완료"
    else:
        decision = "작업 실패"
    return {"final_decision": decision}
Enter fullscreen mode Exit fullscreen mode

워크플로우 정의

tool_workflow = StateGraph(ToolAgentState)
tool_workflow.add_node("plan", lambda s: ToolAgent([]).plan(s))
tool_workflow.add_node("execute", lambda s: ToolAgent([]).execute(s))
tool_workflow.add_node("observe", lambda s: ToolAgent([]).observe(s))
tool_workflow.add_node("decide", lambda s: ToolAgent([]).decide(s))

tool_workflow.add_edge("plan", "execute")
tool_workflow.add_edge("execute", "observe")
tool_workflow.add_edge("observe", "decide")
tool_workflow.add_edge("decide", END)


## 4. 템플릿 3: 인간-중개 워크플로우 (일시정지 → 검토 → 계속)

Enter fullscreen mode Exit fullscreen mode


python
from langchain_core.messages import ToolMessage
import asyncio

class HumanInLoopState(TypedDict):
task_description: str
intermediate_result: str
human_feedback: str
is_approved: bool

class HumanInLoopAgent:
def init(self, llm):
self.llm = llm

async def process_task(self, state):
    # 작업 처리
    response = self.llm.invoke(state["task_description"])
    return {"intermediate_result": response.content}

async def await_human_feedback(self, state):
    # 인간 피드백 대기
    print("작업 결과를 검토하세요:")
    print(state["intermediate_result"])
    print("승인 (y) 또는 거절 (n): ")

    # 실제 구현에서는 웹훅이나 큐 시스템 사용
    feedback = "y"  # 테스트용
    is_approved = feedback.lower() == "y"

    return {"human_feedback": feedback, "is_approved": is_approved}
Enter fullscreen mode Exit fullscreen mode

워크플로우 정의

human_loop_workflow = StateGraph(HumanInLoopState)
human_loop_workflow.add_node("process", lambda s: HumanInLoopAgent(None).process_task(s))
human_loop_workflow.add_node("wait_for_feedback", lambda s: HumanInLoopAgent(None).await_human_feedback(s))

human_loop_workflow.add_edge("process", "wait_for_feedback")
human_loop_workflow.add_conditional_edges(
"wait_for_feedback",
lambda s: "approved" if s["is_approved"] else "rejected",
{
"approved": END,
"rejected": "process" # 다시 시작
}
)


## 5. 템플릿 5: 병렬 실행 에이전트 (분기 → 처리 → 집계)

Enter fullscreen mode Exit fullscreen mode


python
from concurrent.futures import ThreadPoolExecutor
import asyncio

class ParallelExecutionState(TypedDict):
input_data: List[str]
parallel_results: List[dict]
aggregated_result: dict

class ParallelAgent:
def init(self, llm):
self.llm = llm

def fan_out(self, state):
    # 데이터 분기
    tasks = state["input_data"][:3]  # 첫 3개만 처리
    return {"parallel_results": []}

def process_item(self, item):
    # 개별 아이템 처리
    try:
        result = self.llm.invoke(f"처리 요청: {item}")
        return {"item": item, "result": result.content, "status": "success"}
    except Exception as e:
        return {"item": item, "result": str(e), "status": "error"}

def aggregate(self, state):
    # 결과 집계
    results = state["parallel_results"]
    success_count = sum(1 for r in results if r["status"] == "success")
    error_count = len(results) - success_count

    return {
        "aggregated_result": {
            "total": len(results),
            "success": success_count,
            "errors": error_count,
            "details": results
        }
    }
Enter fullscreen mode Exit fullscreen mode

워크플로우 정의

parallel_workflow = StateGraph(ParallelExecutionState)
parallel_workflow.add_node("fan_out", lambda s: ParallelAgent(None).fan_out(s))
parallel_workflow.add_node("process", lambda s: ParallelAgent(None).process_item(s))
parallel_workflow.add_node("aggregate", lambda s: ParallelAgent(None).aggregate(s))

parallel_workflow.add_edge("fan_out", "process")
parallel_workflow.add_edge("process", "aggregate")
parallel_workflow.add_edge("aggregate", END)


## 6. 상태 관리 패턴

Enter fullscreen mode Exit fullscreen mode


python
from langgraph.checkpoint.memory import MemoryS


📥 Get the full guide on Gumroad: https://gumroad.com/l/auto ($5)

Top comments (0)