CrewAI's strength is that you define agents with roles, goals, and tools, and the framework handles the orchestration. An agent researches, another analyzes, a third writes the report. The problem shows up when the last agent in the chain produces the final output — a JSON payload that needs to be structurally valid, conform to business rules, and sometimes get human approval before it goes downstream.
Most CrewAI tutorials skip this part. The output comes back as a string, maybe you parse it as JSON, and you hope it's correct. In production, that hope turns into bugs.
I've been using Rynko Flow as the validation layer after CrewAI tasks. The agent does its work, the output goes through a Flow gate that checks schema and business rules, and only validated data moves forward. When validation fails, the error response is structured enough that the agent can fix itself and retry.
What We're Building
A CrewAI crew with two agents:
- Order Processor — Takes a natural language order request and extracts structured data
- Validator — Submits the extracted data to a Rynko Flow gate, handles errors, and retries if needed
The validator agent uses a custom tool that wraps the Flow API, so it gets structured validation errors directly in its tool response.
Setup
pip install crewai httpx
You'll need:
- A Rynko account (free tier is fine)
- A Flow gate with your schema (setup guide)
- An OpenAI API key (CrewAI's default LLM)
The Flow Validation Tool
CrewAI agents use tools — Python functions decorated with @tool. Here's one that submits data to a Flow gate and returns the result in a format the LLM can reason about:
import os
import json
import httpx
from crewai.tools import tool
RYNKO_BASE_URL = os.environ.get("RYNKO_BASE_URL", "https://api.rynko.dev/api")
RYNKO_API_KEY = os.environ["RYNKO_API_KEY"]
GATE_ID = os.environ["FLOW_GATE_ID"]
@tool("validate_order")
def validate_order(order_json: str) -> str:
"""Validate an order payload against the Flow gate.
Input must be a JSON string with fields: vendor (string),
amount (number), currency (USD/EUR/GBP/INR), po_number (optional string).
Returns validation result with status and any errors."""
try:
payload = json.loads(order_json)
except json.JSONDecodeError as e:
return json.dumps({"success": False, "error": f"Invalid JSON: {e}"})
response = httpx.post(
f"{RYNKO_BASE_URL}/flow/gates/{GATE_ID}/runs",
json={"payload": payload},
headers={
"Authorization": f"Bearer {RYNKO_API_KEY}",
"Content-Type": "application/json",
},
timeout=30,
)
result = response.json()
if result.get("status") == "validation_failed":
errors = result.get("error", {}).get("details", [])
error_lines = [f"- {e.get('field', e.get('rule_id', 'unknown'))}: {e.get('message')}" for e in errors]
return json.dumps({
"success": False,
"status": "validation_failed",
"errors": error_lines,
"message": "Fix these errors and resubmit.",
}, indent=2)
return json.dumps({
"success": True,
"status": result.get("status"),
"run_id": result.get("runId"),
"validation_id": result.get("validation_id"),
}, indent=2)
The tool returns structured JSON in both success and failure cases. When validation fails, the error messages are specific enough — "currency must be one of: USD, EUR, GBP, INR" — that the LLM can fix the issue without guessing.
Defining the Agents
from crewai import Agent
order_processor = Agent(
role="Order Processor",
goal="Extract structured order data from customer requests accurately",
backstory=(
"You are an order processing specialist. You extract vendor name, "
"amount, currency, and PO number from natural language requests. "
"You output clean JSON with fields: vendor, amount, currency, po_number. "
"Currency must be a 3-letter code (USD, EUR, GBP, or INR)."
),
verbose=True,
allow_delegation=False,
)
order_validator = Agent(
role="Order Validator",
goal="Validate extracted orders against business rules and fix any issues",
backstory=(
"You validate order data by submitting it to the validation gateway. "
"If validation fails, you read the error messages carefully, fix each "
"issue in the JSON, and resubmit. You keep trying until it passes or "
"you've made 3 attempts. Always report the final validation status."
),
tools=[validate_order],
verbose=True,
allow_delegation=False,
)
The validator agent has the Flow tool and explicit instructions to read errors and retry. CrewAI agents follow their backstory closely, so the self-correction behavior comes from the backstory rather than from framework-level retry logic.
Defining the Tasks
from crewai import Task
extract_task = Task(
description=(
"Extract order data from this customer request:\n\n"
"{user_request}\n\n"
"Output a JSON object with fields: vendor (string), amount (number), "
"currency (3-letter code: USD, EUR, GBP, or INR), po_number (string, optional). "
"Output ONLY the JSON, nothing else."
),
expected_output="A JSON object with vendor, amount, currency, and optional po_number",
agent=order_processor,
)
validate_task = Task(
description=(
"Take the order JSON from the previous task and validate it using the "
"validate_order tool. If validation fails, read the error messages, fix "
"the JSON, and call the tool again with corrected data. "
"Report the final run ID and validation status."
),
expected_output="Validation result with run ID and status (validated or failed)",
agent=order_validator,
context=[extract_task],
)
The context=[extract_task] tells CrewAI to pass the output of the extract task to the validator. The validator then takes that JSON and runs it through Flow.
Running the Crew
from crewai import Crew, Process
crew = Crew(
agents=[order_processor, order_validator],
tasks=[extract_task, validate_task],
process=Process.sequential,
verbose=True,
)
result = crew.kickoff(
inputs={
"user_request": (
"We need to process an order from Globex Corp for "
"twelve thousand five hundred dollars, PO number PO-2026-042"
)
}
)
print("\n--- Final Result ---")
print(result)
What Happens at Runtime
When you run this, the output shows the full agent reasoning:
[Order Processor] Extracting order data...
> {"vendor": "Globex Corp", "amount": 12500, "currency": "USD", "po_number": "PO-2026-042"}
[Order Validator] Validating order...
> Using tool: validate_order
> Tool result: {"success": true, "status": "validated", "run_id": "..."}
--- Final Result ---
Order validated successfully. Run ID: 550e8400-...
Now here's the interesting case. Say the processor extracts currency: "Dollars":
[Order Validator] Validating order...
> Using tool: validate_order
> Tool result: {"success": false, "errors": ["- currency: must be one of: USD, EUR, GBP, INR"]}
[Order Validator] The currency is invalid. Fixing to "USD" and resubmitting...
> Using tool: validate_order
> Tool result: {"success": true, "status": "validated", "run_id": "..."}
The validator reads the error, fixes the currency, and resubmits. One retry, no human involved.
Handling Multiple Agents Writing to the Same Gate
CrewAI shines when you have multiple specialized agents. In a more complex setup, you might have separate crews for different order types — one for domestic orders, one for international, one for recurring subscriptions. All three can validate against the same Flow gate.
# Different crews, same validation gate
domestic_crew = Crew(agents=[domestic_processor, validator], ...)
international_crew = Crew(agents=[intl_processor, validator], ...)
subscription_crew = Crew(agents=[sub_processor, validator], ...)
The gate enforces consistent validation regardless of which crew produced the data. If you change a business rule — say, increasing the minimum order amount from $10 to $50 — you update it once in the Flow dashboard and every crew picks it up immediately.
Flow's analytics dashboard shows validation results by session, so you can see which crew or agent is producing the most errors and needs prompt tuning.
Adding Human Approval
For high-value orders, configure the gate's approval mode to require human review. When the validator submits a $50,000 order, Flow holds it in a review_required state instead of auto-approving. A reviewer gets an email, reviews the payload, and approves or rejects.
Your CrewAI task can poll for the approval result:
@tool("wait_for_approval")
def wait_for_approval(run_id: str) -> str:
"""Poll a Flow run until it reaches a terminal state."""
for _ in range(60):
response = httpx.get(
f"{RYNKO_BASE_URL}/flow/runs/{run_id}",
headers={"Authorization": f"Bearer {RYNKO_API_KEY}"},
timeout=30,
)
status = response.json().get("status")
if status in ("approved", "rejected", "completed", "delivered"):
return json.dumps({"status": status, "run_id": run_id})
time.sleep(5)
return json.dumps({"status": "timeout", "run_id": run_id})
Using MCP Instead of REST
If you prefer the agent to discover Flow gates dynamically through tool calling (rather than hardcoding the gate ID), you can connect CrewAI to Flow's MCP endpoint. Flow auto-generates a validate_{gate_slug} tool for each active gate, and the tool schema includes field types and constraints so the LLM knows what to submit.
This is useful when your agents work across multiple gates and need to pick the right one based on context.
Local Development Setup
# Create project
mkdir crewai-flow-demo && cd crewai-flow-demo
python -m venv .venv
source .venv/bin/activate
# Install
pip install crewai httpx python-dotenv
# Environment
cat > .env << 'EOF'
OPENAI_API_KEY=sk-...
RYNKO_API_KEY=your_api_key_here
FLOW_GATE_ID=your_gate_id_here
EOF
Create main.py with the code above, add from dotenv import load_dotenv; load_dotenv() at the top, and run with python main.py. CrewAI's verbose=True shows you the full agent reasoning — useful for debugging prompt issues.
Full Working Example
The complete code — agents, tools, tasks, .env.example, and two test scenarios — is in our developer resources repo. Clone it, add your API keys, and run python src/main.py.
Resources:
- Rynko Flow documentation
- CrewAI documentation
- Sign up for free — 500 Flow runs/month, no credit card
- Self-correction demo (terminal recording)
Top comments (0)