DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v22)

LangGraph 워크플로우 템플릿 (v22)

재사용 가능한 LangGraph 템플릿 가이드

Python 개발자들을 위한 LangChain/LangGraph 기반 AI 에이전트 구축을 위한 실용적인 워크플로우 템플릿 모음


1. LangGraph 아키텍처 개요

LangGraph는 상태 기반 워크플로우를 위한 고급 프레임워크로, 다음과 같은 핵심 구성 요소를 포함합니다:

Nodes (노드): 각각의 작업 단위. 함수나 클래스로 정의됩니다.
Edges (엣지): 노드 간의 실행 흐름을 정의합니다.
State (상태): 워크플로우에서 공유되는 상태 데이터 구조.
Checkpointing (체크포인트): 상태 저장 및 복원 기능.

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator

class State(TypedDict):
    messages: Annotated[list, operator.add]

graph = StateGraph(State)
Enter fullscreen mode Exit fullscreen mode

2. 템플릿 1: 간단한 RAG 에이전트 (검색 → 생성 → 검증)

실제 문서 검색 및 생성 시나리오에서 유용한 기본 RAG 워크플로우입니다:

from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser

def retrieve(state):
    # 검색 로직
    query = state["messages"][-1].content
    # 여기서 벡터 데이터베이스 검색 로직 추가
    return {"retrieved_docs": ["문서 내용 1", "문서 내용 2"]}

def generate(state):
    # 생성 로직
    docs = state["retrieved_docs"]
    prompt = PromptTemplate.from_template(
        "다음 문서를 기반으로 질문에 답하세요: {docs} 질문: {query}"
    )
    chain = prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()
    response = chain.invoke({
        "docs": "\n".join(docs),
        "query": state["messages"][-1].content
    })
    return {"generated_response": response}

def validate(state):
    # 응답 검증 로직
    response = state["generated_response"]
    # 간단한 유효성 검사
    if len(response) < 10:
        return {"validated": False, "error": "응답이 너무 짧습니다"}
    return {"validated": True}

# 그래프 구성
workflow = StateGraph(State)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)
workflow.add_node("validate", validate)

workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.add_conditional_edges(
    "validate",
    lambda x: "validated" in x and x["validated"],
    {"validated": END, "invalid": "generate"}
)
Enter fullscreen mode Exit fullscreen mode

3. 템플릿 2: 멀티-도구 에이전트 (계획 → 실행 → 관찰 → 결정)

복잡한 작업을 분석하고 여러 도구를 순차적으로 사용하는 에이전트:

from langchain.tools import Tool
from langchain_openai import ChatOpenAI

def plan(state):
    # 작업 계획 생성
    prompt = "다음 작업을 수행할 계획을 세우세요: {query}"
    plan_result = ChatOpenAI(model="gpt-4").invoke(prompt)
    return {"plan": plan_result.content}

def execute(state):
    # 계획 실행 (도구 호출)
    plan = state["plan"]
    # 도구 실행 로직 (예: API 호출 등)
    return {"executed": True, "tool_output": "도구 실행 결과"}

def observe(state):
    # 실행 결과 관찰
    tool_output = state["tool_output"]
    return {"observation": f"도구 출력: {tool_output}"}

def decide(state):
    # 결정 로직
    observation = state["observation"]
    # 판단 로직 (예: 성공/실패 여부)
    return {"decision": "continue" if "성공" in observation else "retry"}

# 도구 정의
tools = [
    Tool(
        name="search",
        func=lambda x: f"검색 결과: {x}",
        description="웹 검색을 수행합니다"
    )
]

# 워크플로우 구성
workflow = StateGraph(State)
workflow.add_node("plan", plan)
workflow.add_node("execute", execute)
workflow.add_node("observe", observe)
workflow.add_node("decide", decide)

workflow.set_entry_point("plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.add_conditional_edges(
    "decide",
    lambda x: x["decision"],
    {"continue": END, "retry": "plan"}
)
Enter fullscreen mode Exit fullscreen mode

4. 템플릿 3: 인간-중개 워크플로우 (일시정지 → 검토 → 계속)

사용자 검토가 필요한 민감한 작업이나 중요한 결정 시 사용:

import asyncio
from typing import Optional

class HumanReviewState(TypedDict):
    messages: list
    pending_review: bool
    review_approved: Optional[bool]

async def pause_for_review(state):
    # 리뷰 대기
    print("사용자 검토를 위해 일시정지...")
    return {"pending_review": True}

async def review_and_continue(state):
    # 사용자 검토 처리
    # 이 부분은 실제 UI나 외부 시스템 통신으로 연결됨
    user_input = input("승인하시겠습니까? (y/n): ")
    approved = user_input.lower() == 'y'
    return {"review_approved": approved, "pending_review": False}

async def human_in_the_loop_workflow():
    workflow = StateGraph(HumanReviewState)
    workflow.add_node("process", process_task)
    workflow.add_node("pause", pause_for_review)
    workflow.add_node("review", review_and_continue)

    workflow.set_entry_point("process")
    workflow.add_edge("process", "pause")
    workflow.add_edge("pause", "review")
    workflow.add_conditional_edges(
        "review",
        lambda x: x["review_approved"],
        {"True": END, "False": "process"}  # 재시작
    )
    return workflow.compile()

async def process_task(state):
    # 작업 처리 로직
    return {"messages": state["messages"] + ["작업 완료"]}
Enter fullscreen mode Exit fullscreen mode

5. 템플릿 5: 병렬 실행 에이전트 (분기 → 처리 → 집계)

하나의 입력을 여러 경로로 분기하여 병렬로 처리 후 결과 집계:

from concurrent.futures import ThreadPoolExecutor
import asyncio

def fan_out(state):
    # 입력 분기
    query = state["messages"][-1].content
    # 여러 하위 작업 생성
    return {"sub_tasks": [
        {"task": "분석1", "input": query},
        {"task": "분석2", "input": query},
        {"task": "분석3", "input": query}
    ]}

def process_parallel(state):
    # 병렬 처리
    sub_tasks = state["sub_tasks"]

    def worker(task):
        # 각각의 작업 처리
        return f"{task['task']} 결과"

    with ThreadPoolExecutor(max_workers=3) as executor:
        results = list(executor.map(worker, sub_tasks))

    return {"results": results}

def aggregate(state):
    # 결과 집계
    results = state["results"]
    final_result = " | ".join(results)
    return {"final_output": final_result}

# 그래프 구성
workflow = StateGraph(State)
workflow.add_node("fan_out", fan_out)
workflow.add_node("process_parallel", process_parallel)
workflow.add_node("aggregate", aggregate)

workflow.set_entry_point("fan_out")
workflow.add_edge("fan_out", "process_parallel")
workflow.add_edge("process_parallel", "aggregate")
workflow.add_edge("aggregate", END)
Enter fullscreen mode Exit fullscreen mode

6. 상태 관리 패턴

상태를 효과적으로 관리하기 위한 패턴:

# 상태 정의
class AgentState(TypedDict):
    messages: Annotated[list, operator.add]
    step_count: int
    error_count: int
    last_error: Optional[str]

# 상태 업데이트 함수
def update_state(state, updates):
    return {**state, **updates}

# 체크포인트 저장
from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

# 체크포인트 복원
for checkpoint in graph.stream(input_state, config={"configurable": {"thread_id": "1"}}):
    print(checkpoint)
Enter fullscreen mode Exit fullscreen mode

7. 스트리밍 및 실시간 업데이트

실시간 응답을 제공하기 위한 스트리밍 기능:


python
from langchain_core.callbacks import BaseCallbackHandler



---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)