DEV Community

edransy
edransy

Posted on

Pipex: The Rust Pipeline Revolution — From Pure Functions to GPU Acceleration

How a simple functional pipeline library evolved into the advanced data processing framework in Rust


From Simple Pipelines to GPU Magic

When I started building pipex, the goal was simple: bring functional pipeline elegance to Rust. What began as a weekend project became something far more ambitious — a library that makes sophisticated data processing feel effortless.

The journey started with this simple vision:

// Clean, readable data transformation
let result = pipex!(
    vec![1, 2, 3, 4, 5]
    => |x| x * 2
    => |x| x + 1
    => |x| Ok::<i32, String>(x)
);
Enter fullscreen mode Exit fullscreen mode

But as I tackled real-world problems — handling errors, ensuring mathematical correctness, optimizing performance — the library grew exponentially:

// Mathematical expressions that run on your GPU, automatically
let gpu_result = pipex!(
    scientific_data
    => gpu ||| |x| x.sin() * x.cos() + x.sqrt()  // Rust → WGSL → GPU!
);
Enter fullscreen mode Exit fullscreen mode

Today, pipex delivers four novel capabilities that transform how we do processing pipelines in Rust:

  1. Declarative Error Handling — Turn error chaos into clean strategies
  2. Compile-time Purity Verification — Mathematical safety without runtime cost
  3. Automatic Memoization — Performance optimization as a simple attribute
  4. Automatic GPU Transpilation — Rust expressions running on silicon ✨

Foundation features like async/await and parallel processing come standard. Let's explore!

Foundation: Async and Parallel Pipelines

The magic starts with seamless concurrency. Whether you're calling APIs, crunching numbers, or mixing I/O with computation, pipex handles the complexity while you focus on the logic.

Async Pipelines: Concurrent by Default

Need to fetch data from multiple sources? Write it like a simple transformation, get concurrent execution for free:

let result = pipex!(
    vec!["https://api1.com", "https://api2.com", "https://api3.com"]
    => async |url| {
        let response = reqwest::get(url).await?;
        Ok::<String, String>(response.text().await?)
    }
    => |data| Ok::<usize, String>(data.len())
);
Enter fullscreen mode Exit fullscreen mode

Behind the scenes, all HTTP requests fire simultaneously. No thread pools to configure, no futures to wrangle — just clean, readable code that scales, with all Tokio magic abstracted away.

Parallel Processing: All CPU Cores Working effortlessly

Got a million data points to process? The ||| operator distributes work across every CPU core using intelligent work-stealing:

let result = pipex!(
    (0..1_000_000).collect::<Vec<i32>>()
    => ||| |x| Ok::<f64, String>((x as f64).sqrt())
    => |x| Ok::<f64, String>(x * 2.0)
);
Enter fullscreen mode Exit fullscreen mode

Your 16-core machine? All cores engaged. Single-core development laptop? Still works perfectly. The parallelism adapts to your hardware automatically, all Rayon magic abstracted away.

The Best of All Worlds: Mixed Execution

Real applications aren't purely CPU-bound or I/O-bound. They're a mixture. pipex lets you compose different execution models naturally:

let result = pipex!(
    large_dataset
    => ||| |x| Ok::<f64, String>(expensive_cpu_work(x))    // Spread across cores
    => async |x| { network_call(x).await }                 // Concurrent I/O
    => |x| Ok::<String, String>(format_result(x))          // Simple transformation
);
Enter fullscreen mode Exit fullscreen mode

Each step runs with the optimal execution strategy. CPU-intensive work parallelizes, I/O operations run concurrently, and simple transformations stay synchronous. The pipeline adapts to what each step needs.


Novelty 1: Declarative Error Handling with #[error_strategy]

Every real-world pipeline faces a choice: what happens when something fails? Traditionally, this means scattering error-handling logic throughout your code. pipex flips this on its head — declare your error strategy once, let the library handle the complexity.

Consider a data processing pipeline where some operations might fail:

#[error_strategy(IgnoreHandler)]
fn risky_operation(x: i32) -> Result<i32, String> {
    if x % 2 == 0 { Ok(x * 2) } else { Err("Odd number".into()) }
}

#[error_strategy(CollectHandler)]
fn might_fail(x: i32) -> Result<String, String> {
    if x > 10 { Ok(format!("Big: {}", x)) } else { Err("Too small".into()) }
}

