Monday, August 22, 2011

Map Reduce Job to Calculate Total Unique Lines in Your Dataset

Now the Pandora's box is open, I woud unleash the power of this powerful MR framework by giving the example of another very useful requirement. "Give me the total unique lines in the dataset ?". Jumbo kicks in as we fire the following command and rolls down your data nodes to give you the ans:

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class CalculateDistinct {
public static class Map extends MapReduceBase implements Mapper {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text("");
public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {
word.set(value.toString());
output.collect(word,one);
}
}
public static class Combine extends MapReduceBase implements Reducer{

public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
int sum = 0;
Text text = new Text("UniqueUsers");
while (values.hasNext()) {
sum += 1;
values.next();
}

output.collect(text, new IntWritable(sum));
}

}
public static class Reduce extends MapReduceBase implements Reducer {
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += 1;
values.next();
}

output.collect(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(CalculateDistinct.class);
conf.set("fs.default.name","hdfs://localhost:8020/home/hadoop/");
conf.setJobName("Calculate Distinct");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Combine.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path("file:///Users/jagarandas/Work-Assignment/Analytics/analytics-poc/sample-data/"));
FileOutputFormat.setOutputPath(conf, new Path("/home/hadoop/sample-data2/"));
JobClient.runJob(conf);
}
}

1 comment:

Unknown said...

I think in reducer and combiner it must be something like this...


after while loop...


if(sum==1) {
output.collect(key, new IntWritable(sum));
}

/* so you'll get only the unique lines .
for now you'll be printing the keys with 1 which can be modified as required. */