DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v29)

LangGraph 워크플로우 템플릿 (v29)

1. LangGraph 아키텍처 개요

LangGraph는 상태 기반 워크플로우 엔진으로, 노드(Node), 엣지(Edge), 상태(State), 체크포인트(Checkpointing)로 구성됩니다.

핵심 구성 요소:

  • Node: 작업 단위 (예: RAG 검색, 툴 실행)
  • Edge: 노드 간 흐름 제어 (조건부 경로)
  • State: 전체 워크플로우 상태 (context, history, variables)
  • Checkpointing: 실패 복구 및 상태 저장
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator

class GraphState(TypedDict):
    messages: Annotated[list, operator.add]
    context: str
    step: int

workflow = StateGraph(GraphState)
Enter fullscreen mode Exit fullscreen mode

2. 템플릿 1: 간단한 RAG 에이전트

실제 개발에서 가장 흔한 패턴. 검색 → 생성 → 검증 단계로 구성됩니다.

from langchain_core.messages import HumanMessage
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain_core.prompts import PromptTemplate

class RAGState(TypedDict):
    question: str
    context: str
    answer: str
    retrieved_docs: list

def retrieve(state: RAGState):
    # Chroma 검색
    vectorstore = Chroma(persist_directory="./chroma_db")
    docs = vectorstore.similarity_search(state["question"], k=3)
    return {"context": "\n".join([doc.page_content for doc in docs])}

def generate(state: RAGState):
    prompt = PromptTemplate.from_template("""
    다음 질문에 답해주세요:
    {question}

    참고 문서:
    {context}

    답변:
    """)

    # 실제 LLM 호출
    llm = ChatOpenAI(model="gpt-4")
    response = llm.invoke(prompt.format(question=state["question"], context=state["context"]))
    return {"answer": response.content}

def validate(state: RAGState):
    # 답변 검증 로직
    if len(state["answer"]) < 20:
        return {"answer": "답변이 너무 짧습니다. 다시 시도해주세요."}
    return {}

# 워크플로우 정의
workflow = StateGraph(RAGState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)
workflow.add_node("validate", validate)

workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.set_entry_point("retrieve")
workflow.set_finish_point("validate")
Enter fullscreen mode Exit fullscreen mode

3. 템플릿 2: 다중 툴 에이전트

계획 → 실행 → 관찰 → 결정 프로세스로 구성되는 복잡한 에이전트입니다.

from langchain.tools import Tool
from typing import List
import json

class ToolAgentState(TypedDict):
    task: str
    plan: List[str]
    execution_results: List[str]
    current_tool: str
    tool_input: str
    decision: str

class ToolExecutor:
    def __init__(self):
        self.tools = {
            "calculator": Tool(
                name="calculator",
                func=lambda x: str(eval(x)) if x.replace(" ", "").replace(".", "").isdigit() else "Invalid input",
                description="수학 계산기"
            ),
            "web_search": Tool(
                name="web_search",
                func=lambda q: f"검색 결과: {q}",
                description="웹 검색"
            )
        }

def plan(state: ToolAgentState):
    # 작업 분석 및 계획 수립
    if "calculate" in state["task"]:
        plan = ["calculator", "validate_result"]
    else:
        plan = ["web_search", "validate_result"]
    return {"plan": plan}

def execute(state: ToolAgentState):
    # 다음 툴 실행
    tool_name = state["plan"][0]
    tool = ToolExecutor().tools[tool_name]

    # 툴 실행
    result = tool.run(state["task"])
    return {
        "execution_results": [result],
        "current_tool": tool_name,
        "tool_input": state["task"]
    }

def observe(state: ToolAgentState):
    # 실행 결과 분석
    if state["current_tool"] == "calculator":
        try:
            float(state["execution_results"][0])
            return {"decision": "success"}
        except:
            return {"decision": "retry"}
    return {"decision": "success"}

def decide(state: ToolAgentState):
    if state["decision"] == "retry":
        return "execute"
    return END

# 워크플로우 구성
workflow = StateGraph(ToolAgentState)
workflow.add_node("plan", plan)
workflow.add_node("execute", execute)
workflow.add_node("observe", observe)

workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_conditional_edges("observe", decide, {"execute": "execute", "end": END})
workflow.set_entry_point("plan")
Enter fullscreen mode Exit fullscreen mode

4. 템플릿 3: 인간-중개 워크플로우

사람의 검토와 승인을 포함한 중요한 결정 프로세스입니다.

from enum import Enum
from datetime import datetime

class ApprovalStatus(str, Enum):
    PENDING = "pending"
    APPROVED = "approved"
    REJECTED = "rejected"

class HumanInLoopState(TypedDict):
    task: str
    generated_output: str
    user_review: str
    approval_status: ApprovalStatus
    timestamp: datetime

def generate_with_review(state: HumanInLoopState):
    # 초기 생성
    llm = ChatOpenAI(model="gpt-4")
    prompt = f"다음 작업을 수행해주세요:\n{state['task']}"
    result = llm.invoke(prompt)

    return {
        "generated_output": result.content,
        "approval_status": ApprovalStatus.PENDING
    }

def human_review(state: HumanInLoopState):
    # 인간 리뷰 대기 (실제 구현에서는 API 또는 이벤트 시스템 사용)
    # 이 부분은 실제 애플리케이션에서 웹훅 또는 DB 업데이트로 처리
    return {"approval_status": ApprovalStatus.PENDING}

def process_approval(state: HumanInLoopState):
    # 승인/거부 처리
    if state["approval_status"] == ApprovalStatus.APPROVED:
        return {"generated_output": state["generated_output"]}
    else:
        return {"generated_output": "사용자에 의해 거부된 작업"}

# 인간-중개 워크플로우
workflow = StateGraph(HumanInLoopState)
workflow.add_node("generate", generate_with_review)
workflow.add_node("review", human_review)
workflow.add_node("process", process_approval)

workflow.add_edge("generate", "review")
workflow.add_conditional_edges("review", 
    lambda s: "approved" if s["approval_status"] == ApprovalStatus.APPROVED else "rejected",
    {"approved": "process", "rejected": "generate"}
)
workflow.set_entry_point("generate")
Enter fullscreen mode Exit fullscreen mode

5. 템플릿 5: 병렬 실행 에이전트

Fan-out → 프로세싱 → 집계 구조로 빠른 처리가 필요한 경우 사용합니다.

from concurrent.futures import ThreadPoolExecutor
import asyncio

class ParallelProcessingState(TypedDict):
    tasks: List[str]
    results: List[dict]
    aggregated_result: dict

def fan_out(state: ParallelProcessingState):
    # 작업 분할 (fan-out)
    tasks = state["tasks"]
    return {"results": []}

def process_task(state: ParallelProcessingState, task: str):
    # 각 작업 처리
    llm = ChatOpenAI(model="gpt-4")
    result = llm.invoke(f"처리 작업: {task}")
    return {"task": task, "output": result.content}

async def parallel_process(state: ParallelProcessingState):
    # 병렬 처리
    tasks = state["tasks"]

    # 병렬로 실행 (ThreadPoolExecutor 사용)
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(process_task, state, task) for task in tasks]
        results = [future.result() for future in futures]

    return {"results": results}

def aggregate_results(state: ParallelProcessingState):
    # 결과 집계
    aggregated = {}
    for result in state["results"]:
        if result["task"] in aggregated:
            aggregated[result["task"]].append(result["output"])
        else:
            aggregated[result["task"]] = [result["output"]]

    return {"aggregated_result": aggregated}

# 병렬 처리 워크플로우
workflow = StateGraph(ParallelProcessingState)
workflow.add_node("fan_out", fan_out)
workflow.add_node("parallel_process", parallel_process)
workflow.add_node("aggregate", aggregate_results)

workflow.add_edge("fan_out", "parallel_process")
workflow.add_edge("parallel_process", "aggregate")
workflow.set_entry_point("fan_out")
Enter fullscreen mode Exit fullscreen mode

6. 상태 관리 패턴

실제


📥 Get the full guide on Gumroad: https://gumroad.com/l/auto ($5)

Top comments (0)