A program that calculates the average of given numerics using Map Reduce Combiner technique:
Please find the below figure which explains the concept in detail:
The code includes:
1. AverageDriver.java - Driver class
package com.self.average.driver; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.commons.cli2.CommandLine; import org.apache.commons.cli2.Group; import org.apache.commons.cli2.Option; import org.apache.commons.cli2.OptionException; import org.apache.commons.cli2.builder.ArgumentBuilder; import org.apache.commons.cli2.builder.DefaultOptionBuilder; import org.apache.commons.cli2.builder.GroupBuilder; import org.apache.commons.cli2.commandline.Parser; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.mahout.common.CommandLineUtil; import org.apache.tools.ant.util.FileUtils; import com.self.average.mapreduce.AverageMRC; public class AverageDriver extends Configured implements Tool { public static void main(final String[] args) throws Exception { final int res = ToolRunner.run(new Configuration(), new AverageDriver(), args); System.exit(res); } private Configuration conf; private String inputFile; private String outputName; private final String inputDirectoryPrefix = "Average"; private final String averagefilename = "average.txt"; public boolean cpFilestoUserLocation(final String useroutputfile, final String systemfile) throws IOException { System.out.println("Copy file to user location"); System.out.println("useroutputfile::" + useroutputfile); System.out.println("systemfile::" + systemfile); final FileUtils fu = FileUtils.getFileUtils(); fu.copyFile(systemfile, useroutputfile, null, true); return true; } @Override public Configuration getConf() { return this.conf; } public final String getDateTime() { final DateFormat df = new SimpleDateFormat("yyyy-MM-dd_hh-mm-ss"); return df.format(new Date()); } @SuppressWarnings("deprecation") @Override public int run(final String[] args) throws Exception { final DefaultOptionBuilder obuilder = new DefaultOptionBuilder(); final ArgumentBuilder abuilder = new ArgumentBuilder(); final GroupBuilder gbuilder = new GroupBuilder(); final Option inputFileOpt = obuilder .withLongName("input") .withShortName("input") .withRequired(false) .withArgument( abuilder.withName("input").withMinimum(1) .withMaximum(1).create()) .withDescription("input file").create(); final Option outputDirOpt = obuilder .withLongName("o") .withShortName("o") .withRequired(false) .withArgument( abuilder.withName("o").withMinimum(1).withMaximum(1) .create()) .withDescription("output directory option").create(); final Group group = gbuilder.withName("Options") .withOption(inputFileOpt).withOption(outputDirOpt).create(); System.out.println(group); try { final Parser parser = new Parser(); parser.setGroup(group); final CommandLine cmdLine = parser.parse(args); if (cmdLine.hasOption("help")) { CommandLineUtil.printHelp(group); return -1; } this.inputFile = cmdLine.getValue(inputFileOpt).toString().trim(); this.outputName = cmdLine.getValue(outputDirOpt).toString().trim(); System.out.println("input file path::" + this.inputFile); System.out.println("output Directory::" + this.outputName); } catch (final OptionException e) { System.err.println("Exception : " + e); CommandLineUtil.printHelp(group); return -1; } final String parentDirectory = "Average"; final String baseDirectory = this.outputName + "/" + parentDirectory; if (!FileSystem.get(new Configuration()).isDirectory( new Path(baseDirectory))) { FileSystem.get(new Configuration()).mkdirs(new Path(baseDirectory)); } final Job job = new Job(this.getConf()); job.setJobName("AverageProcess"); job.setJarByClass(AverageMRC.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(1); job.setMapperClass(AverageMRC.AverageMap.class); job.setReducerClass(AverageMRC.Reduce.class); job.setCombinerClass(AverageMRC.Combiner.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); DistributedCache.addCacheFile(new Path(this.inputFile).toUri(), job.getConfiguration()); final String averageoutputPath = baseDirectory + "/" + this.inputDirectoryPrefix; if (FileSystem.get(new Configuration()).isDirectory( new Path(averageoutputPath))) { FileSystem.get(new Configuration()).delete(new Path(baseDirectory), true); } FileInputFormat.setInputPaths(job, new Path(this.inputFile)); FileOutputFormat.setOutputPath(job, new Path(averageoutputPath)); final boolean status = job.waitForCompletion(true); if (!status) { System.out.println("Could Not create the dictionary file"); return -1; } if (this.cpFilestoUserLocation(baseDirectory + "/" + this.averagefilename, averageoutputPath + "/" + "part-r-00000")) { System.out.println("Average File Copied at " + baseDirectory + "/" + this.averagefilename); } return 0; } @Override public void setConf(final Configuration conf) { this.conf = conf; } }
2. AverageMRC.java - Map Reduce & Combiner class
package com.self.average.mapreduce; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; public class AverageMRC { public class AverageMap extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(final LongWritable key, final Text value, final Context context) throws IOException, InterruptedException { context.write(new Text("MAPPER"), value); }; } public class Combiner extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException { Integer count = 0; Double sum = 0D; final Iterator<Text> itr = values.iterator(); while (itr.hasNext()) { final String text = itr.next().toString(); final Double value = Double.parseDouble(text); count++; sum += value; } final Double average = sum / count; context.write(new Text("A_C"), new Text(average + "_" + count)); }; } public class Reduce extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException { Double sum = 0D; Integer totalcount = 0; final Iterator<Text> itr = values.iterator(); while (itr.hasNext()) { final String text = itr.next().toString(); final String[] tokens = text.split("_"); final Double average = Double.parseDouble(tokens[0]); final Integer count = Integer.parseInt(tokens[1]); sum += (average * count); totalcount += count; } final Double average = sum / totalcount; context.write(new Text("AVERAGE"), new Text(average.toString())); }; } }
Hi.. i am getting error at this.inputFile = cmdLine.getValue(inputFileOpt).toString().trim();
ReplyDeletecan you please help me to solve the error.
Can you please share, how are you executing this? with what parameters?
Deletewhat is the use of pom.xml file?
ReplyDeleteWhy are we computing average twice here?? Instead we can compute the average in reducer class itself??
ReplyDeletewhy r we repeating avg calculation both in combiner n reducer plz explain
ReplyDeletedoes any body has the simple code for average find....? please attach here
ReplyDeleteimport java.io.IOException;
ReplyDeleteimport java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class GlobalNumberAverage {
public static class GlobalNumberAverageMapper
extends Mapper < Object, Text, Text, DoubleWritable > {
int sum = 0;
int count = 0;
public void map(Object key, Text value, Mapper.Context context) throws IOException,
InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String str = itr.nextToken();
sum = sum + Integer.parseInt(str);
count += 1;
}
}
public void cleanup(Mapper.Context context) throws IOException,
InterruptedException {
context.write(new Text("Average of numbers is"), new DoubleWritable(sum / count));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Global Average of Numbers");
job.setJarByClass(GlobalNumberAverage.class);
job.setMapperClass(GlobalNumberAverageMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
This comment has been removed by the author.
ReplyDeleteI have just started to work on big data project and i came across map -reduce. so i came to article searching for average of no using combiners with map- reduce.
ReplyDeletethanks for the post. I was searching for similar logic for average on internet.