DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v24)

LangGraph 워크플로우 템플릿 (v24)

1. LangGraph 아키텍처 개요

LangGraph는 상태 기반 워크플로우 시스템으로, 다음과 같은 핵심 구성 요소로 이루어져 있습니다:

Nodes: 워크플로우의 각 단계
Edges: 노드 간의 전이 조건
State: 워크플로우 내에서 공유되는 상태
Checkpointing: 상태 저장 및 복구 기능

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator

class GraphState(TypedDict):
    messages: Annotated[list, operator.add]

workflow = StateGraph(GraphState)
Enter fullscreen mode Exit fullscreen mode

2. 템플릿 1: 간단한 RAG 에이전트 (검색 → 생성 → 검증)

이 템플릿은 문서 검색 및 생성을 위한 기본적인 RAG 워크플로우입니다:

import json
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser

class RAGState(TypedDict):
    query: str
    context: str
    response: str
    validation: bool

def retrieve_node(state: RAGState):
    # 간단한 문서 검색 로직
    # 실제 구현에서는 vector store 검색 사용
    return {"context": "검색된 문서 내용"}

def generate_node(state: RAGState):
    prompt = PromptTemplate.from_template(
        "질문: {query}\n문맥: {context}\n답변:"
    )
    llm = ChatOpenAI(model="gpt-4")
    chain = prompt | llm | StrOutputParser()
    response = chain.invoke({
        "query": state["query"],
        "context": state["context"]
    })
    return {"response": response}

def validate_node(state: RAGState):
    # 답변 검증 로직
    # 예: 관련성 점수 계산
    return {"validation": True}

