DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v36)

LangGraph 워크플로우 템플릿 (v36)

1. LangGraph 아키텍처 개요

LangGraph는 상태 기반 워크플로우 엔진으로, 다음과 같은 핵심 구성 요소로 작동합니다:

  • Nodes (노드): 각 단계의 처리 로직
  • Edges (엣지): 노드 간의 전이 조건
  • State (상태): 워크플로우 내 모든 정보 저장
  • Checkpointing (체크포인팅): 상태 저장/복원 기능
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator

class GraphState(TypedDict):
    messages: Annotated[list, operator.add]

# 기본 워크플로우 구조
workflow = StateGraph(GraphState)
Enter fullscreen mode Exit fullscreen mode

2. 템플릿 1: 간단한 RAG 에이전트 (검색 → 생성 → 검증)

실제 문서 검색 후 답변 생성 시스템:

from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser

class RAGAgent:
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm

    def retrieve(self, state):
        question = state["messages"][-1]["content"]
        docs = self.vectorstore.similarity_search(question, k=3)
        return {"retrieved_docs": docs}

    def generate(self, state):
        context = "\n".join([doc.page_content for doc in state["retrieved_docs"]])
        prompt = PromptTemplate.from_template(
            "다음 문맥을 기반으로 질문에 답변하세요:\n{context}\n질문: {question}"
        )
        chain = prompt | self.llm | StrOutputParser()
        answer = chain.invoke({
            "context": context,
            "question": state["messages"][-1]["content"]
        })
        return {"answer": answer}

    def validate(self, state):
        # 답변 검증 로직
        return {"validated": True}

    def build_graph(self):
        workflow = StateGraph(GraphState)

        workflow.add_node("retrieve", self.retrieve)
        workflow.add_node("generate", self.generate)
        workflow.add_node("validate", self.validate)

        workflow.set_entry_point("retrieve")
        workflow.add_edge("retrieve", "generate")
        workflow.add_edge("generate", "validate")
        workflow.add_edge("validate", END)

        return workflow.compile()

# 사용 예시
rag_agent = RAGAgent(vectorstore, ChatOpenAI(model="gpt-4"))
graph = rag_agent.build_graph()
Enter fullscreen mode Exit fullscreen mode

3. 템플릿 2: 다중 도구 에이전트 (계획 → 실행 → 관찰 → 결정)

다양한 도구를 활용한 에이전트 시스템:

from langchain.tools import Tool
from langchain_core.tools import tool
import json

class ToolAgent:
    def __init__(self, tools):
        self.tools = tools

    def plan(self, state):
        # 계획 생성
        return {"plan": ["tool1", "tool2"]}

    def execute(self, state):
        # 도구 실행
        executed_tools = []
        for tool_name in state["plan"]:
            tool = next((t for t in self.tools if t.name == tool_name), None)
            if tool:
                result = tool.run(state["messages"][-1]["content"])
                executed_tools.append({"tool": tool_name, "result": result})
        return {"executed_tools": executed_tools}

    def observe(self, state):
        # 실행 결과 관찰
        return {"observations": "실행 완료"}

    def decide(self, state):
        # 결정 로직
        return {"final_decision": "continue"}

    def build_graph(self):
        workflow = StateGraph(GraphState)

        workflow.add_node("plan", self.plan)
        workflow.add_node("execute", self.execute)
        workflow.add_node("observe", self.observe)
        workflow.add_node("decide", self.decide)

        workflow.set_entry_point("plan")
        workflow.add_edge("plan", "execute")
        workflow.add_edge("execute", "observe")
        workflow.add_edge("observe", "decide")
        workflow.add_edge("decide", END)

        return workflow.compile()

# 도구 정의 예시
tools = [
    Tool(
        name="search",
        func=lambda x: f"검색 결과: {x}",
        description="검색 도구"
    ),
    Tool(
        name="calculate",
        func=lambda x: f"계산 결과: {eval(x)}",
        description="계산 도구"
    )
]
Enter fullscreen mode Exit fullscreen mode

