DEV Community

Cover image for Teaching Agents My Actual Engineering Workflow: Secure Adaptive Orchestration
Kowshik Jallipalli
Kowshik Jallipalli

Posted on

Teaching Agents My Actual Engineering Workflow: Secure Adaptive Orchestration

Chat interfaces force you to act as an AI's micro-manager, holding the entire state of a feature in your head while you spoon-feed it instructions. Real engineering isn't linear. You write a feature, parallelize the documentation and unit tests, and—crucially—adapt your code when a third-party API abruptly changes its payload schema.

When you encode your SDLC into a deterministic workflow graph, you transition from "prompting" to "orchestrating." You can assign routine tasks to worker agents, run independent tasks concurrently, and build "adaptive loops" where an agent automatically rewrites its own integration scripts in response to runtime errors.

However, after auditing dozens of these dynamic agent workflows, a critical flaw emerges: executing LLM-generated code on the fly is a massive Remote Code Execution (RCE) vulnerability. Here is how to codify your engineering workflow into a safe, auditable state machine.

Why This Matters (The Audit Perspective)
If an agent writes a data mapper and your orchestrator immediately evaluates it using Python's built-in exec() against your live environment, you are one hallucination away from a wiped database.

By defining your workflow as a Directed Acyclic Graph (DAG), you create structural boundaries. You can isolate the drafting phase from the testing phase. More importantly, by enforcing strict Pydantic schemas on the agent's feedback loop and executing the proposed code in a segregated subprocess, you maintain the speed of AI automation without compromising your system's integrity.

How It Works: The Hardened DAG
Instead of one massive system prompt, we represent the workflow as a sequence of discrete nodes.

Routine Tasks: Sequential steps like pulling an OpenAPI spec and drafting an initial data mapper.

Parallelizable Chunks: Two separate agents concurrently write the Pytest suite and the Markdown documentation based on the draft.

Secure Adaptive Integration: The generated mapper is executed against a staging API inside a restricted subprocess. If the API returns a 400 Bad Request, the orchestrator catches the exception, sanitizes the stack trace (to prevent secret leakage), and asks the agent to rewrite the code based on a strict JSON schema.

The Code: Workflow Spec and Validated Orchestrator
Here is how you define this workflow in YAML and implement the secure, adaptive orchestrator in Python. Our scenario: an agent building a script that syncs internal SaaS users to a third-party CRM.

  1. The Workflow Specification (workflow.yaml) This defines the execution graph and the specific agent personas for each node. name: CRM_Integration_Builder version: 1.1

nodes:

  • id: analyze_docs
    type: routine
    agent: "Systems Analyst"
    action: "Read CRM OpenAPI spec and extract the User payload schema."

  • id: generate_mapper
    type: routine
    agent: "Backend Engineer"
    depends_on: [analyze_docs]
    action: "Write a Python function 'sync_to_crm(user_dict)'."

# The self-healing loop (Runs dynamically)

  • id: adaptive_test_loop type: adaptive agent: "Integration Engineer" depends_on: [generate_mapper] max_retries: 3 action: "Execute sync_to_crm against staging. If it fails, adapt the code."
    1. The Hardened Adaptive Orchestrator (orchestrator.py) This script focuses on the adaptive_test_loop. It replaces dangerous exec() calls with sandboxed subprocesses, uses Pydantic to validate the LLM's response, and explicitly sanitizes error outputs. import json import subprocess import tempfile import os from pydantic import BaseModel, ValidationError from typing import Dict, Any

1. THE AUDIT FIX: Strict schemas for LLM outputs

class AdaptationResponse(BaseModel):
rationale: str
code: str

Mock LLM Client (Replace with Anthropic/OpenAI SDK utilizing Structured Outputs)

def call_agent_structured(prompt: str) -> str:
"""Simulates an LLM call returning a JSON string matching AdaptationResponse."""
pass

class SecureAdaptiveLoop:
def init(self, initial_code: str, max_retries: int = 3):
self.current_code = initial_code
self.max_retries = max_retries
self.decision_log = []

def sanitize_error(self, error_text: str) -> str:
    """AUDIT FIX: Prevent leaking env paths or secrets in stack traces."""
    # Simple example: strip local absolute paths
    import re
    sanitized = re.sub(r'/Users/[^/]+/', '/app/', error_text)
    return sanitized[:1500] # Truncate to prevent context window exhaustion

def run_dynamic_code_safely(self, code: str) -> tuple[bool, str]:
    """
    AUDIT FIX: Never use exec(). Write to a temp file and run via subprocess 
    with strict timeouts. In production, wrap this in Docker/gVisor.
    """
    with tempfile.TemporaryDirectory() as temp_dir:
        file_path = os.path.join(temp_dir, "mapper.py")

        # Inject a mock execution block to test the function
        executable_code = code + "\n\n" + """
Enter fullscreen mode Exit fullscreen mode

if name == 'main':
test_user = {"email": "dev@example.com", "first": "Ada", "last": "Lovelace", "plan": "pro"}
payload = sync_to_crm(test_user)
if 'customer_tier' not in payload:
raise ValueError("HTTP 400: Missing required field 'customer_tier'.")
print("Success")
"""
with open(file_path, "w") as f:
f.write(executable_code)

        try:
            result = subprocess.run(
                ["python", file_path],
                capture_output=True,
                text=True,
                timeout=5 # Hard kill switch
            )
            if result.returncode == 0:
                return True, "Success"
            return False, result.stderr
        except subprocess.TimeoutExpired:
            return False, "Execution timed out. Infinite loop detected."

def execute(self):
    for attempt in range(1, self.max_retries + 1):
        print(f"--- Running Integration (Attempt {attempt}) ---")
        success, output = self.run_dynamic_code_safely(self.current_code)

        if success:
            print("✅ Integration successful!")
            return True

        safe_error = self.sanitize_error(output)
        print(f"❌ Integration failed. Adapting...")

        if attempt == self.max_retries:
            print("🚨 Max retries reached. Surfacing to human.")
            return False

        # The Adaptive Step
        adaptation_prompt = f"""
        Your Python function threw this error during integration testing:
        {safe_error}

        Current Code:
Enter fullscreen mode Exit fullscreen mode
        ```python
        {self.current_code}
        ```
Enter fullscreen mode Exit fullscreen mode
        Rewrite the function to fix this error. Output strictly valid JSON matching the schema.
"""
    raw_response = call_agent_structured(adaptation_prompt)

    try:
        # AUDIT FIX: Validate LLM output structure before trusting it
        adaptation_data = AdaptationResponse.parse_raw(raw_response)
        self.current_code = adaptation_data.code

        self.decision_log.append({
            "attempt": attempt,
            "error": safe_error,
            "rationale": adaptation_data.rationale
        })
    except ValidationError as e:
        print(f"⚠️ Agent returned invalid JSON format. Retrying... {e}")
        # In a real system, you would feed the validation error back to the agent here
Enter fullscreen mode Exit fullscreen mode
Enter fullscreen mode Exit fullscreen mode




--- Example Execution ---

if name == "main":
# Initial drafted code (missing the required 'customer_tier' field)
initial_mapper_code = """
def sync_to_crm(internal_user):
return {
"email": internal_user["email"],
"full_name": f"{internal_user['first']} {internal_user['last']}"
}
"""
workflow = SecureAdaptiveLoop(initial_code=initial_mapper_code)
workflow.execute()
Pitfalls and Gotchas
When building adaptive orchestration loops, watch out for these traps:

The exec() Vulnerability: As mentioned, evaluating LLM-generated code in your host process means the LLM has your system's exact IAM permissions and environment variables. Always shell out to an isolated subprocess, or better yet, a disposable Docker container with --network none.

The JSON Markdown Wrapper: LLMs notoriously wrap JSON outputs in Markdown backticks (e.g.,

json {...}

). If you pass this directly to json.loads() or Pydantic, it will crash. Use the official "Structured Outputs" features from OpenAI/Anthropic, or aggressively regex-strip the backticks before parsing.

Leaking Secrets in Stack Traces: If your subprocess fails because it couldn't connect to a database, the resulting stack trace might print the raw connection string (including passwords) to stderr. If you blindly feed stderr back to the LLM for the next attempt, you are sending your database credentials to a third-party AI provider. Always sanitize error logs.

Misclassifying Infrastructure Errors: If an external API returns a 503 Service Unavailable, the adaptive agent might try to rewrite perfectly good code to "fix" it. Implement an HTTP status code gate: only feed 400 (Bad Request) or 422 (Unprocessable Entity) errors back to the code-generation loop.

What to Try Next
True Container Sandboxing: Replace the subprocess.run call with the Docker SDK (docker.from_env().containers.run()). Mount the generated script into an Alpine Linux container, execute it, capture the logs, and destroy the container.

Async DAG Execution: Read your workflow.yaml using Python's asyncio. Use asyncio.gather() to spin up the write_tests and write_docs agents concurrently once the initial generate_mapper step successfully completes.

Synthetic Schema Fuzzing: Don't wait for a vendor's API to break in production. Use a separate "Chaos Agent" to randomly mutate the expected payload schema of your mock CRM API during nightly CI runs, proving that your adaptive_test_loop can successfully detect and patch integration regressions automatically

Top comments (0)