DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v32)

LangGraph 워크플로우 템플릿 (v32)

LangGraph 아키텍처 개요

LangGraph는 LangChain의 고급 워크플로우 엔진으로, 상태 기반의 순환 그래프를 기반으로 합니다. 핵심 구성 요소는 다음과 같습니다:

노드 (Nodes)

각 노드는 상태를 입력으로 받아 처리하고 상태를 반환하는 함수입니다. 노드는 일반적으로 Runnable 또는 RunnableCallable 인스턴스입니다.

엣지 (Edges)

엣지는 노드 간의 흐름을 정의합니다. 정적 또는 동적 엣지를 사용할 수 있으며, 조건에 따라 다른 경로로 분기할 수 있습니다.

상태 (State)

상태는 모든 노드가 공유하는 데이터 구조입니다. BaseState를 상속받아 정의하며, 메시지, 변수, 기록 등을 포함합니다.

체크포인팅 (Checkpointing)

체크포인팅은 워크플로우 상태를 저장하고 복원할 수 있게 해줍니다. 이를 통해 장애 복구나 중단 후 재개가 가능합니다.

템플릿 1: 단순 RAG 에이전트

문제: 문서 검색 후 생성하는 RAG 워크플로우의 기본 구조

from langgraph.graph import StateGraph, END
from typing import TypedDict, List
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.runnables import RunnableLambda

class RAGState(TypedDict):
    question: str
    context: str
    answer: str
    retrieved_docs: List[str]

def retrieve(state: RAGState):
    # Simulate retrieval logic
    docs = ["문서 1 내용", "문서 2 내용"]  # 실제 검색 로직으로 교체
    return {"retrieved_docs": docs, "context": "\n".join(docs)}

def generate(state: RAGState):
    # 실제 LLM 호출 로직
    prompt = f"질문: {state['question']}\n문맥: {state['context']}"
    # 예: llm.invoke(prompt)
    answer = "생성된 답변"
    return {"answer": answer}

def validate(state: RAGState):
    # 답변 검증 로직
    if len(state["answer"]) < 10:
        return {"answer": "검증 실패: 답변이 너무 짧습니다"}
    return {"answer": state["answer"]}

# 워크플로우 정의
workflow = StateGraph(RAGState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)
workflow.add_node("validate", validate)

# 엣지 정의
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.add_edge("validate", END)

rag_graph = workflow.compile()
Enter fullscreen mode Exit fullscreen mode

템플릿 2: 다중 도구 에이전트

문제: 계획 → 실행 → 관찰 → 결정의 반복적 워크플로우 구현

from langgraph.graph import StateGraph, END
from typing import TypedDict, List
from langchain_core.messages import HumanMessage

class ToolState(TypedDict):
    input: str
    plan: List[str]
    execution: List[str]
    observations: List[str]
    final_answer: str

def plan(state: ToolState):
    # 도구 사용 계획 생성
    tools = ["web_search", "calculator", "database_query"]
    plan = [f"도구 {tool} 실행" for tool in tools]
    return {"plan": plan}

def execute(state: ToolState):
    # 도구 실행 (실제 실행 로직)
    execution_results = ["검색 결과", "계산 결과", "데이터 조회 결과"]
    return {"execution": execution_results}

def observe(state: ToolState):
    # 실행 결과 분석
    observations = [
        f"검색 결과: {result}" for result in state["execution"]
    ]
    return {"observations": observations}

def decide(state: ToolState):
    # 최종 결정
    if len(state["observations"]) >= 2:
        return {"final_answer": "실행 완료"}
    return {"final_answer": "실행 실패"}

# 워크플로우 구현
workflow = StateGraph(ToolState)
workflow.add_node("plan", plan)
workflow.add_node("execute", execute)
workflow.add_node("observe", observe)
workflow.add_node("decide", decide)

