DEV Community

Mario Carrion
Mario Carrion

Posted on • Updated on • Originally published at mariocarrion.com

Complex Pipelines in Go (Part 1): Introduction

Just recently I had the opportunity to work on a tool in charge of processing gigabytes of data over the wire, the end goal was to download that data, process the values and finally insert them into persistent storage in batches.

This is the first of a series of posts covering all the different pieces involved to achieve the final tool.

Big Picture

Succinctly this solution will consist of 3 processes:

  1. Data Producer Process: reads input data and sends that to a destination for further processing,
  2. Data Consumer Process: receives raw data, parses those values using the expected format and sends them to a different process,
  3. Persistent Storage Process: receives parsed data and stores that in batches.

This is the classic problem solved using Pipelines. The biggest difference between that classic post and this new series is how cancellation comes into place when working with multiple goroutines. This means defining rules regarding the expected behavior when anything fails, all of this handled using two great Go packages: context and errgroup.

For our example we will be using a file part of IMDB's datasets. Those files are gzipped, tab-separated-values (TSV) formatted in the UTF-8 character set. The specific file to use will be name.basics.tsv.gz which defines the following fields:

|-------------------|-----------|---------------------------------------------------|
|       Field       | Data Type |                   Description                     |
|-------------------|-----------|---------------------------------------------------|
| nconst            | string    | alphanumeric unique identifier of the name/person |
| primaryName       | string    | name by which the person is most often credited   |
| birthYear         | string    | in YYYY format                                    |
| deathYear         | string    | in YYYY format if applicable, else '\N'           |
| primaryProfession | []string  | the top-3 professions of the person               |
| knownForTitles    | []string  | titles the person is known for                    |
|-------------------|-----------|---------------------------------------------------|
Enter fullscreen mode Exit fullscreen mode

Data Producer Process: Input Data Format

Because of the location (http resource) and the file data format (gzip) of this input file, our Data Producer Process will request the file using net/http, uncompres the received values using compress/gzip and send them to the Data Consumer Process as raw []byte.

Data Consumer Process: Input Data Format

Those raw []byte values will be read into TSV records using encoding/csv and from there they will be converted into values of a new struct type Name that our next step in our pipeline can understand.

Persistent Storage Process: Input Data Format

The following will be used as the type containing the values to be eventually persisted in a database:

type Name struct {
    NConst             string
    PrimaryName        string
    BirthYear          string
    DeathYear          string
    PrimaryProfessions []string
    KnownForTitles     []string
}
Enter fullscreen mode Exit fullscreen mode

We will be using PostgreSQL as the relational database for this, and specifically github.com/jackc/pgx will be imported for storing the values in batches.

What's next?

The next blog pos will cover the implementation of the PostgreSQL Batcher, as we progress in the series we will continuously connect all the pieces together to eventually complete our final tool.

Top comments (0)