There are scenarios when we want to create a Map Reduce Job which uses:
- 3rd Party Jars Code reference in Mapper or Reducer
- These 3rd Party Jars uses large Data for reference
- These 3rd Party Jars also executes native Code like C/C++ (.so) files
Above all the job is executed from remote server (not from hadoop namenode).
So in the given scenario, its important that:
- The Map/Reduce Job jar is available to all DataNodes
- The 3rd Party Jars & native files are also available at DataNodes
- The Reference Data used by 3rd Party jars is also made available to DataNodes
The overall design should be something like:
***The Code is applicable to Hadoop 1.x (Tested on Apache Hadoop 1.2.1), for Hadoop 2.x changes would be required.
The Maven POM should contain following dependency:
The Maven POM should contain following dependency:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.1</version> </dependency>
In order to achieve the same most of the things are configured in the Driver class itself:
- Define the following Hadoop Cluster parameters in the Hadoop Job Configuration
- final Configuration conf = new Configuration();
- conf.set("mapred.job.tracker", "<job tracker address>"); // x.x.x.x:9001
- conf.set("fs.default.name", "<file server name>"); // hdfs://x.x.x.x:9000
- conf.set("hadoop.tmp.dir", "<tmp file address>"); // /home/user/Data/tmp
- conf.set("dfs.datanode.data.dir", "<datanode directory>"); // file:/home/user/Data/datanode
- Create a Fat Jar file which would contain the 3rd Party Jar file, Mapper Code, Reducer Code (if applicable), Driver Code etc. Define the following component in the Driver class:
- conf.set("mapred.jar", "<path of the fat jar file>");
- Pass the 3rd Party native (.so) files as Distributed Cache files as arguments
- final String[] arguments = new String[]{"-files", "file:///E:/libctr.so"};
- final int res = ToolRunner.run(conf, new CustomTool(conf), arguments);
Apart from that user need to make sure that the DataNodes have access to the 3rd Party Data as local data (User can store all data in one machine & then use NFS in linux to mount the data to other machines)
The Exact Driver code looks like:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ToolRunner; import java.io.File; import java.security.PrivilegedExceptionAction; public class CustomDriver { /** * @param args * @throws Exception */ public static void main(final String[] args) throws Exception { // Create a remote user Instance final UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hduser"); //hduser is the hadoop cluster user ugi.doAs(new PrivilegedExceptionAction<Boolean>() { @Override public Boolean run() throws Exception { final Configuration conf = new Configuration(); // Set Hadoop 1.x Cluster Parameters conf.set("mapred.job.tracker", "<job tracker address>"); // x.x.x.x:9001 conf.set("fs.default.name", "<file server name>"); // hdfs://x.x.x.x:9000 conf.set("hadoop.tmp.dir", "<tmp file address>"); // /home/user/Data/tmp conf.set("dfs.datanode.data.dir", "<datanode directory>"); //Set Fat Jar file Path conf.set("mapred.jar", "<path of the fat jar file>");//E:\jar\flatJar.jar //Set the native file as Distributed Cache files final String[] arguments = new String[]{"-files", "file:///E:/libctr.so"}; // Initate the Tool Runner final int res = ToolRunner.run(conf, new CustomTool(conf), arguments); System.exit(res); return true; } }); } }
The CustomTool code looks like a simple with not much changes:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; public class CustomTool extends Configured implements Tool { private Configuration configuration; public CustomTool() { } public CustomTool(final Configuration conf) { setConf(conf); } @Override public Configuration getConf() { return this.configuration; } @Override public void setConf(final Configuration conf) { this.configuration = conf; } @Override public int run(final String[] arg0) throws Exception { final Job job = new Job(getConf()); final String outputFolderPath = "/home/hduser/Custom-output"; final FileSystem hdfs = FileSystem.get(getConf()); if (hdfs.exists(new Path(outputFolderPath))) { hdfs.delete(new Path(outputFolderPath), true); } job.setJobName("Job Custom-" + System.currentTimeMillis()); job.setNumReduceTasks(0); job.setJarByClass(CustomMapper.class); job.setMapperClass(CustomMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("/home/hduser/Custom/inputfile.txt")); FileOutputFormat.setOutputPath(job, new Path(outputFolderPath)); job.waitForCompletion(true); return 0; } }
The Mapper Code is again simple is used to Invoke the 3rd Party Library Code & include the native (.so) files.
import com.Custom.javademo.CustomThirdPartyCode; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.net.URI; public class CustomMapper extends Mapper<LongWritable, Text, NullWritable, NullWritable> { @Override protected void setup(final Context context) throws IOException, InterruptedException { final Configuration conf = context.getConfiguration(); URI[] cacheFiles = null; cacheFiles = DistributedCache.getCacheFiles(conf); System.out.println("cacheFiles:" + cacheFiles); if (cacheFiles != null) { System.out.println("cacheFiles:" + cacheFiles.length); for (final URI cacheFile : cacheFiles) { System.out.println(cacheFile.toString()); } } }; @Override protected void map(final LongWritable key, final Text val, final Context context) throws java.io.IOException, InterruptedException { System.out.println("Invoked for record ==========>" + val.toString()); final String[] args = new String[1]; args[0] = "/home/hduser/Custom/data"; //works, if the data is accessible to the datanode CustomThirdPartyCode.main(args,val.toString()); }; }
HI Pankaj..its very nice thanks for sharing..I have one difficulty can you help me out?
ReplyDeleteI have once CSV file in that i want to make histogram for those fields who were having numerical fields..
I want to know how we can access specific columns from CSV..given as an input..