DEV Community

Cover image for Validating CrewAI Agent Outputs with Rynko Flow
Srijith Kartha
Srijith Kartha

Posted on • Originally published at blog.rynko.dev

Validating CrewAI Agent Outputs with Rynko Flow

CrewAI's strength is that you define agents with roles, goals, and tools, and the framework handles the orchestration. An agent researches, another analyzes, a third writes the report. The problem shows up when the last agent in the chain produces the final output — a JSON payload that needs to be structurally valid, conform to business rules, and sometimes get human approval before it goes downstream.

Most CrewAI tutorials skip this part. The output comes back as a string, maybe you parse it as JSON, and you hope it's correct. In production, that hope turns into bugs.

I've been using Rynko Flow as the validation layer after CrewAI tasks. The agent does its work, the output goes through a Flow gate that checks schema and business rules, and only validated data moves forward. When validation fails, the error response is structured enough that the agent can fix itself and retry.

What We're Building

A CrewAI crew with two agents:

  1. Order Processor — Takes a natural language order request and extracts structured data
  2. Validator — Submits the extracted data to a Rynko Flow gate, handles errors, and retries if needed

The validator agent uses a custom tool that wraps the Flow API, so it gets structured validation errors directly in its tool response.

Setup

pip install crewai httpx
Enter fullscreen mode Exit fullscreen mode

