When building data-intensive applications, we usually start with the most obvious approach: loading data into a slice or array, iterating over it to process the data, and returning the result. This batch-processing mindset works great—until the data never stops coming.
Whether you are dealing with live IoT telemetry, continuous log tailing, or real-time financial market feeds, you quickly run into the problem of "infinite" data. If you try to append an endless stream of stock ticks to a []float64, your application will inevitably consume all available memory and crash.
To handle infinite data gracefully, you need to shift your architecture from batch processing to stream processing. In Go, we have the perfect built-in primitive for this: Channels.
The Power of Channels as Data Pipes
Go channels are often taught primarily as a way to synchronize goroutines, but they are also incredibly powerful as sequential data pipes. By treating channels as standard inputs and outputs, you can build decoupled, memory-efficient pipelines where data flows through a series of transformations continuously.
When redesigning my open-source technical analysis library, cinar/indicator, for its v2 release, I faced exactly this challenge. In algorithmic trading, systems need to react instantly to live market feeds without accumulating massive memory overhead. Transitioning the library's core architecture from slice-based arrays to stream-based Go channels solved this elegantly.
Let's look at how to build a continuous data pipe, and some of the tricky edge cases you'll encounter along the way.
Building a Pipeline Stage
Imagine we want to calculate a Simple Moving Average (SMA) over a live stream of data. Instead of taking a slice, our function will accept a read-only channel as its input and return a read-only channel as its output.
package main
import (
"fmt"
)
// SimpleMovingAverage acts as a pipe: it reads from 'input', processes, and writes to 'output'
func SimpleMovingAverage(input <-chan float64, period int) <-chan float64 {
output := make(chan float64)
go func() {
// Ensure the output channel is closed when the input stream ends
defer close(output)
window := make([]float64, 0, period)
sum := 0.0
for val := range input {
window = append(window, val)
sum += val
// Keep the window size fixed
if len(window) > period {
sum -= window[0]
window = window[1:]
}
// Only emit a value once we have enough data points
if len(window) == period {
output <- sum / float64(period)
}
}
}()
return output
}
Because the processing happens inside its own goroutine, the function returns the output channel immediately. The goroutine stays alive, eagerly waiting for new data to arrive on the input channel.
Handling Stream Complexities with Helpers
Once you start relying heavily on channels, you run into a few structural challenges. To make working with channels just as easy as working with slices, cinar/indicator includes a robust helper package.
If you are building your own stream-based application, you can leverage these helpers directly from the library rather than reinventing the wheel.
1. The Branching Problem
A major gotcha with Go channels: once a value is read from a channel, it's gone. What if you want to calculate an SMA and a Relative Strength Index (RSI) from the exact same live price ticker? You can't have two consumers read from one channel without them stealing data from each other.
To solve this, the library provides helper.Duplicate. This function takes one input channel and "fans it out" into multiple identical output channels. This allows you to safely branch your data stream to multiple independent technical indicators simultaneously without race conditions or data loss.
// Branching one price stream into three identical streams
priceStreams := helper.Duplicate(livePrices, 3)
smaStream := indicator.SMA(priceStreams[0], 14)
rsiStream := indicator.RSI(priceStreams[1], 14)
macdStream := indicator.MACD(priceStreams[2], 12, 26, 9)
2. Lookbacks and Sliding Windows
Many data processing algorithms require looking back at the last N periods. Instead of managing a sliding window manually inside every single function (like we did in the basic SMA example above), the library uses helper.Buffered. This provides a clean abstraction to maintain a rolling state over a continuous channel, vastly simplifying the development of complex logic.
3. Bridging the Gap: Slices vs. Streams
The rest of the world often still speaks in batches. You might be downloading historical CSV data for backtesting, or you might need to output an array for a charting UI. To bridge this gap, the helper package includes utilities to fluidly move between paradigms:
-
helper.SliceToChan: Converts a static historical array into a simulated live data stream. It spins up a goroutine, pushes every element from the slice into a channel, and closes it. It's perfect for feeding historical backtests into a live-stream architecture. -
helper.ChanToSlice: The inverse operation. It drains a stream back into an array, which is incredibly useful for writing unit tests or rendering charts.
Chaining the Pipes Together
Because all the indicators and helpers in cinar/indicator take channels and return channels, they are highly composable. We can chain them together like Unix command-line pipes (|).
Here is what it looks like to wire up an application using these concepts:
package main
import (
"fmt"
"[github.com/cinar/indicator/v2/helper](https://github.com/cinar/indicator/v2/helper)"
"[github.com/cinar/indicator/v2/trend](https://github.com/cinar/indicator/v2/trend)"
)
func main() {
// 1. We start with a static slice of historical data
historicalPrices := []float64{10.0, 12.0, 14.0, 13.0, 15.0, 18.0, 19.0, 17.0}
// 2. Bridge the gap: convert the slice to a live stream
marketTicks := helper.SliceToChan(historicalPrices)
// 3. Pipe the ticks into a 3-period SMA processor from the library
smaStream := trend.Sma(marketTicks, 3)
// 4. Drain the output stream
for avg := range smaStream {
fmt.Printf("New SMA tick processed: %.2f\n", avg)
}
}
Why This Architecture Wins
- Memory Efficiency: We only store the exact amount of data needed at any given moment. The Go garbage collector easily cleans up the rest, meaning we can process a continuous websocket stream for months without memory leaks.
- Backpressure Handling: Go channels are blocking by nature. If a complex compound strategy at the end of the pipeline is too slow, the channels will naturally fill up, pausing the producers further up the chain until it catches up.
-
Decoupling: Each pipeline stage is completely isolated. The indicator doesn't know if the data is coming from a historical Tiingo repository, an Alpaca websocket, or a mock unit test. It just reads from
<-chan Tand writes to<-chan T.
Try It Out
By treating channels as native data streams and relying on robust helper utilities, you can build highly resilient, concurrent pipelines capable of processing truly infinite data sets.
If you are building financial tools, real-time dashboards, or are just looking to explore a Go codebase that relies heavily on generics and channel-based streaming, I highly recommend leveraging the cinar/indicator library. It comes batteries-included with all the helpers and technical indicators you need to get started with stream processing in Go.
How are you handling continuous data streams in your applications? Let me know in the comments!
Top comments (0)