Monday 19 August 2013

Calculate Average in Map Reduce using Combiners

A program that calculates the average of given numerics using Map Reduce Combiner technique:

Please find the below figure which explains the concept in detail:



The code includes:
1. AverageDriver.java - Driver class
package com.self.average.driver;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.commons.cli2.CommandLine;
import org.apache.commons.cli2.Group;
import org.apache.commons.cli2.Option;
import org.apache.commons.cli2.OptionException;
import org.apache.commons.cli2.builder.ArgumentBuilder;
import org.apache.commons.cli2.builder.DefaultOptionBuilder;
import org.apache.commons.cli2.builder.GroupBuilder;
import org.apache.commons.cli2.commandline.Parser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.CommandLineUtil;
import org.apache.tools.ant.util.FileUtils;

import com.self.average.mapreduce.AverageMRC;

public class AverageDriver extends Configured implements Tool {

  public static void main(final String[] args) throws Exception {

   final int res = ToolRunner.run(new Configuration(),
    new AverageDriver(), args);
  System.exit(res);
 }

  private Configuration conf;

  private String inputFile;
 private String outputName;

  private final String inputDirectoryPrefix = "Average";

  private final String averagefilename = "average.txt";

  public boolean cpFilestoUserLocation(final String useroutputfile,
   final String systemfile) throws IOException {
  System.out.println("Copy file to user location");
  System.out.println("useroutputfile::" + useroutputfile);
  System.out.println("systemfile::" + systemfile);
  final FileUtils fu = FileUtils.getFileUtils();
  fu.copyFile(systemfile, useroutputfile, null, true);

   return true;
 }

  @Override
 public Configuration getConf() {
  return this.conf;
 }

  public final String getDateTime() {
  final DateFormat df = new SimpleDateFormat("yyyy-MM-dd_hh-mm-ss");
  return df.format(new Date());
 }

  @SuppressWarnings("deprecation")
 @Override
 public int run(final String[] args) throws Exception {
  final DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
  final ArgumentBuilder abuilder = new ArgumentBuilder();
  final GroupBuilder gbuilder = new GroupBuilder();

   final Option inputFileOpt = obuilder
    .withLongName("input")
    .withShortName("input")
    .withRequired(false)
    .withArgument(
      abuilder.withName("input").withMinimum(1)
        .withMaximum(1).create())
    .withDescription("input file").create();

   final Option outputDirOpt = obuilder
    .withLongName("o")
    .withShortName("o")
    .withRequired(false)
    .withArgument(
      abuilder.withName("o").withMinimum(1).withMaximum(1)
        .create())
    .withDescription("output directory option").create();

   final Group group = gbuilder.withName("Options")
    .withOption(inputFileOpt).withOption(outputDirOpt).create();

   System.out.println(group);

   try {
   final Parser parser = new Parser();
   parser.setGroup(group);
   final CommandLine cmdLine = parser.parse(args);

    if (cmdLine.hasOption("help")) {
    CommandLineUtil.printHelp(group);
    return -1;
   }

    this.inputFile = cmdLine.getValue(inputFileOpt).toString().trim();
   this.outputName = cmdLine.getValue(outputDirOpt).toString().trim();

    System.out.println("input file path::" + this.inputFile);
   System.out.println("output Directory::" + this.outputName);

   } catch (final OptionException e) {
   System.err.println("Exception : " + e);
   CommandLineUtil.printHelp(group);
   return -1;
  }

   final String parentDirectory = "Average";
  final String baseDirectory = this.outputName + "/" + parentDirectory;
  if (!FileSystem.get(new Configuration()).isDirectory(
    new Path(baseDirectory))) {
   FileSystem.get(new Configuration()).mkdirs(new Path(baseDirectory));
  }

   final Job job = new Job(this.getConf());
  job.setJobName("AverageProcess");
  job.setJarByClass(AverageMRC.class);

   job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

   job.setNumReduceTasks(1);

   job.setMapperClass(AverageMRC.AverageMap.class);
  job.setReducerClass(AverageMRC.Reduce.class);
  job.setCombinerClass(AverageMRC.Combiner.class);

   job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

   DistributedCache.addCacheFile(new Path(this.inputFile).toUri(),
    job.getConfiguration());
  final String averageoutputPath = baseDirectory + "/"
    + this.inputDirectoryPrefix;

   if (FileSystem.get(new Configuration()).isDirectory(
    new Path(averageoutputPath))) {
   FileSystem.get(new Configuration()).delete(new Path(baseDirectory),
     true);
  }

   FileInputFormat.setInputPaths(job, new Path(this.inputFile));
  FileOutputFormat.setOutputPath(job, new Path(averageoutputPath));

   final boolean status = job.waitForCompletion(true);

   if (!status) {
   System.out.println("Could Not create the dictionary file");
   return -1;
  }

   if (this.cpFilestoUserLocation(baseDirectory + "/"
    + this.averagefilename, averageoutputPath + "/"
    + "part-r-00000")) {
   System.out.println("Average File Copied at " + baseDirectory + "/"
     + this.averagefilename);
  }

   return 0;
 }