You'll need:

  • A Rynko account (free tier is fine)
  • A Flow gate with your schema (setup guide)
  • An OpenAI API key (CrewAI's default LLM)

The Flow Validation Tool

CrewAI agents use tools — Python functions decorated with @tool. Here's one that submits data to a Flow gate and returns the result in a format the LLM can reason about:

import os
import json
import httpx
from crewai.tools import tool

RYNKO_BASE_URL = os.environ.get("RYNKO_BASE_URL", "https://api.rynko.dev/api")
RYNKO_API_KEY = os.environ["RYNKO_API_KEY"]
GATE_ID = os.environ["FLOW_GATE_ID"]

@tool("validate_order")
def validate_order(order_json: str) -> str:
    """Validate an order payload against the Flow gate.
    Input must be a JSON string with fields: vendor (string),
    amount (number), currency (USD/EUR/GBP/INR), po_number (optional string).
    Returns validation result with status and any errors."""

    try:
        payload = json.loads(order_json)
    except json.JSONDecodeError as e:
        return json.dumps({"success": False, "error": f"Invalid JSON: {e}"})

    response = httpx.post(
        f"{RYNKO_BASE_URL}/flow/gates/{GATE_ID}/runs",
        json={"payload": payload},
        headers={
            "Authorization": f"Bearer {RYNKO_API_KEY}",
            "Content-Type": "application/json",
        },
        timeout=30,
    )

    result = response.json()

    if result.get("status") == "validation_failed":
        errors = result.get("error", {}).get("details", [])
        error_lines = [f"- {e.get('field', e.get('rule_id', 'unknown'))}: {e.get('message')}" for e in errors]
        return json.dumps({
            "success": False,
            "status": "validation_failed",
            "errors": error_lines,
            "message": "Fix these errors and resubmit.",
        }, indent=2)

    return json.dumps({
        "success": True,
        "status": result.get("status"),
        "run_id": result.get("runId"),
        "validation_id": result.get("validation_id"),
    }, indent=2)
Enter fullscreen mode Exit fullscreen mode

The tool returns structured JSON in both success and failure cases. When validation fails, the error messages are specific enough — "currency must be one of: USD, EUR, GBP, INR" — that the LLM can fix the issue without guessing.

Defining the Agents

from crewai import Agent

order_processor = Agent(
    role="Order Processor",
    goal="Extract structured order data from customer requests accurately",
    backstory=(
        "You are an order processing specialist. You extract vendor name, "
        "amount, currency, and PO number from natural language requests. "
        "You output clean JSON with fields: vendor, amount, currency, po_number. "
        "Currency must be a 3-letter code (USD, EUR, GBP, or INR)."
    ),
    verbose=True,
    allow_delegation=False,
)

order_validator = Agent(
    role="Order Validator",
    goal="Validate extracted orders against business rules and fix any issues",
    backstory=(
        "You validate order data by submitting it to the validation gateway. "
        "If validation fails, you read the error messages carefully, fix each "
        "issue in the JSON, and resubmit. You keep trying until it passes or "
        "you've made 3 attempts. Always report the final validation status."
    ),
    tools=[validate_order],
    verbose=True,
    allow_delegation=False,
)
Enter fullscreen mode Exit fullscreen mode

The validator agent has the Flow tool and explicit instructions to read errors and retry. CrewAI agents follow their backstory closely, so the self-correction behavior comes from the backstory rather than from framework-level retry logic.

Defining the Tasks

from crewai import Task

extract_task = Task(
    description=(
        "Extract order data from this customer request:\n\n"
        "{user_request}\n\n"
        "Output a JSON object with fields: vendor (string), amount (number), "
        "currency (3-letter code: USD, EUR, GBP, or INR), po_number (string, optional). "
        "Output ONLY the JSON, nothing else."
    ),
    expected_output="A JSON object with vendor, amount, currency, and optional po_number",
    agent=order_processor,
)

validate_task = Task(
    description=(
        "Take the order JSON from the previous task and validate it using the "
        "validate_order tool. If validation fails, read the error messages, fix "
        "the JSON, and call the tool again with corrected data. "
        "Report the final run ID and validation status."
    ),
    expected_output="Validation result with run ID and status (validated or failed)",
    agent=order_validator,
    context=[extract_task],
)
Enter fullscreen mode Exit fullscreen mode

The context=[extract_task] tells CrewAI to pass the output of the extract task to the validator. The validator then takes that JSON and runs it through Flow.

Running the Crew

from crewai import Crew, Process

crew = Crew(
    agents=[order_processor, order_validator],
    tasks=[extract_task, validate_task],
    process=Process.sequential,
    verbose=True,
)

result = crew.kickoff(
    inputs={
        "user_request": (
            "We need to process an order from Globex Corp for "
            "twelve thousand five hundred dollars, PO number PO-2026-042"
        )
    }
)

print("\n--- Final Result ---")
print(result)
Enter fullscreen mode Exit fullscreen mode

What Happens at Runtime

When you run this, the output shows the full agent reasoning:

[Order Processor] Extracting order data...
> {"vendor": "Globex Corp", "amount": 12500, "currency": "USD", "po_number": "PO-2026-042"}

[Order Validator] Validating order...
> Using tool: validate_order
> Tool result: {"success": true, "status": "validated", "run_id": "..."}

--- Final Result ---
Order validated successfully. Run ID: 550e8400-...
Enter fullscreen mode Exit fullscreen mode

Now here's the interesting case. Say the processor extracts currency: "Dollars":

[Order Validator] Validating order...
> Using tool: validate_order
> Tool result: {"success": false, "errors": ["- currency: must be one of: USD, EUR, GBP, INR"]}

[Order Validator] The currency is invalid. Fixing to "USD" and resubmitting...
> Using tool: validate_order
> Tool result: {"success": true, "status": "validated", "run_id": "..."}
Enter fullscreen mode Exit fullscreen mode

The validator reads the error, fixes the currency, and resubmits. One retry, no human involved.

Handling Multiple Agents Writing to the Same Gate

CrewAI shines when you have multiple specialized agents. In a more complex setup, you might have separate crews for different order types — one for domestic orders, one for international, one for recurring subscriptions. All three can validate against the same Flow gate.

# Different crews, same validation gate
domestic_crew = Crew(agents=[domestic_processor, validator], ...)
international_crew = Crew(agents=[intl_processor, validator], ...)
subscription_crew = Crew(agents=[sub_processor, validator], ...)
Enter fullscreen mode Exit fullscreen mode

The gate enforces consistent validation regardless of which crew produced the data. If you change a business rule — say, increasing the minimum order amount from $10 to $50 — you update it once in the Flow dashboard and every crew picks it up immediately.

Flow's analytics dashboard shows validation results by session, so you can see which crew or agent is producing the most errors and needs prompt tuning.

Adding Human Approval

For high-value orders, configure the gate's approval mode to require human review. When the validator submits a $50,000 order, Flow holds it in a review_required state instead of auto-approving. A reviewer gets an email, reviews the payload, and approves or rejects.

Your CrewAI task can poll for the approval result:

@tool("wait_for_approval")
def wait_for_approval(run_id: str) -> str:
    """Poll a Flow run until it reaches a terminal state."""
    for _ in range(60):
        response = httpx.get(
            f"{RYNKO_BASE_URL}/flow/runs/{run_id}",
            headers={"Authorization": f"Bearer {RYNKO_API_KEY}"},
            timeout=30,
        )
        status = response.json().get("status")
        if status in ("approved", "rejected", "completed", "delivered"):
            return json.dumps({"status": status, "run_id": run_id})
        time.sleep(5)
    return json.dumps({"status": "timeout", "run_id": run_id})
Enter fullscreen mode Exit fullscreen mode

Using MCP Instead of REST

If you prefer the agent to discover Flow gates dynamically through tool calling (rather than hardcoding the gate ID), you can connect CrewAI to Flow's MCP endpoint. Flow auto-generates a validate_{gate_slug} tool for each active gate, and the tool schema includes field types and constraints so the LLM knows what to submit.

This is useful when your agents work across multiple gates and need to pick the right one based on context.

Local Development Setup

# Create project
mkdir crewai-flow-demo && cd crewai-flow-demo
python -m venv .venv
source .venv/bin/activate

# Install
pip install crewai httpx python-dotenv

# Environment
cat > .env << 'EOF'
OPENAI_API_KEY=sk-...
RYNKO_API_KEY=your_api_key_here
FLOW_GATE_ID=your_gate_id_here
EOF
Enter fullscreen mode Exit fullscreen mode

Create main.py with the code above, add from dotenv import load_dotenv; load_dotenv() at the top, and run with python main.py. CrewAI's verbose=True shows you the full agent reasoning — useful for debugging prompt issues.

Full Working Example

The complete code — agents, tools, tasks, .env.example, and two test scenarios — is in our developer resources repo. Clone it, add your API keys, and run python src/main.py.


Resources:

Top comments (0)