DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v7)

LangGraph 워크플로우 템플릿 (v7)

LangGraph 아키텍처 개요

LangGraph는 상태 기반 워크플로우를 구현하는 강력한 프레임워크입니다. 핵심 구성 요소는 다음과 같습니다:

  • Nodes: 각 단계별 작업 (함수 또는 클래스)
  • Edges: 노드 간의 전이 조건
  • State: 워크플로우 상태 관리 (Pydantic 모델)
  • Checkpointing: 상태 저장/복원 기능
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator

class State(TypedDict):
    messages: Annotated[list, operator.add]
    context: dict

# 기본 워크플로우 구조
graph = StateGraph(State)
Enter fullscreen mode Exit fullscreen mode

템플릿 1: 간단한 RAG 에이전트

문제: 문서 검색 후 생성 및 검증이 필요한 경우

from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

class SimpleRAGAgent:
    def __init__(self, vector_store, llm):
        self.vector_store = vector_store
        self.llm = llm

    def retrieve(self, state):
        query = state["messages"][-1].content
        docs = self.vector_store.similarity_search(query, k=3)
        return {"context": docs}

    def generate(self, state):
        prompt = PromptTemplate.from_template("""
        다음 문서를 기반으로 질문에 답하세요:
        {context}
        질문: {question}
        """)

        chain = prompt | self.llm | StrOutputParser()
        response = chain.invoke({
            "context": "\n".join([doc.page_content for doc in state["context"]]),
            "question": state["messages"][-1].content
        })

        return {"messages": [response]}

    def validate(self, state):
        # 단순 검증: 응답이 관련성 있는지 확인
        if len(state["messages"][-1].content) < 20:
            return {"messages": ["요청이 너무 짧습니다. 다시 시도해 주세요."]}
        return state

    def build_graph(self):
        workflow = StateGraph(State)

        workflow.add_node("retrieve", self.retrieve)
        workflow.add_node("generate", self.generate)
        workflow.add_node("validate", self.validate)

        workflow.set_entry_point("retrieve")
        workflow.add_edge("retrieve", "generate")
        workflow.add_edge("generate", "validate")
        workflow.add_edge("validate", END)

        return workflow.compile()

# 사용 예시
rag_agent = SimpleRAGAgent(vector_store, ChatOpenAI(model="gpt-4"))
app = rag_agent.build_graph()
Enter fullscreen mode Exit fullscreen mode

템플릿 2: 멀티-도구 에이전트

문제: 복잡한 작업을 계획하고 실행하는 경우

import json
from typing import List
from langchain_core.tools import Tool

class MultiToolAgent:
    def __init__(self, tools: List[Tool]):
        self.tools = {tool.name: tool for tool in tools}

    def plan(self, state):
        # 작업 계획 생성
        prompt = PromptTemplate.from_template("""
        다음 작업을 수행하기 위한 계획을 세우세요:
        {question}
        사용 가능한 도구: {tools}
        """)

        plan = self.llm.invoke({
            "question": state["messages"][-1].content,
            "tools": list(self.tools.keys())
        })

        return {"plan": plan.content}

    def execute(self, state):
        # 계획 실행
        plan = json.loads(state["plan"])
        results = []

        for step in plan["steps"]:
            tool_name = step["tool"]
            tool_input = step["input"]

            if tool_name in self.tools:
                try:
                    result = self.tools[tool_name].run(tool_input)
                    results.append(f"{tool_name}: {result}")
                except Exception as e:
                    results.append(f"{tool_name}: 오류 - {str(e)}")

        return {"execution_results": results}

    def observe(self, state):
        # 실행 결과 관찰
        return {"status": "completed"}

    def decide(self, state):
        # 다음 단계 결정
        if any("오류" in result for result in state["execution_results"]):
            return {"next_step": "retry"}
        return {"next_step": "continue"}

# 사용 예시
tools = [
    Tool(name="search", func=search_function, description="웹 검색"),
    Tool(name="calculate", func=calculate_function, description="계산")
]

multi_tool_agent = MultiToolAgent(tools)
workflow = StateGraph(State)
Enter fullscreen mode Exit fullscreen mode

