DEV Community

anujd64
anujd64

Posted on

Word Count Hadoop

Video version of this article: https://www.youtube.com/watch?v=wTkffAYsCBw
Credits: @UnboxingBigData

Code
Open Eclipse and create a new Java project.

Image description

Right click on project and click on Build Path > select Configure Build Path...

Image description

Click on libraries tab and click on Add External JARs...

Image description

Navigate to hadoop common folder in my case it is home/hadoop-3.3.6/share/hadoop/common and select all the JAR files.

Image description

Again click on Add External JARs...

Image description

Navigate to hadoop mapreduce folder in my case it is home/hadoop-3.3.6/share/hadoop/mapreduce and select all the JAR files.

Image description

Now the libraries tab will look like this:

Image description

Click apply and close.

Now create the three classes, code for the classes is given at the end of the article. I am just copy pasting the classes in the src folder of our Eclipse project.

Image description

Image description

Image description

Now we will be exporting JAR for out project, right click on the project and select export.

Image description

Image description

Select the export destination:

Image description

Image description

Image description

Select the main class for the project:

Image description

Image description

Image description

You will find the JAR at the selected location:

Image description

Create an input.txt file like so:

Image description

Now just execute the following commands one by one

cd Desktop
start-all.sh
hadoop fs -mkdir /wc_input
hadoop fs -put input.txt /wc_input
hadoop jar WordCount.jar /wc_input/input.txt /wc_output/
hadoop fs -cat /wc_output/part-00000
stop-all.sh
Enter fullscreen mode Exit fullscreen mode

What do these commands do ?

  • -mkdir command creates the folder wc_input in the hdfs(hadoop's distributed file system)
  • -put will copy the input.txt file to the folder we just created.
  • jar command uses the code in the JAR file exported to perform map reduce on the input.txt file and saves the output in wc_output directory.
  • -cat command just prints out the contents of the output produced by the mapreduce operation.
  • to delete a directory in hdfs use
hadoop fs -rm -r <DIRECTORY_PATH>
Enter fullscreen mode Exit fullscreen mode

Your terminal should look like this:

Image description


Code for the classes:

WC_Runner.java

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WC_Runner {
    public static void main(String[] args) throws IOException{
        JobConf conf = new JobConf(WC_Runner.class);
        conf.setJobName("WordCount");
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        conf.setMapperClass(WC_Mapper.class);
        conf.setCombinerClass(WC_Reducer.class);
        conf.setReducerClass(WC_Reducer.class);
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths(conf,new Path(args[0]));
        FileOutputFormat.setOutputPath(conf,new Path(args[1]));
        JobClient.runJob(conf);
    }
}
Enter fullscreen mode Exit fullscreen mode

WC_Reducer.java

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class WC_Reducer  extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable> {
    public void reduce(Text key, Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output,
                       Reporter reporter) throws IOException {
        int sum=0;
        while (values.hasNext()) {
            sum+=values.next().get();
        }
        output.collect(key,new IntWritable(sum));
    }
}
Enter fullscreen mode Exit fullscreen mode

WC_Mapper.java

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class WC_Mapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(LongWritable key, Text value,OutputCollector<Text,IntWritable> output,
                    Reporter reporter) throws IOException{
        String line = value.toString();
        StringTokenizer  tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()){
            word.set(tokenizer.nextToken());
            output.collect(word, one);
        }
    }

}
Enter fullscreen mode Exit fullscreen mode

Top comments (0)