DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v5)

LangGraph 워크플로우 템플릿 (v5)

개요: LangGraph 아키텍처 기초

LangGraph는 LangChain의 고급 워크플로우 엔진으로, 상태 기반 그래프를 기반으로 한 다중 에이전트 시스템을 구축하는 데 최적화되어 있습니다. 주요 구성 요소는 다음과 같습니다:

  • 노드 (Nodes): 각 작업 단계를 나타냄
  • 엣지 (Edges): 노드 간의 전이 조건을 정의
  • 상태 (State): 모든 노드가 공유하는 데이터 구조
  • 체크포인트 (Checkpointing): 상태 저장 및 복구 기능
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage
import operator

class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], operator.add]
Enter fullscreen mode Exit fullscreen mode

템플릿 1: 단순 RAG 에이전트

이 템플릿은 문서 검색 → 생성 → 검증의 단순한 RAG 워크플로우를 구현합니다. 특히 문서 검색과 생성을 분리하여 효율성을 극대화합니다.

from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain_core.runnables import RunnablePassthrough

class RAGAgentState(TypedDict):
    query: str
    retrieved_docs: list
    generated_response: str
    validation: bool

# 노드 정의
def retrieve_node(state):
    # 문서 검색
    vectorstore = FAISS.load_local("vectorstore", OpenAIEmbeddings())
    retriever = vectorstore.as_retriever()
    docs = retriever.invoke(state["query"])
    return {"retrieved_docs": docs}

def generate_node(state):
    # 문서 기반 생성
    template = PromptTemplate.from_template(
        "다음 문서를 기반으로 질문에 답변하세요:\n\n{context}\n\n질문: {question}"
    )
    llm = ChatOpenAI(model="gpt-4")
    chain = template | llm | StrOutputParser()

    context = "\n\n".join([doc.page_content for doc in state["retrieved_docs"]])
    response = chain.invoke({"context": context, "question": state["query"]})

    return {"generated_response": response}

def validate_node(state):
    # 응답 검증 (단순 예시)
    # 실제 구현에서는 더 복잡한 검증 로직이 필요
    return {"validation": len(state["generated_response"]) > 10}

# 워크플로우 정의
def create_rag_workflow():
    workflow = StateGraph(RAGAgentState)

    workflow.add_node("retrieve", retrieve_node)
    workflow.add_node("generate", generate_node)
    workflow.add_node("validate", validate_node)

    workflow.set_entry_point("retrieve")

    workflow.add_edge("retrieve", "generate")
    workflow.add_edge("generate", "validate")

    # 검증 실패 시 재시도
    workflow.add_conditional_edges(
        "validate",
        lambda x: "retry" if not x["validation"] else "end",
        {"retry": "retrieve", "end": END}
    )

    return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

템플릿 2: 멀티-도구 에이전트

이 템플릿은 계획 → 실행 → 관찰 → 결정의 반복적인 워크플로우를 구현하여 복잡한 작업을 수행합니다. 다양한 도구를 사용할 수 있도록 설계되었습니다.

from langchain.tools import Tool
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

class ToolAgentState(TypedDict):
    tasks: list[str]
    current_task: str
    execution_result: str
    decision: str

# 도구 정의
@tool
def web_search(query: str):
    """웹 검색 도구"""
    # 실제 검색 로직 구현
    return f"검색 결과: {query}"

@tool
def file_operations(operation: str, path: str):
    """파일 조작 도구"""
    return f"파일 {operation} 완료: {path}"

# 노드 정의
def plan_node(state):
    # 작업 계획
    tasks = ["웹 검색", "데이터 수집", "분석"]
    return {"tasks": tasks}

def execute_node(state):
    # 현재 작업 실행
    current_task = state["tasks"][0] if state["tasks"] else ""

    if "웹 검색" in current_task:
        result = web_search.run(current_task)
    elif "파일" in current_task:
        result = file_operations.run(current_task)
    else:
        result = f"작업 실행: {current_task}"

    return {"execution_result": result, "current_task": current_task}