템플릿 3: 인간-중개 워크플로우

문제: 인간 검토가 필요한 중요한 결정을 내려야 할 때

from enum import Enum
from datetime import datetime

class ApprovalStatus(str, Enum):
    PENDING = "pending"
    APPROVED = "approved"
    REJECTED = "rejected"

class HumanInLoopAgent:
    def __init__(self, llm, approval_required=True):
        self.llm = llm
        self.approval_required = approval_required
        self.approval_requests = {}

    def pause_for_review(self, state):
        # 검토를 위해 일시 중지
        task_id = f"task_{datetime.now().timestamp()}"
        self.approval_requests[task_id] = {
            "state": state,
            "timestamp": datetime.now(),
            "status": ApprovalStatus.PENDING
        }

        return {
            "messages": [
                f"검토가 필요합니다. Task ID: {task_id}"
            ],
            "approval_required": True
        }

    def review(self, state):
        # 검토 처리
        task_id = state.get("task_id")
        if task_id in self.approval_requests:
            approval = state.get("approval_decision")
            if approval == "approve":
                self.approval_requests[task_id]["status"] = ApprovalStatus.APPROVED
            else:
                self.approval_requests[task_id]["status"] = ApprovalStatus.REJECTED

        return {"review_complete": True}

    def continue_workflow(self, state):
        # 워크플로우 계속 진행
        return {"messages": ["검토 완료, 작업 재개"]}

# 사용 예시
human_agent = HumanInLoopAgent(ChatOpenAI(model="gpt-4"))
workflow = StateGraph(State)
Enter fullscreen mode Exit fullscreen mode

템플릿 4: 병렬 실행 에이전트

문제: 여러 작업을 동시에 실행하고 결과를 집계할 때

from asyncio import gather
import asyncio

class ParallelExecutionAgent:
    def __init__(self, workers: List[callable]):
        self.workers = workers

    async def fan_out(self, state):
        # 병렬 실행 준비
        tasks = []
        for worker in self.workers:
            task = asyncio.create_task(worker(state))
            tasks.append(task)

        return {"parallel_tasks": tasks, "status": "running"}

    async def process(self, state):
        # 병렬 작업 처리
        tasks = state["parallel_tasks"]
        results = await gather(*tasks, return_exceptions=True)

        return {"results": results}

    def aggregate(self, state):
        # 결과 집계
        results = state["results"]
        summary = {}

        for i, result in enumerate(results):
            if isinstance(result, Exception):
                summary[f"worker_{i}"] = f"오류: {str(result)}"
            else:
                summary[f"worker_{i}"] = result

        return {"aggregated_results": summary}

# 사용 예시
async def worker1(state):
    return {"worker1_result": "완료"}

async def worker2(state):
    return {"worker2_result": "완료"}

parallel_agent = ParallelExecutionAgent([worker1, worker2])
Enter fullscreen mode Exit fullscreen mode

상태 관리 패턴

문제: 복잡한 상태를 효율적으로 관리해야 할 때

from typing import Optional, Any
from langgraph.checkpoint.memory import MemorySaver

class StateManager:
    def __init__(self, memory_saver: MemorySaver):
        self.memory_saver = memory_saver
        self.checkpoint_manager = CheckpointManager()

    def save_checkpoint(self, state: dict, thread_id: str):
        """체크포인트 저장"""
        self.memory_saver.save(state, thread_id)

    def load_checkpoint(self, thread_id: str) -> Optional[dict]:
        """체크포인트 로드"""
        return self.memory_saver.load(thread_id)

    def update_state(self, state: dict, updates: dict):
        """상태 업데이트"""
        for key, value in updates.items():
            if key in state:
                state[key] = value
        return state

# 상태 예시
class WorkflowState(TypedDict):
    messages: list
    metadata: dict
    errors: list
    checkpoints: dict
Enter fullscreen mode Exit fullscreen mode

스트리밍과 실시간 업데이트

문제: 실시간 피드백이 필요한


📥 Get the full guide on Gumroad: https://gumroad.com/l/auto ($5)

Top comments (0)