DEV Community

matias yoon
matias yoon

Posted on

LangGraph 워크플로우 템플릿 (v38)

LangGraph 워크플로우 템플릿 (v38)

Python 개발자를 위한 LangGraph 기반 AI 에이전트 워크플로우 템플릿

LangChain과 LangGraph를 사용한 Python 기반 AI 에이전트 개발을 위한 실전 가이드입니다. 이 템플릿은 실제 개발자들이 겪는 문제를 해결하기 위해 설계되었습니다.

1. LangGraph 아키텍처 개요

LangGraph는 상태 기반 워크플로우 시스템으로, 다음과 같은 핵심 구성 요소로 구성됩니다:

핵심 구성 요소:

  • Nodes: 워크플로우의 단계
  • Edges: 노드 간의 연결
  • State: 워크플로우의 상태 관리
  • Checkpointing: 상태 저장 및 복원
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator

class GraphState(TypedDict):
    messages: Annotated[list, operator.add]
    context: str

# 그래프 생성
workflow = StateGraph(GraphState)
Enter fullscreen mode Exit fullscreen mode

2. 템플릿 1: 간단한 RAG 에이전트 (검색 → 생성 → 검증)

대부분의 LLM 응용 프로그램은 RAG(검색 기반 생성) 패턴을 기반으로 합니다:

from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

class RAGAgent:
    def __init__(self, llm):
        self.llm = llm
        self.retriever = self._setup_retriever()
        self.prompt = PromptTemplate.from_template(
            "주어진 컨텍스트를 사용하여 질문에 답하세요:\n\n{context}\n\n질문: {question}"
        )

    def retrieve(self, state):
        question = state["messages"][-1].content
        docs = self.retriever.invoke(question)
        context = "\n\n".join([doc.page_content for doc in docs])
        return {"context": context}

    def generate(self, state):
        chain = self.prompt | self.llm | StrOutputParser()
        result = chain.invoke({
            "question": state["messages"][-1].content,
            "context": state["context"]
        })
        return {"messages": [result]}

    def validate(self, state):
        # 간단한 검증 로직
        if len(state["messages"][-1].content) < 10:
            return {"messages": ["질문에 대한 충분한 정보가 없습니다."]}
        return {"messages": [state["messages"][-1].content]}

# 워크플로우 정의
def create_rag_workflow():
    workflow = StateGraph(GraphState)

    workflow.add_node("retrieve", RAGAgent.retrieve)
    workflow.add_node("generate", RAGAgent.generate)
    workflow.add_node("validate", RAGAgent.validate)

    workflow.set_entry_point("retrieve")
    workflow.add_edge("retrieve", "generate")
    workflow.add_edge("generate", "validate")
    workflow.add_edge("validate", END)

    return workflow.compile()
Enter fullscreen mode Exit fullscreen mode

3. 템플릿 2: 다중 도구 에이전트 (계획 → 실행 → 관찰 → 결정)

복잡한 작업을 처리하는 다중 도구 에이전트:

from langchain.tools import Tool
from langchain_core.messages import ToolMessage
import json

class MultiToolAgent:
    def __init__(self):
        # 예시 도구들
        self.tools = [
            Tool(
                name="weather",
                func=lambda location: f"{location}의 날씨는 맑습니다.",
                description="날씨 정보 조회"
            ),
            Tool(
                name="calculator",
                func=lambda expression: f"결과: {eval(expression)}",
                description="수학 계산"
            )
        ]

    def plan(self, state):
        # 작업 계획 생성
        plan = {
            "tasks": [
                {"name": "weather", "params": {"location": "서울"}},
                {"name": "calculator", "params": {"expression": "2+2"}}
            ]
        }
        return {"plan": plan}

    def execute(self, state):
        # 도구 실행
        results = []
        for task in state["plan"]["tasks"]:
            tool = next(t for t in self.tools if t.name == task["name"])
            result = tool.func(**task["params"])
            results.append(result)
        return {"execution_results": results}

    def observe(self, state):
        # 결과 관찰 및 분석
        return {"analysis": "모든 작업이 완료되었습니다."}

    def decide(self, state):
        # 결정 로직
        if len(state["execution_results"]) > 0:
            return {"messages": ["작업이 성공적으로 완료되었습니다."]}
        return {"messages": ["작업 실패."]}