  @Override
 public void setConf(final Configuration conf) {
  this.conf = conf;
 }

}


2. AverageMRC.java - Map Reduce & Combiner class
package com.self.average.mapreduce;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class AverageMRC {

 public class AverageMap extends Mapper<LongWritable, Text, Text, Text> {
  @Override
  protected void map(final LongWritable key, final Text value,
    final Context context) throws IOException, InterruptedException {
   context.write(new Text("MAPPER"), value);
  };
 }

 public class Combiner extends Reducer<Text, Text, Text, Text> {
  @Override
  protected void reduce(final Text key, final Iterable<Text> values,
    final Context context) throws IOException, InterruptedException {
   Integer count = 0;
   Double sum = 0D;
   final Iterator<Text> itr = values.iterator();
   while (itr.hasNext()) {
    final String text = itr.next().toString();
    final Double value = Double.parseDouble(text);
    count++;
    sum += value;
   }

   final Double average = sum / count;

   context.write(new Text("A_C"), new Text(average + "_" + count));
  };
 }

 public class Reduce extends Reducer<Text, Text, Text, Text> {
  @Override
  protected void reduce(final Text key, final Iterable<Text> values,
    final Context context) throws IOException, InterruptedException {
   Double sum = 0D;
   Integer totalcount = 0;
   final Iterator<Text> itr = values.iterator();
   while (itr.hasNext()) {
    final String text = itr.next().toString();
    final String[] tokens = text.split("_");
    final Double average = Double.parseDouble(tokens[0]);
    final Integer count = Integer.parseInt(tokens[1]);
    sum += (average * count);
    totalcount += count;
   }

   final Double average = sum / totalcount;

   context.write(new Text("AVERAGE"), new Text(average.toString()));
  };
 }
}





The Maven pom.xml file should contain

  <dependency>
   <artifactId>hadoop-core</artifactId>
   <groupId>org.apache.hadoop</groupId>
   <version>0.20.2</version>
   <scope>provided</scope>
  </dependency>
  <dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
   <version>1.7.5</version>
   <scope>provided</scope>
  </dependency>
  <dependency>
   <groupId>org.apache.mahout.commons</groupId>
   <artifactId>commons-cli</artifactId>
   <version>2.0-mahout</version>
  </dependency>
  <dependency>
   <groupId>org.apache.mahout</groupId>
   <artifactId>mahout-core</artifactId>
   <version>0.4</version>
  </dependency>
  <dependency>
   <groupId>org.apache.commons</groupId>
   <artifactId>commons-lang3</artifactId>
   <version>3.1</version>
  </dependency>

9 comments:

  1. Hi.. i am getting error at this.inputFile = cmdLine.getValue(inputFileOpt).toString().trim();
    can you please help me to solve the error.

    ReplyDelete
    Replies
    1. Can you please share, how are you executing this? with what parameters?

      Delete
  2. what is the use of pom.xml file?

    ReplyDelete
  3. Why are we computing average twice here?? Instead we can compute the average in reducer class itself??

    ReplyDelete
  4. why r we repeating avg calculation both in combiner n reducer plz explain

    ReplyDelete
  5. does any body has the simple code for average find....? please attach here

    ReplyDelete
  6. import java.io.IOException;
    import java.util.StringTokenizer;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class GlobalNumberAverage {

    public static class GlobalNumberAverageMapper
    extends Mapper < Object, Text, Text, DoubleWritable > {

    int sum = 0;
    int count = 0;

    public void map(Object key, Text value, Mapper.Context context) throws IOException,
    InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
    String str = itr.nextToken();
    sum = sum + Integer.parseInt(str);
    count += 1;
    }
    }

    public void cleanup(Mapper.Context context) throws IOException,
    InterruptedException {
    context.write(new Text("Average of numbers is"), new DoubleWritable(sum / count));
    }
    }

    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Global Average of Numbers");
    job.setJarByClass(GlobalNumberAverage.class);
    job.setMapperClass(GlobalNumberAverageMapper.class);
    job.setNumReduceTasks(0);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(DoubleWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    }

    ReplyDelete
  7. This comment has been removed by the author.

    ReplyDelete
  8. I have just started to work on big data project and i came across map -reduce. so i came to article searching for average of no using combiners with map- reduce.

    thanks for the post. I was searching for similar logic for average on internet.

    ReplyDelete