# 워크플로우 구성
workflow = StateGraph(RAGState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_node("validate", validate_node)

workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.set_entry_point("retrieve")
workflow.set_finish_point("validate")
Enter fullscreen mode Exit fullscreen mode

3. 템플릿 2: 다중 툴 에이전트 (계획 → 실행 → 관찰 → 결정)

다중 도구를 활용한 에이전트 워크플로우:

from langchain.tools import Tool
from langchain_core.messages import ToolMessage
from typing import List

class ToolAgentState(TypedDict):
    task: str
    plan: List[str]
    execution: str
    observation: str
    decision: str

def plan_node(state: ToolAgentState):
    # 작업 계획 생성
    plan = ["분석 시작", "도구 선택", "실행", "결과 검토"]
    return {"plan": plan}

def execute_node(state: ToolAgentState):
    # 도구 실행
    # 실제 도구 호출 로직
    tools = [Tool(name="search", func=lambda x: f"검색 결과: {x}")]
    tool = tools[0]
    result = tool.func(state["task"])
    return {"execution": result}

def observe_node(state: ToolAgentState):
    # 실행 결과 관찰
    return {"observation": f"관찰 결과: {state['execution']}"}

def decide_node(state: ToolAgentState):
    # 결정 로직
    if "결과" in state["observation"]:
        return {"decision": "성공"}
    return {"decision": "실패"}

workflow = StateGraph(ToolAgentState)
workflow.add_node("plan", plan_node)
workflow.add_node("execute", execute_node)
workflow.add_node("observe", observe_node)
workflow.add_node("decide", decide_node)

workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.set_entry_point("plan")
workflow.set_finish_point("decide")
Enter fullscreen mode Exit fullscreen mode

4. 템플릿 3: 인간-중개 워크플로우 (일시정지 → 검토 → 계속)

사람의 개입이 필요한 경우 사용:

import asyncio
from datetime import datetime

class HumanInLoopState(TypedDict):
    task: str
    status: str
    human_review: str
    final_result: str

def process_task_node(state: HumanInLoopState):
    # 작업 처리
    return {"status": "processing"}

def pause_for_review_node(state: HumanInLoopState):
    # 인간 검토를 위한 일시정지
    print(f"작업 일시정지: {state['task']}")
    # 실제 구현에서는 API 호출 또는 메시지 대기
    return {"status": "awaiting_review"}

def review_node(state: HumanInLoopState):
    # 인간 검토
    # 이 부분은 외부 인터페이스를 통해 처리
    return {"human_review": "검토 완료"}

def continue_processing_node(state: HumanInLoopState):
    # 검토 후 처리 계속
    return {"status": "completed", "final_result": "최종 결과"}

workflow = StateGraph(HumanInLoopState)
workflow.add_node("process_task", process_task_node)
workflow.add_node("pause_for_review", pause_for_review_node)
workflow.add_node("review", review_node)
workflow.add_node("continue_processing", continue_processing_node)

workflow.add_edge("process_task", "pause_for_review")
workflow.add_edge("pause_for_review", "review")
workflow.add_edge("review", "continue_processing")
workflow.set_entry_point("process_task")
workflow.set_finish_point("continue_processing")
Enter fullscreen mode Exit fullscreen mode

5. 템플릿 5: 병렬 실행 에이전트 (팬아웃 → 처리 → 집계)

병렬 작업 처리:

from concurrent.futures import ThreadPoolExecutor
import threading

class ParallelAgentState(TypedDict):
    tasks: List[str]
    results: List[str]
    aggregated_result: str

def fan_out_node(state: ParallelAgentState):
    # 작업 분할
    tasks = ["task_1", "task_2", "task_3"]
    return {"tasks": tasks, "results": []}

def process_parallel_node(state: ParallelAgentState):
    # 병렬 처리
    def process_task(task):
        # 각 작업 병렬 처리
        return f"{task}_결과"

    with ThreadPoolExecutor(max_workers=3) as executor:
        results = list(executor.map(process_task, state["tasks"]))

    return {"results": results}

def aggregate_node(state: ParallelAgentState):
    # 결과 집계
    aggregated = ", ".join(state["results"])
    return {"aggregated_result": aggregated}

workflow = StateGraph(ParallelAgentState)
workflow.add_node("fan_out", fan_out_node)
workflow.add_node("process_parallel", process_parallel_node)
workflow.add_node("aggregate", aggregate_node)

workflow.add_edge("fan_out", "process_parallel")
workflow.add_edge("process_parallel", "aggregate")
workflow.set_entry_point("fan_out")
workflow.set_finish_point("aggregate")
Enter fullscreen mode Exit fullscreen mode

6. 상태 관리 패턴

상태를 효율적으로 관리하는 방법:

from langgraph.checkpoint import Checkpoint
from langgraph.checkpoint.memory import MemorySaver

# 메모리 체크포인트 사용
memory = MemorySaver()

# 사용자별 상태 저장
class UserState(TypedDict):
    user_id: str
    conversation_history: List[dict]
    preferences: dict

def save_checkpoint(state: UserState):
    # 사용자별 체크포인트 저장
    checkpoint = {
        "user_id": state["user_id"],
        "conversation_history": state["conversation_history"],
        "preferences": state["preferences"],
        "timestamp": datetime.now().isoformat()
    }
    return checkpoint
Enter fullscreen mode Exit fullscreen mode

7. 스트리밍 및 실시간 업데이트

실시간 처리 및 스트리밍:

from langchain_core.callbacks import BaseCallbackHandler

class StreamingCallback(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs):
        print(token, end="", flush=True)

def streaming_node(state: StreamingState):
    llm = ChatOpenAI(streaming=True, callbacks=[StreamingCallback()])
    # 스트리밍 처리 로직
    return {"streaming_output": "스트리밍 결과"}

# 스트리밍 워크플로우
workflow = StateGraph(StreamingState)
workflow.add_node("streaming", streaming_node)
workflow.add_edge("streaming", END)
Enter fullscreen mode Exit fullscreen mode

8. 오류 복구 및 재시도 로직

안정적인 워크플로우를 위한 오류 처리:


python
import time
from functools import wraps

def retry_on_failure(max_retries=3, delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 

---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)