DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v3)

LangGraph 워크플로우 템플릿 (v3)

1. LangGraph 아키텍처 개요

LangGraph는 그래프 기반의 AI 워크플로우 엔진으로, 노드(Node), 엣지(Edge), 상태(State), 체크포인트(Chckpointing)로 구성됩니다.

핵심 구성 요소:

  • Nodes: 각각의 작업 단위 (함수 또는 클래스)
  • Edges: 노드 간의 실행 흐름 정의
  • State: 워크플로우 내 모든 데이터 저장
  • Checkpointing: 상태 저장 및 복구 기능
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator

class AgentState(TypedDict):
    messages: Annotated[list, operator.add]
    current_node: str

# 그래프 생성
workflow = StateGraph(AgentState)
Enter fullscreen mode Exit fullscreen mode

2. 템플릿 1: 단순 RAG 에이전트

사용 사례: 문서 검색 및 요약 생성

from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import ChatPromptTemplate

class RAGAgentState(TypedDict):
    query: str
    retrieved_docs: list
    generated_response: str

def retrieve(state):
    # 문서 검색
    vectorstore = Chroma(persist_directory="./chroma_db")
    retriever = vectorstore.as_retriever()
    docs = retriever.invoke(state["query"])
    return {"retrieved_docs": docs}

def generate(state):
    # 응답 생성
    llm = ChatOpenAI(model="gpt-4")
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a helpful assistant."),
        ("human", "{query}\n\nContext: {context}")
    ])

    chain = prompt | llm
    context = "\n".join([doc.page_content for doc in state["retrieved_docs"]])
    response = chain.invoke({
        "query": state["query"],
        "context": context
    })

    return {"generated_response": response.content}

def validate_response(state):
    # 응답 검증
    if not state["generated_response"]:
        return {"generated_response": "Error: No response generated"}
    return {}

# 워크플로우 정의
rag_workflow = StateGraph(RAGAgentState)
rag_workflow.add_node("retrieve", retrieve)
rag_workflow.add_node("generate", generate)
rag_workflow.add_node("validate", validate_response)

# 엣지 정의
rag_workflow.add_edge("retrieve", "generate")
rag_workflow.add_edge("generate", "validate")
rag_workflow.set_entry_point("retrieve")
rag_workflow.set_finish_point("validate")
Enter fullscreen mode Exit fullscreen mode

3. 템플릿 2: 멀티-도구 에이전트

사용 사례: 복잡한 작업 계획 및 실행 (Plan-Execute-Observe-Decide)

from typing import List, Dict
from langchain.tools import Tool
from langchain_openai import OpenAI

class MultiToolAgentState(TypedDict):
    task: str
    plan: List[str]
    execution_history: List[Dict]
    current_step: int
    final_result: str

def plan_task(state):
    llm = OpenAI()
    prompt = f"Plan the following task in steps: {state['task']}"
    plan = llm.invoke(prompt)
    steps = plan.split('\n')
    return {"plan": steps}

def execute_step(state):
    # 단계별 실행
    current_step = state["plan"][state["current_step"]]

    # 예시 도구들
    tools = [
        Tool(name="search", func=lambda x: f"Search result for {x}"),
        Tool(name="calculate", func=lambda x: f"Calculation result: {x}"),
        Tool(name="summarize", func=lambda x: f"Summary: {x}")
    ]

    # 도구 실행 로직 (실제 구현은 도구에 따라 다름)
    tool = tools[0]  # 간단한 예시
    result = tool.run(current_step)

    return {
        "execution_history": [
            *state["execution_history"],
            {"step": current_step, "result": result}
        ],
        "current_step": state["current_step"] + 1
    }

def decide_next(state):
    # 다음 단계 결정
    if state["current_step"] >= len(state["plan"]):
        return {"final_result": "Task completed"}
    return {"current_step": state["current_step"]}

# 워크플로우 정의
multi_tool_workflow = StateGraph(MultiToolAgentState)
multi_tool_workflow.add_node("plan", plan_task)
multi_tool_workflow.add_node("execute", execute_step)
multi_tool_workflow.add_node("decide", decide_next)

