DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v28)

LangGraph 워크플로우 템플릿 (v28)

개요: LangGraph 아키텍처 이해

LangGraph는 LangChain과 함께 사용되는 강력한 상태 기반 워크플로우 엔진입니다. 그 핵심 구성 요소는 다음과 같습니다:

  • Nodes: 각 단계의 작업 (함수 또는 클래스)
  • Edges: 노드 간의 흐름 조정
  • State: 공유 상태 관리
  • Checkpointing: 상태 저장/복원 기능

템플릿 1: 간단한 RAG 에이전트

실제 개발에서 가장 흔한 패턴 중 하나입니다. 검색 → 생성 → 유효성 검사 순서로 작동합니다.

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langchain_community.vectorstores import FAISS
from langchain_core.runnables import RunnableLambda

class State(TypedDict):
    messages: Annotated[list, operator.add]
    context: str
    query: str

class RAGAgent:
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm

    def retrieve(self, state: State):
        docs = self.vectorstore.similarity_search(state["query"])
        context = "\n\n".join([doc.page_content for doc in docs])
        return {"context": context}

    def generate(self, state: State):
        prompt = f"""주어진 컨텍스트를 기반으로 질문에 답하세요:

Context: {state['context']}
Question: {state['query']}

답변:"""
        response = self.llm.invoke([HumanMessage(content=prompt)])
        return {"messages": [response]}

    def validate(self, state: State):
        # 간단한 유효성 검사 예시
        if len(state["messages"][-1].content) < 10:
            return {"messages": [AIMessage(content="답변이 너무 짧습니다. 다시 시도해주세요.")]}
        return {}

# 실행 코드
graph = StateGraph(State)
graph.add_node("retrieve", RunnableLambda(RAGAgent(vectorstore, llm).retrieve))
graph.add_node("generate", RunnableLambda(RAGAgent(vectorstore, llm).generate))
graph.add_node("validate", RunnableLambda(RAGAgent(vectorstore, llm).validate))

graph.set_entry_point("retrieve")
graph.add_edge("retrieve", "generate")
graph.add_edge("generate", "validate")
graph.add_edge("validate", END)
Enter fullscreen mode Exit fullscreen mode

템플릿 2: 다중 도구 에이전트

계획 → 실행 → 관찰 → 결정 구조. 복잡한 작업을 분할하고 조정하는 데 유용합니다.

import json
from typing import Dict, Any

class ToolAgent:
    def __init__(self):
        self.tools = {
            "web_search": self.web_search,
            "calculator": self.calculator,
            "weather": self.weather
        }

    def plan(self, state: State):
        # 사용자 입력을 기반으로 작업 계획 생성
        plan_prompt = f"""다음 작업을 수행하기 위해 필요한 도구를 선택하세요:

사용자 요청: {state['query']}
가능한 도구: {list(self.tools.keys())}

JSON 형식으로 응답:
{{
  "plan": [
    {{
      "tool": "도구명",
      "input": "입력값"
    }}
  ]
}}"""
        response = self.llm.invoke([HumanMessage(content=plan_prompt)])
        plan = json.loads(response.content)
        return {"plan": plan}

    def execute(self, state: State):
        results = []
        for step in state["plan"]["plan"]:
            tool = self.tools[step["tool"]]
            result = tool(step["input"])
            results.append({"tool": step["tool"], "result": result})
        return {"execution_results": results}

    def observe(self, state: State):
        # 실행 결과 분석
        analysis = f"""실행 결과 요약:
{json.dumps(state['execution_results'], indent=2, ensure_ascii=False)}"""
        return {"analysis": analysis}

    def decide(self, state: State):
        # 최종 결정
        decision_prompt = f"""다음 실행 결과를 바탕으로 최종 응답을 생성하세요:

분석: {state['analysis']}
사용자 요청: {state['query']}

최종 답변:"""
        response = self.llm.invoke([HumanMessage(content=decision_prompt)])
        return {"messages": [response]}

