DEV Community

Cover image for Building High-Performance File Processing Pipelines in Go: Concurrency Patterns That Scale
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

Building High-Performance File Processing Pipelines in Go: Concurrency Patterns That Scale

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

When working with large-scale file processing in Go, I've found that building an efficient concurrent pipeline is crucial for performance. Go's concurrency primitives make it possible to create powerful file processing systems that maximize throughput while maintaining control over resource usage.

The core of effective file processing lies in balancing concurrency levels with system resources. In my experience, simply spawning unlimited goroutines leads to diminishing returns and potential system overload. Instead, I prefer to implement controlled concurrency using worker pools.

Go's channel mechanism serves as the foundation for our pipeline design. We can create bounded channels that connect different stages of our processing workflow while providing natural backpressure. This prevents any single stage from overwhelming subsequent stages.

// CreateWorkerPool starts a pool of workers to process files
func CreateWorkerPool(ctx context.Context, numWorkers int, jobs <-chan string, results chan<- Result) {
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for job := range jobs {
                select {
                case <-ctx.Done():
                    return
                default:
                    result := processFile(ctx, job, workerID)
                    results <- result
                }
            }
        }(i)
    }

    go func() {
        wg.Wait()
        close(results)
    }()
}
Enter fullscreen mode Exit fullscreen mode

This worker pool implementation controls the number of concurrent workers while properly handling cleanup when all jobs are processed. The use of a WaitGroup ensures we don't close the results channel prematurely.

One of the key insights I've gained is that reading files line-by-line rather than loading them entirely into memory makes a significant difference for large files. Using bufio's Scanner or Reader provides an efficient way to process files incrementally:

func processFile(ctx context.Context, filePath string, workerID int) Result {
    file, err := os.Open(filePath)
    if err != nil {
        return Result{Path: filePath, Err: err}
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    lineCount := 0

    for scanner.Scan() {
        // Check for cancellation periodically
        if lineCount%100 == 0 {
            select {
            case <-ctx.Done():
                return Result{Path: filePath, Err: ctx.Err()}
            default:
            }
        }

        // Process the line
        processLine(scanner.Text())
        lineCount++
    }

    if err := scanner.Err(); err != nil {
        return Result{Path: filePath, Err: err}
    }

    return Result{Path: filePath, LineCount: lineCount}
}
Enter fullscreen mode Exit fullscreen mode

Another optimization I've implemented is batch processing. Rather than processing each line individually, gathering lines into batches and processing them together can significantly reduce overhead:

func processBatch(lines []string) error {
    // Process lines as a batch
    batch := make([]ProcessedData, 0, len(lines))

    for _, line := range lines {
        processed, err := parseLine(line)
        if err != nil {
            continue // Skip invalid lines or handle differently
        }
        batch = append(batch, processed)
    }

    // Save batch to database, write to output, etc.
    return saveBatch(batch)
}
Enter fullscreen mode Exit fullscreen mode

Error handling is critical in concurrent file processing. I've found that channeling errors alongside successful results allows for proper tracking without stopping the entire pipeline:

type Result struct {
    Path      string
    LineCount int
    BytesRead int64
    Err       error
}
Enter fullscreen mode Exit fullscreen mode

Context handling provides clean cancellation across the entire pipeline. I always propagate context through each stage of processing to ensure we can gracefully shut down operations:

func (p *Pipeline) Process(ctx context.Context, directoryPath string) error {
    // Create a child context that we can cancel if needed
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    // Set up channels
    jobs := make(chan string, p.bufferSize)
    results := make(chan Result, p.bufferSize)

    // Start the worker pool
    CreateWorkerPool(ctx, p.concurrency, jobs, results)

    // Start a goroutine to collect results
    go p.collectResults(results)

    // Walk the directory and send jobs
    err := filepath.Walk(directoryPath, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }

        if !info.IsDir() && p.matchesFilter(path) {
            select {
            case <-ctx.Done():
                return ctx.Err()
            case jobs <- path:
                // Job sent successfully
            }
        }
        return nil
    })

    // Close the jobs channel after all files have been sent
    close(jobs)

    return err
}
Enter fullscreen mode Exit fullscreen mode

For resource efficiency, I've learned to be mindful of memory usage patterns. Pre-allocating buffers and reusing them where possible makes a significant difference:

// Reuse a buffer for line batching
batchLines := make([]string, 0, batchSize)

// Later in the code
for scanner.Scan() {
    batchLines = append(batchLines, scanner.Text())

    if len(batchLines) >= batchSize {
        processBatch(batchLines)
        // Clear slice but keep capacity
        batchLines = batchLines[:0]
    }
}
Enter fullscreen mode Exit fullscreen mode

File discovery is another important aspect. Using filepath.Walk provides a simple way to traverse directories, but for very large directories with millions of files, I've found that using a separate goroutine for discovery helps maintain steady flow:

