DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v30)

LangGraph 워크플로우 템플릿 (v30)

LangGraph 아키텍처 개요

LangGraph는 그래프 기반 워크플로우를 구축하기 위한 강력한 프레임워크입니다. 핵심 구성 요소는 다음과 같습니다:

노드 (Nodes)

워크플로우의 각 작업 단계입니다. 일반적으로 함수 형태로 구현됩니다.

엣지 (Edges)

노드 간의 실행 흐름을 정의합니다.

상태 (State)

워크플로우 전체에서 공유되는 데이터 구조입니다.

체크포인트 (Checkpointing)

중단된 위치에서 다시 시작할 수 있도록 상태를 저장하는 기능입니다.

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator

class GraphState(TypedDict):
    messages: Annotated[list, operator.add]
    user_input: str
    context: str

# 기본 워크플로우 구조
workflow = StateGraph(GraphState)
Enter fullscreen mode Exit fullscreen mode

템플릿 1: 간단한 RAG 에이전트 (검색 → 생성 → 유효성 검사)

이 템플릿은 검색기반 생성(RAG) 시스템을 구현합니다.

from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableLambda
from langgraph.graph import StateGraph, END
from typing import TypedDict
import operator

class RagState(TypedDict):
    query: str
    context: str
    generated_response: str
    validation_result: bool

# 검색 노드
def retrieve_node(state: RagState):
    # 실제 검색 로직 (예: ChromaDB, Elasticsearch)
    context = search_vector_db(state["query"])
    return {"context": context}

# 생성 노드
def generate_node(state: RagState):
    prompt = PromptTemplate.from_template(
        "다음 문맥을 기반으로 질문에 답하세요:\n\n{context}\n\n질문: {query}"
    )
    llm = ChatOpenAI(model="gpt-4")
    response = llm.invoke(prompt.format(context=state["context"], query=state["query"]))
    return {"generated_response": response.content}

# 유효성 검사 노드
def validate_node(state: RagState):
    # 응답 유효성 검사 로직
    is_valid = check_response_quality(state["generated_response"])
    return {"validation_result": is_valid}

