DEV Community

Brian Michalski
Brian Michalski

Posted on • Updated on

Go + BigQuery : Beam for Beginners

Curious how to get started reading from BigQuery using Apache Beam in Go? In this post, we'll walk through the basics of Apache Beam and write a basic Go pipeline that reads from BigQuery and computes some ngrams.

Beam Intro

Apache Beam is an open-source SDK for writing "big data" processing pipelines complete with Python, Java, and Go implementations. If all goes well, you can write a pipeline using Apache Beam and then run it locally or deploy it on The Cloud using GCP Dataflow, Apache Flink, Spark, etc where it can magically scale up to handle a large amount of data.

Since Beam pipelines can magically scale you have to write your pipeline using Beam's special methods and types. This allows your pipeline to be sharded across dozens/hundreds of workers behind the scenes. For beginners there are two key constructs to know:

  • PCollection: A PCollection is a fancy array. Most steps of your pipeline will take a PCollection as input, run a function on each element, and put that output in another PCollection.
  • ParDo: A ParDo is a function that runs on each PCollection element. When it runs, it can append one or more elements to the resulting PCollection.

Note: This is an oversimplified introduction to Apache Beam. Fancier operations like group/combine/join require more functions you can learn about in the docs.

Step 1: Boring Boilerplate

There is a bunch of boilerplate code that will import beam, construct a new pipeline (beam.NewPipeline), and execute it (beamx.Run). In the middle, we've left space to build all the pieces of the pipeline.

package main

import (
    "context"
    "flag"

    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
)

func main() {
    flag.Parse()
    beam.Init()

    ctx := context.Background()
    p := beam.NewPipeline()
    s := p.Root()

    // Build the pipeline here.

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }
}

Step 2: Reading from BigQuery

Pipelines written in Go read from BigQuery just like most other Go programs, running a SQL query and decoding the results into structs that match the returned fields.

For our example, we're going to be reading HackerNews comments from the BigQuery public dataset so we'll need to add a struct which models that result and then a SQL query to query the data. For testing purposes, consider adding a LIMIT clause to your SQL query to reduce how much data it pulls.

Wiring this up as the first step of our pipeline is as easy as adding the bigqueryio.Query function which returns one of those PCollections we talked about earlier.

import (
    ...
    "reflect"

    "github.com/apache/beam/sdks/go/pkg/beam/io/bigqueryio"
    "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
    ...
)

type CommentRow struct {
    Text   string `bigquery:"text"`
}

const query = `SELECT text
FROM ` + "`bigquery-public-data.hacker_news.comments`" + `
WHERE time_ts BETWEEN '2013-01-01' AND '2014-01-01'
LIMIT 1000
`

func main() {
    ...
    p := beam.NewPipeline()
    s := p.Root()
    project := gcpopts.GetProject(ctx)

    rows := bigqueryio.Query(s, project, query,
        reflect.TypeOf(CommentRow{}), bigqueryio.UseStandardSQL())

    ...
}

In the code above, rows will be a PCollection<CommentRow>, effectively a big array where each element is one comment from HackerNews. We're also using the gcpopts utility which helps grab the GCP Project which BigQuery should bill against; you could just hardcode a string too.

StandardSQL vs LegacySQL

Currently, BigQueryIO defaults to LegacySQL in Apache Beam. I do not like using anything "legacy" so the SQL query above uses Standard SQL (note the unusual backtick escaping around the table name) and sets bigqueryio.UseStandardSQL to enable this. This is a new feature for the Go SDK in Apache Beam 2.23, if your environment is using something older you'll need to upgrade to escape out of LegacySQL mode.

Step 3: Ngram Extraction

Ngrams (n-grams) extract phrases from blocks of text. The text "quick brown fox" contains 3 unigrams: ["quick", "brown", "fox"], 2 bigrams: ["quick brown", "brown fox"] and 1 trigram: ["quick brown fox"].

We can extract them using a function like this:

func ngram(str string, ns ...int) []string {
    split := strings.Split(str, " ")
    results := []string{}

    for _, n := range ns {
        i := 0
        for i < len(split)-n+1 {
            results = append(results, strings.Join(split[i:i+n], " "))
            i++
        }
    }

    return results
}

Since we're dealing with PCollections here, the implementation isn't as easy as calling that function in a for-loop over rows. Instead, we have to us the ParDo function, which can distribute this operation across many machines and later aggregate the results. Imagine if this ngram() function was very expensive or slow, it might make sense to have lots of workers extracting ngrams in parallel to get the job done faster. Beam will figure that out for us.

