DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v20)

LangGraph 워크플로우 템플릿 (v20)

Python 개발자를 위한 LangChain/LangGraph 기반 AI 에이전트 워크플로우 템플릿

1. LangGraph 아키텍처 개요

LangGraph는 상태 기반 워크플로우를 구현하기 위한 히트맵 기반 그래프 시스템입니다. 핵심 구성 요소는 다음과 같습니다:

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

# 상태 정의
class State(TypedDict):
    messages: Annotated[list, operator.add]
    user_input: str
    context: dict

# 그래프 생성
graph = StateGraph(State)
graph.add_node("process", process_message)
graph.add_edge("process", END)
graph.set_entry_point("process")
Enter fullscreen mode Exit fullscreen mode

핵심 구성 요소:

  • Nodes: 각 작업 단계 (예: 검색, 생성, 검증)
  • Edges: 노드 간의 흐름 제어
  • State: 모든 워크플로우 상태를 포함하는 데이터 구조
  • Checkpointing: 상태 저장 및 복구 기능

2. 템플릿 1: 간단한 RAG 에이전트

문제: RAG 워크플로우에서 검색 → 생성 → 검증 단계의 간단한 흐름을 구현해야 함

from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langchain_core.tools import Tool
from langgraph.graph import StateGraph
from typing import Annotated
import operator

class RAGState(TypedDict):
    query: str
    retrieved_docs: list
    generated_response: str
    validation_result: bool

def retrieve_docs(state: RAGState):
    # 검색 기능
    vector_store = VectorStore()
    docs = vector_store.search(state["query"])
    return {"retrieved_docs": docs}

def generate_response(state: RAGState):
    # 생성 기능
    llm = ChatOpenAI(model="gpt-4")
    prompt = f"""
    다음 문서를 기반으로 질문에 답변하세요:
    문서: {state['retrieved_docs']}
    질문: {state['query']}
    """
    response = llm.invoke([HumanMessage(content=prompt)])
    return {"generated_response": response.content}

def validate_response(state: RAGState):
    # 검증 로직
    # 간단한 검증: 답변이 문서에 포함되는지 확인
    docs_content = " ".join([doc.page_content for doc in state["retrieved_docs"]])
    is_valid = state["generated_response"].lower() in docs_content.lower()
    return {"validation_result": is_valid}

# RAG 워크플로우 생성
rag_graph = StateGraph(RAGState)
rag_graph.add_node("retrieve", retrieve_docs)
rag_graph.add_node("generate", generate_response)
rag_graph.add_node("validate", validate_response)

# 엣지 연결
rag_graph.add_edge("retrieve", "generate")
rag_graph.add_edge("generate", "validate")
rag_graph.set_entry_point("retrieve")
Enter fullscreen mode Exit fullscreen mode

3. 템플릿 2: 멀티툴 에이전트

문제: 복잡한 작업을 계획 → 실행 → 관찰 → 결정 단계로 분리해야 함

from langchain_core.tools import Tool
from langchain_openai import ChatOpenAI

class MultiToolState(TypedDict):
    task: str
    plan: list
    execution_results: list
    decision: str

class ToolExecutor:
    def __init__(self):
        self.tools = {
            "search": Tool(name="search", func=self.search),
            "calculate": Tool(name="calculate", func=self.calculate),
            "summarize": Tool(name="summarize", func=self.summarize)
        }

    def search(self, query: str):
        # 검색 로직
        return f"검색 결과: {query}"

    def calculate(self, expression: str):
        # 계산 로직
        return f"계산 결과: {eval(expression)}"

    def summarize(self, text: str):
        # 요약 로직
        return f"요약: {text[:50]}..."

def create_plan(state: MultiToolState):
    # 작업 계획 생성
    tasks = [
        {"task": "검색", "tool": "search", "input": state["task"]},
        {"task": "계산", "tool": "calculate", "input": "2+2"},
        {"task": "요약", "tool": "summarize", "input": "내용"}
    ]
    return {"plan": tasks}

