DEV Community

Sotirios Mantziaris
Sotirios Mantziaris

Posted on

Parallelize work using parwork

In order to process a lot of work we have to parallelize work across all cores, and especially if it's CPU bound.
Go has goroutines, which can be used to parallelize the work, but there is the cost of context switching for a lot of goroutines.
Minimizing this context switching can be achieved by using a fork-join model when processing work.

Parwork solves this problem by using goroutines, channels and waitgroups. It creates workers (goroutines) that pull
work of a queue (channel), process the work and report the work back to a queue (channel).
This is done in a abstracted way so the user has to provide implementation for:

  • a Work interface
  • a WorkGenerator function
  • a WorkCollector function

Work interface

type Work interface {
    Do()
    GetError() error
    Result() interface{}
}
Enter fullscreen mode Exit fullscreen mode

The work interface defines a method Do() which contains all the processing logic of the work item. The GetError() error method can be used to flag the work item as failed and return a error. The Result() interface{} defines a method which returns the result of the work. Due to the lack of generics the data return has to be cast from interface{} to the actual result type in order to be usable in the WorkCollector.

The following example work implementation shows a work item that calculates a MD5 hash of a string:

// md5Work defines a structure that holds the value to be hashed and the result of the hashing
type md5Work struct {
    data   []byte
    hashed []byte
}

// Do calculates the hash of the given value
func (gw *md5Work) Do() {
    gw.data = md5.New().Sum(gw.hashed)
}

// GetError returns nil since the work does not fail
func (gw *md5Work) GetError() error {
    return nil
}

// Result returns the hashed result
func (gw *md5Work) Result() interface{} {
    return gw.data
}
Enter fullscreen mode Exit fullscreen mode

Check out the examples folder of the Github repo for a complete example.

WorkGenerator function

type WorkGenerator func() Work
Enter fullscreen mode Exit fullscreen mode

The WorkGenerator function allows the user to provide a implementation that returns on each call a work item to be processed. If the generator returns nil the generation of work has finished.

Check out the examples folder of the Github repo for a implementation of the generator.

WorkCollector function

type WorkCollector func(Work)
Enter fullscreen mode Exit fullscreen mode

The WorkCollector function takes as a argument a completed Work item. It can check for a failure by calling the GetError or the Result method of the Work item and handle it appropriately.

Check out the examples folder of the Github repo for a implementation of the collector.

Check out

Head over to the Github repo to see the code, with a working example, try it and if you find something, like a bug or a improvement, don't hesitate do open a issue or better yet create a PR.

Thanks and enjoy!

Heroku

Amplify your impact where it matters most — building exceptional apps.

Leave the infrastructure headaches to us, while you focus on pushing boundaries, realizing your vision, and making a lasting impression on your users.

Get Started

Top comments (0)

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay