In my recent exploration of Apache Beam during a day off, I dived into creating simple example programs, and I'm excited to share them with you. If you're a beginner venturing into Apache Beam, I hope these programs will prove valuable in easing your initiation. Let's make your Apache Beam journey smoother!
!pip install apache-beam
import apache_beam as beam
The Simplest Program: Displaying Data
This basic program reads local variables and prints each row. Initially, the program defines the local variable data
, which contains three elements: "Hello"
, "Apache"
, and "Beam"
. The pipeline instance is established using with
statements. The beam.Create
statement reads the local variable data
and creates a PCollection
, a fundamental concept in Apache Beam representing a collection of input/output data. Subsequently, the beam.Map
statements map the print
function to each row, resulting in the printing of the three elements of the local variable data
on stdout
.
data = ['Hello', 'Apache', 'Beam']
with beam.Pipeline() as p:
(p
| "Create Data" >> beam.Create(data)
| "Print" >> beam.Map(print)
)
Hello
Apache
Beam
Add Columns
In this example, data is retrieved from the local variable data
, structured like a table showcasing prices of various fruits. Each row consists of the fruit's name and its corresponding price. The goal is to calculate the price of each fruit, including a 10% tax, and introduce a new column for the updated prices.
class AddTaxColumn(beam.DoFn):
def process(self, element):
name, value = element
new_value = value * 1.1
yield (name, value, new_value)
data = [("apple", 100), ("banana", 200), ("orange", 150)]
with beam.Pipeline() as p:
(p
| 'Create Data' >> beam.Create(data)
| 'AddColumn' >> beam.ParDo(AddTaxColumn())
| 'Print' >> beam.Map(print)
)
('apple', 100, 110.00000000000001)
('banana', 200, 220.00000000000003)
('orange', 150, 165.0)
In Apache Beam, two major options, beam.Map
and beam.ParDo
, are available to apply functions to each row. The key difference is that beam.Map
generates only one output row per input row, whereas beam.ParDo
can produce multiple output rows per input row.
You can pass an ordinary Python function or a class instance inheriting from beam.DoFn
to the beam.ParDo
argument. However, a beam.DoFn
instance cannot be passed to beam.Map
. In the case of beam.Map
, you need to implement the function to return each output row based on its input element. With beam.ParDo
, you must implement the function to yield each output row for its input element. If you yield multiple elements, you can obtain multiple outputs per row.
Here are examples to perform the same transformation:
def add_tax_column_map(element):
name, value = element
new_value = value * 1.1
return (name, value, new_value)
data = [("apple", 100), ("banana", 200), ("orange", 150)]
with beam.Pipeline() as p:
(p
| 'Create Data' >> beam.Create(data)
| 'AddColumn' >> beam.Map(add_tax_column_map)
| 'Print' >> beam.Map(print)
)
('apple', 100, 110.00000000000001)
('banana', 200, 220.00000000000003)
('orange', 150, 165.0)
def add_tax_column_pardo(element):
name, value = element
new_value = value * 1.1
yield (name, value, new_value)
data = [("apple", 100), ("banana", 200), ("orange", 150)]
with beam.Pipeline() as p:
(p
| 'Create Data' >> beam.Create(data)
| 'AddColumn' >> beam.ParDo(add_tax_column_pardo)
| 'Print' >> beam.Map(print)
)
('apple', 100, 110.00000000000001)
('banana', 200, 220.00000000000003)
('orange', 150, 165.0)
Parsing JSON Strings
Parsing JSON is a straightforward task using beam.Map
(or beam.ParDo
) in Apache Beam. This is achieved by applying the json.loads
function through mapping, resulting in a collection of dict
objects.
import json
data = [
'{"name": "John", "age": 30, "city": "New York"}',
'{"name": "Alice", "age": 25, "city": "San Francisco"}',
'{"name": "Bob", "age": 35, "city": "Los Angeles"}'
]
with beam.Pipeline() as p:
(p
| "Create Data" >> beam.Create(data)
| "Parse JSON" >> beam.Map(json.loads)
| "Print" >> beam.Map(print)
)
{'name': 'John', 'age': 30, 'city': 'New York'}
{'name': 'Alice', 'age': 25, 'city': 'San Francisco'}
{'name': 'Bob', 'age': 35, 'city': 'Los Angeles'}
Read or Write Data in CSV, JSON, or Parquet Format
So far, we've used simple input and output operations with local variables and the print
function to display results on stdout
. However, Apache Beam supports reading and writing various data formats, such as CSV, JSON, and Parquet. Let's explore examples of reading and writing files in these formats.
To read CSV data, we can use beam.io.ReadFromCsv
and beam.io.ReadFromParquet
. For JSON data, we first read it as text data using beam.io.ReadFromText
, identifying each line as a separate row, and then parse it using beam.Map(json.loads)
.
# make sample csv, json, and parquet data.
import pandas as pd
data = {
'Name': ['Alice', 'Bob', 'Charlie'],
'Age': [25, 30, 22],
'City': ['New York', 'San Francisco', 'Los Angeles']
}
pd.DataFrame(data).to_csv("input.csv", index=False)
pd.DataFrame(data).to_json("input.json", orient='records', lines=True)
pd.DataFrame(data).to_parquet("input.parquet")
Now we have data in the following formats:
input.csv
Name,Age,City
Alice,25,New York
Bob,30,San Francisco
Charlie,22,Los Angeles
input.json
{"Name":"Alice","Age":25,"City":"New York"}
{"Name":"Bob","Age":30,"City":"San Francisco"}
{"Name":"Charlie","Age":22,"City":"Los Angeles"}
We can read this data with the following code:
print("1. read csv")
with beam.Pipeline() as p:
(p
| 'Read CSV' >> beam.io.ReadFromCsv('input.csv')
| 'Print' >> beam.Map(print)
)
print("2. read json")
with beam.Pipeline() as p:
(p
| 'Read JSON' >> beam.io.ReadFromText('input.json')
| "Parse JSON" >> beam.Map(json.loads)
| 'Print' >> beam.Map(print)
)
print("3. read parquet")
with beam.Pipeline() as p:
(p
| 'Read Parquet' >> beam.io.ReadFromParquet('input.parquet')
| 'Print' >> beam.Map(print)
)
1. read csv
BeamSchema_15bbd568_c8ba_4bea_aec0_a0120a8c4705(Name='Alice', Age=25, City='New York')
BeamSchema_15bbd568_c8ba_4bea_aec0_a0120a8c4705(Name='Bob', Age=30, City='San Francisco')
BeamSchema_15bbd568_c8ba_4bea_aec0_a0120a8c4705(Name='Charlie', Age=22, City='Los Angeles')
2. read json
{'Name': 'Alice', 'Age': 25, 'City': 'New York'}
{'Name': 'Bob', 'Age': 30, 'City': 'San Francisco'}
{'Name': 'Charlie', 'Age': 22, 'City': 'Los Angeles'}
3. read parquet
{'Name': 'Alice', 'Age': 25, 'City': 'New York'}
{'Name': 'Bob', 'Age': 30, 'City': 'San Francisco'}
{'Name': 'Charlie', 'Age': 22, 'City': 'Los Angeles'}
Writing is also straightforward using beam.io.WriteToCsv
, beam.io.WriteToText
, and beam.io.WriteToParquet
. However, there are some points to consider.
Firstly, you need to provide a schema for the data. One approach is to create and specify a class inheriting from typing.NamedTuple
. In this example, we use this approach to output CSV and JSON. However, when outputting a Parquet file, you need to specify the pyarrow data type when using beam.io.WriteToParquet
with its schema
argument.
Secondly, the output will be sharded. So, if you set the output filename as output.csv
, the actual output filename will be output.csv-00000-of-00001
. This holds true for both JSON and Parquet files, similar to the CSV case.
data = [
{'Name': 'Alice', 'Age': 25, 'City': 'New York'},
{'Name': 'Bob', 'Age': 30, 'City': 'San Francisco'},
{'Name': 'Charlie', 'Age': 22, 'City': 'Los Angeles'}
]
import typing
class PersonalInformation(typing.NamedTuple):
Name: str
Age: int
City: str
# Write CSV
with beam.Pipeline() as p:
(p
| 'Create Data' >> beam.Create(data).with_output_types(PersonalInformation)
| 'Write CSV' >> beam.io.WriteToCsv('output.csv')
)
# Write JSON
with beam.Pipeline() as p:
(p
| 'Create Data' >> beam.Create(data).with_output_types(PersonalInformation)
| 'Write JSON' >> beam.io.WriteToJson('output.json')
)
import pyarrow
schema = pyarrow.schema(
[('Name', pyarrow.string()), ('Age', pyarrow.int64()), ('City', pyarrow.string())]
)
# Write Parquet
with beam.Pipeline() as p:
(p
| 'Create Data' >> beam.Create(data)
| 'Write Parquet' >> beam.io.WriteToParquet("output.parquet", schema)
)
assert pd.read_csv("output.csv-00000-of-00001").equals(pd.DataFrame(data))
assert pd.read_json("output.json-00000-of-00001", orient='records', lines=True).equals(pd.DataFrame(data))
assert pd.read_parquet("output.parquet-00000-of-00001").equals(pd.DataFrame(data))
Filtering Data Based on Conditions
Data filtering is a common task in data processing, and Apache Beam provides a convenient method to achieve this using the beam.Filter
transform. To implement this, we simply need to supply a Python function that returns a boolean value based on the elements of each row.
data = [
('Alice', 25, 'New York'),
('Bob', 30, 'San Francisco'),
('Charlie', 22, 'Los Angeles')
]
with beam.Pipeline() as p:
(p
| 'Create Data' >> beam.Create(data)
| 'Filter Age' >> beam.Filter(lambda x: x[1] > 23)
| 'Print' >> beam.Map(print)
)
('Alice', 25, 'New York')
('Bob', 30, 'San Francisco')
Aggregation
For operations similar to GROUP BY
in SQL, Apache Beam provides the beam.GroupByKey
transform. Before applying this transform, it's necessary to convert the data into tuples, where the first element represents the key. After applying beam.GroupByKey
, the resulting data will be a tuple consisting of the key and a list of associated elements.
data = [
{"Name": 'Alice', "Price": 100},
{"Name": 'Alice', "Price": 200},
{"Name": 'Alice', "Price": 100},
{"Name": 'Bob', "Price": 300},
{"Name": 'Bob', "Price": 150},
{"Name": 'Charlie', "Price": 220}
]
with beam.Pipeline() as p:
(p
| 'Create Data' >> beam.Create(data)
| 'Transform' >> beam.Map(lambda x: (x["Name"], x["Price"]))
| 'Filter Age' >> beam.GroupByKey()
| 'Print' >> beam.Map(print)
)
('Alice', [100, 200, 100])
('Bob', [300, 150])
('Charlie', [220])
For calculating the sum for each Name
, you can use beam.Map
(or beam.ParDo
) to perform the summation.
with beam.Pipeline() as p:
(p
| 'Create Data' >> beam.Create(data)
| 'Transform' >> beam.Map(lambda x: (x["Name"], x["Price"]))
| 'Filter Age' >> beam.GroupByKey()
| 'Sum' >> beam.Map(lambda x: (x[0], sum(x[1])))
| 'Print' >> beam.Map(print)
)
('Alice', 400)
('Bob', 450)
('Charlie', 220)
Joining Data
Data can be joined using beam.CoGroupByKey()
, which first performs a GroupBy
operation and then a Join
. Multiple inputs can be read using the p | 'Create Data' >> beam.Create(data)
statement multiple times. The following code results in a somewhat complex output structure.
data_age = [
('Alice', 25),
('Bob', 30),
('Charlie', 22)
]
data_city = [
('Alice', 'New York'),
('Bob', 'San Francisco'),
('Charlie', 'Los Angeles')
]
with beam.Pipeline() as p:
age = p | 'Create Data Age' >> beam.Create(data_age)
city = p | 'Create Data City' >> beam.Create(data_city)
(
(age, city)
| 'Merge' >> beam.CoGroupByKey()
| 'Print' >> beam.Map(print)
)
('Alice', ([25], ['New York']))
('Bob', ([30], ['San Francisco']))
('Charlie', ([22], ['Los Angeles']))
For a dictionary-structured output, you can pass a dictionary as an input to the beam.CoGroupByKey()
operation.
with beam.Pipeline() as p:
age = p | 'Create Data Age' >> beam.Create(data_age)
city = p | 'Create Data City' >> beam.Create(data_city)
(
{"Age": age, "City": city}
| 'Merge' >> beam.CoGroupByKey()
| 'Print' >> beam.Map(print)
)
('Alice', {'Age': [25], 'City': ['New York']})
('Bob', {'Age': [30], 'City': ['San Francisco']})
('Charlie', {'Age': [22], 'City': ['Los Angeles']})
Since this performs the GroupBy
operation first, the output is aggregated if there are many rows per key in the input data.
data_age = [
('Alice', 25),
('Alice', 30),
('Charlie', 22)
]
data_city = [
('Alice', 'New York'),
('Alice', 'San Francisco'),
('Charlie', 'Los Angeles')
]
with beam.Pipeline() as p:
age = p | 'Create Data Age' >> beam.Create(data_age)
city = p | 'Create Data City' >> beam.Create(data_city)
(
(age, city)
| 'Merge' >> beam.CoGroupByKey()
| 'Print' >> beam.Map(print)
)
('Alice', ([25, 30], ['New York', 'San Francisco']))
('Charlie', ([22], ['Los Angeles']))
The CoGroupByKey
merge strategy is equivalent to a full outer join in SQL, allowing for keys present only in the right or left data.
data_age = [
('Alice', 25),
('Charlie', 22)
]
data_city = [
('Alice', 'New York'),
('Bob', 'San Francisco'),
]
with beam.Pipeline() as p:
age = p | 'Create Data Age' >> beam.Create(data_age)
city = p | 'Create Data City' >> beam.Create(data_city)
(
(age, city)
| 'Merge' >> beam.CoGroupByKey()
| 'Print' >> beam.Map(print)
)
('Alice', ([25], ['New York']))
('Charlie', ([22], []))
('Bob', ([], ['San Francisco']))
Additionally, left join or inner join operations in SQL can be emulated by sequentially using beam.Filter
.
# left outer join
with beam.Pipeline() as p:
age = p | 'Create Data Age' >> beam.Create(data_age)
city = p | 'Create Data City' >> beam.Create(data_city)
(
(age, city)
| 'Merge' >> beam.CoGroupByKey()
| 'Left' >> beam.Filter(lambda x: len(x[1][0]) > 0)
| 'Print' >> beam.Map(print)
)
('Alice', ([25], ['New York']))
('Charlie', ([22], []))
# inner join
with beam.Pipeline() as p:
age = p | 'Create Data Age' >> beam.Create(data_age)
city = p | 'Create Data City' >> beam.Create(data_city)
(
(age, city)
| 'Merge' >> beam.CoGroupByKey()
| 'Left' >> beam.Filter(lambda x: len(x[1][0]) > 0 and len(x[1][1]) > 0)
| 'Print' >> beam.Map(print)
)
('Alice', ([25], ['New York']))
Windowing Data
In Apache Beam, we can perform transformations, aggregations, or other complex operations while considering timestamps for each row. This crucial concept allows us to use the same code and pipeline structure for both batch processing and streaming, though the streaming process is beyond the scope of this article. Each row can have its timestamp, and we can separate it by timestamp window and apply common operations to all windows. Here is an example:
The input data consists of logs from some program. This example first extracts the time from the log and attaches a timestamp. Then, it counts occurrences for each type of log: INFO
, WARNING
, and ERROR
. Window information, including start and end times, can be obtained by accessing beam.DoFn.WindowParam
, as shown in the example.
from datetime import datetime
data = [
'2022-03-08 12:00:00 INFO Something ...',
'2022-03-08 12:01:00 WARNING ...',
'2022-03-08 12:02:00 ERROR ...',
'2022-03-08 12:03:00 INFO ...',
'2022-03-08 12:04:00 WARNING ...',
'2022-03-08 12:05:00 ERROR ...',
'2022-03-08 12:06:00 ERROR ...',
'2022-03-08 12:08:00 WARNING ...',
'2022-03-08 12:08:30 ERROR ...'
]
def extract_timestamp_and_log(line):
parts = line.split()
timestamp_str = parts[0] + ' ' + parts[1]
timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
unix_time = int(timestamp.timestamp())
return unix_time, parts[2]
def add_start_end_info(row, window=beam.DoFn.WindowParam):
# window.start and window.end are instances of the `apache_beam.utils.timestamp.Timestamp` class.
start = datetime.utcfromtimestamp(window.start.seconds()).strftime("%Y-%m-%d %H:%M:%S")
end = datetime.utcfromtimestamp(window.end.seconds()).strftime("%Y-%m-%d %H:%M:%S")
return row + (start, end)
with beam.Pipeline() as p:
(p
| 'Create Data' >> beam.Create(data)
| 'Extract Time' >> beam.Map(extract_timestamp_and_log)
| 'Assign Timestamp' >> beam.Map(lambda x: beam.window.TimestampedValue(x[1], x[0]))
| 'Fixed Windows' >> beam.WindowInto(beam.window.FixedWindows(300)) # 5 minutes window
| 'Make Key and Count' >> beam.Map(lambda x: (x, 1))
| 'Group By Type' >> beam.GroupByKey()
| 'Count' >> beam.Map(lambda x: (x[0], sum(x[1])))
| 'Add Window Start and End' >> beam.Map(add_start_end_info)
| 'Print' >> beam.Map(print)
)
('INFO', 2, '2022-03-08 12:00:00', '2022-03-08 12:05:00')
('WARNING', 2, '2022-03-08 12:00:00', '2022-03-08 12:05:00')
('WARNING', 1, '2022-03-08 12:05:00', '2022-03-08 12:10:00')
('ERROR', 1, '2022-03-08 12:00:00', '2022-03-08 12:05:00')
('ERROR', 3, '2022-03-08 12:05:00', '2022-03-08 12:10:00')
Displaying a Pipeline DAG
To visualize the Directed Acyclic Graph (DAG) structure of a sample pipeline, we can use the apache_beam.runners.interactive.display.pipeline_graph.PipelineGraph.get_dot()
method. The following code defines a pipeline, generates a .dot
file illustrating the pipeline diagram, and converts it into a .png
image. It's worth noting that we do not use the with
statement to define the pipeline, as we are not executing the pipeline; our goal is solely to define its structure.
from apache_beam.runners.interactive.display.pipeline_graph import PipelineGraph
data = ['Hello', 'Apache', 'Beam']
p = beam.Pipeline()
(p
| "Create Data" >> beam.Create(data)
| "Print" >> beam.Map(print)
)
dag_dot = PipelineGraph(p).get_dot()
%store PipelineGraph(p).get_dot() >dag.dot
!apt -qqq install graphviz
!dot -Tpng dag.dot > dag.png
from IPython.display import Image
Image('dag.png')
Writing 'PipelineGraph(p).get_dot()' (str) to file 'dag.dot'.
Further Studies
There are plenty of documents about Apache Beam, and the following two sites are good introductions:
Top comments (0)