LangGraph 워크플로우 템플릿 (v35)
LangGraph 아키텍처 개요
LangGraph는 상태 기반 워크플로우를 구현하는 라이브러리로, 다음 구성 요소로 이루어져 있습니다:
Nodes (노드): 워크플로우의 각 단계. 입력 상태를 받아 출력 상태를 반환하는 함수.
Edges (엣지): 노드 간의 실행 흐름을 정의. 조건부 또는 고정된 경로를 가질 수 있음.
State (상태): 워크플로우 전체에서 공유되는 데이터 구조. 입력, 출력, 중간 상태 모두 포함.
Checkpointing (체크포인팅): 실행 상태를 저장하고 복구할 수 있는 기능.
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage
import operator
# 상태 정의
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], operator.add]
# 추가 상태 필드...
# 노드 정의
def retrieve_node(state):
# 검색 로직
pass
def generate_node(state):
# 생성 로직
pass
# 워크플로우 생성
workflow = StateGraph(AgentState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", END)
템플릿 1: 간단한 RAG 에이전트
문제: 문서 검색 후 생성하는 일반적인 RAG 패턴을 쉽게 구현할 수 있는 템플릿
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.vectorstores import Chroma
class RAGAgent:
def __init__(self, vector_store, llm):
self.vector_store = vector_store
self.llm = llm
def retrieve(self, state):
# 질문에 기반하여 문서 검색
query = state["messages"][-1].content
docs = self.vector_store.similarity_search(query, k=3)
return {"context": docs}
def generate(self, state):
# 검색된 문서를 기반으로 응답 생성
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="You are a helpful assistant"),
HumanMessage(content="Context: {context}\n\nQuestion: {question}")
])
chain = prompt | self.llm
response = chain.invoke({
"context": "\n\n".join([doc.page_content for doc in state["context"]]),
"question": state["messages"][-1].content
})
return {"messages": [response]}
def validate(self, state):
# 생성된 응답 검증
return {"validated": True}
# 사용 예시
def create_rag_workflow():
agent = RAGAgent(vector_store, llm)
workflow = StateGraph(AgentState)
workflow.add_node("retrieve", agent.retrieve)
workflow.add_node("generate", agent.generate)
workflow.add_node("validate", agent.validate)
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.add_edge("validate", END)
return workflow.compile()
템플릿 2: 멀티-도구 에이전트
문제: 여러 도구를 순차적으로 사용하여 문제를 해결해야 하는 복잡한 작업을 관리하는 템플릿
from langchain.tools import Tool
from langchain_core.output_parsers import JsonOutputParser
class MultiToolAgent:
def __init__(self, tools):
self.tools = tools
def plan(self, state):
# 문제 해결 계획 생성
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="You are a helpful assistant that plans tasks"),
HumanMessage(content="Plan tasks for: {question}\nAvailable tools: {tools}")
])
chain = prompt | self.llm | JsonOutputParser()
plan = chain.invoke({
"question": state["messages"][-1].content,
"tools": [t.name for t in self.tools]
})
return {"plan": plan}
def execute(self, state):
# 계획된 작업 실행
plan = state["plan"]
results = []
for task in plan["tasks"]:
tool = next((t for t in self.tools if t.name == task["tool"]), None)
if tool:
result = tool.run(task["input"])
results.append(result)
return {"execution_results": results}
def observe(self, state):
# 실행 결과 관찰 및 피드백 수집
return {"feedback": "Completed successfully"}
def decide(self, state):
# 다음 단계 결정
feedback = state["feedback"]
if feedback == "Completed successfully":
return "continue"
else:
return "retry"
# 사용 예시
def create_multitool_workflow():
agent = MultiToolAgent(tools)
workflow = StateGraph(AgentState)
workflow.add_node("plan", agent.plan)
workflow.add_node("execute", agent.execute)
workflow.add_node("observe", agent.observe)
workflow.add_node("decide", agent.decide)
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.add_conditional_edges("decide",
lambda x: x["decision"],
{"continue": END, "retry": "plan"}
)
return workflow.compile()
템플릿 3: 인간-중개 워크플로우
문제: 인간의 개입이 필요한 상황에서 자동화를 유지하면서 검토 및 승인을 처리하는 템플릿
from langgraph.checkpoint.memory import MemorySaver
from datetime import datetime
class HumanInLoopAgent:
def __init__(self, llm):
self.llm = llm
self.checkpointer = MemorySaver()
def pause_for_review(self, state):
# 검토를 위해 일시 정지
timestamp = datetime.now().isoformat()
return {
"status": "awaiting_review",
"review_required": True,
"review_timestamp": timestamp
}
def review(self, state):
# 인간 검토 로직
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="You are a reviewer"),
HumanMessage(content="Review this: {content}")
])
chain = prompt | self.llm
review_result = chain.invoke({"content": state["messages"][-1].content})
return {"review_result": review_result, "reviewed": True}
def continue_workflow(self, state):
# 검토 후 계속 진행
return {"status": "continue"}
# 사용 예시
def create_human_in_loop_workflow():
agent = HumanInLoopAgent(llm)
workflow = StateGraph(AgentState)
workflow.add_node("generate", agent.generate)
workflow.add_node("pause", agent.pause_for_review)
workflow.add_node("review", agent.review)
workflow.add_node("continue", agent.continue_workflow)
workflow.add_edge("generate", "pause")
workflow.add_edge("pause", "review")
workflow.add_edge("review", "continue")
workflow.add_edge("continue", END)
return workflow.compile()
템플릿 5: 병렬 실행 에이전트
문제: 여러 작업을 동시에 처리하여 성능을 향상시키는 병렬 처리 워크플로우
python
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelAgent:
def __init__(self, llm):
self.llm = llm
def fan_out(self, state):
# 작업 분산
tasks = [
{"id": "task1", "input": "process data 1"},
{"id": "task2", "input": "process data 2"},
{"id": "task3", "input": "process data 3"}
]
return {"tasks": tasks}
def process(self, state):
# 병렬 처리
def process_task(task):
prompt = ChatPromptTemplate.from_messages([
HumanMessage(content=f"Process: {task['input']}")
])
chain = prompt | self.llm
result = chain.invoke({})
return {"task_id": task["id"], "result": result}
# 병렬 실행
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(process_task, state["tasks"]))
return {"processed_results": results}
def aggregate(self, state):
# 결과 집계
aggregated = {
"results": [r["result"] for r in state["processed_results"]]
}
return {"aggregated_output": aggregated}
# 사용 예시
def create_parallel_workflow():
agent = ParallelAgent(llm)
workflow = StateGraph(AgentState)
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Top comments (0)