workflow.set_entry_point("plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.add_edge("decide", END)

tool_graph = workflow.compile()
Enter fullscreen mode Exit fullscreen mode

템플릿 3: 인간-중개 워크플로우

문제: 인간 검토와 승인을 포함한 워크플로우 구현

from langgraph.graph import StateGraph, END
from typing import TypedDict, List
from langchain_core.messages import HumanMessage

class HumanInLoopState(TypedDict):
    input: str
    generated_output: str
    review_request: str
    user_feedback: str
    final_output: str

def generate(state: HumanInLoopState):
    # AI 생성
    return {"generated_output": "생성된 결과"}

def pause_for_review(state: HumanInLoopState):
    # 사용자 검토 요청
    review_prompt = f"검토 요청:\n{state['generated_output']}"
    return {"review_request": review_prompt}

def process_feedback(state: HumanInLoopState):
    # 사용자 피드백 처리
    if state["user_feedback"]:
        return {"final_output": f"수정된 결과: {state['user_feedback']}"}
    else:
        return {"final_output": state["generated_output"]}

# 사용자 입력 헬퍼 함수
def get_user_input():
    return input("사용자 피드백을 입력하세요: ")

# 워크플로우 정의
workflow = StateGraph(HumanInLoopState)
workflow.add_node("generate", generate)
workflow.add_node("pause_for_review", pause_for_review)
workflow.add_node("process_feedback", process_feedback)

workflow.set_entry_point("generate")
workflow.add_edge("generate", "pause_for_review")
workflow.add_edge("pause_for_review", "process_feedback")
workflow.add_edge("process_feedback", END)

human_loop_graph = workflow.compile()
Enter fullscreen mode Exit fullscreen mode

템플릿 4: 병렬 실행 에이전트

문제: 병렬로 여러 작업을 실행하고 결과를 집계하는 워크플로우

from langgraph.graph import StateGraph, END
from typing import TypedDict, List
from concurrent.futures import ThreadPoolExecutor
import asyncio

class ParallelExecutionState(TypedDict):
    input_data: List[str]
    parallel_results: List[str]
    aggregated_result: str

def fan_out(state: ParallelExecutionState):
    # 입력 데이터를 병렬 처리
    return {"parallel_results": state["input_data"]}

def process_parallel_item(item: str):
    # 각 아이템 처리 (실제 AI 처리 로직)
    return f"처리 결과: {item}"

async def parallel_process(state: ParallelExecutionState):
    # 병렬 처리 로직
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor(max_workers=4) as executor:
        tasks = [loop.run_in_executor(executor, process_parallel_item, item) 
                for item in state["parallel_results"]]
        results = await asyncio.gather(*tasks)
    return {"parallel_results": results}

def aggregate_results(state: ParallelExecutionState):
    # 결과 집계
    aggregated = " | ".join(state["parallel_results"])
    return {"aggregated_result": aggregated}

# 병렬 워크플로우
workflow = StateGraph(ParallelExecutionState)
workflow.add_node("fan_out", fan_out)
workflow.add_node("parallel_process", parallel_process)
workflow.add_node("aggregate_results", aggregate_results)

workflow.set_entry_point("fan_out")
workflow.add_edge("fan_out", "parallel_process")
workflow.add_edge("parallel_process", "aggregate_results")
workflow.add_edge("aggregate_results", END)

parallel_graph = workflow.compile()
Enter fullscreen mode Exit fullscreen mode

상태 관리 패턴

1. 상태 히스토리 관리

class HistoryState(TypedDict):
    messages: List[dict]
    current_step: int
    history: List[dict]

def append_message(state: HistoryState, message: dict):
    return {
        "messages": state["messages"] + [message],
        "current_step": state["current_step"] + 1
    }
Enter fullscreen mode Exit fullscreen mode

2. 상태 재사용 패턴

def state_reuse_pattern(state: dict, new_state: dict):
    # 기존 상태와 새로운 상태 병합
    merged = {**state, **new_state}
    return merged
Enter fullscreen mode Exit fullscreen mode

스트리밍 및 실시간 업데이트


python
from langchain_core.callbacks import StreamingCallbackHandler

class StreamCallback(StreamingCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs):
        print(f"토큰: {token}")

async def stream_execution(graph, input_data):
    #

---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)