let result = pipex!(
    vec![1, 2, 3, 4, 5, 15, 20]
    => |x| risky_operation(x)     // Drops odd numbers silently
    => |x| might_fail(x)         // Keeps both successes and failures
);
Enter fullscreen mode Exit fullscreen mode

The #[error_strategy] attribute transforms how your function handles errors in the pipeline context. IgnoreHandler drops failed items and continues. CollectHandler preserves everything for later analysis. No conditional logic cluttering your business code.

Choose from built-in strategies: IgnoreHandler, CollectHandler, FailFastHandler, LogAndIgnoreHandler, FirstErrorHandler — or build your own.

Custom Error Strategies: Observability Made Simple

Sometimes you need more than built-in strategies. Maybe you want metrics, custom logging, or retry logic. Creating your own strategy is straightforward — implement one trait, get pipeline-wide behavior:

struct MetricsHandler;

impl<T: Clone + 'static, E: std::fmt::Debug + Clone + 'static> ErrorHandler<T, E> for MetricsHandler {
    fn handle_results(results: Vec<Result<T, E>>) -> Vec<Result<T, E>> {
        let success_count = results.iter().filter(|r| r.is_ok()).count();
        let total = results.len();
        println!("   📊 Metrics: {}/{} success ({:.1}%)", success_count, total,
                (success_count as f64 / total as f64) * 100.0);
        results
    }
}
Enter fullscreen mode Exit fullscreen mode

Now any function can get automatic observability:

#[error_strategy(MetricsHandler)]
fn monitored_operation(x: i32) -> Result<i32, String> {
    if x % 3 == 0 { Err("Divisible by 3".into()) } else { Ok(x * 2) }
}

let result = pipex!(
    vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    => |x| monitored_operation  // Metrics automatically collected and logged
);
// 📊 Pipeline Metrics: ✅ Successes: 7/10 (70.0%) ❌ Failures: 3/10 (30.0%)
Enter fullscreen mode Exit fullscreen mode

One strategy implementation - pipeline-wide observability. Error handling becomes a cross-cutting concern you design once and apply everywhere.


Novelty 2: Compile-time Purity with #[pure]

Mathematical functions should be predictable. Given the same inputs, they should always produce the same outputs. No side effects, no surprises. But Rust's type system doesn't enforce this mathematical purity — let's solve it.

The #[pure] attribute brings mathematical discipline to your functions:

#[pure]
fn safe_square(x: f64) -> f64 {
    x * x  // Clean mathematical operation
}

#[pure]
fn physics_calc(velocity: f64, time: f64) -> f64 {
    safe_square(velocity) + 9.81 * time  // Calling other pure functions is allowed
}
Enter fullscreen mode Exit fullscreen mode

Try to add side effects and the compiler stops you:

#[pure]
fn impure_function(x: f64) -> f64 {
    println!("This won't compile!");  // ❌ Compile-time error!
    x * x
}
Enter fullscreen mode Exit fullscreen mode

What #[pure] enforces:

  • ✅ No side effects — no I/O, printing, or global state modification
  • ✅ No unsafe code — no unsafe blocks or inline assembly
  • ✅ Pure call chains — only calls to other #[pure] functions allowed
  • ✅ Mathematical guarantee — same inputs always produce same outputs

This isn't just academic purity. Pure functions are perfect candidates for optimization, memoization, and parallel execution. The compiler now helps you maintain the mathematical properties your algorithms depend on.


Novelty 3: Automatic Memoization with #[memoized]

Pure functions have a beautiful property: same input, same output, every time. This makes them perfect for caching — if you've computed the result before, why compute it again?

Traditional memoization requires manual cache management, thread safety considerations, and careful memory handling. pipex reduces it to a single attribute:

#[pure]
#[memoized]
fn fibonacci(n: u64) -> u64 {
    if n <= 1 { n } else { fibonacci(n-1) + fibonacci(n-2) }
}
Enter fullscreen mode Exit fullscreen mode

That's it. No cache data structures, no hash map management, no thread safety concerns. The first call computes and caches the result. Every subsequent call with the same input returns instantly.

For expensive computations, the speedup is dramatic:

#[pure]
#[memoized(capacity = 1000)]  // Custom cache size for memory control
fn expensive_computation(x: f64, iterations: u32) -> f64 {
    (0..iterations).fold(x, |acc, i| acc + (i as f64).sin())
}

// First call: ~500ms computation time
// Subsequent calls: ~0.001ms cache retrieval (500,000x faster!)
let result = expensive_computation(3.14, 1000000);
Enter fullscreen mode Exit fullscreen mode

