This post covers how to deploy a local Docker Hadoop Cluster to run custom Python mapper and reducer function using the classic word count example.
We will use the Docker image by big-data-europe repository to set up Hadoop.
git clone email@example.com:big-data-europe/docker-hadoop.git
With the Docker image for Hadoop on your local machine, we can use docker-compose to configure the local Hadoop cluster. Replace the
docker-compose.yml file with the following file from this GitHub Gist.
This docker-compose file configures a Hadoop cluster with a master node (namenode) and three worker nodes, it also configures the network port to allow communication between the nodes. To start the cluster, run:
docker-compose up -d
docker ps to verify the containers are up, you should see a container list similar to the following:
IMAGE PORTS NAMES docker-hadoop_resourcemanager resourcemanager docker-hadoop_nodemanager1 0.0.0.0:8042->8042/tcp nodemanager1 docker-hadoop_historyserver 0.0.0.0:8188->8188/tcp historyserver docker-hadoop_datanode3 9864/tcp datanode3 docker-hadoop_datanode2 9864/tcp datanode2 docker-hadoop_datanode1 9864/tcp datanode1 docker-hadoop_namenode 0.0.0.0:9870->9870/tcp namenode
The current status of the local Hadoop cluster will be available at localhost:9870
For this simple MapReduce program, we will use the classical word count example. The program reads text files and counts how often each word occurs.
The mapper function will read the text and emit the key-value pair, which in this case is
<word, 1>. Copy the following code into
#!/usr/bin/env python """mapper.py""" import sys # input comes from STDIN (standard input) for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() # increase counters for word in words: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print ('%s\t%s' % (word, 1))
The reducer function processes the result from the mapper and returns the word count. Copy the following code into
#!/usr/bin/env python """reducer.py""" from operator import itemgetter import sys current_word = None current_count = 0 word = None # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\t', 1) # convert count (currently a string) to int try: count = int(count) except ValueError: # count was not a number, so silently # ignore/discard this line continue # this IF-switch only works because Hadoop sorts map output # by key (here: word) before it is passed to the reducer if current_word == word: current_count += count else: if current_word: # write result to STDOUT print ('%s\t%s' % (current_word, current_count)) current_count = count current_word = word # do not forget to output the last word if needed! if current_word == word: print ('%s\t%s' % (current_word, current_count))
Note because Hadoop runs on Apache server which is built in Java, the program takes a Java JAR file as an input. To execute Python in Hadoop, we will need to use the Hadoop Streaming library to pipe the Python executable into the Java framework. As a result, we need to process the Python input from STDIN.
Copy the local
reducer.py to the namenode:
docker cp LOCAL_PATH/mapper.py namenode:mapper.py docker cp LOCAL_PATH/reducer.py namenode:reducer.py
Enter the namenode container of the Hadoop cluster:
docker exec -it namenode bash
ls and you should find
reducer.py in the namenode container.
Now let's prepare the input. For this simple example, we will use a set of text files with a short string. For a more realistic example, you can use e-book from Project Gutenberg, download the
Plain Text UTF-8 encoding.
mkdir input echo "Hello World" >input/f1.txt echo "Hello Docker" >input/f2.txt echo "Hello Hadoop" >input/f3.txt echo "Hello MapReduce" >input/f4.txt
The MapReduce program access files from the Hadoop Distributed File System (HDFS). Run the following to transfer the input directory and files to HDFS:
hadoop fs -mkdir -p input hdfs dfs -put ./input/* input
find / -name 'hadoop-streaming*.jar' to locate the hadoop string library JAR file. The path should look something like
Finally, we can execute the MapReduce program:
hadoop jar /opt/hadoop-3.2.1/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar \ -file mapper.py -mapper mapper.py \ -file reducer.py -reducer reducer.py \ -input input -output output
To safely shut down the cluster and remove containers, run:
Yen V. (2019). How to set up a Hadoop cluster in Docker.
Retrieved from: here
Noll M. Writing An Hadoop MapReduce Program In Python.
Retrieved from: here