multi_tool_workflow.add_edge("plan", "execute")
multi_tool_workflow.add_edge("execute", "decide")
multi_tool_workflow.add_conditional_edges(
    "decide",
    lambda x: "continue" if x["current_step"] < len(x["plan"]) else "finish",
    {
        "continue": "execute",
        "finish": END
    }
)
multi_tool_workflow.set_entry_point("plan")
Enter fullscreen mode Exit fullscreen mode

4. 템플릿 3: 인간-중개 워크플로우

사용 사례: 인간 검토 후 실행 (중요한 결정이나 보안 이슈)

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END

class HumanInLoopState(TypedDict):
    task: str
    decision_required: bool
    human_review: str
    processed_data: str

def auto_process(state):
    # 자동 처리
    llm = ChatOpenAI(model="gpt-4")
    prompt = f"Process this task: {state['task']}"
    result = llm.invoke(prompt)
    return {"processed_data": result.content, "decision_required": True}

def human_review(state):
    # 인간 검토 (일시 정지)
    return {"human_review": "Review required"}

def manual_decision(state):
    # 인간 결정
    return {"decision_required": False}

# 체크포인트 설정
memory = MemorySaver()
human_loop_workflow = StateGraph(HumanInLoopState)
human_loop_workflow.add_node("auto_process", auto_process)
human_loop_workflow.add_node("human_review", human_review)
human_loop_workflow.add_node("manual_decision", manual_decision)

human_loop_workflow.add_edge("auto_process", "human_review")
human_loop_workflow.add_edge("human_review", "manual_decision")
human_loop_workflow.set_entry_point("auto_process")
Enter fullscreen mode Exit fullscreen mode

5. 템플릿 5: 병렬 실행 에이전트

사용 사례: 여러 작업을 동시에 처리 후 집계

from concurrent.futures import ThreadPoolExecutor
import asyncio

class ParallelAgentState(TypedDict):
    inputs: List[str]
    processed_results: List[Dict]
    aggregated_result: str

def fan_out_process(state):
    # fan-out: 여러 입력 처리
    inputs = state["inputs"]
    results = []

    # 병렬 처리
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(process_single_input, inp) for inp in inputs]
        for future in futures:
            results.append(future.result())

    return {"processed_results": results}

def process_single_input(input_data):
    # 각 입력 처리
    llm = ChatOpenAI(model="gpt-4")
    prompt = f"Process: {input_data}"
    result = llm.invoke(prompt)
    return {"input": input_data, "output": result.content}

def aggregate_results(state):
    # 결과 집계
    results = state["processed_results"]
    aggregated = "\n".join([f"{r['input']}: {r['output']}" for r in results])
    return {"aggregated_result": aggregated}

# 워크플로우 정의
parallel_workflow = StateGraph(ParallelAgentState)
parallel_workflow.add_node("fan_out", fan_out_process)
parallel_workflow.add_node("aggregate", aggregate_results)

parallel_workflow.add_edge("fan_out", "aggregate")
parallel_workflow.set_entry_point("fan_out")
Enter fullscreen mode Exit fullscreen mode

6. 상태 관리 패턴


python
from langgraph.checkpoint import Checkpoint
from typing import Any, Dict

class StateManager:
    def __init__(self, checkpoint_path: str):
        self.checkpoint_path = checkpoint_path

    def save_state(self, state: Dict[str, Any], thread_id: str):
        """상태 저장"""
        checkpoint = Checkpoint(
            thread_id=thread_id,
            state=state,
            metadata={"timestamp": time.time()}
        )
        # 실제 저장 로직
        with open(f"{self.checkpoint_path}/{thread_id}.json", "w") as f:
            json.dump(checkpoint.dict(), f)

    def load_state(self, thread_id: str) -> Dict[str, Any]:
        """상태 로드"""
        try:
            with open(f"{self.checkpoint_path}/{thread_id}.json", "r") as f:
                checkpoint_data = json.load(f)
                return checkpoint_data["state"]
        except FileNotFoundError:
            return {}

# 상태 초기화 및 관

---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)