Enter fullscreen mode Exit fullscreen mode

4. 템플릿 3: 인간-중개 워크플로우 (일시정지 → 검토 → 계속)

사람의 개입이 필요한 중요한 결정을 위한 워크플로우:

class HumanInLoopAgent:
    def __init__(self):
        self.user_feedback = None

    def pause_for_review(self, state):
        # 사용자 검토를 위한 일시정지
        return {"status": "pending_review"}

    def review(self, state):
        # 사용자 검토 처리
        if self.user_feedback is not None:
            return {"messages": [self.user_feedback]}
        return {"messages": ["사용자 검토가 필요합니다."]}

    def continue_workflow(self, state):
        # 워크플로우 재개
        return {"status": "processing"}

# 사용자 피드백 설정
def set_user_feedback(feedback):
    agent = HumanInLoopAgent()
    agent.user_feedback = feedback
    return agent
Enter fullscreen mode Exit fullscreen mode

5. 템플릿 5: 병렬 실행 에이전트 (분기 → 처리 → 집계)

대량 데이터를 병렬로 처리:

from concurrent.futures import ThreadPoolExecutor
import asyncio

class ParallelAgent:
    def __init__(self, max_workers=4):
        self.max_workers = max_workers

    def fan_out(self, state):
        # 입력 데이터 분기
        data_chunks = [
            {"chunk_id": i, "data": f"chunk_{i}"}
            for i in range(4)
        ]
        return {"chunks": data_chunks}

    def process_chunk(self, chunk_data):
        # 개별 청크 처리
        time.sleep(0.1)  # 시뮬레이션
        return {"result": f"processed_{chunk_data['data']}"}

    async def parallel_process(self, state):
        # 병렬 처리
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = [
                executor.submit(self.process_chunk, chunk)
                for chunk in state["chunks"]
            ]
            results = [future.result() for future in futures]
        return {"processed_results": results}

    def aggregate(self, state):
        # 결과 집계
        aggregated = {
            "total_count": len(state["processed_results"]),
            "results": [r["result"] for r in state["processed_results"]]
        }
        return {"messages": [f"{aggregated['total_count']}개 처리 완료"]}
Enter fullscreen mode Exit fullscreen mode

6. 상태 관리 패턴

효율적인 상태 관리:

from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint import BaseCheckpointSaver

class StateManager:
    def __init__(self):
        # 메모리 체크포인트 사용
        self.checkpointer = MemorySaver()

    def get_state(self, thread_id):
        # 상태 조회
        return self.checkpointer.get(thread_id)

    def update_state(self, thread_id, new_state):
        # 상태 업데이트
        self.checkpointer.put(thread_id, new_state)

    def rollback(self, thread_id, version):
        # 이전 버전으로 롤백
        pass

# 상태 저장 설정
def setup_workflow_with_checkpoint():
    workflow = StateGraph(GraphState)
    workflow.add_checkpoint(MemorySaver())
    return workflow
Enter fullscreen mode Exit fullscreen mode

7. 스트리밍 및 실시간 업데이트

실시간 처리 지원:


python
from langchain_core.callbacks import BaseCallbackHandler

class StreamingHandler(BaseCallbackHandler):
    def on_chain_start(self, serialized, inputs, **kwargs):
        print("체인 시작")

    def on_chain_end(self, outputs, **kwargs):
        print("체인 종료")

    def on_llm_new_token(self, token, **kwargs):
        print(f"토큰: {token}", end="", flush=True)

class StreamingAgent:
    def __init__(self):
        self.streaming_handler = StreamingHandler()

    def stream_response(self, query):
        # 스트리밍 응답
        llm = ChatOpenAI(callbacks=[self.streaming_handler])
        response = llm.invoke(query)
        return response

# 사용 예시
async def stream_example():


---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)