def observe_node(state):
    # 실행 결과 관찰
    # 결과 분석 및 다음 단계 결정
    return {"decision": "continue"}

def decide_node(state):
    # 결정: 다음 작업 선택
    if state["tasks"]:
        next_task = state["tasks"][1:]  # 다음 작업
        return {"tasks": next_task}
    return {"tasks": []}

# 워크플로우 정의
def create_tool_workflow():
    workflow = StateGraph(ToolAgentState)

    workflow.add_node("plan", plan_node)
    workflow.add_node("execute", execute_node)
    workflow.add_node("observe", observe_node)
    workflow.add_node("decide", decide_node)

    workflow.set_entry_point("plan")

    workflow.add_edge("plan", "execute")
    workflow.add_edge("execute", "observe")
    workflow.add_edge("observe", "decide")

    workflow.add_conditional_edges(
        "decide",
        lambda x: "end" if not x["tasks"] else "execute",
        {"end": END, "execute": "execute"}
    )

    return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

템플릿 3: 인간-중개 워크플로우

이 템플릿은 인간의 개입이 필요한 상황에서 사용되며, 실행 일시정지 → 검토 → 계속의 흐름을 제공합니다.

from langchain_core.messages import HumanMessage
import time

class HumanInLoopState(TypedDict):
    messages: list[BaseMessage]
    pending_review: str
    human_feedback: str

def pause_node(state):
    # 실행 일시정지 및 인간 검토 요청
    return {"pending_review": "필요한 검토가 있습니다"}

def review_node(state):
    # 인간 검토 노드 (예시: 가상 검토)
    # 실제 구현에서는 웹훅 또는 메시지 큐를 통해 검토 요청
    print("사용자 검토 요청:", state["pending_review"])
    time.sleep(1)  # 시뮬레이션
    return {"human_feedback": "검토 완료"}

def continue_node(state):
    # 검토 완료 후 계속 진행
    return {"pending_review": None}

def create_human_loop_workflow():
    workflow = StateGraph(HumanInLoopState)

    workflow.add_node("pause", pause_node)
    workflow.add_node("review", review_node)
    workflow.add_node("continue", continue_node)

    workflow.set_entry_point("pause")

    workflow.add_edge("pause", "review")
    workflow.add_edge("review", "continue")
    workflow.add_edge("continue", END)

    return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

템플릿 5: 병렬 실행 에이전트

이 템플릿은 하나의 입력을 여러 작업으로 분할 → 병렬 처리 → 결과 집계의 패턴을 제공합니다.


python
from concurrent.futures import ThreadPoolExecutor
import asyncio

class ParallelAgentState(TypedDict):
    inputs: list[str]
    parallel_results: list[str]
    aggregated_result: str

def fan_out_node(state):
    # 입력 분할
    return {"parallel_results": []}

def process_node(state):
    # 각각의 입력 처리 (병렬 실행)
    def process_single(input_item):
        # 실제 처리 로직
        return f"처리 결과: {input_item}"

    inputs = state["inputs"]
    # 병렬 처리 (실제 구현에서는 ThreadPoolExecutor 사용)
    with ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(process_single, inputs))

    return {"parallel_results": results}

def aggregate_node(state):
    # 결과 집계
    aggregated = ", ".join(state["parallel_results"])
    return {"aggregated_result": aggregated}

def create_parallel_workflow():
    workflow = StateGraph(ParallelAgentState)

    workflow.add_node("fan_out", fan_out_node)
    workflow.add_node("process", process_node)
    workflow.add_node("aggregate", aggregate_node)

    workflow.set_entry_point("fan_out")

    workflow.add_edge("fan_out", "process")
    workflow.add_edge("process", "aggregate")
    workflow.add_edge("aggregate", END)


---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)