4. 템플릿 3: 인간-중개 워크플로우 (중지 → 검토 → 계속)

사용자 검토를 포함한 작업 흐름:

import asyncio
from datetime import datetime

class HumanInLoopAgent:
    def __init__(self, llm):
        self.llm = llm

    def process(self, state):
        # 기본 처리
        return {"processed_data": "처리된 데이터"}

    def pause_for_review(self, state):
        # 중지 및 검토
        return {"status": "awaiting_review"}

    def review(self, state):
        # 검토 상태
        return {"review_status": "approved"}

    def continue_processing(self, state):
        # 계속 진행
        return {"status": "processing"}

    def build_graph(self):
        workflow = StateGraph(GraphState)

        workflow.add_node("process", self.process)
        workflow.add_node("pause", self.pause_for_review)
        workflow.add_node("review", self.review)
        workflow.add_node("continue", self.continue_processing)

        workflow.set_entry_point("process")
        workflow.add_edge("process", "pause")
        workflow.add_edge("pause", "review")
        workflow.add_edge("review", "continue")
        workflow.add_edge("continue", END)

        return workflow.compile()

# 인간 검토를 위한 특별 처리
def human_review_callback(state):
    # 외부 시스템과 통신
    print("사용자 검토 요청:", state["messages"][-1]["content"])
    # 실제 구현에서는 API 호출이나 이메일 알림 필요
    return {"review_requested": True}
Enter fullscreen mode Exit fullscreen mode

5. 템플릿 5: 병렬 실행 에이전트 (분기 → 처리 → 집계)

병렬 작업 처리 및 결과 집계:

class ParallelAgent:
    def __init__(self, llm):
        self.llm = llm

    def fan_out(self, state):
        # 작업 분기
        return {"tasks": ["task1", "task2", "task3"]}

    def process_parallel(self, state):
        # 병렬 처리
        tasks = state["tasks"]
        results = []
        for task in tasks:
            # 각 작업 병렬 처리
            result = f"결과: {task}"
            results.append({"task": task, "result": result})
        return {"results": results}

    def aggregate(self, state):
        # 결과 집계
        aggregated = "\n".join([r["result"] for r in state["results"]])
        return {"aggregated_result": aggregated}

    def build_graph(self):
        workflow = StateGraph(GraphState)

        workflow.add_node("fan_out", self.fan_out)
        workflow.add_node("process_parallel", self.process_parallel)
        workflow.add_node("aggregate", self.aggregate)

        workflow.set_entry_point("fan_out")
        workflow.add_edge("fan_out", "process_parallel")
        workflow.add_edge("process_parallel", "aggregate")
        workflow.add_edge("aggregate", END)

        return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

6. 상태 관리 패턴

상태를 효율적으로 관리하고 저장하는 방법:

from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
from langchain_core.runnables import RunnableConfig

class StateManager:
    def __init__(self, checkpoint_backend=None):
        self.checkpoint_backend = checkpoint_backend or MemorySaver()

    def create_workflow_with_checkpoint(self, workflow_builder):
        return workflow_builder.with_config(
            configurable={
                "thread_id": "12345"
            }
        )

    def get_state(self, graph, thread_id):
        # 현재 상태 조회
        return graph.get_state({"configurable": {"thread_id": thread_id}})

    def update_state(self, graph, thread_id, update_data):
        # 상태 업데이트
        return graph.update_state(
            {"configurable": {"thread_id": thread_id}},
            update_data
        )

# 사용 예시
state_manager = StateManager(SqliteSaver("checkpoints.db"))
Enter fullscreen mode Exit fullscreen mode

7. 스트리밍 및 실시간 업데이트

실시간 데이터 스트리밍 처리:


python
import asyncio
from typing import AsyncGenerator

class StreamingAgent:
    async def stream_process(self, state):
        # 스트리밍 처리
        messages = state

---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)