# 실행 코드
graph = StateGraph(State)
graph.add_node("plan", RunnableLambda(ToolAgent().plan))
graph.add_node("execute", RunnableLambda(ToolAgent().execute))
graph.add_node("observe", RunnableLambda(ToolAgent().observe))
graph.add_node("decide", RunnableLambda(ToolAgent().decide))

graph.set_entry_point("plan")
graph.add_edge("plan", "execute")
graph.add_edge("execute", "observe")
graph.add_edge("observe", "decide")
graph.add_edge("decide", END)
Enter fullscreen mode Exit fullscreen mode

템플릿 3: 인간-중개 워크플로우

사용자 리뷰 및 승인을 포함한 보안 작업 흐름입니다.

from datetime import datetime
import time

class HumanInLoopAgent:
    def __init__(self):
        self.pending_reviews = {}

    def process(self, state: State):
        # 작업 처리
        response = self.llm.invoke([HumanMessage(content=state["query"])])
        task_id = str(time.time())
        self.pending_reviews[task_id] = {
            "task": state["query"],
            "response": response.content,
            "status": "pending"
        }
        return {
            "messages": [response],
            "task_id": task_id,
            "status": "paused"
        }

    def review(self, state: State):
        # 리뷰 단계
        task_id = state.get("task_id")
        if not task_id:
            return {}

        # 실제 리뷰 로직은 외부 시스템에서 호출
        review_result = self.manual_review(task_id)
        self.pending_reviews[task_id]["status"] = review_result["status"]
        return {"review_result": review_result}

    def manual_review(self, task_id: str):
        # 시뮬레이션
        return {
            "status": "approved",
            "comment": "사용자 승인됨"
        }

# 실행 코드
graph = StateGraph(State)
graph.add_node("process", RunnableLambda(HumanInLoopAgent().process))
graph.add_node("review", RunnableLambda(HumanInLoopAgent().review))

graph.set_entry_point("process")
graph.add_edge("process", "review")
graph.add_edge("review", END)
Enter fullscreen mode Exit fullscreen mode

템플릿 5: 병렬 실행 에이전트

fan-out → process → aggregate 패턴으로 여러 작업을 동시에 처리합니다.

from concurrent.futures import ThreadPoolExecutor
import asyncio

class ParallelAgent:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4")

    def fan_out(self, state: State):
        # 병렬 작업 분할
        tasks = [
            {"name": "task1", "input": "분석 데이터 1"},
            {"name": "task2", "input": "분석 데이터 2"},
            {"name": "task3", "input": "분석 데이터 3"}
        ]
        return {"tasks": tasks}

    def process(self, state: State):
        # 각 작업 병렬 실행
        def process_task(task):
            prompt = f"작업 '{task['name']}' 처리: {task['input']}"
            response = self.llm.invoke([HumanMessage(content=prompt)])
            return {"task": task["name"], "result": response.content}

        # 병렬 처리
        with ThreadPoolExecutor(max_workers=3) as executor:
            futures = [executor.submit(process_task, task) for task in state["tasks"]]
            results = [future.result() for future in futures]

        return {"processed_results": results}

    def aggregate(self, state: State):
        # 결과 집계
        summary = "\n".join([
            f"{r['task']}: {r['result']}" 
            for r in state["processed_results"]
        ])
        return {"messages": [AIMessage(content=f"집계 결과:\n{summary}")]}
Enter fullscreen mode Exit fullscreen mode

상태 관리 패턴

상태 저장 및 복원


python
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver

# 메모리 체크포인트 (테스트용)
memory = MemorySaver()

# SQLite 체크포인트 (프로덕션용)
sqlite_checkpoint = SqliteSaver.from_conn_string("checkpoints.db")

# 상태 관리 클래스
class StateManager:
    def __init__(self, graph, checkpoint):
        self.graph = graph
        self.checkpoint = checkpoint

    def get_state(self, thread_id):
        return self.graph.get_state(config={"configurable": {"thread_id":

---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)