How a simple functional pipeline library evolved into the advanced data processing framework in Rust
From Simple Pipelines to GPU Magic
When I started building pipex
, the goal was simple: bring functional pipeline elegance to Rust. What began as a weekend project became something far more ambitious — a library that makes sophisticated data processing feel effortless.
The journey started with this simple vision:
// Clean, readable data transformation
let result = pipex!(
vec![1, 2, 3, 4, 5]
=> |x| x * 2
=> |x| x + 1
=> |x| Ok::<i32, String>(x)
);
But as I tackled real-world problems — handling errors, ensuring mathematical correctness, optimizing performance — the library grew exponentially:
// Mathematical expressions that run on your GPU, automatically
let gpu_result = pipex!(
scientific_data
=> gpu ||| |x| x.sin() * x.cos() + x.sqrt() // Rust → WGSL → GPU!
);
Today, pipex
delivers four novel capabilities that transform how we do processing pipelines in Rust:
- Declarative Error Handling — Turn error chaos into clean strategies
- Compile-time Purity Verification — Mathematical safety without runtime cost
- Automatic Memoization — Performance optimization as a simple attribute
- Automatic GPU Transpilation — Rust expressions running on silicon ✨
Foundation features like async/await and parallel processing come standard. Let's explore!
Foundation: Async and Parallel Pipelines
The magic starts with seamless concurrency. Whether you're calling APIs, crunching numbers, or mixing I/O with computation, pipex
handles the complexity while you focus on the logic.
Async Pipelines: Concurrent by Default
Need to fetch data from multiple sources? Write it like a simple transformation, get concurrent execution for free:
let result = pipex!(
vec!["https://api1.com", "https://api2.com", "https://api3.com"]
=> async |url| {
let response = reqwest::get(url).await?;
Ok::<String, String>(response.text().await?)
}
=> |data| Ok::<usize, String>(data.len())
);
Behind the scenes, all HTTP requests fire simultaneously. No thread pools to configure, no futures to wrangle — just clean, readable code that scales, with all Tokio
magic abstracted away.
Parallel Processing: All CPU Cores Working effortlessly
Got a million data points to process? The |||
operator distributes work across every CPU core using intelligent work-stealing:
let result = pipex!(
(0..1_000_000).collect::<Vec<i32>>()
=> ||| |x| Ok::<f64, String>((x as f64).sqrt())
=> |x| Ok::<f64, String>(x * 2.0)
);
Your 16-core machine? All cores engaged. Single-core development laptop? Still works perfectly. The parallelism adapts to your hardware automatically, all Rayon
magic abstracted away.
The Best of All Worlds: Mixed Execution
Real applications aren't purely CPU-bound or I/O-bound. They're a mixture. pipex
lets you compose different execution models naturally:
let result = pipex!(
large_dataset
=> ||| |x| Ok::<f64, String>(expensive_cpu_work(x)) // Spread across cores
=> async |x| { network_call(x).await } // Concurrent I/O
=> |x| Ok::<String, String>(format_result(x)) // Simple transformation
);
Each step runs with the optimal execution strategy. CPU-intensive work parallelizes, I/O operations run concurrently, and simple transformations stay synchronous. The pipeline adapts to what each step needs.
Novelty 1: Declarative Error Handling with #[error_strategy]
Every real-world pipeline faces a choice: what happens when something fails? Traditionally, this means scattering error-handling logic throughout your code. pipex
flips this on its head — declare your error strategy once, let the library handle the complexity.
Consider a data processing pipeline where some operations might fail:
#[error_strategy(IgnoreHandler)]
fn risky_operation(x: i32) -> Result<i32, String> {
if x % 2 == 0 { Ok(x * 2) } else { Err("Odd number".into()) }
}
#[error_strategy(CollectHandler)]
fn might_fail(x: i32) -> Result<String, String> {
if x > 10 { Ok(format!("Big: {}", x)) } else { Err("Too small".into()) }
}
let result = pipex!(
vec![1, 2, 3, 4, 5, 15, 20]
=> |x| risky_operation(x) // Drops odd numbers silently
=> |x| might_fail(x) // Keeps both successes and failures
);
The #[error_strategy]
attribute transforms how your function handles errors in the pipeline context. IgnoreHandler
drops failed items and continues. CollectHandler
preserves everything for later analysis. No conditional logic cluttering your business code.
Choose from built-in strategies: IgnoreHandler
, CollectHandler
, FailFastHandler
, LogAndIgnoreHandler
, FirstErrorHandler
— or build your own.
Custom Error Strategies: Observability Made Simple
Sometimes you need more than built-in strategies. Maybe you want metrics, custom logging, or retry logic. Creating your own strategy is straightforward — implement one trait, get pipeline-wide behavior:
struct MetricsHandler;
impl<T: Clone + 'static, E: std::fmt::Debug + Clone + 'static> ErrorHandler<T, E> for MetricsHandler {
fn handle_results(results: Vec<Result<T, E>>) -> Vec<Result<T, E>> {
let success_count = results.iter().filter(|r| r.is_ok()).count();
let total = results.len();
println!(" 📊 Metrics: {}/{} success ({:.1}%)", success_count, total,
(success_count as f64 / total as f64) * 100.0);
results
}
}
Now any function can get automatic observability:
#[error_strategy(MetricsHandler)]
fn monitored_operation(x: i32) -> Result<i32, String> {
if x % 3 == 0 { Err("Divisible by 3".into()) } else { Ok(x * 2) }
}
let result = pipex!(
vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
=> |x| monitored_operation // Metrics automatically collected and logged
);
// 📊 Pipeline Metrics: ✅ Successes: 7/10 (70.0%) ❌ Failures: 3/10 (30.0%)
One strategy implementation - pipeline-wide observability. Error handling becomes a cross-cutting concern you design once and apply everywhere.
Novelty 2: Compile-time Purity with #[pure]
Mathematical functions should be predictable. Given the same inputs, they should always produce the same outputs. No side effects, no surprises. But Rust's type system doesn't enforce this mathematical purity — let's solve it.
The #[pure]
attribute brings mathematical discipline to your functions:
#[pure]
fn safe_square(x: f64) -> f64 {
x * x // Clean mathematical operation
}
#[pure]
fn physics_calc(velocity: f64, time: f64) -> f64 {
safe_square(velocity) + 9.81 * time // Calling other pure functions is allowed
}
Try to add side effects and the compiler stops you:
#[pure]
fn impure_function(x: f64) -> f64 {
println!("This won't compile!"); // ❌ Compile-time error!
x * x
}
What #[pure]
enforces:
- ✅ No side effects — no I/O, printing, or global state modification
- ✅ No unsafe code — no
unsafe
blocks or inline assembly - ✅ Pure call chains — only calls to other
#[pure]
functions allowed - ✅ Mathematical guarantee — same inputs always produce same outputs
This isn't just academic purity. Pure functions are perfect candidates for optimization, memoization, and parallel execution. The compiler now helps you maintain the mathematical properties your algorithms depend on.
Novelty 3: Automatic Memoization with #[memoized]
Pure functions have a beautiful property: same input, same output, every time. This makes them perfect for caching — if you've computed the result before, why compute it again?
Traditional memoization requires manual cache management, thread safety considerations, and careful memory handling. pipex
reduces it to a single attribute:
#[pure]
#[memoized]
fn fibonacci(n: u64) -> u64 {
if n <= 1 { n } else { fibonacci(n-1) + fibonacci(n-2) }
}
That's it. No cache data structures, no hash map management, no thread safety concerns. The first call computes and caches the result. Every subsequent call with the same input returns instantly.
For expensive computations, the speedup is dramatic:
#[pure]
#[memoized(capacity = 1000)] // Custom cache size for memory control
fn expensive_computation(x: f64, iterations: u32) -> f64 {
(0..iterations).fold(x, |acc, i| acc + (i as f64).sin())
}
// First call: ~500ms computation time
// Subsequent calls: ~0.001ms cache retrieval (500,000x faster!)
let result = expensive_computation(3.14, 1000000);
Memoization shines in pipelines where the same expensive operations might be called repeatedly:
let result = pipex!(
scientific_data
=> |x| expensive_computation // Each unique input cached automatically
=> |x| another_memoized_function // Cache hits propagate through the pipeline
);
Performance optimization becomes declarative rather than manual. Focus on your algorithms, let the library handle the caching strategy.
Novelty 4: Automatic GPU Transpilation — The Holy Grail
Here's where things get wild. What if you could write normal Rust mathematical expressions and have them automatically run on your GPU? No WGSL knowledge, no buffer management, no shader compilation — just Rust code that happens to execute on silicon designed for parallel computation.
This sounds impossible. It's not:
// Write normal Rust math expressions
let result = pipex!(
vec![1.0, 2.0, 3.0, 4.0, 5.0]
=> gpu ||| |x| x.sin() * x.cos() + x.sqrt() // Automatically becomes WGSL!
=> |x| Ok::<f32, String>(x * 2.0) // Back to CPU processing
);
Behind the scenes, pipex
parses your Rust expression, generates equivalent WGSL compute shader code, compiles it for your specific GPU, manages buffer creation and data transfer, executes the kernel, and returns results. You see none of this complexity.
What the Transpiler Handles Today:
Basic arithmetic:
=> gpu ||| |x| x * x + 1.0
Some Mathematical functions:
=> gpu ||| |x| x.sin() + x.cos() + x.sqrt()
Complex expressions with proper operator precedence:
=> gpu ||| |x| (x * x + 1.0) * x.sin() + x.cos() * (x + 3.14159)
Smart Hardware Detection
The library detects your GPU hardware and optimizes accordingly. On Apple Silicon:
🎮 GPU DEVICE DETECTED:
📱 Name: Apple M3
🖥️ Backend: Metal
🚀 APPLE SILICON DETECTED!
All Features Combined
#[pure]
#[memoized(capacity = 500)]
#[error_strategy(CollectHandler)]
fn advanced_physics(velocity: f64, time: f64) -> Result<f64, String> {
if time < 0.0 { return Err("Negative time".into()); }
Ok(velocity * time + 0.5 * 9.81 * time * time)
}
let result = pipex!(
time_measurements
=> |t| Ok::<f64, String>(t / 1000.0) // Convert units
=> |v, t| advanced_physics // Pure + Memoized + Errors
=> gpu ||| |distance| distance.sqrt() // GPU acceleration
=> async |speed| { format_result(speed).await } // Async formatting
);
This pipeline: Converts units → Applies pure/memoized physics → GPU acceleration → Async I/O — all with graceful error handling.
Getting Started
[features]
default = ["parallel", "async", "gpu", "memoization"]
parallel = []
async = []
gpu = []
memoization = []
[dependencies]
pipex = { version = "0.1.20", features = ["full", "gpu"] }
use pipex::*;
// Basic pipeline
let result = pipex!(
vec![1, 2, 3, 4, 5]
=> |x| Ok::<i32, String>(x * 2)
=> |x| Ok::<i32, String>(x + 1)
);
// With all features
#[pure]
#[memoized]
#[error_strategy(CollectHandler)]
fn process(x: f64) -> Result<f64, String> { /* work */ }
let advanced = pipex!(
data
=> ||| |x| process // Parallel + Pure + Memoized + Errors
=> gpu ||| |x| x.sin() + x.cos() // GPU acceleration
=> async |x| { save_to_db(x).await } // Async I/O
);
Conclusion
What started as simple functional pipelines evolved into the most advanced data processing framework ever created. pipex
combines:
- Functional elegance of Haskell
- Memory safety of Rust
- GPU performance of CUDA
- Zero-config developer experience of Typescript
Try pipex today and experience the future of Rust data processing.
Note that lib is still in early alpha
and there might be many issues in implementation, thus every contribution and issue report is welcome. Let's build the future of Rust processing pipelines together
Repository: github.com/edransy/pipex
Crates.io: crates.io/crates/pipex
Top comments (0)