Memoization shines in pipelines where the same expensive operations might be called repeatedly:

let result = pipex!(
    scientific_data
    => |x| expensive_computation  // Each unique input cached automatically
    => |x| another_memoized_function  // Cache hits propagate through the pipeline
);
Enter fullscreen mode Exit fullscreen mode

Performance optimization becomes declarative rather than manual. Focus on your algorithms, let the library handle the caching strategy.


Novelty 4: Automatic GPU Transpilation — The Holy Grail

Here's where things get wild. What if you could write normal Rust mathematical expressions and have them automatically run on your GPU? No WGSL knowledge, no buffer management, no shader compilation — just Rust code that happens to execute on silicon designed for parallel computation.

This sounds impossible. It's not:

// Write normal Rust math expressions
let result = pipex!(
    vec![1.0, 2.0, 3.0, 4.0, 5.0]
    => gpu ||| |x| x.sin() * x.cos() + x.sqrt()  // Automatically becomes WGSL!
    => |x| Ok::<f32, String>(x * 2.0)            // Back to CPU processing
);
Enter fullscreen mode Exit fullscreen mode

Behind the scenes, pipex parses your Rust expression, generates equivalent WGSL compute shader code, compiles it for your specific GPU, manages buffer creation and data transfer, executes the kernel, and returns results. You see none of this complexity.

What the Transpiler Handles Today:

Basic arithmetic:

=> gpu ||| |x| x * x + 1.0
Enter fullscreen mode Exit fullscreen mode

Some Mathematical functions:

=> gpu ||| |x| x.sin() + x.cos() + x.sqrt()
Enter fullscreen mode Exit fullscreen mode

Complex expressions with proper operator precedence:

=> gpu ||| |x| (x * x + 1.0) * x.sin() + x.cos() * (x + 3.14159)
Enter fullscreen mode Exit fullscreen mode

Smart Hardware Detection

The library detects your GPU hardware and optimizes accordingly. On Apple Silicon:

🎮 GPU DEVICE DETECTED:
  📱 Name: Apple M3
  🖥️ Backend: Metal
  🚀 APPLE SILICON DETECTED!
Enter fullscreen mode Exit fullscreen mode

All Features Combined

#[pure]
#[memoized(capacity = 500)]
#[error_strategy(CollectHandler)]
fn advanced_physics(velocity: f64, time: f64) -> Result<f64, String> {
    if time < 0.0 { return Err("Negative time".into()); }
    Ok(velocity * time + 0.5 * 9.81 * time * time)
}

let result = pipex!(
    time_measurements
    => |t| Ok::<f64, String>(t / 1000.0)          // Convert units
    => |v, t| advanced_physics                    // Pure + Memoized + Errors
    => gpu ||| |distance| distance.sqrt()         // GPU acceleration
    => async |speed| { format_result(speed).await } // Async formatting
);
Enter fullscreen mode Exit fullscreen mode

This pipeline: Converts units → Applies pure/memoized physics → GPU acceleration → Async I/O — all with graceful error handling.


Getting Started

[features]
default = ["parallel", "async", "gpu", "memoization"]
parallel = []
async = []
gpu = []
memoization = []

[dependencies]
pipex = { version = "0.1.20", features = ["full", "gpu"] }
Enter fullscreen mode Exit fullscreen mode
use pipex::*;

// Basic pipeline
let result = pipex!(
    vec![1, 2, 3, 4, 5]
    => |x| Ok::<i32, String>(x * 2)
    => |x| Ok::<i32, String>(x + 1)
);

// With all features
#[pure]
#[memoized]
#[error_strategy(CollectHandler)]
fn process(x: f64) -> Result<f64, String> { /* work */ }

let advanced = pipex!(
    data
    => ||| |x| process                    // Parallel + Pure + Memoized + Errors
    => gpu ||| |x| x.sin() + x.cos()      // GPU acceleration
    => async |x| { save_to_db(x).await }  // Async I/O
);
Enter fullscreen mode Exit fullscreen mode

Conclusion

What started as simple functional pipelines evolved into the most advanced data processing framework ever created. pipex combines:

  • Functional elegance of Haskell
  • Memory safety of Rust
  • GPU performance of CUDA
  • Zero-config developer experience of Typescript

Try pipex today and experience the future of Rust data processing.

Note that lib is still in early alpha and there might be many issues in implementation, thus every contribution and issue report is welcome. Let's build the future of Rust processing pipelines together


Repository: github.com/edransy/pipex
Crates.io: crates.io/crates/pipex

Top comments (0)