def execute_plan(state: MultiToolState):
    # 계획 실행
    executor = ToolExecutor()
    results = []

    for task in state["plan"]:
        tool = executor.tools[task["tool"]]
        result = tool.func(task["input"])
        results.append({"task": task["task"], "result": result})

    return {"execution_results": results}

def make_decision(state: MultiToolState):
    # 실행 결과 기반 결정
    if len(state["execution_results"]) > 0:
        return {"decision": "작업 완료"}
    return {"decision": "작업 실패"}

# 멀티툴 워크플로우
multi_tool_graph = StateGraph(MultiToolState)
multi_tool_graph.add_node("plan", create_plan)
multi_tool_graph.add_node("execute", execute_plan)
multi_tool_graph.add_node("decide", make_decision)

multi_tool_graph.add_edge("plan", "execute")
multi_tool_graph.add_edge("execute", "decide")
multi_tool_graph.set_entry_point("plan")
Enter fullscreen mode Exit fullscreen mode

4. 템플릿 3: 인간 인터페이스 워크플로우

문제: 인간의 검토와 승인을 포함한 워크플로우가 필요함

import asyncio
from typing import Optional

class HumanReviewState(TypedDict):
    task: str
    response: Optional[str]
    human_review: Optional[str]
    approved: Optional[bool]

async def generate_response_async(state: HumanReviewState):
    # 비동기 응답 생성
    llm = ChatOpenAI(model="gpt-4")
    prompt = f"다음 요청에 대해 답변하세요: {state['task']}"
    response = llm.invoke([HumanMessage(content=prompt)])
    return {"response": response.content}

async def human_review_node(state: HumanReviewState):
    # 인간 검토 노드 (예: API 호출 또는 사용자 입력 대기)
    print(f"사용자 검토 필요: {state['response']}")
    # 실제 구현에서는 사용자 인터페이스를 통해 검토를 요청
    return {"human_review": "사용자 검토 완료"}

async def approve_decision(state: HumanReviewState):
    # 승인 결정
    return {"approved": True}

# 인간 인터페이스 워크플로우
human_graph = StateGraph(HumanReviewState)
human_graph.add_node("generate", generate_response_async)
human_graph.add_node("review", human_review_node)
human_graph.add_node("approve", approve_decision)

human_graph.add_edge("generate", "review")
human_graph.add_edge("review", "approve")
human_graph.set_entry_point("generate")
Enter fullscreen mode Exit fullscreen mode

5. 템플릿 5: 병렬 실행 에이전트

문제: 여러 작업을 병렬로 처리한 후 결과를 집계해야 함

from concurrent.futures import ThreadPoolExecutor
import asyncio

class ParallelState(TypedDict):
    data: list
    processed_results: list
    aggregated_result: dict

def parallel_processor(data_item: dict):
    # 병렬 처리 로직
    result = {
        "id": data_item["id"],
        "processed": f"처리됨: {data_item['value']}"
    }
    return result

async def fan_out_processing(state: ParallelState):
    # 데이터 분할 및 병렬 처리
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(parallel_processor, item) 
                  for item in state["data"]]
        results = [future.result() for future in futures]

    return {"processed_results": results}

def aggregate_results(state: ParallelState):
    # 결과 집계
    total = sum(item["id"] for item in state["processed_results"])
    return {"aggregated_result": {
        "total": total,
        "count": len(state["processed_results"]),
        "items": state["processed_results"]
    }}

# 병렬 실행 워크플로우
parallel_graph = StateGraph(ParallelState)
parallel_graph.add_node("process", fan_out_processing)
parallel_graph.add_node("aggregate", aggregate_results)

parallel_graph.add_edge("process", "aggregate")
parallel_graph.set_entry_point("process")

# 데이터 예시
data_example = [
    {"id": 1, "value": "a"},
    {"id": 2, "value": "b"},
    {"id": 3, "value": "c"}
]
Enter fullscreen mode Exit fullscreen mode

6. 상태 관리 패턴

문제: 복잡한


📥 Get the full guide on Gumroad: https://gumroad.com/l/auto ($5)

Top comments (0)