Monday 8 December 2014

Remotely Execute Map Reduce Job having 3rd Party Jars & Files


There are scenarios when we want to create a Map Reduce Job which uses:

  1. 3rd Party Jars Code reference in Mapper or Reducer
  2. These 3rd Party Jars uses large Data for reference
  3. These 3rd Party Jars also executes native Code like C/C++ (.so) files
Above all the job is executed from remote server (not from hadoop namenode).

So in the given scenario, its important that:
  1. The Map/Reduce Job jar is available to all DataNodes
  2. The 3rd Party Jars & native files are also available at DataNodes
  3. The Reference Data used by 3rd Party jars is also made available to DataNodes

The overall design should be something like:




***The Code is applicable to Hadoop 1.x (Tested on Apache Hadoop 1.2.1), for Hadoop 2.x changes would be required.

The Maven POM should contain following dependency:


<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-core</artifactId>
   <version>1.2.1</version>
</dependency>
In order to achieve the same most of the things are configured in the Driver class itself:
  • Define the following Hadoop Cluster parameters in the Hadoop Job Configuration
    • final Configuration conf = new Configuration();
    • conf.set("mapred.job.tracker", "<job tracker address>"); // x.x.x.x:9001
    • conf.set("fs.default.name", "<file server name>"); // hdfs://x.x.x.x:9000
    • conf.set("hadoop.tmp.dir", "<tmp file address>"); // /home/user/Data/tmp
    • conf.set("dfs.datanode.data.dir", "<datanode directory>"); // file:/home/user/Data/datanode
  • Create a Fat Jar file which would contain the 3rd Party Jar file, Mapper Code, Reducer Code (if applicable), Driver Code etc. Define the following component in the Driver class:
          • conf.set("mapred.jar", "<path of the fat jar file>");
        • Pass the 3rd Party native (.so) files as Distributed Cache files as arguments
          • final String[] arguments = new String[]{"-files", "file:///E:/libctr.so"};
          • final int res = ToolRunner.run(conf, new CustomTool(conf), arguments);

        Apart from that user need to make sure that the DataNodes have access to the 3rd Party Data as local data (User can store all data in one machine & then use NFS in linux to mount the data to other machines)

        The Exact Driver code looks like:

        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.security.UserGroupInformation;
        import org.apache.hadoop.util.ToolRunner;
        
        import java.io.File;
        import java.security.PrivilegedExceptionAction;
        
        public class CustomDriver
        {
          /**
           * @param args
           * @throws Exception
           */
          public static void main(final String[] args) throws Exception
          {
         
         // Create a remote user Instance
         final UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hduser"); //hduser is the hadoop cluster user
            ugi.doAs(new PrivilegedExceptionAction<Boolean>()
            {
              @Override
              public Boolean run() throws Exception
              {
                final Configuration conf = new Configuration();
                // Set Hadoop 1.x Cluster Parameters
          conf.set("mapred.job.tracker", "<job tracker address>"); // x.x.x.x:9001
          conf.set("fs.default.name", "<file server name>"); // hdfs://x.x.x.x:9000
          conf.set("hadoop.tmp.dir", "<tmp file address>"); // /home/user/Data/tmp
          conf.set("dfs.datanode.data.dir", "<datanode directory>"); 
          
          //Set Fat Jar file Path
          conf.set("mapred.jar", "<path of the fat jar file>");//E:\jar\flatJar.jar
          
          //Set the native file as Distributed Cache files
          final String[] arguments = new String[]{"-files", "file:///E:/libctr.so"};
          
          // Initate the Tool Runner
          final int res = ToolRunner.run(conf, new CustomTool(conf), arguments);
                System.exit(res);
                return true;
              }
            });
          }
        }
        



        The CustomTool code looks like a simple with not much changes:



        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.conf.Configured;
        import org.apache.hadoop.fs.FileSystem;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.NullWritable;
        import org.apache.hadoop.mapreduce.Job;
        import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
        import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
        import org.apache.hadoop.util.Tool;
        
        public class CustomTool extends Configured implements Tool
        {
        
          private Configuration configuration;
        
          public CustomTool()
          {
        
          }
        
          public CustomTool(final Configuration conf)
          {
            setConf(conf);
          }
        
          @Override
          public Configuration getConf()
          {
            return this.configuration;
          }
        
          @Override
          public void setConf(final Configuration conf)
          {
            this.configuration = conf;
          }
        
          @Override
          public int run(final String[] arg0) throws Exception
          {
            final Job job = new Job(getConf());
            final String outputFolderPath = "/home/hduser/Custom-output";
        
            final FileSystem hdfs = FileSystem.get(getConf());
            if (hdfs.exists(new Path(outputFolderPath)))
            {
              hdfs.delete(new Path(outputFolderPath), true);
            }
        
            job.setJobName("Job Custom-" + System.currentTimeMillis());
            job.setNumReduceTasks(0);
           
            job.setJarByClass(CustomMapper.class);
            job.setMapperClass(CustomMapper.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(NullWritable.class);
        
            FileInputFormat.setInputPaths(job, new Path("/home/hduser/Custom/inputfile.txt"));
            FileOutputFormat.setOutputPath(job, new Path(outputFolderPath));
            job.waitForCompletion(true);
            return 0;
          }
        }
        




        The Mapper Code is again simple is used to Invoke the 3rd Party Library Code & include the native (.so) files.




        import com.Custom.javademo.CustomThirdPartyCode;
        
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.filecache.DistributedCache;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.LongWritable;
        import org.apache.hadoop.io.NullWritable;
        import org.apache.hadoop.io.Text;
        import org.apache.hadoop.mapreduce.Mapper;
        
        import java.io.IOException;
        import java.net.URI;
        
        public class CustomMapper extends Mapper<LongWritable, Text, NullWritable, NullWritable>
        {
        
          @Override
          protected void setup(final Context context) throws IOException, InterruptedException
          {
            final Configuration conf = context.getConfiguration();
            URI[] cacheFiles = null;
            cacheFiles = DistributedCache.getCacheFiles(conf);
            System.out.println("cacheFiles:" + cacheFiles);
            if (cacheFiles != null)
            {
              System.out.println("cacheFiles:" + cacheFiles.length);
              for (final URI cacheFile : cacheFiles)
              {
                System.out.println(cacheFile.toString());
              }
            }
           
          };
        
          @Override
          protected void map(final LongWritable key, final Text val, final Context context) throws java.io.IOException, InterruptedException
          {
            System.out.println("Invoked for record ==========>" + val.toString());
            final String[] args = new String[1];
            args[0] = "/home/hduser/Custom/data"; //works, if the data is accessible to the datanode
            CustomThirdPartyCode.main(args,val.toString());
          };
        }