DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v46)

LangGraph 워크플로우 템플릿 (v46)

1. LangGraph 아키텍처 개요

LangGraph는 상태 기반 워크플로우를 구축하기 위한 고급 프레임워크입니다. 핵심 구성 요소는 다음과 같습니다:

노드(Node): 각 워크플로우 단계를 나타내는 함수
엣지(Edge): 노드 간의 전이 조건
상태(State): 워크플로우 상태를 저장하는 Pydantic 모델
체크포인트(Checkpoint): 상태 저장 및 복구 메커니즘

from typing import Annotated
from langgraph.graph import StateGraph, END
from pydantic import BaseModel
from typing import List, Dict, Any

# 상태 정의
class GraphState(BaseModel):
    messages: List[Dict[str, Any]]
    context: Dict[str, Any]
    step: int = 0

# 노드 정의
def retrieve_node(state: GraphState):
    # RAG 검색 로직
    return {"messages": [{"role": "assistant", "content": "검색 결과"}]}

def generate_node(state: GraphState):
    # 생성 로직
    return {"messages": [{"role": "assistant", "content": "생성된 텍스트"}]}

# 그래프 생성
workflow = StateGraph(GraphState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", END)
Enter fullscreen mode Exit fullscreen mode

2. 템플릿 1: 단순 RAG 에이전트

이 템플릿은 검색-생성-검증 워크플로우를 구현합니다:

from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

class RAGAgent:
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm

    def retrieve(self, query):
        docs = self.vectorstore.similarity_search(query, k=3)
        return "\n".join([doc.page_content for doc in docs])

    def generate(self, query, context):
        prompt = f"질문: {query}\n참고 자료: {context}"
        response = self.llm.invoke([HumanMessage(content=prompt)])
        return response.content

    def validate(self, response, query):
        # 응답 검증 로직
        return True

# LangGraph 구현
class RAGState(BaseModel):
    query: str
    context: str
    response: str
    validated: bool = False

def retrieve_node(state: RAGState):
    agent = RAGAgent(vectorstore, llm)
    context = agent.retrieve(state.query)
    return {"context": context}

def generate_node(state: RAGState):
    agent = RAGAgent(vectorstore, llm)
    response = agent.generate(state.query, state.context)
    return {"response": response}

def validate_node(state: RAGState):
    agent = RAGAgent(vectorstore, llm)
    validated = agent.validate(state.response, state.query)
    return {"validated": validated}

# 워크플로우 구성
workflow = StateGraph(RAGState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_node("validate", validate_node)

workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.add_edge("validate", END)

# 반복 조건
def should_retry(state: RAGState):
    if not state.validated:
        return "retry"
    return "end"

workflow.add_conditional_edges(
    "validate",
    should_retry,
    {
        "retry": "retrieve",
        "end": END
    }
)
Enter fullscreen mode Exit fullscreen mode

3. 템플릿 2: 멀티 툴 에이전트

계획-실행-관찰-결정 워크플로우를 구현합니다:

from typing import Literal
from langchain.tools import Tool
from langchain_openai import ChatOpenAI

class ToolAgentState(BaseModel):
    plan: str
    execution_result: str
    observation: str
    decision: str
    tools: List[str]

# 사용자 정의 도구
def search_tool(query: str) -> str:
    return f"검색 결과: {query}"

def calculator_tool(expression: str) -> str:
    try:
        result = eval(expression)
        return f"계산 결과: {result}"
    except:
        return "계산 오류"

# 도구 정의
tools = [
    Tool(
        name="search",
        func=search_tool,
        description="웹 검색을 위한 도구"
    ),
    Tool(
        name="calculator",
        func=calculator_tool,
        description="수학 계산을 위한 도구"
    )
]

def plan_node(state: ToolAgentState):
    # 계획 생성
    plan = "검색 및 계산 수행"
    return {"plan": plan}

def execute_node(state: ToolAgentState):
    # 도구 실행
    tool_results = []
    for tool_name in state.tools:
        tool = next((t for t in tools if t.name == tool_name), None)
        if tool:
            result = tool.func("test query")
            tool_results.append(f"{tool_name}: {result}")
    return {"execution_result": "\n".join(tool_results)}

def observe_node(state: ToolAgentState):
    # 관찰 및 평가
    observation = f"실행 결과: {state.execution_result}"
    return {"observation": observation}

def decide_node(state: ToolAgentState):
    # 결정
    if "오류" in state.execution_result:
        decision = "retry"
    else:
        decision = "complete"
    return {"decision": decision}

# 워크플로우 구성
workflow = StateGraph(ToolAgentState)
workflow.add_node("plan", plan_node)
workflow.add_node("execute", execute_node)
workflow.add_node("observe", observe_node)
workflow.add_node("decide", decide_node)

workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.add_conditional_edges(
    "decide",
    lambda state: state.decision,
    {
        "retry": "plan",
        "complete": END
    }
)
Enter fullscreen mode Exit fullscreen mode

4. 템플릿 3: 인간-인-라인 워크플로우

사용자 검토 및 승인을 포함한 워크플로우:

from enum import Enum
from typing import Optional

class ApprovalStatus(str, Enum):
    PENDING = "pending"
    APPROVED = "approved"
    REJECTED = "rejected"

class HumanInLoopState(BaseModel):
    task: str
    result: str
    approval_status: ApprovalStatus = ApprovalStatus.PENDING
    user_review: Optional[str] = None

def task_execution_node(state: HumanInLoopState):
    # 작업 실행
    result = f"작업 '{state.task}' 완료"
    return {"result": result}

def human_review_node(state: HumanInLoopState):
    # 사용자 검토 대기
    return {"approval_status": ApprovalStatus.PENDING}

def process_approval_node(state: HumanInLoopState):
    # 승인 처리
    if state.approval_status == ApprovalStatus.APPROVED:
        return {"approval_status": ApprovalStatus.APPROVED}
    else:
        return {"approval_status": ApprovalStatus.REJECTED}

# 워크플로우 구성
workflow = StateGraph(HumanInLoopState)
workflow.add_node("execute", task_execution_node)
workflow.add_node("review", human_review_node)
workflow.add_node("process_approval", process_approval_node)

workflow.add_edge("execute", "review")
workflow.add_edge("review", "process_approval")
workflow.add_conditional_edges(
    "process_approval",
    lambda state: state.approval_status,
    {
        ApprovalStatus.APPROVED: END,
        ApprovalStatus.REJECTED: "execute"  # 재시작
    }
)

# 체크포인트 기능 추가
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
Enter fullscreen mode Exit fullscreen mode

5. 템플릿 5: 병렬 실행 에이전트

팬아웃-프로세스-집계 워크플로우:


python
from concurrent.futures import ThreadPoolExecutor
import asyncio

class ParallelAgentState(BaseModel):
    tasks: List[str]
    results: List[str] = []
    aggregation: str = ""

def fan_out_node(state: ParallelAgentState):
    # 작업 분할
    return {"tasks": ["task1", "task2", "task3"]}

def process_parallel_node(state: ParallelAgentState):
    # 병렬 처리
    def worker(task):
        return f"{task} 처리 완료"

    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(worker, task) for task in state.tasks]
        results = [future.result() for future in futures]

    return {"results": results}

def aggregate_node(state

---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)