MapReduce Composite Key Operation-Part2

Composite Key Operation In complex operation where we require multi column operation, basic key may not help. For example if we need to calculate population group by country and state then selection of key matter. If we choose the composite key wisely it could solve problem easily. We can create composite key by implementing WritableComparable interface which make it use like any normal WritableComparable interface object. Below is the composite key with two fields country and state which overwrite compare method to do sorting based on country and state. It provides write() and readfields() method to searlized and de-searlized attributes.

private static class CompositeGroupKey implements
WritableComparable<CompositeGroupKey> {
String country;
String state;
public void write(DataOutput out) throws IOException {
WritableUtils.writeString(out, country);
WritableUtils.writeString(out, state);
}
public void readFields(DataInput in) throws IOException {
this.country = WritableUtils.readString(in);
this.state = WritableUtils.readString(in);
}
public int compareTo(CompositeGroupKey pop) {
if (pop == null)
return 0;
int intcnt = country.compareTo(pop.country);
return intcnt == 0 ? state.compareTo(pop.state) : intcnt;
}
@Override
public String toString() {
return country.toString() + ":" + state.toString();
}
}

We are using above composite key to create MapReduce job to count total population group by Country and State. input

Country State City Population (Mil)
USA, CA Su 12
USA, CA SA 42
USA, CA Fr 23
USA, MO XY 23
USA, MO AB 19
USA, MO XY 23
USA, MO AB 19
IND, TN AT 11
IND, TN KL 10

output

Country State Total Population
IND TN 21
USA CA 77
USA MO 84

Mapper Program Once we define composite key we create the mapper class which use input generated from InputFormat. Input Format split the file and pass to individual Mapper which invoke multiple map tasks. Map task transform input split record into Key-value pair where Key and Value should be implement WritableComparable interface. Writable Interface provides the capabilities to write the data into disk and sort it. The Number of map task will be decided based on InputSplits defined in InputFormat. The split is a logically split not physical. The MapReduce first invoke setup () method of context and then invoke map (Object, Object, Context) for each input split and at last invoke cleanup (Context) method for cleanup activity. We extend Mapper class to basic generic Mapper<Key2, Value1, Key2, Value2> class which indicate the input & out put for key and value(s). Mapper class could be overwrite map () method to process the input data as below

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] keyvalue = line.split(",");
populat.set(Integer.parseInt(keyvalue[3]));
CompositeGroupKey cntry = new CompositeGroupKey(keyvalue[0], keyvalue[1]);
context.write(cntry, populat);
}

If you see above map method it pass Key as Object and value as Text. Value contains each line of file. Inside map method we split the line and create the Key-value and pass to intermediate area through context. Context will fill the io buffer with key-value mapper and later spills to local disk. The map task output grouped by sorted key and writes to the local disk as intermediate data. The grouping of map output defined by partition that identifies the reducer for each key. MapReduce could also provides local combiner which combine intermediate map output and pass to the reducer to cut down the amount of data transferred from the Map to the Reducer. Reducer: The Reducer copy intermediate map tasks output from local disk through http and pass to individual reducer based on each key. Before invoking individual reduce task, Reducer shuffle, merge and sort key-value. Reducer process collection of values for each key and write to the disk. Below is the reduce method which will be spawned by Reducer for each Key that means each reduce task would be having single key with collection of values

public void reduce(CompositeGroupKey key, Iterator<IntWritable> values,Context context) throws IOException, InterruptedException {
int cnt = 0;
while (values.hasNext()) {
cnt = cnt + values.next().get();
}
context.write(key, new IntWritable(cnt));
}

