DEV Community

Syukur
Syukur

Posted on

Making Sense Golang Worker Pattern

Motivation

Imagine we are asked to insert 10000 rows to database by
calling repository.InsertStudentData() function.
Just iterate over rows. It sounds fine at first, but how long
it will take to completely insert them? What will likely happend
to computer resources?

Fine, we can use goroutine to run them concurrently.

Understanding Worker Pattern

Basically, worker pattern consist of 3 steps

  1. Spawn workers
  2. Publish task/job to workers
  3. Collecting results

  4. Spawn workers
    Workers in this pattern are represented in function that runs
    concurrently using goroutine. Look at snippet below.

{
    ...
    intNumWorkers := 3

    for w := 0; w < intNumWorkers; w++ {
        go InsertStudentData(ctx, tx, insertedStudentJobs, insertedStudentjobsErrCh)
    }
    ...
}

func InsertStudentData(ctx context.Context, tx *sql.Tx, data <-chan models.StudentItem, errCh chan<- error) {
    ...
}   
Enter fullscreen mode Exit fullscreen mode

This means the code will spawns 3 different workers where
each worker does not need to wait each other to finish its task.

Illustration of this step as follows.

  1. Publish task/job to workers As mentioned earlier, Because workers run concurrently and are independent of each other, and because goroutines cannot directly return values to their caller, channels are used as a coordination mechanism for workers to pull jobs and publish results safely.

Take a look of this snippet where a caller publishes
jobs to workers.

{
    ...
    numJobs := len(StudentItems)
    jobs := make(chan models.StudentItem, numJobs)

    for i := 0; i < numJobs; i++ {
        jobs <- StudentItems[i]
    }

    close(jobs)
    ...
}

func InsertStudentWorker(ctx context.Context, tx *sql.Tx, data <-chan models.StudentItem, errCh chan<- error) {
    for d := range data {
        err := repository.InsertStudentItemFromWorker(ctx, tx, d)
        ...
    }
}   
Enter fullscreen mode Exit fullscreen mode

Worker accepts a read-only channel argument (data <-chan models.StudentItem).
This means as task is availble on channel, whichever available worker
will recieves it. Recall that workers
are running concurrently, this means we don't care which worker
will pull task from caller.

Finally, we close jobs channel to prevent goroutine leak.

This illustration demonstrates second step.

  1. Collecting results Once worker finish its task, the result will be send to write-only channel. Since caller knows how many jobs were published, it reads number of results from the channel without caring which worker produces each one. Snippet below is enhancement from above, where worker publishes its result to result channel, then caller read from the stream.
{
    ...
    numJobs := len(StudentItems)
    jobs := make(chan models.StudentItem, numJobs)

    for i := 0; i < numJobs; i++ {
        jobs <- StudentItems[i]
    }

    close(jobs)

    // read from channel stream
    for j := 0; j < numJobs; j++ {
        err := <-jobsErrCh
        if err != nil {
            _ = tx.Rollback()
            return err
        }
    }
}

func InsertStudentWorker(ctx context.Context, tx *sql.Tx, data <-chan models.StudentItem, errCh chan<- error) {
    for d := range data {
        err := repository.InsertStudentItemFromWorker(ctx, tx, d)
        errCh <- err
    }
}   
Enter fullscreen mode Exit fullscreen mode

Illustration below demonstrates this step.

Top comments (0)