DEV Community

Cover image for Getting started with Apache Beam for distributed data processing
Kimmo Sääskilahti
Kimmo Sääskilahti

Posted on

Getting started with Apache Beam for distributed data processing

MapReduce was revolutionary when it was first published in 2004. It provided a programming model for batch processing datasets with terabytes of data. MapReduce was built on three seemingly simple phases: map, sort, and reduce. It used the general-purpose HDFS (Hadoop Distributed File System) file system for I/O and was therefore capable of processing almost any kind of data.

MapReduce jobs were notoriously tedious to write. High-level APIs such as Hive and Pig provided higher-level APIs wrapping MapReduce and made it a lot easier to get stuff done with MapReduce.

However, the MapReduce model had other shortcomings. For example, the stiff map-sort-reduce flow isn't optimal for every kind of job: the sort phase is often unnecessary and sometimes it would be much more useful to chain reducers directly without a new map-sort phase. MapReduce was also built for the kind of fault-tolerance that isn't really required outside Google scale: it insists on writing all intermediate state on HDFS, which makes processing slower and takes a lot of storage space.

New frameworks and programming models such as Apache Spark, Apache Tez and Apache Flink emerged to address these short-comings. Spark does its best to keep data close to the executors or even in memory between tasks, which can speed things up a lot. Its dataset abstractions such as resilient distributed dataset (RDD) and Dataset also made it a lot easier to reason about and write programs running in distributed setting.

Apache Beam is yet another abstraction for massively parallel processing jobs. Beam allows declaring both batch and streaming jobs in unified fashion and they can run in any execution engine such as Spark, Flink, Google Cloud Dataflow, Apache Samza, or Twister 2.

In this article, we'll declare a processing pipeline with Apache Beam Python SDK and execute the pipeline in Dataflow. Alternatively, you can execute the pipeline in your local machine.

Setup

The code for the example can be found in the wordcount/ folder of this repository. To get started, move to the folder and install the requirements with

$ pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode

You'll probably want to create a virtual environment before running the command. You can also install Apache Beam Python SDK directly with pip install apache-beam[gcp], where the gcp bundle includes everything required to use the Google Cloud Dataflow runner as execution engine.

Google Cloud Setup

Note: Running the pipeline will incur charges on your Google Cloud account. See Dataflow pricing. For me, running this pipeline multiple times incurred charges of $0.01 (covered by the free-tier credits).

If you want to execute the pipeline in Dataflow, you'll need to execute the following steps:

  1. Create a Google Cloud account and a project within the account in the console.
  2. Install and setup Google Cloud CLI if you want to manage resources from the command-line.
  3. Enable Dataflow API as instructed here. You may also need to enable other APIs: you'll get detailed instructions when your pipeline fails.
  4. Create a GCP bucket for writing output, preferably in the same region where you run Dataflow. To create bucket from the command line, run: gsutil mb -p PROJECT_ID -c STANDARD -l REGION -b on gs://BUCKET_NAME. Fill in project ID, region and bucket name here.
  5. Create a service account, download the JSON key and store it somewhere in your machine.

Running the pipeline

The pipeline definition is located in main.py. The file is almost identical to the wordcount_minimal.py from Beam examples. We'll go through the pipeline code soon, but if you're in hurry, here are the details how to execute the pipeline first locally with the local execution engine (DirectRunner) without Google Cloud account:

python main.py --runner DirectRunner --input gs://dataflow-samples/shakespeare/kinglear.txt --output output/counts
Enter fullscreen mode Exit fullscreen mode

It should only take a few seconds to execute the pipeline. Check the output in output/counts folder:

$ cat output/counts-00000-of-00001 | head -n 5
KING: 243
LEAR: 236
DRAMATIS: 1
PERSONAE: 1
king: 65
Enter fullscreen mode Exit fullscreen mode

To run the same script with DataflowRunner, writing output to the GCS bucket:

$ export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json
$ export GCP_PROJECT=your-gcp-project-11111111
$ export GCP_BUCKET=your-gcp-bucket
$ export GCP_REGION=europe-west-1
$ python main.py --runner DataflowRunner --project ${GCP_PROJECT} --region=${GCP_REGION} --staging_location=gs://${GCP_BUCKET}/staging --temp_location gs://${GCP_BUCKET}/temp --job_name wordcount-job --input gs://dataflow-samples/shakespeare/kinglear.txt --output gs://${GCP_BUCKET}/output/counts
Enter fullscreen mode Exit fullscreen mode

Running the pipeline in the cloud can take up to three minutes to finish.

Understand the pipeline

The complete pipeline definition is here (without parsing arguments etc.):

import re
from past.builtins import unicode

import apache_beam as beam

from apache_beam.options.pipeline_options import (
    PipelineOptions,
)

from apache_beam.io import ReadFromText, WriteToText

with beam.Pipeline(options=options) as p:

    lines = p | ReadFromText(input_file)

    counts = (
        lines
        | "Split"
        >> (
            beam.FlatMap(lambda x: re.findall(r"[A-Za-z\']+", x)).with_output_types(
                unicode
            )
        )
        | "PairWithOne" >> beam.Map(lambda x: (x, 1))
        | "GroupAndSum" >> beam.CombinePerKey(sum)
    )

    def format_result(word_count):
        (word, count) = word_count
        return "%s: %s" % (word, count)

    output = counts | "Format" >> beam.Map(format_result)
    output | WriteToText(output_file)
Enter fullscreen mode Exit fullscreen mode

Let's start from the pipeline creation. The pipeline object is created with options specifying, for example, the configuration for the execution engine. The pipeline is used as a context manager with beam.Pipeline(options=options) as p so that it can be executed at __exit__.

We start the pipeline with ReadFromText from the apache_beam.io package:

lines = p | ReadFromText(input_file)
Enter fullscreen mode Exit fullscreen mode

ReadFromText returns a PCollection, which is Beam's term for the dataset:

A PCollection represents a potentially distributed, multi-element dataset that acts as the pipeline's data. Apache Beam transforms use PCollection objects as inputs and outputs for each step in your pipeline. A PCollection can hold a dataset of a fixed size or an unbounded dataset from a continuously updating data source.

ReadFromText itself is an PTransform:

A transform represents a processing operation that transforms data. A transform takes one or more PCollections as input, performs an operation that you specify on each element in that collection, and produces one or more PCollections as output. A transform can perform nearly any kind of processing operation, including performing mathematical computations on data, converting data from one format to another, grouping data together, reading and writing data, filtering data to output only the elements you want, or combining data elements into single values.

In the Python SDK, transforms are chained with the vertical bar |. See here for the definition of __or__.

So now lines is a PCollection containing all lines in the input text file. Here are the next steps:

counts = (
    lines
    | "Split"
    >> (
        beam.FlatMap(lambda x: re.findall(r"[A-Za-z\']+", x)).with_output_types(
            unicode
        )
    )
    | "PairWithOne" >> beam.Map(lambda x: (x, 1))
    | "GroupAndSum" >> beam.CombinePerKey(sum)
)
Enter fullscreen mode Exit fullscreen mode

The bit-shift operator is overridden by defining __rrshift__ for PTransform to allow naming it. In the "Split" transform, each line is split into words. This collection of collections is flattened to a collection with beam.FlatMap. The "PairWithOne" transform maps every word to a tuple (x, 1). The first item is the key and the second item is the value. The key-value pairs are then fed to the "GroupAndSum" transform, where all values are summed up by key. This is parallelized word count!

Finally, the output is formatted and written:

def format_result(word_count):
    (word, count) = word_count
    return "%s: %s" % (word, count)

output = counts | "Format" >> beam.Map(format_result)
output | WriteToText(output_file)
Enter fullscreen mode Exit fullscreen mode

The "Format" transform maps every (word, count) pair with the format_result function. The output is written to output_file with the WriteToText transform.

Conclusion

That concludes my quick start for Apache Beam. It's a very promising project and there's a lot to learn. Please leave comments or questions if you have any, thanks a lot for reading!

Latest comments (0)