Job: For running the MapReduce we have to set the Mapper, Reducers and other property in JobConf. JobConf is the main configuration class, which configure MapReduce parameters such as Mapper, Reducer, Combiner, InputFormat, OutputFormat, and Comparator etc. Below code shows how to create and run the job based on above map and reduce code

Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "GroupMR");
job.setJarByClass(GroupMR.class);
job.setMapperClass(GroupMapper.class);
job.setReducerClass(GroupReducer.class);
job.setOutputKeyClass(CompositeGroupKey.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setMaxInputSplitSize(job, 10);
FileInputFormat.setMinInputSplitSize(job, 100);
FileInputFormat.addInputPath(job, new Path("/Local/data/Country.csv"));
FileOutputFormat.setOutputPath(job, new Path("/Local/data/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);

This main method calls the MapReduce Job. Before calling the job we need to set the MapperClass, ReducerClass, OutputKeyClass and OutputValueClass. We can also set the FileInputFormat and FileOutputFormat. Running: To run the MapReduce program is quite straight forward, what we need to do package the java application in JAR say hadoopessence.jar and run as below in command line hadoop jar hadoopessence.jar.jar org.hadoopessence.compositesorting /input folder and /output folder in HDFS Download: Click here to download source code Reference Hadoop Essence: The Beginner’s Guide to Hadoop & Hive

Hadoop MapReduce Group By Operation – Part1

Introduction This article is continuity of my previous articles on Hadoop MapReduce. In my previous article from my Book Hadoop Essence, I had discussed about overall MapReduce architecture and how it works plus basic Hello World program. In this article I will be discussing how to write basic MapReduce program to calculate sum based on group by operation.

You can download full code in here. Click Here to Download source code.

Input and Output This program read the file country.csv, which contains Country CD, State, City, Population (million) USA, CA, Sunnyvale, 12 USA, CA, SAN JOSE, 42 USA, MO, XY, 23 USA, MO, AB, 19 IND, TN, AT, 11 IND, TN, KL, 10 MapReduce program will process and aggregate the total population group by country and state as below Country CD, State, Total Population (million) IND, TN, 22 USA, CA, 54 USA, MO, 42

Environment Setup You can refer Setup link to setup to configure Apache Hadoop.

Project Dependency We have to add Hadoop dependency in POM.xml of Maven project. We can create a maven based Java project and add the below Hadoop core dependency in POM. In this application I am using Hadoop 1.x version.

<dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-core</artifactId>
        <version>1.2.1</version>
    </dependency>

Mapper Program Mapper program split input file and spawned map task for each splits. Map task transform input split record into Key-value pair where Key and Value should be implement WritableComparable interface. Writable Interface provides the capabilities to write the data into disk and sort it. The Number of map task will be decided based on InputSplits defined in InputFormat. The split is a logically split not physical splits. The MapReduce first invoke setup () method of context and then invoke map (Object, Object, Context) for each input split and at last invoke cleanup (Context) method for cleanup activity. We extend Mapper class to basic generic Mapper<Key2, Value1, Key2, Value2> class which indicate the input & out put key and value. Mapper class could be overwrite map () method to process the input data as below

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] keyvalue = line.split(&amp;amp;quot;,&amp;amp;quot;);
            cntText.set(new Text(keyvalue[0]));
            stateText.set(keyvalue[1]);
            populat.set(Integer.parseInt(keyvalue[3]));
            Country cntry = new Country(cntText, stateText);
            context.write(cntry, populat);
        }
        

If you see above map method it pass Key as Object and value as Text. Value contains each line of file. Inside map method we split the line and create the Key-value and pass to intermediate area through context. Context will fill the io buffer with key-value mapper out and later spills to local disk. Below is the custom Country key, which I used for sorting the value country wise and then state wise. Below is country key which implements WritableComparable interface

private static class Country implements WritableComparable&amp;amp;lt;Country&amp;amp;gt; {
Text country;
        Text state;
        public Country(Text country, Text state) {
            this.country = country;
            this.state = state;
        }
        public Country() {
            this.country = new Text();
            this.state = new Text();
        }
public void write(DataOutput out) throws IOException {
            this.country.write(out);
            this.state.write(out);
        }
public void readFields(DataInput in) throws IOException {
            this.country.readFields(in);
            this.state.readFields(in);
        }
public int compareTo(Country pop) {
            if (pop == null)
                return 0;
            int intcnt = country.compareTo(pop.country);
            if (intcnt != 0) {
                return intcnt;
            } else {
                return state.compareTo(pop.state);

            }
        }
@Override
        public String toString() {
return country.toString() + &amp;amp;quot;:&amp;amp;quot; + state.toString();
        }
    }

In the above Key I have implemented compare method that will be used MapReduce program to sort the values. Below is the full mapper code

public  class GroupMapper extends Mapper&amp;amp;lt;LongWritable, Text, Country, IntWritable&amp;amp;gt; {
Country cntry = new Country();
Text cntText = new Text();
Text stateText = new Text();
IntWritable populat = new IntWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
            String[] keyvalue = line.split(&amp;amp;quot;,&amp;amp;quot;);
            cntText.set(new Text(keyvalue[0]));
            stateText.set(keyvalue[1]);
            populat.set(Integer.parseInt(keyvalue[3]));
            Country cntry = new Country(cntText, stateText);
            context.write(cntry, populat);

        }
    }