# 워크플로우 정의
workflow = StateGraph(RagState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_node("validate", validate_node)

workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")

# 유효성 검사에 따라 분기
def route_validation(state: RagState):
    return "generate" if not state["validation_result"] else END

workflow.add_conditional_edges("validate", route_validation)
Enter fullscreen mode Exit fullscreen mode

템플릿 2: 멀티-도구 에이전트 (계획 → 실행 → 관찰 → 결정)

복잡한 작업을 계획하고 실행하는 에이전트입니다.

import json
from typing import List, Dict

class ToolAgentState(TypedDict):
    task: str
    plan: List[str]
    execution_history: List[Dict]
    final_answer: str

# 계획 노드
def plan_node(state: ToolAgentState):
    prompt = PromptTemplate.from_template(
        """
        다음 작업을 수행하기 위한 단계별 계획을 생성하세요:
        {task}

        계획은 JSON 형식으로 반환하세요.
        """
    )

    llm = ChatOpenAI(model="gpt-4")
    response = llm.invoke(prompt.format(task=state["task"]))

    plan = json.loads(response.content)
    return {"plan": plan}

# 실행 노드
def execute_node(state: ToolAgentState):
    # 현재 실행할 단계 추출
    current_step = state["plan"][len(state["execution_history"])]

    # 도구 실행 (예: API 호출, 파일 조작)
    result = execute_tool(current_step)

    # 실행 기록 업데이트
    new_history = state["execution_history"] + [{"step": current_step, "result": result}]
    return {"execution_history": new_history}

# 관찰 노드
def observe_node(state: ToolAgentState):
    # 실행 결과 검토
    last_result = state["execution_history"][-1]["result"]
    observations = analyze_result(last_result)
    return {"observations": observations}

# 결정 노드
def decide_node(state: ToolAgentState):
    # 실행 완료 여부 판단
    if len(state["execution_history"]) >= len(state["plan"]):
        return {"final_answer": compile_final_answer(state)}
    else:
        return {"next_step": "execute"}

# 워크플로우 정의
workflow = StateGraph(ToolAgentState)
workflow.add_node("plan", plan_node)
workflow.add_node("execute", execute_node)
workflow.add_node("observe", observe_node)
workflow.add_node("decide", decide_node)

workflow.set_entry_point("plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")

def route_decide(state: ToolAgentState):
    if len(state["execution_history"]) >= len(state["plan"]):
        return END
    else:
        return "execute"

workflow.add_conditional_edges("decide", route_decide)
Enter fullscreen mode Exit fullscreen mode

템플릿 3: 인간-중개 워크플로우 (일시 정지 → 검토 → 진행)

사람의 개입이 필요한 워크플로우를 구현합니다.

from datetime import datetime
import time

class HumanInLoopState(TypedDict):
    task: str
    response: str
    user_review: str
    status: str
    timestamp: str

# 작업 생성 노드
def create_task_node(state: HumanInLoopState):
    # 작업 생성
    task_description = generate_task_description(state["task"])
    return {
        "response": task_description,
        "status": "awaiting_review",
        "timestamp": datetime.now().isoformat()
    }

# 일시 정지 노드
def pause_node(state: HumanInLoopState):
    # 사용자에게 검토 요청
    print(f"사용자 검토 필요: {state['response']}")
    time.sleep(2)  # 실제 구현에서는 웹훅 또는 이벤트 시스템 사용
    return {"status": "reviewed"}

# 검토 노드
def review_node(state: HumanInLoopState):
    # 사용자 검토 결과 처리
    user_feedback = get_user_feedback()  # 실제 구현에서 이벤트 처리
    return {"user_review": user_feedback}

# 진행 노드
def continue_node(state: HumanInLoopState):
    # 사용자 피드백에 따라 처리
    if "reject" in state["user_review"].lower():
        return {"status": "rejected"}
    else:
        return {"status": "approved"}

# 워크플로우 정의
workflow = StateGraph(HumanInLoopState)
workflow.add_node("create_task", create_task_node)
workflow.add_node("pause", pause_node)
workflow.add_node("review", review_node)
workflow.add_node("continue", continue_node)

workflow.set_entry_point("create_task")
workflow.add_edge("create_task", "pause")
workflow.add_edge("pause", "review")
workflow.add_edge("review", "continue")

def route_continue(state: HumanInLoopState):
    if state["status"] == "rejected":
        return "create_task"  # 재시작
    else:
        return END

workflow.add_conditional_edges("continue", route_continue)
Enter fullscreen mode Exit fullscreen mode

템플릿 5: 병렬 실행 에이전트 (분기 → 처리 → 집계)

여러 작업을 동시에 처리하고 결과를 집계합니다.


python
from concurrent.futures import ThreadPoolExecutor
import asyncio

class ParallelAgentState(TypedDict):
    tasks: List[str]
    parallel_results: List[Dict]
    aggregated_result: str

# 분기 노드
def fan_out_node(state: ParallelAgentState):
    # 작업 분기
    tasks = state["tasks"]
    return {"parallel_results": []}

# 병렬 처리 노드
def process_parallel_node(state: ParallelAgentState):
    def process_task(task):
        # 각 작업 병렬 처리
        result = execute_task(task)
        return {"task": task, "result": result}

    # 병렬 실행
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(process_task, task) for task in state["tasks"]]
        results = [future.result() for future in futures]

    return {"parallel_results": results}

# 집계 노드
def aggregate_node(state: ParallelAgentState):
    # 결과 집계
    aggregated = aggregate_results(state["parallel_results"])
    return {"aggregated_result": aggregated}

# 워크플로우 정의
workflow = StateGraph(ParallelAgentState)
workflow.add_node("fan_out", fan_out_node)
workflow.add_node("process", process_parallel

---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)