DEV Community

Cover image for Thinking in Pipelines: A Better Way to Structure Go Systems
Francis Awuor
Francis Awuor

Posted on

Thinking in Pipelines: A Better Way to Structure Go Systems

I was working on a personal project recently.

A job scraper.

And in the process, I came across a pattern that’s genuinely changed how I think about structuring backend systems in Go.

It's called the Pipeline Pattern. And as it turns out, it actually shows up in a lot of places - payments, analytics, APIs, etc.

In this article, I’ll be walking you through it using my job scraper project. Which is actually a perfect use case for this pattern, as you’ll see.

The Mess We're Trying to Avoid

Now before I show you the pattern, let me show you what the code could look like without it.

Now essentially what my scraper does is this:

  • Scrape job listings from multiple sources
  • Normalize them (i.e., clean them up)
  • Score them based on keywords (The most relevant to my skillset get the highest scores)
  • Save them to a database.

So initially what I might have done, was something like this (simplified):

for _, raw := range rawJobs {
    // normalize
    raw.Title = strings.TrimSpace(raw.Title)
    raw.Location = strings.ReplaceAll(raw.Location, "NYC", "New York")

    // score
    score := 0
    for _, keyword := range keywords {
        if strings.Contains(raw.Title, keyword) {
            score++
        }
    }

    // save
    s.Repo.Create(raw.Title, raw.Location, score)
}
Enter fullscreen mode Exit fullscreen mode

Which technically works. But it creates a few problems:

  • No clear stages. Where does normalization end and scoring begin? You have to read everything to understand anything.
  • Hard to test. How do you test just the scoring logic? You can't. It's glued to everything else inside that loop.
  • Hard to change. Want a new scoring rule? You're digging through the loop, touching normalization, maybe breaking the save logic. Everything is coupled.
  • Hard to reuse. If you need that normalization logic somewhere else, you have to copy-paste it. And now you’re duplicating code.
  • Concurrency feels impossible. Now imagine you want to process jobs concurrently. Where do you even start? The loop? The scoring part? The saving part? Everything is tangled, so it’s hard to determine what to run concurrently.

The Realization

Now here’s the thing you might notice when you look at that loop.

It's not really one problem. It's the same set of steps, repeated for every job. Scrape a job, clean it up, evaluate it, save it. Every single time.

So the question becomes — what if we made those steps explicit?


The Pipeline

That's the core idea. Instead of one big loop doing everything, you break the work into stages:

Scrape → Normalize → Score → Store

Each stage does one thing. Data flows through, gets transformed, moves on.

Here's what that looks like in practice. This is the actual pipeline from my scraper:

type Pipeline struct {
    scorer         scoring.Scorer
    jobService     JobService
    companyService CompanyService
    logger         *slog.Logger
}

func NewPipeline(
    scorer scoring.Scorer,
    jobService JobService,
    companyService CompanyService,
    logger *slog.Logger,
) *Pipeline {
    return &Pipeline{
        scorer:         scorer,
        jobService:     jobService,
        companyService: companyService,
        logger:         logger,
    }
}
Enter fullscreen mode Exit fullscreen mode

Notice what's happening in NewPipeline. You're not hardcoding any specific scraper or store. You're passing them in. We'll come back to why that matters in a second.

Now here's the Run() method. This is where the actual pipeline executes:

func (p *Pipeline) Run(ctx context.Context, scraper Scraper) error {
    // 1. Scrape
    rawJobs, err := scraper.Scrape(ctx)
    if err != nil {
        return fmt.Errorf("scraping %s: %w", scraper.Source(), err)
    }

    for _, rawJob := range rawJobs {
        // 2. Normalize
        normalizedJob, err := normalize.Normalize(rawJob)
        if err != nil {
            failed++
            continue
        }

        // 3. Score
        job.Score = p.scorer.Score(job)

        // 4. Save
        if err := p.jobService.Save(ctx, job); err != nil {
            failed++
            continue
        }
        saved++
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Same logic as the messy version. But now every stage is cleanly separated into it’s own separate function.

You can read this top to bottom as it is now and immediately understand what the system does. Scrape, normalize, score, save. No digging or guessing. The structure tells you the story.

And that's before we even get to the best part.


Swappability

Here's where it gets interesting.

Because each stage is separate and wired through interfaces, you can swap any part of the pipeline without touching the rest. Think about what that means in practice.

Swap the scraper. During testing, I use a FakeScraper that returns hardcoded jobs. In production, that becomes a real scraper hitting actual job sites.

// testing
scraper := &FakeScraper{}

// production
scraper := &RemotiveScraper{}
Enter fullscreen mode Exit fullscreen mode

The pipeline doesn't change. Not a single line.

Swap the store. In development, an in-memory store is fast and easy to work with. In production, that's a real database.

// development
store := NewInMemoryStore()

// production
store := NewPostgresStore()
Enter fullscreen mode Exit fullscreen mode

Same pipeline. Different backend.

Swap the scorer. Right now I'm using a simple keyword-based scorer. Later, that could become something smarter. Maybe ML-based, maybe something else entirely.

scorer := &KeywordScorer{keywords: []string{"Go", "backend", "remote"}}

// later...
scorer := &MLScorer{}
Enter fullscreen mode Exit fullscreen mode

Still the same pipeline.

This really made it click for me: You don’t rewrite the system. You just swap parts. Go back to that messy loop for a second. Try swapping the scraper there. You can't. The logic is baked in, everything depends on everything, there's no clean boundary to swap across.

The pipeline gives you those boundaries.


Concurrency - The Worker Pool

So now the system is clean and flexible. But what about performance?

Right now, everything runs one job at a time. Scrape one, normalize it, score it, save it. Then move to the next. For small datasets that's fine. For a thousand jobs, that's slow.

So let's level it up.

The idea is simple: instead of processing one job at a time, spin up a pool of workers (goroutines) and let them process jobs concurrently. If you haven't used goroutines before, they're basically lightweight threads in Go. You can spin up many of them without much overhead. The key word there is many, not unlimited. More on that in a second.

Here's the shape of it:

func (p *Pipeline) Run(ctx context.Context, scraper Scraper, numWorkers int) error {
    rawJobs, err := scraper.Scrape(ctx)
    if err != nil {
        return fmt.Errorf("scraping %s: %w", scraper.Source(), err)
    }

    jobs := make(chan RawJob)
    var wg sync.WaitGroup

    // spin up workers
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for raw := range jobs {
                // normalize, score, save
            }
        }()
    }

    // feed jobs into the channel
    for _, raw := range rawJobs {
        jobs <- raw
    }
    close(jobs)

    wg.Wait()
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Think of it like a queue and a team. The channel is the queue. Raw jobs go in one end, workers pull from the other. Each worker runs a job through the pipeline stages independently. sync.WaitGroup just makes sure we don't return until every worker is done.

The numWorkers parameter is the important part. You decide how many workers run. Not Go, not the runtime. You. That matters because unbounded concurrency has a real cost. A thousand goroutines all trying to write to your database at the same time will hurt more than help. Three to ten workers, controlled, is usually the right call.

The pipeline didn't change conceptually. It just runs in parallel now.


Pipelines Are Everywhere

Now what’s interesting… once you internalize this pattern, you start seeing it everywhere.

Payments: validate the card → charge it → save the transaction. Analytics: collect the event → clean it → store it. APIs: receive the request → process it → send the response.

Different domains, same shape. Data comes in, moves through stages, comes out the other side transformed. That's the pipeline pattern. And the reason it keeps showing up is because it maps well onto how a lot of real work actually flows, i.e., step by step, with clear handoffs between stages.

Which means if you learn to recognize it, you can apply it. Not just in job scrapers, but anywhere you find yourself writing code that takes something, does a series of things to it, and produces a result.


So, Why Bother?

You could write a system without any of this. Plenty of working software is just one big loop doing everything. And honestly, for something small and throwaway, that's fine.

But the moment your system needs to grow, that big loop starts fighting you. You want to add a new data source, but the scraping logic is tangled with the normalization. You want to test scoring in isolation, but it's buried three levels deep. You want to swap your in-memory store for a real database, but there's no clean seam to grab onto.

I’ve run into all this while working on my scraper and other recent builds. And the pipeline pattern has made those problems manageable.

That's really what this is about. Not clean code for the sake of clean code. But about building something you can actually come back to, change, and grow without it fighting you the whole way.

The four ideas worth keeping:

  • Break into stages. Each stage does one thing.
  • Keep stages focused. If a stage is hard to name, it's probably doing too much.
  • Make parts swappable. Wire through interfaces, not concrete types.
  • Control concurrency. A worker pool, not unlimited goroutines.

Thinking in pipelines, I feel, makes a system easier to reason about. Which matters a lot when you’re in the middle of building something or making updates.


That's the pattern. Hope it's useful.

Top comments (0)