The map task output grouped by sorted key writes to the local disk and intermediate data. The grouping of map output defined by partition that identifies the reducer for each key. MapReduce also provide local combiner which combine intermediate map output and pass to the reducer. It helps to cut down the amount of data transferred from the Map to the Reducer. Here org.apache.hadoop.io.Text class as key, which provides methods to searlized, and compare texts at byte level. The value IntWritable again searlized as it implements writable interface and support sorting as it implements WritableComparable interface. Reducer: The Reducer copy intermediate map tasks output from local disk through http and pass to individual reducer based on each key. Before invoking individual reduce task Reducer shuffle, merge and sort key-value. Reduce task process collection of value for single key. Reducer process collection of values for each key and writ to the disk. Reducer class implements generic Reducer class as below public static class GroupReducer extends Reducer<Country, IntWritable, Country, IntWritable> { The generic Reducer class defined input & output key-value here we are getting input in <Country, IntWritable> and spill the out as <Country, IntWritable> Below is the reduce method which will be spawned by Reducer method for each Key that means each reduce task would be having single key with collection of values.

public void reduce(Country key, Iterator&amp;amp;lt;IntWritable&amp;amp;gt; values, Context context) throws IOException,
                InterruptedException {
int cnt = 0;
            while (values.hasNext()) {
                cnt = cnt + values.next().get();
            }
            context.write(key, new IntWritable(cnt));

        }
        

Job: For running the MapReduce we have to set the Mapper, Reducers and other property in JobConf. JobConf is the main configuration class, which configure MapReduce parameters such as Mapper, Reducer, Combiner, InputFormat, OutputFormat, and Comparator etc. Below code shows how to create and run the job based on above map and reduce code

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        FileUtils.deleteDirectory(new File(&amp;amp;quot;/Local/data/output&amp;amp;quot;));
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, &amp;amp;quot;GroupMR&amp;amp;quot;);
        job.setJarByClass(GroupMR.class);
        job.setMapperClass(GroupMapper.class);
        job.setReducerClass(GroupReducer.class);
        job.setOutputKeyClass(Country.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.setMaxInputSplitSize(job, 10);
        FileInputFormat.setMinInputSplitSize(job, 100);
        FileInputFormat.addInputPath(job, new Path(&amp;amp;quot;/Local/data/Country.csv&amp;amp;quot;));
        FileOutputFormat.setOutputPath(job, new Path(&amp;amp;quot;/Local/data/output&amp;amp;quot;));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

This main method calls the MapReduce Job. Before calling the job we need to set the MapperClass, ReducerClass, OutputKeyClass and OutputValueClass. We can also set the FileInputFormat and FileOutputFormat. FileInputFormat also use to decide number of map tasks. We can also set input split size, which take part while deciding number of map task s below. Split size can min between maxSize and blockSizeMath.min(maxSize, blockSize) As per above code, framework will execute the map-reduce job as described in Job.

Running To run the MapReduce program is quite straight forward, what we need to do package the java application in JAR say hadoopessence.jar and run as below in command line hadoop jar hadoopessence.jar.jar org.techmytalk.hadoopessence. GroupMR /input folder and /output folder in HDFS For testing we can directly run this application as Run As in eclipse.

Download Source We can download full source using download link

Summary: In this article I explained basic Map Reduce to compute Group By operation Use download link to download full source code.

Reference Hadoop Essence: The Beginner’s Guide to Hadoop & Hive