DEV Community

Glen Baker
Glen Baker

Posted on • Originally published at entropicdrift.com

Three Patterns That Made Prodigy's Functional Migration Worth It

Originally published on Entropic Drift


The Migration Story

Over the past few weeks, I've been migrating Prodigy—my Rust-based AI workflow orchestration tool—to use functional programming patterns from Stillwater, a library I built for applicative validation and effect handling in Rust.

The migration touched variable aggregation, environment access, and workflow execution. Not every change was revolutionary, but three patterns produced outsized benefits in testability, safety, and code clarity. This post breaks down each one with concrete before/after comparisons.

Pattern 1: Semigroup-Based Variable Aggregation

The Problem: Prodigy's MapReduce workflows aggregate results from parallel AI agents. Before the migration, aggregation logic was scattered across custom merge implementations—each aggregate type (count, sum, average, etc.) had its own ad-hoc combination logic with no consistency guarantees.

The Solution: Implement the Semigroup trait from Stillwater, which provides a single combine operation with a mathematical guarantee: associativity.

(a.combine(b)).combine(c) == a.combine((b.combine(c)))
Enter fullscreen mode Exit fullscreen mode

This property means results can be combined in any order—essential for safe parallel aggregation.

Before: Ad-Hoc Merge Logic

// Each aggregate type had custom merge logic
fn merge_counts(a: usize, b: usize) -> usize {
    a + b
}

fn merge_averages(sum_a: f64, count_a: usize, sum_b: f64, count_b: usize) -> (f64, usize) {
    (sum_a + sum_b, count_a + count_b)
}

fn merge_maps(mut a: HashMap<String, Value>, b: HashMap<String, Value>) -> HashMap<String, Value> {
    for (k, v) in b {
        a.entry(k).or_insert(v);
    }
    a
}

// No guarantee these operations are associative
// No property tests verifying correctness
// 15 different aggregate types, 15 different implementations
Enter fullscreen mode Exit fullscreen mode

After: Unified Semigroup Implementation

