Your LangGraph agent works great in demos. But in production, every node's output needs to be validated before the next node acts on it. Here's how to add a validation step without writing custom checking logic.
LangGraph gives you fine-grained control over your agent's execution graph — you define nodes, edges, and conditional routing. But one thing that's missing from most LangGraph tutorials is what happens when a node produces bad data. The next node just receives it and either crashes or propagates the error downstream.
I ran into this when building an order processing pipeline with LangGraph. The extraction node would occasionally produce negative amounts, invalid currencies, or missing fields. The downstream nodes — pricing, invoicing, fulfillment — would silently process the bad data. By the time someone noticed, the damage was already in the database.
The typical fix is writing validation logic inside each node. That works, but it means every node carries its own schema checks, the validation rules are scattered across your codebase, and there's no central place to see what's failing and why.
So I hooked up Rynko Flow as an external validation step in the graph. The agent extracts data, Flow validates it against a schema and business rules, and only if it passes does the pipeline continue. If it fails, the agent gets structured errors it can use to self-correct.
What You'll Build
A LangGraph agent with three nodes:
- Extract — LLM extracts order data from a natural language request
- Validate — Submits the extracted data to a Rynko Flow gate
- Process — Handles the validated order (or routes back for correction)
The graph looks like this:
extract → validate → process
↓ (if failed)
extract (retry with error context)
Prerequisites
pip install langgraph langchain-openai httpx
You'll also need:
- A Rynko account (free tier works)
- A Flow gate configured with your order schema
- An OpenAI API key (or any LangChain-compatible LLM)
Setting Up the Flow Gate
Create a gate in the Flow dashboard with this schema:
| Field | Type | Constraints |
|---|---|---|
| vendor | string | required, min 1 char |
| amount | number | required, >= 0 |
| currency | string | required, one of: USD, EUR, GBP, INR |
| po_number | string | optional |
Add a business rule: amount >= 10 with error message "Order amount must be at least $10."
If you already have a Pydantic model, you can import the schema directly — run YourModel.model_json_schema() and paste the output into the gate's Import Schema dialog. There's a tutorial for that.
Save and publish the gate. Note the gate ID — you'll need it in the code.
The Validation Client
First, a small wrapper around the Flow API. This is what the validate node will call:
import httpx
import os
RYNKO_BASE_URL = os.environ.get("RYNKO_BASE_URL", "https://api.rynko.dev/api")
RYNKO_API_KEY = os.environ["RYNKO_API_KEY"]
def validate_with_flow(gate_id: str, payload: dict) -> dict:
"""Submit a payload to a Flow gate and return the result."""
response = httpx.post(
f"{RYNKO_BASE_URL}/flow/gates/{gate_id}/runs",
json={"payload": payload},
headers={
"Authorization": f"Bearer {RYNKO_API_KEY}",
"Content-Type": "application/json",
},
timeout=30,
)
return response.json()
This returns the full validation result — status, errors, validation ID, the works. The important fields are status (either "validated" or "validation_failed") and errors (an array of specific field-level issues when validation fails).
Defining the Graph State
LangGraph uses a typed state that flows between nodes. Ours tracks the user request, extracted data, validation result, and retry count:
from typing import TypedDict, Optional
class OrderState(TypedDict):
user_request: str
extracted_data: Optional[dict]
validation_result: Optional[dict]
validation_errors: Optional[str]
retry_count: int
final_result: Optional[str]
The Three Nodes
Extract Node
The LLM extracts structured order data from the user's natural language request. If there were previous validation errors, they're included in the prompt so the LLM can correct its output:
from langchain_openai import ChatOpenAI
import json
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
GATE_ID = os.environ["FLOW_GATE_ID"] # Your gate ID
def extract_order(state: OrderState) -> dict:
error_context = ""
if state.get("validation_errors"):
error_context = (
f"\n\nYour previous extraction had validation errors:\n"
f"{state['validation_errors']}\n"
f"Fix these issues in your new extraction."
)
response = llm.invoke(
f"Extract order data from this request as JSON with fields: "
f"vendor (string), amount (number), currency (string, one of USD/EUR/GBP/INR), "
f"po_number (string, optional).\n\n"
f"Request: {state['user_request']}"
f"{error_context}\n\n"
f"Respond with ONLY valid JSON, no markdown."
)
try:
extracted = json.loads(response.content)
except json.JSONDecodeError:
extracted = {"vendor": "", "amount": 0, "currency": ""}
return {"extracted_data": extracted}
Validate Node
This is the Flow integration — submit the extracted data to the gate and capture the result:
def validate_order(state: OrderState) -> dict:
result = validate_with_flow(GATE_ID, state["extracted_data"])
if result.get("status") == "validation_failed":
errors = result.get("error", {}).get("details", [])
error_text = "\n".join(
f"- {e.get('field', e.get('rule_id', 'unknown'))}: {e.get('message', 'invalid')}"
for e in errors
)
return {
"validation_result": result,
"validation_errors": error_text,
"retry_count": state.get("retry_count", 0) + 1,
}
return {
"validation_result": result,
"validation_errors": None,
}
Process Node
If validation passed, the order moves forward. In a real system this would write to your database, trigger fulfillment, or call another API:
def process_order(state: OrderState) -> dict:
validation_id = state["validation_result"].get("validation_id", "")
return {
"final_result": (
f"Order processed successfully.\n"
f"Vendor: {state['extracted_data']['vendor']}\n"
f"Amount: {state['extracted_data']['amount']} {state['extracted_data']['currency']}\n"
f"Validation ID: {validation_id}"
)
}
The validation_id is a tamper-proof token from Flow — your downstream systems can verify that the data passed validation and hasn't been modified since.
Wiring the Graph
Now connect the nodes with conditional routing. If validation fails and we haven't exhausted retries, route back to the extract node with the error context:
from langgraph.graph import StateGraph, END
def should_retry(state: OrderState) -> str:
if state.get("validation_errors") and state.get("retry_count", 0) < 3:
return "retry"
elif state.get("validation_errors"):
return "give_up"
return "proceed"
# Build the graph
graph = StateGraph(OrderState)
graph.add_node("extract", extract_order)
graph.add_node("validate", validate_order)
graph.add_node("process", process_order)
graph.set_entry_point("extract")
graph.add_edge("extract", "validate")
graph.add_conditional_edges(
"validate",
should_retry,
{
"retry": "extract", # Back to extraction with error context
"proceed": "process", # Validation passed
"give_up": END, # Max retries reached
},
)
graph.add_edge("process", END)
app = graph.compile()
Running It
result = app.invoke({
"user_request": "Process an order from Globex Corp for twelve thousand five hundred dollars USD, PO number PO-2026-042",
"retry_count": 0,
})
print(result["final_result"])
Output:
Order processed successfully.
Vendor: Globex Corp
Amount: 12500.0 USD
Validation ID: v_abc123...
The Self-Correction Loop
The interesting part is what happens when the LLM makes a mistake. Say it extracts currency: "Dollars" instead of "USD". Flow returns:
{
"status": "validation_failed",
"errors": [
{"field": "currency", "message": "must be one of: USD, EUR, GBP, INR"}
]
}
The graph routes back to the extract node, which now includes the error in its prompt. The LLM reads "currency must be one of: USD, EUR, GBP, INR", fixes its extraction to "USD", and the second attempt passes validation.
This happens automatically — no human intervention, no hardcoded fixes. The LLM uses the structured error feedback from Flow to correct itself.
In our testing, most validation issues resolve in one retry. The retry_count cap of 3 is a safety net — if the agent can't fix it in three attempts, something is fundamentally wrong with the input and it's better to fail explicitly.
Why Not Just Use Pydantic in the Node?
You could validate with Pydantic directly in the extract node. For a single agent, that works fine. But Flow gives you a few things Pydantic doesn't:
Business rules that cross fields. Pydantic validates field types and constraints, but expressions like endDate > startDate or quantity * price == total need custom validators. Flow evaluates these as expressions — you configure them in the dashboard, no code changes needed.
Centralized validation across agents. If you have five different LangGraph pipelines submitting orders, they all validate against the same gate. Change a rule once, it applies everywhere. With Pydantic, you'd need to update the model in every repo.
Observability. Flow's analytics dashboard shows you which fields fail most often, which business rules trigger, and which agents (by session) are producing the most errors. When you're debugging why Agent C keeps submitting bad currencies, this is where you look.
Approval workflows. For high-value orders, add a human approval step on the gate. The pipeline pauses, a reviewer approves or rejects, and the graph resumes. You can't do this with a Pydantic validator.
Adding MCP for Direct Tool Access
If you want the LLM to call Flow tools directly (instead of going through a hardcoded REST call), you can use LangChain's MCP tool integration. Flow's MCP endpoint at https://api.rynko.dev/api/flow/mcp auto-generates a validate_{gate_slug} tool for each active gate in your workspace.
This means the LLM can discover available gates and submit payloads through tool calling, which is useful when the agent needs to decide which gate to validate against based on the input.
Local Development Setup
To set up a local LangGraph development environment:
# Create a project directory
mkdir langgraph-flow-demo && cd langgraph-flow-demo
# Set up a virtual environment
python -m venv .venv
source .venv/bin/activate
# Install dependencies
pip install langgraph langchain-openai httpx python-dotenv
# Create .env file
cat > .env << 'EOF'
OPENAI_API_KEY=sk-...
RYNKO_API_KEY=your_api_key_here
FLOW_GATE_ID=your_gate_id_here
EOF
Create a main.py with the code from this tutorial, add from dotenv import load_dotenv; load_dotenv() at the top, and run it with python main.py.
For iterative development, LangGraph has a built-in visualization tool:
# Print the graph structure
app.get_graph().print_ascii()
# Or save as PNG (requires pygraphviz)
app.get_graph().draw_png("graph.png")
This shows you the nodes, edges, and conditional routing at a glance — useful for verifying the self-correction loop is wired correctly.
Full Working Example
The complete code for this tutorial — including the graph, Flow client, .env.example, and two test scenarios — is in our developer resources repo. Clone it, add your API keys, and run python src/main.py.
Resources:
- Rynko Flow documentation
- Flow API reference
- LangGraph documentation
- Sign up for free — 500 Flow runs/month, no credit card
- Self-correction demo (terminal recording)
Top comments (0)