Friday, 4 January 2019

Apache PIG as a Client running in remote Hadoop Cluster


The intent is to use the Apache PIG API's to execute PIG Script from Client application on Remote Hadoop Cluster.

PIG Version : pig-0.12.0
Apache Hadoop Version : Hadoop 1.2.1

import java.io.IOException;
import java.util.Properties;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;



public class PigClient {

    public static void main (final String[] args) {
        PigServer pig;
        try {

            final Properties properties = new Properties ();
            properties.put ("fs.default.name", "hdfs://x.x.x.x:9000");
            pig = new PigServer (ExecType.MAPREDUCE, properties);
            System.out.println ("-----------Connected to Hadoop Server-------");
            pig.registerScript ("E:\\wordcount.pig");
        } catch (final ExecException e) {
            e.printStackTrace ();
        } catch (final IOException e) {
            e.printStackTrace ();
        }
    }
}

Wednesday, 7 January 2015

Gmail Actions - Schema.org

There is an interesting thing which I found today in one of my email in Gmail account which I thought of sharing.
Below you would see a mail from ‘Meru Cabs’  with subject as ‘Meru Booking Status: CONFIRMED’ & beside it there is a link which says ‘Modify reservation’ (indicating an external link)


(It might be that you already know,why & how it’s there but in case you don’t J)




The button is something which I found to be unusual as it does not appear in other mails but surely makes this mail to be different & ‘clickable’.
On clicking it I was redirected to Meru cabs site.

What made me curious was how to create such emails, so I googled it & landed on this developers page https://developers.google.com/gmail/markup/overview.
The answer to it is that Meru used Schema.org meta-data in their emails which are widely supported/rendered by Google/Yahoo/ in order to provide such actions.

On viewing the mail source code I found the script as:

<script type=3D"application/ld+json">
{
  "@context": "http://schema.org",
  "@type": "EventReservation",
  "reservationNumber": "33***2628",
  "reservationStatus": "http://schema.org/Confirmed",
  "underName": {
    "@type": "Person",
    "name": "Pankaj Khattar"
  },
  "reservationFor": {
    "@type": "Event",
    "name": "My Meru Cab Booking",
    "startDate": "2014-01-06T21:00:00+05:30",
    "location": { 
      "@type": "Place",
      "name": "NOIDA SECTOR",
      "address": {
        "@type": "PostalAddress",
        "streetAddress": "Sector, Noida, UP",
        "addressLocality": "Delhi",
        "addressRegion": "Delhi",
        "postalCode": "110001",
        "addressCountry": "India"
      }
    }
  },
  "modifiedTime": "2014-01-06T21:00:00+05:30",
  "modifyReservationUrl": "http://www.merucabs.com" 
}
</script>


The above script contains all the event information & hence informed Google about it, create that action button.
The developer page explains how to create such emails & add scripts in the mails.
Similar to above mail Gmail supports many other such actions like:
  • RSVP Action for events
  • Review Action for restaurants, movies, products and services
  • One-click Action for just about anything that can be performed with a single click
  • Go-to Action for more complex interactions
  • Flight interactive cards
Another important thing is that the event was also updated in the Google Calendar (there is another notification mail above that in the screenshot) & Google Now as well.
Hence the above actions actually helped in improving the user experience, and make it easy to take quick action.

Monday, 8 December 2014

Remotely Execute Map Reduce Job having 3rd Party Jars & Files


There are scenarios when we want to create a Map Reduce Job which uses:

  1. 3rd Party Jars Code reference in Mapper or Reducer
  2. These 3rd Party Jars uses large Data for reference
  3. These 3rd Party Jars also executes native Code like C/C++ (.so) files
Above all the job is executed from remote server (not from hadoop namenode).

So in the given scenario, its important that:
  1. The Map/Reduce Job jar is available to all DataNodes
  2. The 3rd Party Jars & native files are also available at DataNodes
  3. The Reference Data used by 3rd Party jars is also made available to DataNodes

The overall design should be something like:




***The Code is applicable to Hadoop 1.x (Tested on Apache Hadoop 1.2.1), for Hadoop 2.x changes would be required.

The Maven POM should contain following dependency:


<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-core</artifactId>
   <version>1.2.1</version>
</dependency>
In order to achieve the same most of the things are configured in the Driver class itself:
  • Define the following Hadoop Cluster parameters in the Hadoop Job Configuration
    • final Configuration conf = new Configuration();
    • conf.set("mapred.job.tracker", "<job tracker address>"); // x.x.x.x:9001
    • conf.set("fs.default.name", "<file server name>"); // hdfs://x.x.x.x:9000
    • conf.set("hadoop.tmp.dir", "<tmp file address>"); // /home/user/Data/tmp
    • conf.set("dfs.datanode.data.dir", "<datanode directory>"); // file:/home/user/Data/datanode
  • Create a Fat Jar file which would contain the 3rd Party Jar file, Mapper Code, Reducer Code (if applicable), Driver Code etc. Define the following component in the Driver class:
          • conf.set("mapred.jar", "<path of the fat jar file>");
        • Pass the 3rd Party native (.so) files as Distributed Cache files as arguments
          • final String[] arguments = new String[]{"-files", "file:///E:/libctr.so"};
          • final int res = ToolRunner.run(conf, new CustomTool(conf), arguments);

        Apart from that user need to make sure that the DataNodes have access to the 3rd Party Data as local data (User can store all data in one machine & then use NFS in linux to mount the data to other machines)

        The Exact Driver code looks like:

        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.security.UserGroupInformation;
        import org.apache.hadoop.util.ToolRunner;
        
        import java.io.File;
        import java.security.PrivilegedExceptionAction;
        
        public class CustomDriver
        {
          /**
           * @param args
           * @throws Exception
           */
          public static void main(final String[] args) throws Exception
          {
         
         // Create a remote user Instance
         final UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hduser"); //hduser is the hadoop cluster user
            ugi.doAs(new PrivilegedExceptionAction<Boolean>()
            {
              @Override
              public Boolean run() throws Exception
              {
                final Configuration conf = new Configuration();
                // Set Hadoop 1.x Cluster Parameters
          conf.set("mapred.job.tracker", "<job tracker address>"); // x.x.x.x:9001
          conf.set("fs.default.name", "<file server name>"); // hdfs://x.x.x.x:9000
          conf.set("hadoop.tmp.dir", "<tmp file address>"); // /home/user/Data/tmp
          conf.set("dfs.datanode.data.dir", "<datanode directory>"); 
          
          //Set Fat Jar file Path
          conf.set("mapred.jar", "<path of the fat jar file>");//E:\jar\flatJar.jar
          
          //Set the native file as Distributed Cache files
          final String[] arguments = new String[]{"-files", "file:///E:/libctr.so"};
          
          // Initate the Tool Runner
          final int res = ToolRunner.run(conf, new CustomTool(conf), arguments);
                System.exit(res);
                return true;
              }
            });
          }
        }
        



        The CustomTool code looks like a simple with not much changes:



        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.conf.Configured;
        import org.apache.hadoop.fs.FileSystem;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.NullWritable;
        import org.apache.hadoop.mapreduce.Job;
        import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
        import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
        import org.apache.hadoop.util.Tool;
        
        public class CustomTool extends Configured implements Tool
        {
        
          private Configuration configuration;
        
          public CustomTool()
          {
        
          }
        
          public CustomTool(final Configuration conf)
          {
            setConf(conf);
          }
        
          @Override
          public Configuration getConf()
          {
            return this.configuration;
          }
        
          @Override
          public void setConf(final Configuration conf)
          {
            this.configuration = conf;
          }
        
          @Override
          public int run(final String[] arg0) throws Exception
          {
            final Job job = new Job(getConf());
            final String outputFolderPath = "/home/hduser/Custom-output";
        
            final FileSystem hdfs = FileSystem.get(getConf());
            if (hdfs.exists(new Path(outputFolderPath)))
            {
              hdfs.delete(new Path(outputFolderPath), true);
            }
        
            job.setJobName("Job Custom-" + System.currentTimeMillis());
            job.setNumReduceTasks(0);
           
            job.setJarByClass(CustomMapper.class);
            job.setMapperClass(CustomMapper.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(NullWritable.class);
        
            FileInputFormat.setInputPaths(job, new Path("/home/hduser/Custom/inputfile.txt"));
            FileOutputFormat.setOutputPath(job, new Path(outputFolderPath));
            job.waitForCompletion(true);
            return 0;
          }
        }
        




        The Mapper Code is again simple is used to Invoke the 3rd Party Library Code & include the native (.so) files.




        import com.Custom.javademo.CustomThirdPartyCode;
        
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.filecache.DistributedCache;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.LongWritable;
        import org.apache.hadoop.io.NullWritable;
        import org.apache.hadoop.io.Text;
        import org.apache.hadoop.mapreduce.Mapper;
        
        import java.io.IOException;
        import java.net.URI;
        
        public class CustomMapper extends Mapper<LongWritable, Text, NullWritable, NullWritable>
        {
        
          @Override
          protected void setup(final Context context) throws IOException, InterruptedException
          {
            final Configuration conf = context.getConfiguration();
            URI[] cacheFiles = null;
            cacheFiles = DistributedCache.getCacheFiles(conf);
            System.out.println("cacheFiles:" + cacheFiles);
            if (cacheFiles != null)
            {
              System.out.println("cacheFiles:" + cacheFiles.length);
              for (final URI cacheFile : cacheFiles)
              {
                System.out.println(cacheFile.toString());
              }
            }
           
          };
        
          @Override
          protected void map(final LongWritable key, final Text val, final Context context) throws java.io.IOException, InterruptedException
          {
            System.out.println("Invoked for record ==========>" + val.toString());
            final String[] args = new String[1];
            args[0] = "/home/hduser/Custom/data"; //works, if the data is accessible to the datanode
            CustomThirdPartyCode.main(args,val.toString());
          };
        }
        

        Wednesday, 9 July 2014

        Multi-Tenancy


        What is Multi-Tenancy ?
        • A single instance of the software runs on a server, serving multiple client  organizations (tenants) 
        • Designed to virtually partition its data and configuration 
        • Essential attribute of Cloud Computing 
        What's a Tenant ?
        • Tenant is "my user who has her own users"
        • Multi-tenancy is not to the tenant's advantage instead its for the Multi-tenancy provider
        • Tenant would always prefer an environment which is isolated from other tenants


        Levels in Multi-tenancy
        Multitenancy can be introduced in either of 2 levels:
        1. Hypervisor level Isolation
        2. DB level Isolation

        Hypervisor level Isolation
        • Maps the physical machine to a virtualized machine
        • Hypervisor allows to partition the hardware into finer granularity 
        • improve the efficiency by having more tenants running on the same physical machine 
        • Provides cleanest separation 
        • Less security concerns 
        • Easier cloud adoption
        • virtualization introduces a certain % of overheadvirtualization introduces a certain % of overhead 

        DB level Isolation
        • Re-architect the underlying data layer  
        • Computing resources and application code shared between all the tenants on a server 
        • Introduce distributed and partitioned DB 
        • Degree of isolating is as good as the rewritten query 
        • Approaches for DB level Isolation:
        • Separated Databases
        • Shared Database, Separate Schemas
        • Shared Database, Shared Schema
        • No VM overhead 

        Types of DB level Isolation:
        1. Separated Databases
        2. Shared Database, Separate Schemas
        3. Shared Database, Shared Schemas

        Separated Databases




        • Each tenant has its own set of data, isolated from others
        • Metadata associates each database with the correct tenant 
        • Easy to extend the application's data model to meet tenants' individual needs 
        • Higher costs for hardware & maintaining equipment and backing up tenant data 


        Shared Database, Separate Schemas




        • Multiple tenants in the same database 
        • Each tenant having its database schema
        • Moderate degree of logical data isolation
        • Tenant data is harder to restore in the event of a failure 
        • Appropriate for small number of tables per database

        Shared Database, Shared Schemas




        • Same database & the same set of tables to host multiple tenants' data
        • Introduce an extra attribute "tenantId" in every table 
        • Append a "where tenantId = $thisTenantId" in every query 
        • Lowest hardware and backup costs 
        • Additional development effort in the area of security 
        • Small number of servers required


        Virtualization vs Data Partitioning

         

        VirtualizationData Partitioning
        Type of Implementation
        SimpleComplex
        Nature
        Multiple instances of the application and database servers on the same hardware as per the number of Tenants
        Single instance of the application for all the tenants with a shared database schema
        Architecture Changes
        NoYes
        Extension
        Each tenant can have its own extension of the code and database schema
        Difficult to Maintain
        Handling custom extensions for each tenant can be harder to implement.
        Easy to Maintain
        H/W Requirement
        Very HighVery Less
        Cost ( Dev. + Service)
        Very HighLess
        Multi-tenantNot 100%100%
        RecommendedNoYes



        Hadoop - A Framework for Data Intensive Computing Applications



        What does it do?
        • Hadoop implements Google’s MapReduce, using HDFS
        • MapReduce divides applications into many small blocks of work. 
        • HDFS creates multiple replicas of data blocks for reliability, placing them on compute nodes around the cluster. 
        • MapReduce can then process the data where it is located. 
        • Hadoop ‘s target is to run on clusters of the order of 10,000-nodes.


        Hadoop: Assumptions
        • It is written with large clusters of computers in mind and is built around the following assumptions:
        • Hardware will fail.
        •  Processing will be run in batches. Thus there is an emphasis on high throughput as opposed to low latency.
        • Applications that run on HDFS have large data sets. A typical file in HDFS is gigabytes to terabytes in size. 
        • It should provide high aggregate data bandwidth and scale to hundreds of nodes in a single cluster. It should support tens of millions of files in a single instance.
        • Applications need a write-once-read-many access model. 
        • Moving Computation is Cheaper than Moving Data. 
        •  Portability is important.

        Example Applications and Organizations using Hadoop
        • A9.com – Amazon: To build Amazon's product search indices; process millions of sessions daily for analytics, using both the Java and streaming APIs; clusters vary from 1 to 100 nodes. 
        • Yahoo! : More than 100,000 CPUs in ~20,000 computers running Hadoop; biggest cluster: 2000 nodes (2*4cpu boxes with 4TB disk each); used to support research for Ad Systems and Web Search 
        • AOL : Used for a variety of things ranging from statistics generation to running advanced algorithms for doing behavioral analysis and targeting; cluster size is 50 machines, Intel Xeon, dual processors, dual core, each with 16GB Ram and 800 GB hard-disk giving us a total of 37 TB HDFS capacity. 
        • Facebook: To store copies of internal log and dimension data sources and use it as a source for reporting/analytics and machine learning; 320 machine cluster with 2,560 cores and about 1.3 PB raw storage; 
        • FOX Interactive Media : 3 X 20 machine cluster (8 cores/machine, 2TB/machine storage) ; 10 machine cluster (8 cores/machine, 1TB/machine storage); Used for log analysis, data mining and machine learning 
        • University of Nebraska Lincoln: one medium-sized Hadoop cluster (200TB) to store and serve physics data;

        MapReduce Paradigm
        • Programming  model developed at Google
        • Sort/merge based distributed computing
        • Initially, it was intended for their internal search/indexing application, but now used extensively by more organizations (e.g., Yahoo, Amazon.com, IBM, etc.)
        • It is functional style programming (e.g., LISP) that is naturally parallelizable across  a large cluster of workstations or PCS.
        •  The underlying system takes care of the partitioning of the input data, scheduling the program’s execution across several machines, handling machine failures, and managing required inter-machine communication. (This is the key for Hadoop’s success)


        How does MapReduce work?
        • The run time partitions the input and provides it to different Map instances;
        • Map (key, value)  (key’, value’)
        • The run time collects the (key’, value’) pairs and distributes them to several Reduce functions so that each Reduce function gets the pairs with the same key’. 
        • Each Reduce produces a single (or zero) file output.
        • Map and Reduce are user written functions

        Hadoop Architecture




        MapReduce-Fault tolerance
        • Worker failure: The master pings every worker periodically. If no response is received from a worker in a certain amount of time, the master marks the worker as failed. Any map tasks completed by the worker are reset back to their initial idle state, and therefore become eligible for scheduling on other workers. Similarly, any map task or reduce task in progress on a failed worker is also reset to idle and becomes eligible for rescheduling.
        • Master Failure: It is easy to make the master write periodic checkpoints of the master data structures described above. If the master task dies, a new copy can be started from the last checkpointed state. However, in most cases, the user restarts the job.

        Mapping workers to Processors
        • The input data (on HDFS) is stored on the local disks of the machines in the cluster. HDFS divides each file into 64 MB blocks, and stores several copies of each block (typically 3 copies) on different machines. 
        • The MapReduce master takes the location information of the input files into account and attempts to schedule a map task on a machine that contains a replica of the corresponding input data. Failing that, it attempts to schedule a map task near a replica of that task's input data. When running large MapReduce operations on a significant fraction of the workers in a cluster, most input data is read locally and consumes no network bandwidth.


        Additional support functions
        • Partitioning function: The users of MapReduce specify the number of reduce tasks/output files that they desire (R). Data gets partitioned across these tasks using a partitioning function on the intermediate key. A default partitioning function is provided that uses hashing (e.g. .hash(key) mod R.). In some cases, it may be useful to partition data by some other function of the key. The user of the MapReduce library can provide a special partitioning function. 
        • Combiner function: User can specify a Combiner function that does partial merging of  the intermediate local disk data before it is sent over the network. The Combiner function is executed on each machine that performs a map task. Typically the same code is used to implement both the combiner and the reduce functions.


        Hadoop Distributed File System (HDFS)
        • The Hadoop Distributed File System (HDFS) is a distributed file system designed to run on commodity hardware. It has many similarities with existing distributed file systems. However, the differences from other distributed file systems are significant. 
        • highly fault-tolerant and is designed to be deployed on low-cost hardware. 
        • provides high throughput access to application data and is suitable for applications that have large data sets. 
        • relaxes a few POSIX requirements to enable streaming access to file system data. 
        • part of the Apache Hadoop Core project. The project URL is http://hadoop.apache.org/core/. 

        Hadoop Community
        • Hadoop Users
        1. Adobe
        2. Alibaba
        3. Amazon
        4. AOL
        5. Facebook
        6. Google
        7. IBM
        • Major Contributor
        1. Apache
        2. Cloudera
        3. Yahoo




        Monday, 19 August 2013

        Calculate Average in Map Reduce using Combiners

        A program that calculates the average of given numerics using Map Reduce Combiner technique:

        Please find the below figure which explains the concept in detail:



        The code includes:
        1. AverageDriver.java - Driver class
        package com.self.average.driver;
        
        import java.io.IOException;
        import java.text.DateFormat;
        import java.text.SimpleDateFormat;
        import java.util.Date;
        
        import org.apache.commons.cli2.CommandLine;
        import org.apache.commons.cli2.Group;
        import org.apache.commons.cli2.Option;
        import org.apache.commons.cli2.OptionException;
        import org.apache.commons.cli2.builder.ArgumentBuilder;
        import org.apache.commons.cli2.builder.DefaultOptionBuilder;
        import org.apache.commons.cli2.builder.GroupBuilder;
        import org.apache.commons.cli2.commandline.Parser;
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.conf.Configured;
        import org.apache.hadoop.filecache.DistributedCache;
        import org.apache.hadoop.fs.FileSystem;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.Text;
        import org.apache.hadoop.mapreduce.Job;
        import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
        import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
        import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
        import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
        import org.apache.hadoop.util.Tool;
        import org.apache.hadoop.util.ToolRunner;
        import org.apache.mahout.common.CommandLineUtil;
        import org.apache.tools.ant.util.FileUtils;
        
        import com.self.average.mapreduce.AverageMRC;
        
        public class AverageDriver extends Configured implements Tool {
        
          public static void main(final String[] args) throws Exception {
        
           final int res = ToolRunner.run(new Configuration(),
            new AverageDriver(), args);
          System.exit(res);
         }
        
          private Configuration conf;
        
          private String inputFile;
         private String outputName;
        
          private final String inputDirectoryPrefix = "Average";
        
          private final String averagefilename = "average.txt";
        
          public boolean cpFilestoUserLocation(final String useroutputfile,
           final String systemfile) throws IOException {
          System.out.println("Copy file to user location");
          System.out.println("useroutputfile::" + useroutputfile);
          System.out.println("systemfile::" + systemfile);
          final FileUtils fu = FileUtils.getFileUtils();
          fu.copyFile(systemfile, useroutputfile, null, true);
        
           return true;
         }
        
          @Override
         public Configuration getConf() {
          return this.conf;
         }
        
          public final String getDateTime() {
          final DateFormat df = new SimpleDateFormat("yyyy-MM-dd_hh-mm-ss");
          return df.format(new Date());
         }
        
          @SuppressWarnings("deprecation")
         @Override
         public int run(final String[] args) throws Exception {
          final DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
          final ArgumentBuilder abuilder = new ArgumentBuilder();
          final GroupBuilder gbuilder = new GroupBuilder();
        
           final Option inputFileOpt = obuilder
            .withLongName("input")
            .withShortName("input")
            .withRequired(false)
            .withArgument(
              abuilder.withName("input").withMinimum(1)
                .withMaximum(1).create())
            .withDescription("input file").create();
        
           final Option outputDirOpt = obuilder
            .withLongName("o")
            .withShortName("o")
            .withRequired(false)
            .withArgument(
              abuilder.withName("o").withMinimum(1).withMaximum(1)
                .create())
            .withDescription("output directory option").create();
        
           final Group group = gbuilder.withName("Options")
            .withOption(inputFileOpt).withOption(outputDirOpt).create();
        
           System.out.println(group);
        
           try {
           final Parser parser = new Parser();
           parser.setGroup(group);
           final CommandLine cmdLine = parser.parse(args);
        
            if (cmdLine.hasOption("help")) {
            CommandLineUtil.printHelp(group);
            return -1;
           }
        
            this.inputFile = cmdLine.getValue(inputFileOpt).toString().trim();
           this.outputName = cmdLine.getValue(outputDirOpt).toString().trim();
        
            System.out.println("input file path::" + this.inputFile);
           System.out.println("output Directory::" + this.outputName);
        
           } catch (final OptionException e) {
           System.err.println("Exception : " + e);
           CommandLineUtil.printHelp(group);
           return -1;
          }
        
           final String parentDirectory = "Average";
          final String baseDirectory = this.outputName + "/" + parentDirectory;
          if (!FileSystem.get(new Configuration()).isDirectory(
            new Path(baseDirectory))) {
           FileSystem.get(new Configuration()).mkdirs(new Path(baseDirectory));
          }
        
           final Job job = new Job(this.getConf());
          job.setJobName("AverageProcess");
          job.setJarByClass(AverageMRC.class);
        
           job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(Text.class);
        
           job.setNumReduceTasks(1);
        
           job.setMapperClass(AverageMRC.AverageMap.class);
          job.setReducerClass(AverageMRC.Reduce.class);
          job.setCombinerClass(AverageMRC.Combiner.class);
        
           job.setInputFormatClass(TextInputFormat.class);
          job.setOutputFormatClass(TextOutputFormat.class);
        
           DistributedCache.addCacheFile(new Path(this.inputFile).toUri(),
            job.getConfiguration());
          final String averageoutputPath = baseDirectory + "/"
            + this.inputDirectoryPrefix;
        
           if (FileSystem.get(new Configuration()).isDirectory(
            new Path(averageoutputPath))) {
           FileSystem.get(new Configuration()).delete(new Path(baseDirectory),
             true);
          }
        
           FileInputFormat.setInputPaths(job, new Path(this.inputFile));
          FileOutputFormat.setOutputPath(job, new Path(averageoutputPath));
        
           final boolean status = job.waitForCompletion(true);
        
           if (!status) {
           System.out.println("Could Not create the dictionary file");
           return -1;
          }
        
           if (this.cpFilestoUserLocation(baseDirectory + "/"
            + this.averagefilename, averageoutputPath + "/"
            + "part-r-00000")) {
           System.out.println("Average File Copied at " + baseDirectory + "/"
             + this.averagefilename);
          }
        
           return 0;
         }
        
          @Override
         public void setConf(final Configuration conf) {
          this.conf = conf;
         }
        
        }
        


        2. AverageMRC.java - Map Reduce & Combiner class
        package com.self.average.mapreduce;
        
        import java.io.IOException;
        import java.util.Iterator;
        
        import org.apache.hadoop.io.LongWritable;
        import org.apache.hadoop.io.Text;
        import org.apache.hadoop.mapreduce.Mapper;
        import org.apache.hadoop.mapreduce.Reducer;
        
        public class AverageMRC {
        
         public class AverageMap extends Mapper<LongWritable, Text, Text, Text> {
          @Override
          protected void map(final LongWritable key, final Text value,
            final Context context) throws IOException, InterruptedException {
           context.write(new Text("MAPPER"), value);
          };
         }
        
         public class Combiner extends Reducer<Text, Text, Text, Text> {
          @Override
          protected void reduce(final Text key, final Iterable<Text> values,
            final Context context) throws IOException, InterruptedException {
           Integer count = 0;
           Double sum = 0D;
           final Iterator<Text> itr = values.iterator();
           while (itr.hasNext()) {
            final String text = itr.next().toString();
            final Double value = Double.parseDouble(text);
            count++;
            sum += value;
           }
        
           final Double average = sum / count;
        
           context.write(new Text("A_C"), new Text(average + "_" + count));
          };
         }
        
         public class Reduce extends Reducer<Text, Text, Text, Text> {
          @Override
          protected void reduce(final Text key, final Iterable<Text> values,
            final Context context) throws IOException, InterruptedException {
           Double sum = 0D;
           Integer totalcount = 0;
           final Iterator<Text> itr = values.iterator();
           while (itr.hasNext()) {
            final String text = itr.next().toString();
            final String[] tokens = text.split("_");
            final Double average = Double.parseDouble(tokens[0]);
            final Integer count = Integer.parseInt(tokens[1]);
            sum += (average * count);
            totalcount += count;
           }
        
           final Double average = sum / totalcount;
        
           context.write(new Text("AVERAGE"), new Text(average.toString()));
          };
         }
        }
        





        The Maven pom.xml file should contain

          <dependency>
           <artifactId>hadoop-core</artifactId>
           <groupId>org.apache.hadoop</groupId>
           <version>0.20.2</version>
           <scope>provided</scope>
          </dependency>
          <dependency>
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-log4j12</artifactId>
           <version>1.7.5</version>
           <scope>provided</scope>
          </dependency>
          <dependency>
           <groupId>org.apache.mahout.commons</groupId>
           <artifactId>commons-cli</artifactId>
           <version>2.0-mahout</version>
          </dependency>
          <dependency>
           <groupId>org.apache.mahout</groupId>
           <artifactId>mahout-core</artifactId>
           <version>0.4</version>
          </dependency>
          <dependency>
           <groupId>org.apache.commons</groupId>
           <artifactId>commons-lang3</artifactId>
           <version>3.1</version>
          </dependency>