impl Semigroup for AggregateResult {
    fn combine(self, other: Self) -> Self {
        use AggregateResult::*;

        match (self, other) {
            // All 15 types follow the same pattern
            (Count(a), Count(b)) => Count(a.saturating_add(b)),
            (Sum(a), Sum(b)) => Sum(a + b),
            (Average(sum_a, count_a), Average(sum_b, count_b)) => {
                Average(sum_a + sum_b, count_a + count_b)
            }
            (Merge(mut a), Merge(b)) => {
                for (k, v) in b {
                    a.entry(k).or_insert(v);
                }
                Merge(a)
            }
            // ... other types follow same structure

            // Type mismatches handled at validation boundary
            _ => unreachable!("Use combine_homogeneous for type safety")
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The Key Insight: Validation at Boundaries

The Semigroup combine only handles matching types. Type validation happens at the boundary using Stillwater's homogeneous validation:

pub fn aggregate_map_results(
    results: Vec<AggregateResult>,
) -> Validation<AggregateResult, Vec<TypeMismatchError>> {
    if results.is_empty() {
        return Validation::success(AggregateResult::Count(0));
    }

    combine_homogeneous(results, std::mem::discriminant, TypeMismatchError::new)
}
Enter fullscreen mode Exit fullscreen mode

This accumulates all type mismatches rather than failing on the first one—a core principle of applicative validation.

Property Tests Verify Associativity

proptest! {
    #[test]
    fn prop_count_associativity(a in 0usize..1000, b in 0usize..1000, c in 0usize..1000) {
        let x = AggregateResult::Count(a);
        let y = AggregateResult::Count(b);
        let z = AggregateResult::Count(c);

        // (x combine y) combine z == x combine (y combine z)
        let left = x.clone().combine(y.clone()).combine(z.clone());
        let right = x.combine(y.combine(z));

        prop_assert_eq!(left, right);
    }
}
Enter fullscreen mode Exit fullscreen mode

All 15 aggregate types have property tests verifying associativity. This isn't ceremony—it's proof that parallel aggregation is safe.

Parallel Aggregation: Now Trivial

With associativity guaranteed, parallel aggregation becomes a one-liner:

pub fn parallel_aggregate(results: Vec<AggregateResult>) -> Option<AggregateResult> {
    results.into_par_iter().reduce_with(|a, b| a.combine(b))
}
Enter fullscreen mode Exit fullscreen mode

Rayon's reduce_with can split work arbitrarily because associativity guarantees the same result regardless of combination order.

The Gains

Metric Before After
Lines of code ~450 ~200
Custom merge functions 15 1
Property tests 0 33
Panics in production Possible None (validation at boundaries)
Parallel-safe Unknown Proven via associativity

Pattern 2: Reader Pattern for Environment Access

The Problem: MapReduce workflow functions needed access to configuration, storage, executors, and other dependencies. The traditional approach threads these through every function signature:

async fn execute_agent(
    item: &Value,
    config: &MapConfig,
    worktree_manager: &WorktreeManager,
    executor: &CommandExecutor,
    storage: &Storage,
    job_id: &str,
    max_parallel: usize,
) -> Result<AgentResult> { ... }
Enter fullscreen mode Exit fullscreen mode

Six parameters just for dependencies. Adding a new dependency meant updating every function in the call chain.

The Solution: The Reader pattern via Stillwater's Effect::asks, which extracts dependencies from an environment at runtime.

Before: Parameter Threading

async fn execute_map_phase(
    items: Vec<Value>,
    config: &MapConfig,
    worktree_mgr: &WorktreeManager,
    executor: &CommandExecutor,
    storage: &Storage,
    job_id: &str,
    max_parallel: usize,
) -> Result<Vec<AgentResult>> {
    let mut results = Vec::new();
    for item in items {
        let result = execute_agent(
            &item, config, worktree_mgr, executor, storage, job_id, max_parallel
        ).await?;
        results.push(result);
    }
    Ok(results)
}
Enter fullscreen mode Exit fullscreen mode

Every function explicitly passes dependencies to its callees. Tests require constructing all dependencies even when testing logic that doesn't use them.

After: Reader Pattern with Effect::asks

/// Get the worktree manager from the environment
pub fn get_worktree_manager() -> Effect<Arc<WorktreeManager>, MapReduceError, MapEnv> {
    Effect::asks(|env: &MapEnv| env.worktree_manager.clone())
}

/// Get max parallel setting
pub fn get_max_parallel() -> Effect<usize, MapReduceError, MapEnv> {
    Effect::asks(|env: &MapEnv| env.max_parallel)
}

/// Workflow code extracts what it needs
fn execute_agent(item: Value) -> Effect<AgentResult, MapReduceError, MapEnv> {
    get_worktree_manager()
        .and_then(|wt_mgr| create_worktree_effect(&item.id))
        .and_then(|worktree| execute_commands_effect(&item, &worktree))
}
Enter fullscreen mode Exit fullscreen mode

Dependencies are extracted from the environment when needed, not threaded through signatures.

Local Overrides: Temporarily Modify Context

The Reader pattern also enables local modifications via Effect::local:

/// Run an effect with reduced concurrency
pub fn with_max_parallel<T: Send + 'static>(
    max_parallel: usize,
    effect: Effect<T, MapReduceError, MapEnv>,
) -> Effect<T, MapReduceError, MapEnv> {
    Effect::local(
        move |env: &MapEnv| MapEnv {
            max_parallel,
            ..env.clone()
        },
        effect,
    )
}

// Usage: run risky operations with limited concurrency
let effect = with_max_parallel(2, execute_agents(work_items));
Enter fullscreen mode Exit fullscreen mode

The override only affects the wrapped effect—outer code sees the original environment.

Testing with Mock Environments

#[tokio::test]
async fn test_get_max_parallel() {
    let env = MockMapEnvBuilder::new()
        .with_max_parallel(10)
        .build();

    let effect = get_max_parallel();
    let result = effect.run(&env).await;

    assert_eq!(result.unwrap(), 10);
}

#[tokio::test]
async fn test_local_changes_dont_leak() {
    let env = MockMapEnvBuilder::new().with_max_parallel(5).build();

    // Override inside effect
    let inner = with_max_parallel(100, get_max_parallel());
    assert_eq!(inner.run(&env).await.unwrap(), 100);

    // Original environment unchanged
    let outer = get_max_parallel();
    assert_eq!(outer.run(&env).await.unwrap(), 5);
}
Enter fullscreen mode Exit fullscreen mode

Tests use MockMapEnvBuilder to construct exactly the environment they need. No complex setup, no unused dependencies.

The Gains

Metric Before After
Average function parameters 6-8 1-2
Test setup boilerplate High Minimal (builder pattern)
Adding new dependencies Change N signatures Add one getter
Local overrides Manual save/restore Effect::local
Composition Manual threading and_then/map

Pattern 3: Pure Core, Imperative Shell

The Problem: Workflow execution mixed pure logic (variable expansion, command building, output parsing) with I/O operations (file access, shell commands, API calls). Testing required mocking I/O even for purely computational tests.

The Solution: Separate pure transformations into a pure/ module and I/O operations into an effects/ module. Pure functions are trivially testable; effects are testable with mock environments.

The Architecture

src/cook/workflow/
├── pure/                           # No I/O, no side effects
│   ├── command_builder.rs          # Build commands from templates
│   ├── output_parser.rs            # Parse command output
│   └── variable_expansion.rs       # Expand ${VAR} patterns
└── effects/                        # I/O encapsulated in Effects
    ├── claude.rs                   # Claude API interactions
    ├── shell.rs                    # Shell command execution
    ├── handler.rs                  # Custom handler execution
    └── environment.rs              # Effect environment
Enter fullscreen mode Exit fullscreen mode

Pure Module: No Mocks Needed

//! Pure workflow transformations module
//!
//! All functions in this module are:
//! - Pure: No I/O operations, no side effects
//! - Deterministic: Same inputs always produce same outputs
//! - Testable: No mocking required for unit tests

/// Expand variables in template string
pub fn expand_variables(template: &str, variables: &HashMap<String, String>) -> String {
    let mut result = template.to_string();

    // Expand ${VAR} first (more specific pattern)
    for (key, value) in variables {
        let placeholder = format!("${{{}}}", key);
        result = result.replace(&placeholder, value);
    }

    // Expand $VAR with word boundaries
    for (key, value) in variables {
        result = expand_simple_var(&result, key, value);
    }

    result
}
Enter fullscreen mode Exit fullscreen mode

Testing is straightforward:

#[test]
fn test_expand_variables_mixed() {
    let template = "echo ${name} $value";
    let vars: HashMap<String, String> = [
        ("name".into(), "test".into()),
        ("value".into(), "123".into()),
    ].iter().cloned().collect();

    let result = expand_variables(template, &vars);
    assert_eq!(result, "echo test 123");
}
Enter fullscreen mode Exit fullscreen mode

No mock setup, no async runtime, no I/O stubs. Just input → output.

Property Tests for Pure Functions

proptest! {
    #[test]
    fn prop_variable_expansion_is_deterministic(
        template in ".*",
        vars in prop::collection::hash_map(valid_var_name(), safe_value(), 0..5),
    ) {
        let result1 = expand_variables(&template, &vars);
        let result2 = expand_variables(&template, &vars);

        prop_assert_eq!(result1, result2);
    }

    #[test]
    fn prop_variable_expansion_idempotent_for_safe_values(
        template in r"[a-zA-Z0-9 ${}_.,-]*",
        vars in prop::collection::hash_map(valid_var_name(), r"[a-zA-Z0-9 _-]*", 0..3),
    ) {
        let safe_vars: HashMap<String, String> = vars
            .into_iter()
            .filter(|(_, v)| !v.contains('$'))
            .collect();

        let result1 = expand_variables(&template, &safe_vars);
        let result2 = expand_variables(&result1, &safe_vars);

        // Idempotent when values don't contain variable references
        prop_assert_eq!(result1, result2);
    }
}
Enter fullscreen mode Exit fullscreen mode

Property tests verify behavioral guarantees that unit tests might miss.

Effects Module: I/O in Composable Units

//! Effect-based I/O operations for workflow execution
//!
//! The effects module separates concerns:
//! - **Pure logic** lives in `pure/` module
//! - **I/O effects** live here
//! - **Environment** provides dependencies via injection

pub fn execute_shell_command_effect(
    command: &str,
    vars: &HashMap<String, String>,
) -> Effect<CommandOutput, CommandError, WorkflowEnv> {
    // 1. Pure: expand variables in command
    let expanded = expand_variables(command, vars);

    // 2. Effect: execute shell command
    Effect::from_async(move |env: &WorkflowEnv| {
        let shell = env.shell_executor.clone();
        async move {
            shell.execute(&expanded).await
                .map_err(|e| CommandError::ExecutionFailed {
                    message: e.to_string(),
                    exit_code: None,
                })
        }
    })
}
Enter fullscreen mode Exit fullscreen mode

Effects compose via and_then:

let workflow_effect = execute_shell_command_effect("cargo build", &vars)
    .and_then(|_| execute_shell_command_effect("cargo test", &vars))
    .map(|result| process_output(result));

// Execute with environment
let output = workflow_effect.run(&env).await?;
Enter fullscreen mode Exit fullscreen mode

The Gains

Metric Before After
Pure functions Mixed with I/O Isolated in pure/
Unit tests requiring mocks Most Only effects/
Property tests Difficult Natural
Code reuse Limited Pure functions composable
Testability Mock-heavy Mock-minimal

Lessons Learned

1. Mathematical Guarantees Pay Off

The Semigroup trait's associativity guarantee isn't academic—it's what makes parallel aggregation provably correct. Property tests verify the guarantee; production code depends on it.

2. Reader Pattern Scales

Threading 6 parameters through 10 functions is manageable. Threading 12 parameters through 50 functions isn't. The Reader pattern's overhead is constant; parameter threading's overhead scales with codebase size.

3. Pure Functions Are Easier to Test, Period

Every function in pure/ can be tested with zero setup. No mocks, no async runtime, no environment construction. This isn't ideology—it's pragmatic reduction of test complexity.

4. Separation Creates Options

With pure logic separated from effects, I can:

  • Test pure logic exhaustively with property tests
  • Test effects with minimal mock environments
  • Swap effect implementations (real vs mock) without touching business logic
  • Reason about pure and effectful code independently

5. The Migration Was Incremental

None of this happened in one big refactor. Each spec (171, 174a-h, 175) was a focused change:

  • Spec 171: Semigroup for aggregation
  • Spec 174b: Pure workflow transformations
  • Spec 174d: Effect modules
  • Spec 175: Reader pattern helpers

Small, reviewable changes that compiled and passed tests at each step.

Conclusion

The Stillwater migration to Prodigy wasn't about adopting functional programming for its own sake. It was about solving real problems:

  1. Parallel aggregation needed to be provably correct → Semigroup's associativity
  2. Dependency threading was becoming unmanageable → Reader pattern
  3. Testing required too much mock infrastructure → Pure core, imperative shell

Each pattern addressed a specific pain point with a proven solution. The result is code that's more testable, more composable, and easier to reason about.

If you're building Rust applications that deal with aggregation, dependency injection, or I/O-heavy workflows, consider whether these patterns might help. They're not silver bullets, but they're sharp tools for the right problems.


Resources

Projects:

  • Prodigy - AI workflow orchestration
  • Stillwater - Applicative validation and effects for Rust

Want more content like this? Follow me on Dev.to or subscribe to Entropic Drift for posts on AI-powered development workflows, Rust tooling, and technical debt management.

Check out my open-source projects:

  • Debtmap - Technical debt analyzer
  • Prodigy - AI workflow orchestration

Top comments (0)