DEV Community

Cover image for **Build Parallel Data Processing Pipelines in Go for Memory-Efficient Large File Handling**
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

**Build Parallel Data Processing Pipelines in Go for Memory-Efficient Large File Handling**

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

When you have to work with large amounts of data, you can quickly run into problems. Your computer might run out of memory trying to load a huge file all at once. Or, the process might become so slow that it takes hours to finish. I've faced these issues many times, and the solution often involves breaking the work into smaller, parallel pieces. Let me show you a way to build a system in Go that handles this efficiently.

Think of a data pipeline like an assembly line in a factory. Raw materials come in one end, get worked on at several stations, and finished products come out the other end. Our goal is to make this line fast, reliable, and smart about how much space it uses.

The core idea is simple: don't try to eat the whole elephant in one bite. Instead, cut it into small pieces, have multiple chefs work on different pieces at the same time, and then combine the results. In technical terms, this means chunked reading, parallel processing with a worker pool, and coordinated writing.

Let's start by looking at the main structure that orchestrates everything. We create a DataPipeline that ties together the reader, processor, and writer.

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "sync"
    "sync/atomic"
    "time"
)

// DataPipeline is the manager for the whole operation.
type DataPipeline struct {
    reader    *ChunkedReader
    processor *ParallelProcessor
    writer    *ResultWriter
    monitor   *PipelineMonitor
}
Enter fullscreen mode Exit fullscreen mode

This pipeline is the boss. It sets up the other parts and tells them when to start and stop. To create one, you give it an input file, an output file, and decide how many workers you want.

func NewDataPipeline(inputPath, outputPath string, workers int) *DataPipeline {
    inputFile, err := os.Open(inputPath)
    if err != nil {
        log.Fatal(err)
    }

    outputFile, err := os.Create(outputPath)
    if err != nil {
        log.Fatal(err)
    }

    return &DataPipeline{
        reader: NewChunkedReader(inputFile),
        processor: NewParallelProcessor(workers),
        writer: NewResultWriter(outputFile),
        monitor: &PipelineMonitor{metrics: make(map[string]uint64)},
    }
}
Enter fullscreen mode Exit fullscreen mode

The real work begins when we call Run. This method starts all the components as separate goroutines and waits for them to finish. Using a context.Context allows us to cancel everything gracefully if something goes wrong or if we run out of time.

