DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v35)

LangGraph 워크플로우 템플릿 (v35)

LangGraph 아키텍처 개요

LangGraph는 상태 기반 워크플로우를 구현하는 라이브러리로, 다음 구성 요소로 이루어져 있습니다:

Nodes (노드): 워크플로우의 각 단계. 입력 상태를 받아 출력 상태를 반환하는 함수.

Edges (엣지): 노드 간의 실행 흐름을 정의. 조건부 또는 고정된 경로를 가질 수 있음.

State (상태): 워크플로우 전체에서 공유되는 데이터 구조. 입력, 출력, 중간 상태 모두 포함.

Checkpointing (체크포인팅): 실행 상태를 저장하고 복구할 수 있는 기능.

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage
import operator

# 상태 정의
class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], operator.add]
    # 추가 상태 필드...

# 노드 정의
def retrieve_node(state):
    # 검색 로직
    pass

def generate_node(state):
    # 생성 로직
    pass

# 워크플로우 생성
workflow = StateGraph(AgentState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", END)
Enter fullscreen mode Exit fullscreen mode

템플릿 1: 간단한 RAG 에이전트

문제: 문서 검색 후 생성하는 일반적인 RAG 패턴을 쉽게 구현할 수 있는 템플릿

from langchain_core.messages import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.vectorstores import Chroma

class RAGAgent:
    def __init__(self, vector_store, llm):
        self.vector_store = vector_store
        self.llm = llm

    def retrieve(self, state):
        # 질문에 기반하여 문서 검색
        query = state["messages"][-1].content
        docs = self.vector_store.similarity_search(query, k=3)
        return {"context": docs}

    def generate(self, state):
        # 검색된 문서를 기반으로 응답 생성
        prompt = ChatPromptTemplate.from_messages([
            SystemMessage(content="You are a helpful assistant"),
            HumanMessage(content="Context: {context}\n\nQuestion: {question}")
        ])

        chain = prompt | self.llm
        response = chain.invoke({
            "context": "\n\n".join([doc.page_content for doc in state["context"]]),
            "question": state["messages"][-1].content
        })

        return {"messages": [response]}

    def validate(self, state):
        # 생성된 응답 검증
        return {"validated": True}

# 사용 예시
def create_rag_workflow():
    agent = RAGAgent(vector_store, llm)

    workflow = StateGraph(AgentState)
    workflow.add_node("retrieve", agent.retrieve)
    workflow.add_node("generate", agent.generate)
    workflow.add_node("validate", agent.validate)

    workflow.add_edge("retrieve", "generate")
    workflow.add_edge("generate", "validate")
    workflow.add_edge("validate", END)

    return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

템플릿 2: 멀티-도구 에이전트

문제: 여러 도구를 순차적으로 사용하여 문제를 해결해야 하는 복잡한 작업을 관리하는 템플릿

from langchain.tools import Tool
from langchain_core.output_parsers import JsonOutputParser

class MultiToolAgent:
    def __init__(self, tools):
        self.tools = tools

    def plan(self, state):
        # 문제 해결 계획 생성
        prompt = ChatPromptTemplate.from_messages([
            SystemMessage(content="You are a helpful assistant that plans tasks"),
            HumanMessage(content="Plan tasks for: {question}\nAvailable tools: {tools}")
        ])

        chain = prompt | self.llm | JsonOutputParser()
        plan = chain.invoke({
            "question": state["messages"][-1].content,
            "tools": [t.name for t in self.tools]
        })

        return {"plan": plan}

    def execute(self, state):
        # 계획된 작업 실행
        plan = state["plan"]
        results = []

        for task in plan["tasks"]:
            tool = next((t for t in self.tools if t.name == task["tool"]), None)
            if tool:
                result = tool.run(task["input"])
                results.append(result)

        return {"execution_results": results}

    def observe(self, state):
        # 실행 결과 관찰 및 피드백 수집
        return {"feedback": "Completed successfully"}

    def decide(self, state):
        # 다음 단계 결정
        feedback = state["feedback"]
        if feedback == "Completed successfully":
            return "continue"
        else:
            return "retry"

# 사용 예시
def create_multitool_workflow():
    agent = MultiToolAgent(tools)

    workflow = StateGraph(AgentState)
    workflow.add_node("plan", agent.plan)
    workflow.add_node("execute", agent.execute)
    workflow.add_node("observe", agent.observe)
    workflow.add_node("decide", agent.decide)

    workflow.add_edge("plan", "execute")
    workflow.add_edge("execute", "observe")
    workflow.add_edge("observe", "decide")
    workflow.add_conditional_edges("decide", 
        lambda x: x["decision"], 
        {"continue": END, "retry": "plan"}
    )

    return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

템플릿 3: 인간-중개 워크플로우

문제: 인간의 개입이 필요한 상황에서 자동화를 유지하면서 검토 및 승인을 처리하는 템플릿

from langgraph.checkpoint.memory import MemorySaver
from datetime import datetime

class HumanInLoopAgent:
    def __init__(self, llm):
        self.llm = llm
        self.checkpointer = MemorySaver()

    def pause_for_review(self, state):
        # 검토를 위해 일시 정지
        timestamp = datetime.now().isoformat()

        return {
            "status": "awaiting_review",
            "review_required": True,
            "review_timestamp": timestamp
        }

    def review(self, state):
        # 인간 검토 로직
        prompt = ChatPromptTemplate.from_messages([
            SystemMessage(content="You are a reviewer"),
            HumanMessage(content="Review this: {content}")
        ])

        chain = prompt | self.llm
        review_result = chain.invoke({"content": state["messages"][-1].content})

        return {"review_result": review_result, "reviewed": True}

    def continue_workflow(self, state):
        # 검토 후 계속 진행
        return {"status": "continue"}

# 사용 예시
def create_human_in_loop_workflow():
    agent = HumanInLoopAgent(llm)

    workflow = StateGraph(AgentState)
    workflow.add_node("generate", agent.generate)
    workflow.add_node("pause", agent.pause_for_review)
    workflow.add_node("review", agent.review)
    workflow.add_node("continue", agent.continue_workflow)

    workflow.add_edge("generate", "pause")
    workflow.add_edge("pause", "review")
    workflow.add_edge("review", "continue")
    workflow.add_edge("continue", END)

    return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

템플릿 5: 병렬 실행 에이전트

문제: 여러 작업을 동시에 처리하여 성능을 향상시키는 병렬 처리 워크플로우


python
from concurrent.futures import ThreadPoolExecutor
import asyncio

class ParallelAgent:
    def __init__(self, llm):
        self.llm = llm

    def fan_out(self, state):
        # 작업 분산
        tasks = [
            {"id": "task1", "input": "process data 1"},
            {"id": "task2", "input": "process data 2"},
            {"id": "task3", "input": "process data 3"}
        ]

        return {"tasks": tasks}

    def process(self, state):
        # 병렬 처리
        def process_task(task):
            prompt = ChatPromptTemplate.from_messages([
                HumanMessage(content=f"Process: {task['input']}")
            ])
            chain = prompt | self.llm
            result = chain.invoke({})
            return {"task_id": task["id"], "result": result}

        # 병렬 실행
        with ThreadPoolExecutor(max_workers=3) as executor:
            results = list(executor.map(process_task, state["tasks"]))

        return {"processed_results": results}

    def aggregate(self, state):
        # 결과 집계
        aggregated = {
            "results": [r["result"] for r in state["processed_results"]]
        }
        return {"aggregated_output": aggregated}

# 사용 예시
def create_parallel_workflow():
    agent = ParallelAgent(llm)

    workflow = StateGraph(AgentState)


---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)