DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v34)

LangGraph 워크플로우 템플릿 (v34)

Python 개발자를 위한 LangChain/LangGraph 워크플로우 템플릿

1. LangGraph 아키텍처 개요

LangGraph는 상태 기반의 워크플로우 시스템으로, 다음과 같은 핵심 구성 요소로 작동합니다:

  • 노드 (Nodes): 각 단계의 작업을 정의하는 함수
  • 엣지 (Edges): 노드 간의 전이 조건
  • 상태 (State): 모든 노드에서 공유되는 데이터 구조
  • 체크포인팅 (Checkpointing): 상태를 저장하고 복구하는 기능
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator

class AgentState(TypedDict):
    messages: Annotated[list, operator.add]
    user_input: str

# 기본 워크플로우 구성
workflow = StateGraph(AgentState)
Enter fullscreen mode Exit fullscreen mode

2. 템플릿 1: 간단한 RAG 에이전트 (검색 → 생성 → 검증)

실제 문제: 문서 검색 후 생성된 답변의 정확성 검증 필요

from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# RAG 워크플로우
def retrieve(state):
    # 문서 검색 로직
    vector_store = VectorStore()
    query = state["user_input"]
    retrieved_docs = vector_store.search(query, k=3)
    return {"retrieved_docs": retrieved_docs}

def generate(state):
    # 생성 로직
    prompt = PromptTemplate.from_template(
        "다음 문서를 기반으로 질문에 답하세요: {context}\n질문: {query}"
    )
    chain = prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()

    context = "\n".join([doc.page_content for doc in state["retrieved_docs"]])
    answer = chain.invoke({"context": context, "query": state["user_input"]})

    return {"generated_answer": answer}

def validate(state):
    # 검증 로직
    prompt = PromptTemplate.from_template(
        "다음 답변이 질문에 답하고 있는지 검증하세요:\n질문: {query}\n답변: {answer}\n정답 여부: (예/아니오)"
    )
    chain = prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()

    verdict = chain.invoke({
        "query": state["user_input"], 
        "answer": state["generated_answer"]
    })

    return {"validation": verdict, "valid": "" in verdict}

# 워크플로우 정의
workflow = StateGraph(AgentState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)
workflow.add_node("validate", validate)

# 엣지 정의
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")

# 검증 결과에 따라 분기
def route_validation(state):
    if state["valid"]:
        return END
    else:
        return "retrieve"  # 재검색

workflow.add_conditional_edges(
    "validate",
    route_validation,
    {
        "retrieve": "retrieve",
        END: END
    }
)
Enter fullscreen mode Exit fullscreen mode

3. 템플릿 2: 다중 도구 에이전트 (계획 → 실행 → 관찰 → 결정)

실제 문제: 여러 도구를 사용해야 하는 복잡한 작업 처리

from langchain.tools import Tool
from langchain_openai import OpenAI

# 도구 정의
tools = [
    Tool(
        name="search",
        func=lambda query: f"검색 결과: {query}",
        description="웹 검색을 위한 도구"
    ),
    Tool(
        name="calculator",
        func=lambda expression: f"계산 결과: {eval(expression)}",
        description="수학 계산을 위한 도구"
    )
]

class ToolAgentState(TypedDict):
    messages: Annotated[list, operator.add]
    plan: str
    execution_result: str
    observation: str

def plan(state):
    # 작업 계획 생성
    prompt = PromptTemplate.from_template(
        "다음 작업을 위한 계획을 세우세요:\n{task}\n도구 사용 계획:"
    )
    chain = prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()
    plan = chain.invoke({"task": state["user_input"]})
    return {"plan": plan}

def execute(state):
    # 도구 실행
    tools_dict = {tool.name: tool for tool in tools}
    # 간단한 실행 로직 (실제 구현 시 도구 선택 로직 포함)
    tool_result = tools_dict["search"].run(state["user_input"])
    return {"execution_result": tool_result}

def observe(state):
    # 실행 결과 관찰
    return {"observation": f"실행 결과: {state['execution_result']}"}

def decide(state):
    # 결정 로직
    prompt = PromptTemplate.from_template(
        "다음 작업을 고려하세요:\n{task}\n관찰 결과: {observation}\n결정:"
    )
    chain = prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()
    decision = chain.invoke({
        "task": state["user_input"],
        "observation": state["observation"]
    })
    return {"decision": decision}

# 워크플로우 구성
workflow = StateGraph(ToolAgentState)
workflow.add_node("plan", plan)
workflow.add_node("execute", execute)
workflow.add_node("observe", observe)
workflow.add_node("decide", decide)

workflow.set_entry_point("plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.add_edge("decide", END)
Enter fullscreen mode Exit fullscreen mode

4. 템플릿 3: 인간-중개 워크플로우 (일시정지 → 검토 → 계속)

실제 문제: 인간의 판단이 필요한 작업에서 자동화와 인간 통제의 균형

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END
from datetime import datetime

class HumanInLoopState(TypedDict):
    messages: Annotated[list, operator.add]
    task: str
    human_review: str
    status: str  # "pending", "approved", "rejected"
    timestamp: str

def task_creation(state):
    # 작업 생성
    task = f"사용자 요청: {state['user_input']}"
    return {"task": task, "status": "pending", "timestamp": datetime.now().isoformat()}

def review_request(state):
    # 검토 요청
    return {"messages": state["messages"] + [{"role": "assistant", "content": 
        f"작업을 검토해 주세요:\n{state['task']}\n[승인/거부]"}]}

def human_review(state):
    # 인간 검토
    review = state["human_review"]  # 사용자 입력
    status = "approved" if "승인" in review else "rejected"
    return {"status": status, "messages": state["messages"] + [{"role": "user", "content": review}]}

def handle_rejection(state):
    # 거부 시 재작업
    return {"messages": state["messages"] + [{"role": "assistant", "content": 
        "작업이 거부되었습니다. 새로운 요청을 주세요"}]}

# 워크플로우 정의
workflow = StateGraph(HumanInLoopState)
workflow.add_node("create_task", task_creation)
workflow.add_node("request_review", review_request)
workflow.add_node("human_review", human_review)
workflow.add_node("handle_rejection", handle_rejection)

workflow.set_entry_point("create_task")
workflow.add_edge("create_task", "request_review")
workflow.add_edge("request_review", "human_review")

def route_review(state):
    if state["status"] == "approved":
        return "END"
    else:
        return "handle_rejection"

workflow.add_conditional_edges(
    "human_review",
    route_review,
    {
        "handle_rejection": "handle_rejection",
        "END": END
    }
)
Enter fullscreen mode Exit fullscreen mode

5. 템플릿 5: 병렬 실행 에이전트 (팬아웃 → 처리 → 집계)

실제 문제: 여러 데이터 소스를 동시에 처리하고 결과를 통합하는 작업


python
import asyncio
from concurrent.futures import ThreadPoolExecutor

class ParallelProcessingState(TypedDict):
    messages: Annotated[list, operator.add]
    data_sources: list
    processed_results: list
    aggregated_result: str

def fan_out(state):
    # 데이터 소스 분산
    sources = state["data_sources"]
    return {"processed_results": []}

async def process_async(data_source):
    # 비동기 처리 로직
    await asyncio.sleep(1)  # 실제 API 호출
    return f"처리된 {data_source}"

def process_parallel(state):
    # 병렬 처리
    with ThreadPoolExecutor(max

---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)