DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v44)

LangGraph 워크플로우 템플릿 (v44)

Reusable LangGraph templates for Python developers building AI agents

Overview

LangGraph는 LangChain과 함께 사용되는 고급 워크플로우 엔진으로, 복잡한 AI 에이전트를 쉽게 구축할 수 있게 해줍니다. 이 문서는 실제 개발자가 문제를 해결할 수 있도록 설계된 4가지 핵심 템플릿을 제공합니다.

1. LangGraph 구조 개요

LangGraph는 세 가지 핵심 구성 요소로 이루어져 있습니다:

  • Nodes: 각각의 작업 단계 (예: 검색, 생성, 검증)
  • Edges: 노드 간의 흐름 정의 (조건부 또는 고정)
  • State: 워크플로우 상태 관리
  • Checkpointing: 상태 저장 및 복구 기능
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph
import operator

class State(TypedDict):
    messages: Annotated[list, operator.add]

graph = StateGraph(State)
Enter fullscreen mode Exit fullscreen mode

2. 템플릿 1: 단순 RAG 에이전트 (검색 → 생성 → 검증)

from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langgraph.graph import StateGraph, END

class SimpleRAGState(TypedDict):
    question: str
    context: str
    answer: str
    validation: bool

def retrieve(state: SimpleRAGState):
    # 검색 로직 구현
    # 예: ChromaDB 또는 Pinecone에서 유사 문서 검색
    return {"context": "검색된 문서 내용"}

def generate(state: SimpleRAGState):
    # LLM 기반 생성
    llm = ChatOpenAI(model="gpt-4")
    prompt = f"질문: {state['question']}\n문맥: {state['context']}"
    response = llm.invoke([HumanMessage(content=prompt)])
    return {"answer": response.content}

def validate(state: SimpleRAGState):
    # 생성된 답변의 유효성 검증
    # 예: 관련성 점수 계산 또는 키워드 검사
    return {"validation": True}  # 실제 구현에서는 더 복잡한 로직

workflow = StateGraph(SimpleRAGState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)
workflow.add_node("validate", validate)

workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.add_edge("validate", END)

app = workflow.compile()
Enter fullscreen mode Exit fullscreen mode

3. 템플릿 2: 멀티-도구 에이전트 (계획 → 실행 → 관찰 → 결정)

from typing import List
from langchain_core.tools import tool
import json

class MultiToolAgentState(TypedDict):
    plan: List[str]
    execution_log: List[str]
    observations: List[str]
    final_answer: str

@tool
def search_web(query: str) -> str:
    """웹 검색 도구"""
    return f"검색 결과: {query}"

@tool
def calculate_math(expression: str) -> str:
    """수학 계산 도구"""
    return f"계산 결과: {eval(expression)}"

@tool
def get_weather(location: str) -> str:
    """날씨 정보 도구"""
    return f"{location} 날씨 정보"

def plan(state: MultiToolAgentState):
    # 초기 계획 생성
    return {"plan": ["search_web", "calculate_math"]}

def execute(state: MultiToolAgentState):
    # 계획 실행 및 관찰
    tools = [search_web, calculate_math]
    execution_log = []
    observations = []

    for tool_name in state["plan"]:
        tool_instance = next(t for t in tools if t.name == tool_name)
        result = tool_instance.run(f"도구 실행: {tool_name}")
        execution_log.append(f"{tool_name}: {result}")
        observations.append(result)

    return {"execution_log": execution_log, "observations": observations}

def decide(state: MultiToolAgentState):
    # 관찰 결과 분석 후 최종 결정
    return {"final_answer": "최종 답변"}

workflow = StateGraph(MultiToolAgentState)
workflow.add_node("plan", plan)
workflow.add_node("execute", execute)
workflow.add_node("decide", decide)

workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "decide")
workflow.add_edge("decide", END)

app = workflow.compile()
Enter fullscreen mode Exit fullscreen mode

4. 템플릿 3: 인간-중개 워크플로우 (일시정지 → 검토 → 계속)

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END, START

