LangGraph 워크플로우 템플릿 (v13)
LangGraph은 복잡한 AI 에이전트 워크플로우를 구축하기 위한 강력한 프레임워크로, Python 개발자들에게 뛰어난 유연성과 확장성을 제공합니다. 이 가이드는 실제 문제 해결에 초점을 맞춘 재사용 가능한 템플릿들을 제공합니다.
1. LangGraph 아키텍처 개요
LangGraph는 다음과 같은 핵심 구성 요소로 작동합니다:
- Nodes: 워크플로우의 각 단계
- Edges: 노드 간의 연결 및 흐름 제어
- State: 워크플로우의 상태 관리
- Checkpointing: 상태 지속성과 재시작 기능
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
# 기본 워크플로우 구조
workflow = StateGraph(AgentState)
2. 템플릿 1: 간단한 RAG 에이전트
검색 기반 생성(RAG) 워크플로우는 정보 검색과 생성을 결합합니다.
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
class RAGState(TypedDict):
query: str
context: str
response: str
def retrieve_node(state: RAGState):
# Chroma 벡터 스토어에서 검색
vectorstore = Chroma(persist_directory="./chroma_db")
retriever = vectorstore.as_retriever()
docs = retriever.invoke(state["query"])
context = "\n".join([doc.page_content for doc in docs])
return {"context": context}
def generate_node(state: RAGState):
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("user", "Context: {context}\n\nQuestion: {query}")
])
model = ChatOpenAI(model="gpt-4")
chain = prompt | model | StrOutputParser()
response = chain.invoke({
"context": state["context"],
"query": state["query"]
})
return {"response": response}
def validate_node(state: RAGState):
# 응답 검증 로직
if len(state["response"]) < 20:
return {"response": "Error: Response too short"}
return {"response": state["response"]}
# 워크플로우 정의
workflow = StateGraph(RAGState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_node("validate", validate_node)
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.add_edge("validate", END)
rag_app = workflow.compile()
3. 템플릿 2: 멀티-도구 에이전트
도구 실행과 계획을 기반으로 한 에이전트는 복잡한 작업을 수행할 수 있습니다.
from langchain.tools import Tool
from langchain_core.tools import tool
from typing import List
class MultiToolState(TypedDict):
input: str
plan: List[str]
execution: List[dict]
observation: str
final_response: str
class CalculatorTool:
@staticmethod
@tool
def calculator(query: str) -> str:
"""계산기 도구"""
try:
result = eval(query)
return f"Result: {result}"
except:
return "Error: Invalid calculation"
class WebSearchTool:
@staticmethod
@tool
def web_search(query: str) -> str:
"""웹 검색 도구"""
return f"Search results for '{query}'"
def plan_node(state: MultiToolState):
# 작업 계획 생성
plan = [
"search information about the query",
"perform calculations if needed",
"generate final response"
]
return {"plan": plan}
def execute_node(state: MultiToolState):
# 도구 실행
tools = [CalculatorTool.calculator, WebSearchTool.web_search]
execution_results = []
for tool in tools:
try:
result = tool.run(state["input"])
execution_results.append({"tool": tool.name, "result": result})
except Exception as e:
execution_results.append({"tool": tool.name, "error": str(e)})
return {"execution": execution_results}
def observe_node(state: MultiToolState):
# 실행 결과 관찰
observations = []
for exec_result in state["execution"]:
observations.append(f"{exec_result['tool']}: {exec_result.get('result', 'Error')}")
return {"observation": "\n".join(observations)}
def decide_node(state: MultiToolState):
# 결정 노드 - 최종 응답 생성
return {"final_response": f"Analysis complete. Observations:\n{state['observation']}"}
# 워크플로우 구성
workflow = StateGraph(MultiToolState)
workflow.add_node("plan", plan_node)
workflow.add_node("execute", execute_node)
workflow.add_node("observe", observe_node)
workflow.add_node("decide", decide_node)
workflow.set_entry_point("plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.add_edge("decide", END)
multi_tool_app = workflow.compile()
4. 템플릿 3: 인간-중개 워크플로우
사람의 검토와 승인을 포함한 워크플로우는 신뢰성 있는 작업을 요구할 때 유용합니다.
from typing import Literal
from langgraph.checkpoint.memory import MemorySaver
class HumanLoopState(TypedDict):
input: str
analysis: str
review: str
final_decision: Literal["approved", "rejected", "pending"]
def analyze_node(state: HumanLoopState):
return {"analysis": f"Analysis of '{state['input']}' completed"}
def review_node(state: HumanLoopState):
return {"review": "Waiting for human review"}
def approve_node(state: HumanLoopState):
# 사용자 승인 처리
return {"final_decision": "approved"}
def reject_node(state: HumanLoopState):
return {"final_decision": "rejected"}
# 체크포인트 사용
memory = MemorySaver()
workflow = StateGraph(HumanLoopState)
workflow.add_node("analyze", analyze_node)
workflow.add_node("review", review_node)
workflow.add_node("approve", approve_node)
workflow.add_node("reject", reject_node)
workflow.set_entry_point("analyze")
workflow.add_edge("analyze", "review")
workflow.add_edge("review", END) # 대기 상태
# 인간 검토 상태 전환
def human_decision_router(state: HumanLoopState):
if state["final_decision"] == "approved":
return "approve"
elif state["final_decision"] == "rejected":
return "reject"
else:
return "review"
workflow.add_conditional_edges(
"review",
human_decision_router,
{
"approve": "approve",
"reject": "reject",
"review": "review"
}
)
human_loop_app = workflow.compile(checkpointer=memory)
5. 템플릿 5: 병렬 실행 에이전트
파이프라인에서 병렬 처리는 성능 최적화에 중요합니다.
python
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelState(TypedDict):
input_data: list
processing_results: list
aggregated_result: dict
def fan_out_node(state: ParallelState):
# 데이터 파이프라인 분할
return {"processing_results": []}
def process_item(item: dict) -> dict:
"""개별 아이템 처리"""
# 실제 작업 수행
result = {"id": item["id"], "processed": True, "data": item["data"] * 2}
return result
def parallel_process_node(state: ParallelState):
# 병렬 처리
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(process_item, item) for item in state["input_data"]]
results = [future.result() for future in futures]
return {"processing_results": results}
def aggregate_node(state: ParallelState):
# 결과 집계
total_items = len(state["processing_results"])
success_count = sum(1 for r in state["processing_results"] if r.get("processed"))
return {
"aggregated_result": {
"total_items": total_items,
"success_count": success_count,
"details": state["processing_results"]
}
}
# 병렬 워크플로우 구성
workflow = StateGraph(ParallelState)
workflow.add_node("fan_out", fan_out_node)
workflow.add_node("parallel_process", parallel_process_node)
workflow.add_node("aggregate
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
Top comments (0)