DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v39)

LangGraph 워크플로우 템플릿 (v39)

LangGraph 아키텍처 개요

LangGraph는 상태 기반 워크플로우를 구현하기 위한 프레임워크로, 다음과 같은 핵심 구성 요소로 작동합니다:

Nodes (노드): 워크플로우의 각 단계. 각 노드는 특정 작업을 수행하고 상태를 업데이트합니다.

Edges (엣지): 노드 간의 전이 조건을 정의합니다.

State (상태): 워크플로우의 현재 상태를 유지합니다. 노드가 상태를 읽고 변경할 수 있습니다.

Checkpointing (체크포인팅): 상태를 저장하고 복원할 수 있어 중단 후 복구가 가능합니다.

from langgraph.graph import StateGraph
from typing import TypedDict, Annotated
import operator

class AgentState(TypedDict):
    messages: Annotated[list, operator.add]

# 기본 워크플로우 구성
workflow = StateGraph(AgentState)
Enter fullscreen mode Exit fullscreen mode

템플릿 1: 단순 RAG 에이전트

문서 검색 후 생성하고 검증하는 단순한 RAG 워크플로우입니다:

from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate

class RAGState(TypedDict):
    query: str
    documents: list
    response: str
    validated: bool

def retrieve(state: RAGState):
    # 문서 검색 로직
    documents = vector_store.similarity_search(state["query"])
    return {"documents": documents}

def generate(state: RAGState):
    # 생성 로직
    prompt = PromptTemplate.from_template(
        "Context: {context}\n\nQuestion: {query}\n\nAnswer:"
    )
    llm = ChatOpenAI(model="gpt-4")
    response = llm.invoke([
        ("system", "You are a helpful assistant."),
        ("user", prompt.format(
            context="\n".join([doc.page_content for doc in state["documents"]]),
            query=state["query"]
        ))
    ])
    return {"response": response.content}

def validate(state: RAGState):
    # 응답 검증
    # 간단한 예: 응답이 질문과 관련된 내용인지 확인
    return {"validated": True}

# 워크플로우 생성
rag_workflow = StateGraph(RAGState)
rag_workflow.add_node("retrieve", retrieve)
rag_workflow.add_node("generate", generate)
rag_workflow.add_node("validate", validate)

rag_workflow.add_edge("retrieve", "generate")
rag_workflow.add_edge("generate", "validate")
rag_workflow.set_entry_point("retrieve")
rag_workflow.set_finish_point("validate")

rag_app = rag_workflow.compile()
Enter fullscreen mode Exit fullscreen mode

템플릿 2: 다중 도구 에이전트

계획 → 실행 → 관찰 → 결정의 반복적인 루프:

from typing import List
import json

class ToolAgentState(TypedDict):
    task: str
    plan: List[str]
    execution_history: List[dict]
    final_answer: str

def plan(state: ToolAgentState):
    # 작업 계획 생성
    llm = ChatOpenAI(model="gpt-4")
    plan_prompt = PromptTemplate.from_template(
        "Break down the task '{task}' into specific steps. Format as JSON array of strings."
    )
    response = llm.invoke([
        ("system", "You are a task planning assistant."),
        ("user", plan_prompt.format(task=state["task"]))
    ])

    try:
        steps = json.loads(response.content)
        return {"plan": steps}
    except:
        return {"plan": [state["task"]]}

def execute(state: ToolAgentState):
    # 도구 실행
    execution_results = []

    for step in state["plan"]:
        # 예시: 각 단계에 대한 도구 실행 로직
        result = {"step": step, "status": "completed", "output": f"Result for {step}"}
        execution_results.append(result)

    return {"execution_history": execution_results}

def observe(state: ToolAgentState):
    # 실행 결과 관찰
    # 예: 성공/실패 여부 확인
    failed_steps = [
        step for step in state["execution_history"] 
        if step.get("status") == "failed"
    ]
    return {"execution_history": state["execution_history"]}

def decide(state: ToolAgentState):
    # 결정 로직
    if not state["execution_history"]:
        return {"final_answer": "No execution history available"}

    all_success = all(step.get("status") == "completed" 
                     for step in state["execution_history"])

    if all_success:
        return {"final_answer": "All tasks completed successfully"}
    else:
        return {"final_answer": "Some tasks failed"}