class HumanInLoopState(TypedDict):
    user_request: str
    intermediate_response: str
    final_answer: str
    needs_review: bool

def generate_initial(state: HumanInLoopState):
    # 초기 생성
    return {"intermediate_response": "초기 답변 생성"}

def check_review_needed(state: HumanInLoopState):
    # 검토 필요 여부 확인 (예: 복잡한 요청)
    needs_review = len(state["user_request"]) > 100  # 간단한 조건
    return {"needs_review": needs_review}

def human_review(state: HumanInLoopState):
    # 인간 검토 절차
    print("사용자 검토 대기...")
    # 실제 구현에서는 사용자 인터페이스 또는 외부 시스템과 연동
    return {"final_answer": "사용자 검토 결과"}

def final_generate(state: HumanInLoopState):
    # 최종 생성
    return {"final_answer": "최종 답변"}

workflow = StateGraph(HumanInLoopState)
workflow.add_node("generate_initial", generate_initial)
workflow.add_node("check_review", check_review_needed)
workflow.add_node("human_review", human_review)
workflow.add_node("final_generate", final_generate)

workflow.add_edge(START, "generate_initial")
workflow.add_edge("generate_initial", "check_review")
workflow.add_conditional_edges(
    "check_review",
    lambda x: "needs_review" if x["needs_review"] else "final_generate",
    {
        "needs_review": "human_review",
        "final_generate": "final_generate"
    }
)
workflow.add_edge("human_review", "final_generate")
workflow.add_edge("final_generate", END)

app = workflow.compile(checkpointer=MemorySaver())
Enter fullscreen mode Exit fullscreen mode

5. 템플릿 5: 병렬 실행 에이전트 (팬아웃 → 처리 → 집계)

from concurrent.futures import ThreadPoolExecutor
import asyncio

class ParallelAgentState(TypedDict):
    tasks: List[str]
    results: List[dict]
    aggregated_result: str

def fan_out(state: ParallelAgentState):
    # 작업 분할
    tasks = ["task_1", "task_2", "task_3"]
    return {"tasks": tasks}

def process_task(state: ParallelAgentState, task: str):
    # 개별 작업 처리
    # 예: API 호출, 데이터 처리 등
    return {task: f"결과_{task}"}

def aggregate_results(state: ParallelAgentState):
    # 결과 집계
    results = [f"결과_{task}" for task in state["tasks"]]
    return {"aggregated_result": " | ".join(results)}

# 병렬 처리 구현
async def parallel_process(state: ParallelAgentState):
    tasks = state["tasks"]
    # 비동기 작업 처리
    results = []
    for task in tasks:
        result = process_task(state, task)
        results.append(result)

    # 결과 집계
    return {"results": results, "aggregated_result": " | ".join([str(r) for r in results])}

workflow = StateGraph(ParallelAgentState)
workflow.add_node("fan_out", fan_out)
workflow.add_node("process_parallel", parallel_process)
workflow.add_node("aggregate", aggregate_results)

workflow.add_edge("fan_out", "process_parallel")
workflow.add_edge("process_parallel", "aggregate")
workflow.add_edge("aggregate", END)

app = workflow.compile()
Enter fullscreen mode Exit fullscreen mode

6. 상태 관리 패턴

6.1 상태 업데이트 패턴

from typing import Annotated
import operator

class UpdateState(TypedDict):
    messages: Annotated[list, operator.add]
    current_step: int
    status: str

def update_with_logging(state: UpdateState):
    # 상태 업데이트 및 로깅
    current_step = state.get("current_step", 0) + 1
    return {
        "current_step": current_step,
        "status": f"처리 중 (단계 {current_step})"
    }
Enter fullscreen mode Exit fullscreen mode

6.2 상태 저장 및 복구


python
from langgraph.checkpoint.sqlite import SqliteSaver

# 체크포인트 저장
checkpointer = SqliteSaver.from_conn_string(":memory:")
app = workflow.compile(checkpointer=checkpointer)

# 상태 저장
config = {"configurable": {"thread_id": "thread_123"}}
app

---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)