func (p *Pipeline) discoverFiles(ctx context.Context, rootPath string, jobs chan<- string) error {
    sem := make(chan struct{}, 20) // Limit concurrent directory reads

    var discover func(path string) error
    discover = func(path string) error {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case sem <- struct{}{}:
            defer func() { <-sem }()
        }

        entries, err := os.ReadDir(path)
        if err != nil {
            return err
        }

        for _, entry := range entries {
            fullPath := filepath.Join(path, entry.Name())

            if entry.IsDir() {
                if err := discover(fullPath); err != nil {
                    return err
                }
            } else if p.matchesFilter(fullPath) {
                select {
                case <-ctx.Done():
                    return ctx.Err()
                case jobs <- fullPath:
                    // Job sent successfully
                }
            }
        }

        return nil
    }

    return discover(rootPath)
}
Enter fullscreen mode Exit fullscreen mode

Progress monitoring is essential for long-running processing jobs. I implement this by tracking statistics in a thread-safe manner:

type Statistics struct {
    mu             sync.Mutex
    filesProcessed int
    linesProcessed int
    bytesProcessed int64
    errorCount     int
    startTime      time.Time
}

func (s *Statistics) Update(result Result) {
    s.mu.Lock()
    defer s.mu.Unlock()

    if result.Err != nil {
        s.errorCount++
        return
    }

    s.filesProcessed++
    s.linesProcessed += result.LineCount
    s.bytesProcessed += result.BytesRead
}

func (s *Statistics) String() string {
    s.mu.Lock()
    defer s.mu.Unlock()

    duration := time.Since(s.startTime)
    return fmt.Sprintf(
        "Processed %d files (%d lines, %.2f MB) in %v (%.2f files/sec, %.2f MB/sec), %d errors",
        s.filesProcessed,
        s.linesProcessed,
        float64(s.bytesProcessed)/(1024*1024),
        duration,
        float64(s.filesProcessed)/duration.Seconds(),
        float64(s.bytesProcessed)/(1024*1024*duration.Seconds()),
        s.errorCount,
    )
}
Enter fullscreen mode Exit fullscreen mode

Rate limiting can be important when processing files that might impact external systems. I implement this using a simple time-based throttling mechanism:

func rateLimitedWorker(ctx context.Context, jobs <-chan string, results chan<- Result, rateLimit time.Duration) {
    limiter := time.NewTicker(rateLimit)
    defer limiter.Stop()

    for job := range jobs {
        select {
        case <-ctx.Done():
            return
        case <-limiter.C:
            result := processFile(ctx, job, 0)
            results <- result
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

For processing files with different formats, I've found that implementing a strategy pattern works well:

type Processor interface {
    Process(ctx context.Context, filePath string) (Result, error)
}

type CSVProcessor struct {
    // Configuration for CSV processing
}

type JSONProcessor struct {
    // Configuration for JSON processing
}

func (p *CSVProcessor) Process(ctx context.Context, filePath string) (Result, error) {
    // CSV-specific processing
}

func (p *JSONProcessor) Process(ctx context.Context, filePath string) (Result, error) {
    // JSON-specific processing
}

// Factory function to get appropriate processor
func GetProcessor(filePath string) Processor {
    ext := filepath.Ext(filePath)
    switch ext {
    case ".csv":
        return &CSVProcessor{}
    case ".json":
        return &JSONProcessor{}
    default:
        return &DefaultProcessor{}
    }
}
Enter fullscreen mode Exit fullscreen mode

When dealing with very large files that exceed available memory, streaming processing becomes essential. I implement this using io.Reader interfaces and incremental processing:

func processLargeFile(ctx context.Context, filePath string) error {
    file, err := os.Open(filePath)
    if err != nil {
        return err
    }
    defer file.Close()

    reader := bufio.NewReader(file)
    buffer := make([]byte, 64*1024) // 64KB chunks

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        n, err := reader.Read(buffer)
        if err != nil {
            if err == io.EOF {
                break
            }
            return err
        }

        // Process chunk of data
        processChunk(buffer[:n])
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

I've found that proper connection management is essential when processing files that involve database operations:

func processBatchWithDB(ctx context.Context, db *sql.DB, batch []string) error {
    // Begin transaction
    tx, err := db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }

    // Prepare statement
    stmt, err := tx.PrepareContext(ctx, "INSERT INTO records (field1, field2) VALUES (?, ?)")
    if err != nil {
        tx.Rollback()
        return err
    }
    defer stmt.Close()

    // Insert batch data
    for _, line := range batch {
        fields := parseLine(line)
        _, err := stmt.ExecContext(ctx, fields[0], fields[1])
        if err != nil {
            tx.Rollback()
            return err
        }
    }

    return tx.Commit()
}
Enter fullscreen mode Exit fullscreen mode

Connection pooling further improves performance when dealing with database operations:

func setupDBPool() *sql.DB {
    db, err := sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/database")
    if err != nil {
        panic(err)
    }

    // Set connection pool parameters
    db.SetMaxOpenConns(25)
    db.SetMaxIdleConns(25)
    db.SetConnMaxLifetime(5 * time.Minute)

    return db
}
Enter fullscreen mode Exit fullscreen mode

When implementing multi-stage pipelines, I connect stages using channels:

func buildPipeline(ctx context.Context, inputDir string, outputDir string, concurrency int) {
    // Stage 1: Find files
    filesChan := make(chan string, 100)
    go findFiles(ctx, inputDir, filesChan)

    // Stage 2: Read and parse files
    parsedDataChan := make(chan ParsedData, 100)
    for i := 0; i < concurrency; i++ {
        go parseFiles(ctx, filesChan, parsedDataChan)
    }

    // Stage 3: Transform data
    transformedChan := make(chan TransformedData, 100)
    for i := 0; i < concurrency; i++ {
        go transformData(ctx, parsedDataChan, transformedChan)
    }

    // Stage 4: Write results
    for i := 0; i < concurrency; i++ {
        go writeResults(ctx, transformedChan, outputDir)
    }
}
Enter fullscreen mode Exit fullscreen mode

For complex transformations, using workers specialized for different tasks improves efficiency:

func setupSpecializedWorkers(ctx context.Context, inputChan <-chan Data) <-chan Result {
    resultChan := make(chan Result, 100)

    // Text processing workers
    for i := 0; i < 5; i++ {
        go textProcessor(ctx, inputChan, resultChan)
    }

    // Image processing workers
    for i := 0; i < 3; i++ {
        go imageProcessor(ctx, inputChan, resultChan)
    }

    // Numeric processing workers
    for i := 0; i < 7; i++ {
        go numericProcessor(ctx, inputChan, resultChan)
    }

    return resultChan
}
Enter fullscreen mode Exit fullscreen mode

Timeouts are crucial for preventing stuck operations. I implement them at various levels:

func processWithTimeout(filePath string, timeout time.Duration) (Result, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    resultChan := make(chan Result, 1)
    errChan := make(chan error, 1)

    go func() {
        result, err := processFile(ctx, filePath)
        if err != nil {
            errChan <- err
            return
        }
        resultChan <- result
    }()

    select {
    case <-ctx.Done():
        return Result{}, ctx.Err()
    case err := <-errChan:
        return Result{}, err
    case result := <-resultChan:
        return result, nil
    }
}
Enter fullscreen mode Exit fullscreen mode

I've found that monitoring and observability are essential for production systems. Adding logging and metrics to the pipeline helps identify bottlenecks:

func monitoredWorker(ctx context.Context, id int, jobs <-chan string, results chan<- Result, logger *log.Logger, metrics *Metrics) {
    for job := range jobs {
        startTime := time.Now()

        logger.Printf("Worker %d starting job: %s", id, job)
        result := processFile(ctx, job, id)

        duration := time.Since(startTime)
        metrics.RecordJobDuration(duration)
        metrics.IncrementJobsProcessed()

        if result.Err != nil {
            logger.Printf("Worker %d error processing %s: %v", id, job, result.Err)
            metrics.IncrementErrors()
        } else {
            logger.Printf("Worker %d completed %s: processed %d lines", id, job, result.LineCount)
        }

        results <- result
    }
}
Enter fullscreen mode Exit fullscreen mode

In my production systems, I also implement circuit breakers to handle failure scenarios gracefully:

type CircuitBreaker struct {
    mu            sync.Mutex
    failureCount  int
    threshold     int
    resetTimeout  time.Duration
    lastFailure   time.Time
    isOpen        bool
}

func (cb *CircuitBreaker) Execute(job func() error) error {
    cb.mu.Lock()
    if cb.isOpen {
        if time.Since(cb.lastFailure) > cb.resetTimeout {
            // Half-open state: allow one request through
            cb.mu.Unlock()
        } else {
            cb.mu.Unlock()
            return errors.New("circuit breaker open")
        }
    } else {
        cb.mu.Unlock()
    }

    err := job()

    if err != nil {
        cb.mu.Lock()
        cb.failureCount++
        cb.lastFailure = time.Now()

        if cb.failureCount >= cb.threshold {
            cb.isOpen = true
        }
        cb.mu.Unlock()
    } else {
        cb.mu.Lock()
        cb.failureCount = 0
        cb.isOpen = false
        cb.mu.Unlock()
    }

    return err
}
Enter fullscreen mode Exit fullscreen mode

For applications requiring exact-once processing guarantees, I implement checkpointing:

type Checkpoint struct {
    mu           sync.Mutex
    processedIDs map[string]bool
    checkpointFn func(map[string]bool) error
}

func (cp *Checkpoint) MarkProcessed(id string) error {
    cp.mu.Lock()
    defer cp.mu.Unlock()

    cp.processedIDs[id] = true
    return cp.checkpointFn(cp.processedIDs)
}

func (cp *Checkpoint) IsProcessed(id string) bool {
    cp.mu.Lock()
    defer cp.mu.Unlock()

    return cp.processedIDs[id]
}
Enter fullscreen mode Exit fullscreen mode

In summary, building a high-performance concurrent file processing pipeline in Go requires careful consideration of resource management, error handling, and workflow design. By leveraging Go's concurrency primitives and following the patterns I've outlined above, you can create efficient pipelines that handle large volumes of files with controlled resource usage. The key is finding the right balance between parallelism and system constraints while maintaining clean error propagation and graceful cancellation throughout your processing flow.


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)