ngrams := beam.ParDo(s, func(row CommentRow, emit func(string)) {
    for _, gram := range ngram(row.Text, 1, 2, 3) {
        emit(gram)
    }
}, rows)

This snippet takes the rows input (specified last) and runs the inline-function on each element (aka each CommentRow). The function also has a reference to an emitter function emit which is used to collect the results and package them back up into a PCollection. A loop calls emit() on each value returned from the ngram() function above. The result is ngrams which is an array of each ngram extracted, a PCollection<string>.

Step 4: Analytics

Now that we have all the ngrams extracted and floating around in this big magical array it's time to count how many times each value is found in the array. Remember, the previous step just extracted the raw words -- it did not attempt to count/dedup/aggregate them.

Counting occurrences of things is a pretty common problem with special ways of distributing across multiple workers. Luckily we don't need to deal with any of that, just use the stats.Count function.

counts := stats.Count(s, ngrams)

This function returns yet another PCollection, this time containing a key-value pair (aka KV in Beam-lingo) where the key is the ngram string, and the value is an int count... essentially a PCollection<{string, int}>.

Step 5: Outputting

Now that we have the data we want it's time to output it somewhere. For this example, we'll dump it to a text file. To do this we need to make a collection of strings (PCollection<string>). Another ParDo to the rescue!

formatted := beam.ParDo(s, func(gram string, c int) string {
    return fmt.Sprintf("%s,%v", gram, c)
}, counts)

This ParDo uses a slightly different syntax than the previous one. Instead of exposing an emit() function the inline function just returns a string. We can use this simpler(?) syntax since each element of the input collection counts only converts to one output element; in our previous use building the ngrams each input string had multiple output values.

The result here is that formatted contains a list of strings suitable for printing, PCollection<string>.

We then call the textio.Write function and point it to an output path:

textio.Write(s, "/tmp/output.txt", formatted)

Final Code

package main

import (
    "context"
    "flag"
    "fmt"
    "reflect"
    "strings"

    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/io/bigqueryio"
    "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
    "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
)

// CommentRow models 1 row of HackerNews comments.
type CommentRow struct {
    Text string `bigquery:"text"`
}

const query = `SELECT text
FROM ` + "`bigquery-public-data.hacker_news.comments`" + `
WHERE time_ts BETWEEN '2013-01-01' AND '2014-01-01'
LIMIT 1000
`

// ngram extracts variable sizes of ngrams from a string.
func ngram(str string, ns ...int) []string {
    split := strings.Split(str, " ")
    results := []string{}

    for _, n := range ns {
        i := 0
        for i < len(split)-n+1 {
            results = append(results, strings.Join(split[i:i+n], " "))
            i++
        }
    }

    return results
}

func main() {
    flag.Parse()
    beam.Init()

    ctx := context.Background()
    p := beam.NewPipeline()
    s := p.Root()
    project := gcpopts.GetProject(ctx)

    // Build a PCollection<CommentRow> by querying BigQuery.
    rows := bigqueryio.Query(s, project, query,
        reflect.TypeOf(CommentRow{}), bigqueryio.UseStandardSQL())

    // Extract unigrams, bigrams, and trigrams from each comment.
    // Builds a PCollection<string> where each string is a single
    // occurrence of an ngram.
    ngrams := beam.ParDo(s, func(row CommentRow, emit func(string)) {
        for _, gram := range ngram(row.Text, 1, 2, 3) {
            emit(gram)
        }
    }, rows)

    // Count the occurrence of each ngram.
    // Returns a PCollection<{string, count: int}>.
    counts := stats.Count(s, ngrams)

    // Convert each count row into a string suitable for printing.
    // Returns a PCollection<string>.
    formatted := beam.ParDo(s, func(gram string, c int) string {
        return fmt.Sprintf("%s,%v", gram, c)
    }, counts)

    // Write each string to a line in the output file.
    textio.Write(s, "/tmp/output.txt", formatted)

    // Now that the pipeline is fully constructed, we execute it.
    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }
}

Running this with a command like $ go run main.go --project=<gcp-project-id> will kick off a local pipeline and output some ngrams to /tmp/output.txt. To run a more scalable version, perhaps without the LIMIT SQL clause, check out the "Dataflow" Tab on the Go Quickstart with the right flags to run on GCP Dataflow.

dataflow section with flags highlighted

Happy analyzing!

Top comments (1)

Collapse
 
temphiza profile image
temphiza

Hi Brian,

Great post!, I'm wondering if u have any idea on deploying the final pipeline to gcp?, in go there is no clue of --template_location (used on java/python to create classic templates)

best,

Faustino