# LangGraph 워크플로우 템플릿 (v43)
# Practical LangGraph Patterns for Python Developers
## 1. LangGraph 아키텍처 개요
LangGraph는 상태 기반의 워크플로우 엔진으로, 노드(Node), 엣지(Edge), 상태(State), 체크포인트(Checkpointing)로 구성됩니다.
### 주요 구성 요소:
- **노드**: 각각의 작업 단위 (LLM 호출, 도구 실행, 데이터 처리 등)
- **엣지**: 실행 순서 및 조건 (예: 조건부 경로)
- **상태**: 모든 노드 간 공유되는 데이터 구조
- **체크포인트**: 실패 시 복구를 위한 상태 저장
## 2. 템플릿 1: 간단한 RAG 에이전트
python
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator
class State(TypedDict):
question: str
context: str
answer: str
def retrieve(state: State):
# 벡터DB에서 관련 문서 검색
docs = vector_store.similarity_search(state["question"])
return {"context": "\n".join([doc.page_content for doc in docs])}
def generate(state: State):
# LLM으로 답변 생성
prompt = f"질문: {state['question']}\n문맥: {state['context']}"
response = llm.invoke(prompt)
return {"answer": response}
def validate(state: State):
# 답변 검증 로직
if len(state["answer"]) < 10:
return {"answer": "죄송합니다. 답변을 생성할 수 없습니다."}
return {}
워크플로우 정의
workflow = StateGraph(State)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)
workflow.add_node("validate", validate)
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.add_edge("validate", END)
app = workflow.compile()
## 3. 템플릿 2: 멀티-도구 에이전트
python
from typing import List
from langchain_core.messages import AIMessage
class AgentState(TypedDict):
messages: Annotated[List, operator.add]
steps: List[str]
tool_calls: List[dict]
def plan(state: AgentState):
# 실행 계획 생성
prompt = f"다음 작업을 수행할 계획을 세우세요: {state['messages'][-1].content}"
response = llm.invoke(prompt)
return {"steps": response.split("\n")}
def execute(state: AgentState):
# 도구 실행
tool_calls = []
for step in state["steps"]:
if "search" in step.lower():
result = search_tool.run(step)
tool_calls.append({"name": "search", "result": result})
elif "calculator" in step.lower():
result = calculator_tool.run(step)
tool_calls.append({"name": "calculator", "result": result})
return {"tool_calls": tool_calls}
def observe(state: AgentState):
# 실행 결과 관찰 및 판단
if len(state["tool_calls"]) == 0:
return {"messages": [AIMessage(content="실행할 도구가 없습니다.")]}
return {}
조건부 엣지 정의
def should_continue(state: AgentState):
if len(state["tool_calls"]) > 0:
return "execute"
return "observe"
workflow = StateGraph(AgentState)
workflow.add_node("plan", plan)
workflow.add_node("execute", execute)
workflow.add_node("observe", observe)
workflow.set_entry_point("plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_conditional_edges(
"observe",
should_continue,
{"execute": "execute", "observe": "observe"}
)
workflow.add_edge("observe", END)
app = workflow.compile()
## 4. 템플릿 3: 인간-통합 워크플로우
python
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
class HumanWorkflowState(TypedDict):
question: str
answer: str
approval: bool
feedback: str
def generate_answer(state: HumanWorkflowState):
response = llm.invoke(state["question"])
return {"answer": response}
def human_review(state: HumanWorkflowState):
# 인간 검토를 위한 일시정지
print("답변 검토를 위해 일시정지 중...")
print(f"생성된 답변: {state['answer']}")
approval = input("답변을 승인하시겠습니까? (y/n): ")
return {"approval": approval.lower() == 'y'}
def update_answer(state: HumanWorkflowState):
if not state["approval"]:
# 인간이 승인하지 않으면 수정 요청
feedback = input("개선 요청 사항을 입력하세요: ")
return {"feedback": feedback, "answer": "수정 중..."}
return {}
workflow = StateGraph(HumanWorkflowState)
workflow.add_node("generate", generate_answer)
workflow.add_node("review", human_review)
workflow.add_node("update", update_answer)
workflow.set_entry_point("generate")
workflow.add_edge("generate", "review")
workflow.add_edge("review", "update")
workflow.add_edge("update", END)
체크포인트 저장
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
## 5. 템플릿 5: 병렬 실행 에이전트
python
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelState(TypedDict):
input_data: List[dict]
processed_data: List[dict]
results: List[dict]
def fan_out(state: ParallelState):
# 입력 데이터를 병렬로 분할
return {"processed_data": state["input_data"]}
def process_item(item: dict):
# 개별 데이터 처리
try:
result = llm.invoke(f"처리: {item['data']}")
return {"id": item["id"], "result": result, "status": "success"}
except Exception as e:
return {"id": item["id"], "result": str(e), "status": "error"}
def aggregate_results(state: ParallelState):
# 병렬 처리 결과 집계
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(process_item, item) for item in state["processed_data"]]
results = [future.result() for future in futures]
return {"results": results}
병렬 워크플로우
workflow = StateGraph(ParallelState)
workflow.add_node("fan_out", fan_out)
workflow.add_node("aggregate", aggregate_results)
workflow.set_entry_point("fan_out")
workflow.add_edge("fan_out", "aggregate")
workflow.add_edge("aggregate", END)
app = workflow.compile()
## 6. 상태 관리 패턴
python
상태 필터링 패턴
class FilteredState(TypedDict):
messages: List[dict]
filtered_messages: List[dict]
def filter_messages(state: FilteredState):
# 중요한 메시지만 필터링
important = [msg for msg in state["messages"] if msg["importance"] > 0.8]
return {"filtered_messages": important}
상태 압축 패턴
class CompressedState(TypedDict):
long_history: List[dict]
compressed: dict
def compress_history(state: CompressedState):
# 오래된 메시지 압축
if len(state["long_history"]) > 100:
# 마지막 10개만 유지
compressed = {"summary": "최근 10개 메시지 요약", "recent": state["long_history"][-10:]}
return {"compressed": compressed}
return {}
상태 저장 및 로드 패턴
def save_state(state: dict, checkpoint_id: str):
import json
with open(f"checkpoints/{checkpoint_id}.json", "w") as f:
json.dump(state, f)
def load_state(checkpoint_id: str) -> dict:
import json
with open(f"checkpoints/{checkpoint_id}.json", "r") as f:
return json.load(f)
## 7. 스트리밍 및 실시간 업데이트
python
from langchain_core.callbacks import BaseCallbackHandler
class StreamingCallback(BaseCallbackHandler):
def on_llm_new_token(self, token: str, **kwargs):
print(token, end="", flush=True)
스트리밍 워크플로우
def streaming_agent(state: State):
# 스트리밍 응답 생성
stream = llm.stream(state["question"])
for chunk in stream:
yield chunk
실시간 상태 업데이트
class RealtimeState(TypedDict):
current_task: str
progress: int
status: str
def update_progress(state: RealtimeState):
# 진척도 업데이트
progress = min(state["progress"] + 10, 100)
📥 Get the full guide on Gumroad: https://gumroad.com/l/auto ($5)
Top comments (0)