As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
When you have to work with large amounts of data, you can quickly run into problems. Your computer might run out of memory trying to load a huge file all at once. Or, the process might become so slow that it takes hours to finish. I've faced these issues many times, and the solution often involves breaking the work into smaller, parallel pieces. Let me show you a way to build a system in Go that handles this efficiently.
Think of a data pipeline like an assembly line in a factory. Raw materials come in one end, get worked on at several stations, and finished products come out the other end. Our goal is to make this line fast, reliable, and smart about how much space it uses.
The core idea is simple: don't try to eat the whole elephant in one bite. Instead, cut it into small pieces, have multiple chefs work on different pieces at the same time, and then combine the results. In technical terms, this means chunked reading, parallel processing with a worker pool, and coordinated writing.
Let's start by looking at the main structure that orchestrates everything. We create a DataPipeline that ties together the reader, processor, and writer.
package main
import (
"context"
"fmt"
"log"
"os"
"sync"
"sync/atomic"
"time"
)
// DataPipeline is the manager for the whole operation.
type DataPipeline struct {
reader *ChunkedReader
processor *ParallelProcessor
writer *ResultWriter
monitor *PipelineMonitor
}
This pipeline is the boss. It sets up the other parts and tells them when to start and stop. To create one, you give it an input file, an output file, and decide how many workers you want.
func NewDataPipeline(inputPath, outputPath string, workers int) *DataPipeline {
inputFile, err := os.Open(inputPath)
if err != nil {
log.Fatal(err)
}
outputFile, err := os.Create(outputPath)
if err != nil {
log.Fatal(err)
}
return &DataPipeline{
reader: NewChunkedReader(inputFile),
processor: NewParallelProcessor(workers),
writer: NewResultWriter(outputFile),
monitor: &PipelineMonitor{metrics: make(map[string]uint64)},
}
}
The real work begins when we call Run. This method starts all the components as separate goroutines and waits for them to finish. Using a context.Context allows us to cancel everything gracefully if something goes wrong or if we run out of time.
func (dp *DataPipeline) Run(ctx context.Context) error {
startTime := time.Now()
var wg sync.WaitGroup
wg.Add(3)
// Start the reader, processor, and writer in their own goroutines.
go func() { defer wg.Done(); dp.reader.Start(ctx) }()
go func() { defer wg.Done(); dp.processor.Start(ctx, dp.reader.Chunks()) }()
go func() { defer wg.Done(); dp.writer.Start(ctx, dp.processor.Results()) }()
// Start a monitor to watch progress.
go dp.monitor.Start(ctx, dp.reader, dp.processor, dp.writer)
wg.Wait()
dp.writer.Flush() // Ensure all data is written.
duration := time.Since(startTime)
dp.monitor.Report(duration)
return nil
}
Now, let's talk about the first stage: reading. The ChunkedReader is responsible for taking a large file and breaking it into smaller, manageable pieces called chunks. This is crucial because it prevents us from loading gigabytes of data into memory at once.
Imagine you have a book too heavy to lift. Instead of carrying the whole book, you photocopy one chapter at a time. The ChunkedReader does the digital equivalent.
type ChunkedReader struct {
file *os.File
scanner *bufio.Scanner
chunkSize int
chunks chan *DataChunk
bufferPool *sync.Pool
}
type DataChunk struct {
ID int
Records [][]byte
}
We configure a chunkSize, say 10,000 lines. The reader will read up to that many lines from the file, package them into a DataChunk object, and send that chunk into a channel. A channel in Go is like a conveyor belt connecting one goroutine to another.
func (cr *ChunkedReader) Start(ctx context.Context) {
defer close(cr.chunks) // Close the conveyor belt when we're done.
defer cr.file.Close()
chunkID := 0
var currentChunkRecords [][]byte
for cr.scanner.Scan() {
select {
case <-ctx.Done():
return // Stop if we are told to cancel.
default:
line := cr.scanner.Bytes()
// We need to copy the line because the scanner reuses its buffer.
lineCopy := make([]byte, len(line))
copy(lineCopy, line)
currentChunkRecords = append(currentChunkRecords, lineCopy)
// If we've hit the chunk size, send it for processing.
if len(currentChunkRecords) >= cr.chunkSize {
chunk := &DataChunk{ID: chunkID, Records: currentChunkRecords}
cr.chunks <- chunk
chunkID++
currentChunkRecords = nil // Reset for the next chunk.
}
}
}
// Don't forget the last chunk which might be smaller.
if len(currentChunkRecords) > 0 {
chunk := &DataChunk{ID: chunkID, Records: currentChunkRecords}
cr.chunks <- chunk
}
}
You'll notice a bufferPool in the struct. This is an optimization. Creating and destroying many byte slices (the lineCopy) can put pressure on Go's garbage collector. A sync.Pool lets us reuse old slices, which is much faster.
The chunks flow out of the reader via the cr.chunks channel. This is the input to our next stage: the ParallelProcessor. This is where the speed magic happens.
The ParallelProcessor manages a pool of workers. Think of workers as chefs in a kitchen. Instead of having one chef prepare every single meal, we have several. When an order (a data chunk) comes in, an available chef takes it and starts cooking.
type ParallelProcessor struct {
workerPool chan *Worker
inputQueue chan *DataChunk
resultQueue chan *ProcessedChunk
workers int
}
type Worker struct {
ID int
transformers []Transformer
}
type ProcessedChunk struct {
ID int
Records []Record
Metadata map[string]interface{}
Errors []error
}
The processor starts by creating a number of idle workers and putting them in the workerPool channel, which acts as a waiting room for available chefs.
func (pp *ParallelProcessor) Start(ctx context.Context, inputQueue <-chan *DataChunk) {
defer close(pp.resultQueue)
// Create workers and put them in the pool.
for i := 0; i < pp.workers; i++ {
worker := &Worker{ID: i, transformers: pp.getTransformers()}
pp.workerPool <- worker
}
var wg sync.WaitGroup
// Listen for incoming chunks on the inputQueue.
for chunk := range inputQueue {
select {
case <-ctx.Done():
return
case worker := <-pp.workerPool: // Take an available worker.
wg.Add(1)
// Assign the chunk to the worker in a new goroutine.
go func(w *Worker, c *DataChunk) {
defer wg.Done()
defer func() { pp.workerPool <- w }() // Put the worker back in the pool when done.
processed, err := w.Process(c)
if err != nil {
log.Printf("Worker %d had an error: %v", w.ID, err)
return
}
// Send the result forward.
pp.resultQueue <- processed
}(worker, chunk)
}
}
wg.Wait() // Wait for all assigned work to be finished.
}
This pattern is powerful. It limits concurrency to the exact number of workers we defined. If we have 8 workers and 100 chunks, only 8 will be processed at any given moment. This prevents us from overloading the CPU or memory. The worker, after finishing its task, goes back into the pool, ready for the next chunk.
What does a worker actually do? It applies a series of transformations. Each transformation is a small, focused operation. For example, the first might parse a raw byte slice into a structured CSV record. The next might validate that record, and a third might clean or map certain fields.
func (w *Worker) Process(chunk *DataChunk) (*ProcessedChunk, error) {
result := &ProcessedChunk{
ID: chunk.ID,
Records: make([]Record, 0, len(chunk.Records)),
Metadata: make(map[string]interface{}),
}
for _, rawRecord := range chunk.Records {
var currentRecord Record = rawRecord
var transformErr error
// Apply each transformer in sequence.
for _, transformer := range w.transformers {
currentRecord, transformErr = transformer.Transform(currentRecord)
if transformErr != nil {
result.Errors = append(result.Errors, transformErr)
break // Skip this record if a transformer fails.
}
}
if transformErr == nil && currentRecord != nil {
result.Records = append(result.Records, currentRecord)
}
}
result.Metadata["worker_id"] = w.ID
return result, nil
}
Using a slice of Transformer interfaces makes the system very flexible. You can plug in any logic you need without changing the worker's core code.
type Transformer interface {
Transform(Record) (Record, error)
}
// Example: A transformer that parses a CSV line.
type CSVParser struct{}
func (c *CSVParser) Transform(r Record) (Record, error) {
rawBytes, ok := r.([]byte)
if !ok {
return nil, fmt.Errorf("expected []byte")
}
// Use a CSV reader to parse the line.
reader := csv.NewReader(bytes.NewReader(rawBytes))
fields, err := reader.Read()
if err != nil {
return nil, err
}
return CSVRecord{Fields: fields}, nil
}
Once a worker finishes processing a chunk, it sends the ProcessedChunk into the resultQueue. This is the conveyor belt leading to the final stage: the ResultWriter.
The writer's job is to collect all the processed chunks and write them to the output destination, often a file. There's a catch: the chunks might finish processing out of order. Worker 3 might finish its chunk before Worker 2. We need to handle this if maintaining the original order is important. For this example, let's assume order doesn't matter, which is common for many aggregation or transformation tasks.
type ResultWriter struct {
writer io.Writer
inputQueue <-chan *ProcessedChunk
aggregator *ResultAggregator
flushSize int
}
func (rw *ResultWriter) Start(ctx context.Context, inputQueue <-chan *ProcessedChunk) {
csvWriter := csv.NewWriter(rw.writer)
recordCount := 0
for chunk := range inputQueue {
select {
case <-ctx.Done():
return
default:
for _, record := range chunk.Records {
if csvRec, ok := record.(CSVRecord); ok {
csvWriter.Write(csvRec.Fields)
recordCount++
}
// Flush to disk every 'flushSize' records.
if recordCount%rw.flushSize == 0 {
csvWriter.Flush()
}
}
// Let the aggregator know about this chunk's results.
rw.aggregator.Aggregate(chunk)
}
}
csvWriter.Flush() // Final flush.
}
Flushing periodically (writing from memory to disk) is important for efficiency. Writing each record individually would be very slow. Writing everything at the end risks losing a lot of data if the program crashes. A periodic flush is a good balance.
The ResultAggregator is a useful sidekick for the writer. It doesn't write data but keeps track of global statistics: how many records were processed, how many errors occurred, and perhaps calculating a success rate.
type ResultAggregator struct {
totalRecords uint64 // Use atomic for safe concurrent access.
errors []error
mu sync.RWMutex // Use a mutex to protect the errors slice.
}
func (ra *ResultAggregator) Aggregate(chunk *ProcessedChunk) {
atomic.AddUint64(&ra.totalRecords, uint64(len(chunk.Records)))
ra.mu.Lock()
ra.errors = append(ra.errors, chunk.Errors...)
ra.mu.Unlock()
}
func (ra *ResultAggregator) SuccessRate() float64 {
total := atomic.LoadUint64(&ra.totalRecords)
if total == 0 {
return 0.0
}
ra.mu.RLock()
errorCount := len(ra.errors)
ra.mu.RUnlock()
return float64(total-uint64(errorCount)) / float64(total)
}
Finally, a PipelineMonitor can watch over the whole operation. It can periodically check how many chunks are waiting, how busy the workers are, and report on speed. This is invaluable for understanding performance and debugging.
type PipelineMonitor struct {
metrics map[string]uint64
mu sync.RWMutex
}
func (pm *PipelineMonitor) Start(ctx context.Context, components ...interface{}) {
ticker := time.NewTicker(2 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Sample queue lengths, worker activity, etc.
pm.mu.Lock()
// ... collect data from components ...
pm.mu.Unlock()
pm.printStatus()
}
}
}
Bringing it all together in main is straightforward.
func main() {
// Configure and create the pipeline.
pipeline := NewDataPipeline("large_data.csv", "processed_data.csv", 8)
// Give it a 10-minute timeout.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
// Run it.
if err := pipeline.Run(ctx); err != nil {
log.Fatal("Pipeline failed:", err)
}
// Print final summary.
agg := pipeline.writer.aggregator
fmt.Printf("All done!\n")
fmt.Printf("Total Records: %d\n", atomic.LoadUint64(&agg.totalRecords))
fmt.Printf("Success Rate: %.2f%%\n", agg.SuccessRate()*100)
}
This design gives you a robust foundation. You can adjust the chunk size based on your available memory. You can tune the number of workers to match your CPU cores. You can add new transformers for different data cleaning rules. The pipeline remains memory-efficient because only chunks of data are in memory at any time, not the entire dataset.
From my experience, this pattern can improve processing times dramatically compared to a simple, sequential read-process-write loop. It turns a task that might take hours into one that takes minutes, all while keeping your computer responsive. The key is in the balance: chunks small enough to be memory-friendly, but large enough to keep the workers busy and minimize overhead. It’s a practical, powerful way to handle big data jobs, even on a single machine.
📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
Top comments (0)