DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v31)

LangGraph 워크플로우 템플릿 (v31)

LangGraph는 LangChain의 고급 워크플로우 관리 시스템으로, 복잡한 AI 에이전트 구축에 있어 강력한 기반을 제공합니다. 이 가이드에서는 실제 개발자가 활용할 수 있는 4가지 핵심 템플릿을 소개합니다. 각 템플릿은 3-7달러의 가치를 지니는 실용적인 솔루션으로, 로컬 AI 인프라 구축과 성능 최적화 문제를 해결합니다.

1. LangGraph 아키텍처 개요

LangGraph는 노드(Node), 엣지(Edge), 상태(State), 체크포인트(Chckpoint)로 구성된 그래프 구조를 기반으로 합니다:

  • 노드: 각 단계의 작업을 정의 (예: retrieve, generate)
  • 엣지: 노드 간 실행 흐름 정의 (예: retrieve → generate)
  • 상태: 워크플로우 내 모든 상태 관리 (예: messages, tool_calls)
  • 체크포인트: 실패 복구 및 상태 복원 기능 제공

2. 템플릿 1: 단순 RAG 에이전트

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage
import operator

class State(TypedDict):
    question: str
    context: str
    answer: str

def retrieve(state: State):
    # 실제 RAG 검색 로직 구현
    context = search_vector_db(state["question"])
    return {"context": context}

def generate(state: State):
    prompt = f"질문: {state['question']}\n컨텍스트: {state['context']}"
    answer = llm.invoke(prompt)
    return {"answer": answer}

def validate(state: State):
    # 검증 로직 추가
    return {"validation": "passed"}

# 그래프 생성
workflow = StateGraph(State)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)
workflow.add_node("validate", validate)

workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.add_edge("validate", END)

app = workflow.compile()
Enter fullscreen mode Exit fullscreen mode

3. 템플릿 2: 멀티-도구 에이전트

from typing import List
from langchain_core.tools import Tool
from langchain_openai import ChatOpenAI
import json

class AgentState(TypedDict):
    messages: List[dict]
    tools: List[str]
    next: str

class ToolExecutor:
    def __init__(self):
        self.tools = {
            "calculator": CalculatorTool(),
            "search": SearchTool(),
            "weather": WeatherTool()
        }

    def execute(self, tool_name: str, input_data: dict):
        if tool_name in self.tools:
            return self.tools[tool_name].run(input_data)
        return {"error": "도구를 찾을 수 없음"}

def plan(state: AgentState):
    # 도구 계획
    tools_needed = analyze_tools_needed(state["messages"])
    return {"tools": tools_needed, "next": "execute"}

def execute(state: AgentState):
    # 도구 실행
    tool_executor = ToolExecutor()
    results = []

    for tool_name in state["tools"]:
        result = tool_executor.execute(tool_name, state["messages"][-1])
        results.append(result)

    return {"messages": results, "next": "observe"}

def observe(state: AgentState):
    # 실행 결과 검토
    if not state["messages"]:
        return {"next": "plan"}
    return {"next": "decide"}

def decide(state: AgentState):
    # 결정 로직
    if should_continue(state["messages"]):
        return {"next": "plan"}
    return {"next": END}

# 워크플로우 설정
workflow = StateGraph(AgentState)
workflow.add_node("plan", plan)
workflow.add_node("execute", execute)
workflow.add_node("observe", observe)
workflow.add_node("decide", decide)

workflow.set_entry_point("plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.add_edge("decide", END)
Enter fullscreen mode Exit fullscreen mode

4. 템플릿 3: 인간-참여 워크플로우

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState

class HumanReviewState(MessagesState):
    review_required: bool
    review_status: str

def human_review_node(state: HumanReviewState):
    # 리뷰 요청
    return {
        "review_required": True,
        "messages": state["messages"] + [
            {"type": "human_review", "prompt": "리뷰를 진행해 주세요"}
        ]
    }

def review_handler(state: HumanReviewState):
    # 리뷰 완료 처리
    if state["review_status"] == "approved":
        return {"review_required": False, "messages": state["messages"]}
    else:
        return {"review_required": True, "messages": state["messages"]}

# 체크포인트 설정
memory = MemorySaver()

# 워크플로우 정의
workflow = StateGraph(HumanReviewState)
workflow.add_node("process", process_node)
workflow.add_node("review", human_review_node)
workflow.add_node("handle_review", review_handler)

workflow.set_entry_point("process")
workflow.add_edge("process", "review")
workflow.add_edge("review", "handle_review")
workflow.add_edge("handle_review", END)

app = workflow.compile(checkpointer=memory)
Enter fullscreen mode Exit fullscreen mode

5. 템플릿 5: 병렬 실행 에이전트

from concurrent.futures import ThreadPoolExecutor
from typing import List

class ParallelState(TypedDict):
    inputs: List[str]
    results: List[dict]
    processed: int

def fan_out(state: ParallelState):
    # 입력 분산
    return {"results": [], "processed": 0}

def process_parallel(state: ParallelState, input_item: str):
    # 병렬 처리
    result = process_item(input_item)
    return result

def aggregate_results(state: ParallelState):
    # 결과 집계
    return {"final_result": aggregate(state["results"])}

# 병렬 처리 실행
def parallel_workflow():
    inputs = ["task1", "task2", "task3"]
    results = []

    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(process_parallel, input_item) 
                   for input_item in inputs]
        for future in futures:
            results.append(future.result())

    return {"final_results": results}
Enter fullscreen mode Exit fullscreen mode

6. 상태 관리 패턴

# 상태 저장 및 복원 패턴
class StateManager:
    def __init__(self, checkpoint_path: str):
        self.checkpoint_path = checkpoint_path

    def save_state(self, state: dict, thread_id: str):
        import pickle
        with open(f"{self.checkpoint_path}/{thread_id}.pkl", "wb") as f:
            pickle.dump(state, f)

    def load_state(self, thread_id: str):
        import pickle
        with open(f"{self.checkpoint_path}/{thread_id}.pkl", "rb") as f:
            return pickle.load(f)

# 상태 최적화
class OptimizedState(TypedDict):
    # 필요한 필드만 저장
    messages: Annotated[List[dict], operator.add]
    metadata: dict
    timestamp: int
Enter fullscreen mode Exit fullscreen mode

7. 스트리밍 및 실시간 업데이트

import asyncio
from langchain_core.messages import AIMessage

async def stream_response(messages: List[dict]):
    """실시간 스트리밍 응답"""
    async for chunk in llm.astream(messages):
        yield chunk

def setup_streaming_graph():
    """스트리밍 워크플로우 설정"""
    workflow = StateGraph(StreamingState)
    workflow.add_node("process", process_with_streaming)
    workflow.add_node("stream", stream_results)

    workflow.set_entry_point("process")
    workflow.add_edge("process", "stream")

    return workflow.compile()

# 스트리밍 상태 정의
class StreamingState(TypedDict):
    input: str
    output: List[str]
    stream_id: str
Enter fullscreen mode Exit fullscreen mode

8. 오류 복구 및 재시도 로직


python
import time
from functools import wraps

class RetryHandler:
    def __init__(self, max_retries: int = 3, backoff_factor: int = 1):
        self.max_retries = max_retries
        self.backoff_factor = backoff_factor

    def retry_on_failure(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(self.max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == self.max_retries - 1:
                        raise e
                    wait_time = self.backoff_factor * (2 ** attempt)
                    time.sleep(wait_time)
            return None
        return wrapper



---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)