# 워크플로우 구성
tool_workflow = StateGraph(ToolAgentState)
tool_workflow.add_node("plan", plan)
tool_workflow.add_node("execute", execute)
tool_workflow.add_node("observe", observe)
tool_workflow.add_node("decide", decide)

tool_workflow.add_edge("plan", "execute")
tool_workflow.add_edge("execute", "observe")
tool_workflow.add_edge("observe", "decide")
tool_workflow.set_entry_point("plan")
tool_workflow.set_finish_point("decide")

tool_app = tool_workflow.compile()
Enter fullscreen mode Exit fullscreen mode

템플릿 3: 인간-중개 워크플로우

사용자 검토 후 진행하는 인간 인터페이스:

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState
import asyncio

class HumanReviewState(MessagesState):
    review_status: str
    review_comment: str

def pause_for_review(state: HumanReviewState):
    # 사용자 검토를 위한 일시정지
    return {"review_status": "pending"}

def review_process(state: HumanReviewState):
    # 검토 프로세스
    return {"review_status": "approved"}  # 또는 "rejected"

def continue_with_review(state: HumanReviewState):
    # 검토 후 진행
    if state["review_status"] == "rejected":
        return {"messages": [{"role": "assistant", "content": "Process rejected by human review"}]}
    return {"messages": [{"role": "assistant", "content": "Process continues after review"}]}

# 인간 검토 워크플로우
review_workflow = StateGraph(HumanReviewState)
review_workflow.add_node("pause", pause_for_review)
review_workflow.add_node("review", review_process)
review_workflow.add_node("continue", continue_with_review)

review_workflow.add_edge("pause", "review")
review_workflow.add_edge("review", "continue")
review_workflow.set_entry_point("pause")
review_workflow.set_finish_point("continue")

review_app = review_workflow.compile(checkpointer=MemorySaver())
Enter fullscreen mode Exit fullscreen mode

템플릿 5: 병렬 실행 에이전트

파이프라인에서 병렬 처리 후 집계:

from concurrent.futures import ThreadPoolExecutor
import time

class ParallelAgentState(TypedDict):
    data: List[str]
    processed_results: List[dict]
    aggregated_result: dict

def fan_out(state: ParallelAgentState):
    # 데이터를 병렬로 분할
    return {"processed_results": []}

def process_item(item: dict):
    # 각 항목 처리
    time.sleep(0.1)  # 시뮬레이션
    return {
        "id": item["id"],
        "processed": True,
        "result": f"Processed {item['data']}"
    }

def aggregate_results(state: ParallelAgentState):
    # 결과 집계
    results = state["processed_results"]
    total = len(results)
    successful = sum(1 for r in results if r.get("processed"))

    return {
        "aggregated_result": {
            "total_items": total,
            "successful": successful,
            "failed": total - successful
        }
    }

# 병렬 처리 워크플로우
parallel_workflow = StateGraph(ParallelAgentState)
parallel_workflow.add_node("fan_out", fan_out)
parallel_workflow.add_node("aggregate", aggregate_results)

parallel_workflow.add_edge("fan_out", "aggregate")
parallel_workflow.set_entry_point("fan_out")
parallel_workflow.set_finish_point("aggregate")

parallel_app = parallel_workflow.compile()
Enter fullscreen mode Exit fullscreen mode

상태 관리 패턴

1. 상태 초기화

def initialize_state():
    return {
        "messages": [],
        "timestamp": time.time(),
        "version": "v39",
        "metadata": {}
    }

# 상태 초기화 및 체크포인트
initial_state = initialize_state()
Enter fullscreen mode Exit fullscreen mode

2. 상태 병합

def merge_state(current_state, new_state):
    merged = current_state.copy()
    for key, value in new_state.items():
        if key in merged and isinstance(merged[key], list):
            merged[key].extend(value)
        else:
            merged[key] = value
    return merged
Enter fullscreen mode Exit fullscreen mode

3. 상태 유효성 검사


python
def validate_state(state):
    required_fields = ["messages", "timestamp"]
    for field in required_fields:
        if field not in state:
            raise ValueError(f"Missing required field: {field}")


---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)