func (dp *DataPipeline) Run(ctx context.Context) error {
    startTime := time.Now()
    var wg sync.WaitGroup
    wg.Add(3)

    // Start the reader, processor, and writer in their own goroutines.
    go func() { defer wg.Done(); dp.reader.Start(ctx) }()
    go func() { defer wg.Done(); dp.processor.Start(ctx, dp.reader.Chunks()) }()
    go func() { defer wg.Done(); dp.writer.Start(ctx, dp.processor.Results()) }()

    // Start a monitor to watch progress.
    go dp.monitor.Start(ctx, dp.reader, dp.processor, dp.writer)

    wg.Wait()

    dp.writer.Flush() // Ensure all data is written.
    duration := time.Since(startTime)
    dp.monitor.Report(duration)
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Now, let's talk about the first stage: reading. The ChunkedReader is responsible for taking a large file and breaking it into smaller, manageable pieces called chunks. This is crucial because it prevents us from loading gigabytes of data into memory at once.

Imagine you have a book too heavy to lift. Instead of carrying the whole book, you photocopy one chapter at a time. The ChunkedReader does the digital equivalent.

type ChunkedReader struct {
    file      *os.File
    scanner   *bufio.Scanner
    chunkSize int
    chunks    chan *DataChunk
    bufferPool *sync.Pool
}

type DataChunk struct {
    ID      int
    Records [][]byte
}
Enter fullscreen mode Exit fullscreen mode

We configure a chunkSize, say 10,000 lines. The reader will read up to that many lines from the file, package them into a DataChunk object, and send that chunk into a channel. A channel in Go is like a conveyor belt connecting one goroutine to another.

func (cr *ChunkedReader) Start(ctx context.Context) {
    defer close(cr.chunks) // Close the conveyor belt when we're done.
    defer cr.file.Close()

    chunkID := 0
    var currentChunkRecords [][]byte

    for cr.scanner.Scan() {
        select {
        case <-ctx.Done():
            return // Stop if we are told to cancel.
        default:
            line := cr.scanner.Bytes()
            // We need to copy the line because the scanner reuses its buffer.
            lineCopy := make([]byte, len(line))
            copy(lineCopy, line)

            currentChunkRecords = append(currentChunkRecords, lineCopy)

            // If we've hit the chunk size, send it for processing.
            if len(currentChunkRecords) >= cr.chunkSize {
                chunk := &DataChunk{ID: chunkID, Records: currentChunkRecords}
                cr.chunks <- chunk
                chunkID++
                currentChunkRecords = nil // Reset for the next chunk.
            }
        }
    }

    // Don't forget the last chunk which might be smaller.
    if len(currentChunkRecords) > 0 {
        chunk := &DataChunk{ID: chunkID, Records: currentChunkRecords}
        cr.chunks <- chunk
    }
}
Enter fullscreen mode Exit fullscreen mode

You'll notice a bufferPool in the struct. This is an optimization. Creating and destroying many byte slices (the lineCopy) can put pressure on Go's garbage collector. A sync.Pool lets us reuse old slices, which is much faster.

The chunks flow out of the reader via the cr.chunks channel. This is the input to our next stage: the ParallelProcessor. This is where the speed magic happens.

The ParallelProcessor manages a pool of workers. Think of workers as chefs in a kitchen. Instead of having one chef prepare every single meal, we have several. When an order (a data chunk) comes in, an available chef takes it and starts cooking.

type ParallelProcessor struct {
    workerPool  chan *Worker
    inputQueue  chan *DataChunk
    resultQueue chan *ProcessedChunk
    workers     int
}

type Worker struct {
    ID          int
    transformers []Transformer
}

type ProcessedChunk struct {
    ID       int
    Records  []Record
    Metadata map[string]interface{}
    Errors   []error
}
Enter fullscreen mode Exit fullscreen mode

The processor starts by creating a number of idle workers and putting them in the workerPool channel, which acts as a waiting room for available chefs.

func (pp *ParallelProcessor) Start(ctx context.Context, inputQueue <-chan *DataChunk) {
    defer close(pp.resultQueue)

    // Create workers and put them in the pool.
    for i := 0; i < pp.workers; i++ {
        worker := &Worker{ID: i, transformers: pp.getTransformers()}
        pp.workerPool <- worker
    }

    var wg sync.WaitGroup

    // Listen for incoming chunks on the inputQueue.
    for chunk := range inputQueue {
        select {
        case <-ctx.Done():
            return
        case worker := <-pp.workerPool: // Take an available worker.
            wg.Add(1)
            // Assign the chunk to the worker in a new goroutine.
            go func(w *Worker, c *DataChunk) {
                defer wg.Done()
                defer func() { pp.workerPool <- w }() // Put the worker back in the pool when done.

                processed, err := w.Process(c)
                if err != nil {
                    log.Printf("Worker %d had an error: %v", w.ID, err)
                    return
                }
                // Send the result forward.
                pp.resultQueue <- processed
            }(worker, chunk)
        }
    }
    wg.Wait() // Wait for all assigned work to be finished.
}
Enter fullscreen mode Exit fullscreen mode

This pattern is powerful. It limits concurrency to the exact number of workers we defined. If we have 8 workers and 100 chunks, only 8 will be processed at any given moment. This prevents us from overloading the CPU or memory. The worker, after finishing its task, goes back into the pool, ready for the next chunk.

What does a worker actually do? It applies a series of transformations. Each transformation is a small, focused operation. For example, the first might parse a raw byte slice into a structured CSV record. The next might validate that record, and a third might clean or map certain fields.

func (w *Worker) Process(chunk *DataChunk) (*ProcessedChunk, error) {
    result := &ProcessedChunk{
        ID:       chunk.ID,
        Records:  make([]Record, 0, len(chunk.Records)),
        Metadata: make(map[string]interface{}),
    }

    for _, rawRecord := range chunk.Records {
        var currentRecord Record = rawRecord
        var transformErr error

        // Apply each transformer in sequence.
        for _, transformer := range w.transformers {
            currentRecord, transformErr = transformer.Transform(currentRecord)
            if transformErr != nil {
                result.Errors = append(result.Errors, transformErr)
                break // Skip this record if a transformer fails.
            }
        }
        if transformErr == nil && currentRecord != nil {
            result.Records = append(result.Records, currentRecord)
        }
    }
    result.Metadata["worker_id"] = w.ID
    return result, nil
}
Enter fullscreen mode Exit fullscreen mode

Using a slice of Transformer interfaces makes the system very flexible. You can plug in any logic you need without changing the worker's core code.

type Transformer interface {
    Transform(Record) (Record, error)
}

// Example: A transformer that parses a CSV line.
type CSVParser struct{}

func (c *CSVParser) Transform(r Record) (Record, error) {
    rawBytes, ok := r.([]byte)
    if !ok {
        return nil, fmt.Errorf("expected []byte")
    }
    // Use a CSV reader to parse the line.
    reader := csv.NewReader(bytes.NewReader(rawBytes))
    fields, err := reader.Read()
    if err != nil {
        return nil, err
    }
    return CSVRecord{Fields: fields}, nil
}
Enter fullscreen mode Exit fullscreen mode

Once a worker finishes processing a chunk, it sends the ProcessedChunk into the resultQueue. This is the conveyor belt leading to the final stage: the ResultWriter.

The writer's job is to collect all the processed chunks and write them to the output destination, often a file. There's a catch: the chunks might finish processing out of order. Worker 3 might finish its chunk before Worker 2. We need to handle this if maintaining the original order is important. For this example, let's assume order doesn't matter, which is common for many aggregation or transformation tasks.

type ResultWriter struct {
    writer      io.Writer
    inputQueue  <-chan *ProcessedChunk
    aggregator  *ResultAggregator
    flushSize   int
}

func (rw *ResultWriter) Start(ctx context.Context, inputQueue <-chan *ProcessedChunk) {
    csvWriter := csv.NewWriter(rw.writer)
    recordCount := 0

    for chunk := range inputQueue {
        select {
        case <-ctx.Done():
            return
        default:
            for _, record := range chunk.Records {
                if csvRec, ok := record.(CSVRecord); ok {
                    csvWriter.Write(csvRec.Fields)
                    recordCount++
                }
                // Flush to disk every 'flushSize' records.
                if recordCount%rw.flushSize == 0 {
                    csvWriter.Flush()
                }
            }
            // Let the aggregator know about this chunk's results.
            rw.aggregator.Aggregate(chunk)
        }
    }
    csvWriter.Flush() // Final flush.
}
Enter fullscreen mode Exit fullscreen mode

Flushing periodically (writing from memory to disk) is important for efficiency. Writing each record individually would be very slow. Writing everything at the end risks losing a lot of data if the program crashes. A periodic flush is a good balance.

The ResultAggregator is a useful sidekick for the writer. It doesn't write data but keeps track of global statistics: how many records were processed, how many errors occurred, and perhaps calculating a success rate.

type ResultAggregator struct {
    totalRecords uint64 // Use atomic for safe concurrent access.
    errors       []error
    mu           sync.RWMutex // Use a mutex to protect the errors slice.
}

func (ra *ResultAggregator) Aggregate(chunk *ProcessedChunk) {
    atomic.AddUint64(&ra.totalRecords, uint64(len(chunk.Records)))
    ra.mu.Lock()
    ra.errors = append(ra.errors, chunk.Errors...)
    ra.mu.Unlock()
}

func (ra *ResultAggregator) SuccessRate() float64 {
    total := atomic.LoadUint64(&ra.totalRecords)
    if total == 0 {
        return 0.0
    }
    ra.mu.RLock()
    errorCount := len(ra.errors)
    ra.mu.RUnlock()
    return float64(total-uint64(errorCount)) / float64(total)
}
Enter fullscreen mode Exit fullscreen mode

Finally, a PipelineMonitor can watch over the whole operation. It can periodically check how many chunks are waiting, how busy the workers are, and report on speed. This is invaluable for understanding performance and debugging.

type PipelineMonitor struct {
    metrics map[string]uint64
    mu      sync.RWMutex
}

func (pm *PipelineMonitor) Start(ctx context.Context, components ...interface{}) {
    ticker := time.NewTicker(2 * time.Second)
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            // Sample queue lengths, worker activity, etc.
            pm.mu.Lock()
            // ... collect data from components ...
            pm.mu.Unlock()
            pm.printStatus()
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Bringing it all together in main is straightforward.

func main() {
    // Configure and create the pipeline.
    pipeline := NewDataPipeline("large_data.csv", "processed_data.csv", 8)

    // Give it a 10-minute timeout.
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
    defer cancel()

    // Run it.
    if err := pipeline.Run(ctx); err != nil {
        log.Fatal("Pipeline failed:", err)
    }

    // Print final summary.
    agg := pipeline.writer.aggregator
    fmt.Printf("All done!\n")
    fmt.Printf("Total Records: %d\n", atomic.LoadUint64(&agg.totalRecords))
    fmt.Printf("Success Rate: %.2f%%\n", agg.SuccessRate()*100)
}
Enter fullscreen mode Exit fullscreen mode

This design gives you a robust foundation. You can adjust the chunk size based on your available memory. You can tune the number of workers to match your CPU cores. You can add new transformers for different data cleaning rules. The pipeline remains memory-efficient because only chunks of data are in memory at any time, not the entire dataset.

From my experience, this pattern can improve processing times dramatically compared to a simple, sequential read-process-write loop. It turns a task that might take hours into one that takes minutes, all while keeping your computer responsive. The key is in the balance: chunks small enough to be memory-friendly, but large enough to keep the workers busy and minimize overhead. It’s a practical, powerful way to handle big data jobs, even on a single machine.

📘 Checkout my latest ebook for free on my channel!

Be sure to like, share, comment, and subscribe to the channel!


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)