DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v19)

LangGraph 워크플로우 템플릿 (v19)

1. LangGraph 아키텍처 개요

LangGraph는 상태 기반 워크플로우 시스템으로, 다음과 같은 핵심 구성 요소를 포함합니다:

노드 (Nodes): 각 작업 단계를 나타내는 함수
엣지 (Edges): 노드 간의 전이 조건
상태 (State): 워크플로우의 공유 상태
체크포인트 (Checkpointing): 상태 저장 및 복원 기능

from typing import TypedDict, Annotated
import operator

class GraphState(TypedDict):
    messages: Annotated[list, operator.add]
    user_input: str

# 기본 워크플로우 구조
from langgraph.graph import StateGraph

workflow = StateGraph(GraphState)
Enter fullscreen mode Exit fullscreen mode

2. 템플릿 1: 단순 RAG 에이전트 (검색 → 생성 → 검증)

from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser

class RAGAgent:
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm

    def retrieve(self, state):
        question = state["user_input"]
        docs = self.vectorstore.similarity_search(question)
        return {"retrieved_docs": docs}

    def generate(self, state):
        docs = state["retrieved_docs"]
        context = "\n\n".join([doc.page_content for doc in docs])

        prompt = PromptTemplate.from_template("""
        사용자 질문: {question}
        제공된 문서: {context}

        위 문서를 기반으로 질문에 답해주세요.
        """)

        chain = prompt | self.llm | StrOutputParser()
        answer = chain.invoke({"question": state["user_input"], "context": context})
        return {"answer": answer}

    def validate(self, state):
        # 검증 로직 추가 가능
        return {"validated": True}

# 워크플로우 구성
def create_rag_workflow():
    agent = RAGAgent(vectorstore, llm)

    workflow = StateGraph(GraphState)

    workflow.add_node("retrieve", agent.retrieve)
    workflow.add_node("generate", agent.generate)
    workflow.add_node("validate", agent.validate)

    workflow.set_entry_point("retrieve")
    workflow.add_edge("retrieve", "generate")
    workflow.add_edge("generate", "validate")

    return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

3. 템플릿 2: 멀티-도구 에이전트 (계획 → 실행 → 관찰 → 결정)

from typing import List
from langchain_core.tools import Tool

class MultiToolAgent:
    def __init__(self, tools: List[Tool], llm):
        self.tools = tools
        self.llm = llm

    def plan(self, state):
        # 도구 사용 계획 생성
        prompt = f"""
        사용자 요청: {state["user_input"]}
        사용 가능한 도구들: {[tool.name for tool in self.tools]}

        어떤 도구를 사용해야 하는지 결정하고, 실행 순서를 정해주세요.
        """

        response = self.llm.invoke(prompt)
        # 실제 도구 호출 순서 파싱 로직 구현
        return {"plan": response}

    def execute(self, state):
        # 계획된 도구 실행
        plan = state["plan"]
        results = []

        for tool_name in plan["tools"]:
            tool = next(t for t in self.tools if t.name == tool_name)
            result = tool.invoke(plan["arguments"])
            results.append(result)

        return {"tool_results": results}

    def observe(self, state):
        # 실행 결과 관찰
        return {"observations": state["tool_results"]}

    def decide(self, state):
        # 다음 단계 결정
        return {"next_step": "continue"}

# 사용 예시
def create_multi_tool_workflow():
    tools = [
        Tool(name="search", func=search_function, description="웹 검색"),
        Tool(name="calculate", func=calculate_function, description="수학 계산")
    ]

    agent = MultiToolAgent(tools, llm)

    workflow = StateGraph(GraphState)
    workflow.add_node("plan", agent.plan)
    workflow.add_node("execute", agent.execute)
    workflow.add_node("observe", agent.observe)
    workflow.add_node("decide", agent.decide)

    workflow.set_entry_point("plan")
    workflow.add_edge("plan", "execute")
    workflow.add_edge("execute", "observe")
    workflow.add_edge("observe", "decide")

    return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

4. 템플릿 3: 인간-중개 워크플로우 (중지 → 검토 → 계속)

from langgraph.checkpoint.memory import MemorySaver

class HumanInLoopAgent:
    def __init__(self, llm):
        self.llm = llm

    def process(self, state):
        # AI 처리
        prompt = f"사용자 요청: {state['user_input']}"
        response = self.llm.invoke(prompt)
        return {"ai_output": response}

    def review(self, state):
        # 사용자 검토 요청
        return {"status": "waiting_for_review"}

    def continue_process(self, state):
        # 사용자 승인 후 진행
        return {"status": "continue"}

# 인간-중개 워크플로우
def create_human_in_loop_workflow():
    agent = HumanInLoopAgent(llm)

    workflow = StateGraph(GraphState)
    workflow.add_node("process", agent.process)
    workflow.add_node("review", agent.review)
    workflow.add_node("continue", agent.continue_process)

    workflow.set_entry_point("process")
    workflow.add_edge("process", "review")

    # 중지 및 재개를 위한 커스텀 로직
    def should_continue(state):
        if state["status"] == "waiting_for_review":
            return "review"
        else:
            return "continue"

    workflow.add_conditional_edges(
        "review",
        should_continue,
        {
            "review": "review",
            "continue": "continue"
        }
    )

    return workflow.compile(checkpointer=MemorySaver())

# 사용 예시
graph = create_human_in_loop_workflow()
result = graph.invoke({"user_input": "사용자 요청"}, config={"configurable": {"thread_id": "123"}})
Enter fullscreen mode Exit fullscreen mode

5. 템플릿 5: 병렬 실행 에이전트 (분기 → 처리 → 집계)

from concurrent.futures import ThreadPoolExecutor
import asyncio

class ParallelAgent:
    def __init__(self, llm):
        self.llm = llm

    def fan_out(self, state):
        # 병렬 처리를 위한 작업 분기
        tasks = [
            {"name": "task1", "data": state["user_input"]},
            {"name": "task2", "data": state["user_input"]},
        ]
        return {"tasks": tasks}

    def process_parallel(self, task):
        # 각 작업 병렬 실행
        prompt = f"처리할 작업: {task['data']} (작업명: {task['name']})"
        result = self.llm.invoke(prompt)
        return {"task": task["name"], "result": result}

    def aggregate(self, state):
        # 결과 집계
        results = state["task_results"]
        aggregated = {
            "summary": f"{len(results)}개 작업 완료",
            "details": results
        }
        return {"final_output": aggregated}

# 병렬 실행 워크플로우
def create_parallel_workflow():
    agent = ParallelAgent(llm)

    workflow = StateGraph(GraphState)
    workflow.add_node("fan_out", agent.fan_out)
    workflow.add_node("process", agent.process_parallel)
    workflow.add_node("aggregate", agent.aggregate)

    workflow.set_entry_point("fan_out")
    workflow.add_edge("fan_out", "process")

    # 병렬 작업 처리 후 집계
    def process_tasks(state):
        tasks = state["tasks"]
        with ThreadPoolExecutor(max_workers=4) as executor:
            results = list(executor.map(agent.process_parallel, tasks))
        return {"task_results": results}

    workflow.add_node("process_parallel", process_tasks)
    workflow.add_edge("process_parallel", "aggregate")

    return workflow.compile()

# 실제 병렬 실행
parallel_graph = create_parallel_workflow()
result = parallel_graph.invoke({"user_input": "병렬 처리할 데이터"})
Enter fullscreen mode Exit fullscreen mode

6. 상태 관리 패턴


python
from langgraph.checkpoint import Checkpoint
from langgraph.checkpoint.base import BaseCheckpointSaver

class CustomCheckpointSaver(BaseCheckpointSaver):
    def __init__(self):
        self.checkpoints = {}

    def get(self, config):
        thread_id = config["configurable"]["thread_id"]
        return self.checkpoints.get(thread_id, {})

    def put(self, config, checkpoint):
        thread_id = config["configurable"]["thread_id"]
        self.check

---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)