There are scenarios when we want to create a Map Reduce Job which uses:
- 3rd Party Jars Code reference in Mapper or Reducer
- These 3rd Party Jars uses large Data for reference
- These 3rd Party Jars also executes native Code like C/C++ (.so) files
Above all the job is executed from remote server (not from hadoop namenode).
So in the given scenario, its important that:
- The Map/Reduce Job jar is available to all DataNodes
- The 3rd Party Jars & native files are also available at DataNodes
- The Reference Data used by 3rd Party jars is also made available to DataNodes
The overall design should be something like:
***The Code is applicable to Hadoop 1.x (Tested on Apache Hadoop 1.2.1), for Hadoop 2.x changes would be required.
The Maven POM should contain following dependency:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
In order to achieve the same most of the things are configured in the Driver class itself:
- Define the following Hadoop Cluster parameters in the Hadoop Job Configuration
- final Configuration conf = new Configuration();
- conf.set("mapred.job.tracker", "<job tracker address>"); // x.x.x.x:9001
- conf.set("fs.default.name", "<file server name>"); // hdfs://x.x.x.x:9000
- conf.set("hadoop.tmp.dir", "<tmp file address>"); // /home/user/Data/tmp
- conf.set("dfs.datanode.data.dir", "<datanode directory>"); // file:/home/user/Data/datanode
- Create a Fat Jar file which would contain the 3rd Party Jar file, Mapper Code, Reducer Code (if applicable), Driver Code etc. Define the following component in the Driver class:
- conf.set("mapred.jar", "<path of the fat jar file>");
- Pass the 3rd Party native (.so) files as Distributed Cache files as arguments
- final String[] arguments = new String[]{"-files", "file:///E:/libctr.so"};
- final int res = ToolRunner.run(conf, new CustomTool(conf), arguments);
Apart from that user need to make sure that the DataNodes have access to the 3rd Party Data as local data (User can store all data in one machine & then use NFS in linux to mount the data to other machines)
The Exact Driver code looks like:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ToolRunner;
import java.io.File;
import java.security.PrivilegedExceptionAction;
public class CustomDriver
{
/**
* @param args
* @throws Exception
*/
public static void main(final String[] args) throws Exception
{
// Create a remote user Instance
final UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hduser"); //hduser is the hadoop cluster user
ugi.doAs(new PrivilegedExceptionAction<Boolean>()
{
@Override
public Boolean run() throws Exception
{
final Configuration conf = new Configuration();
// Set Hadoop 1.x Cluster Parameters
conf.set("mapred.job.tracker", "<job tracker address>"); // x.x.x.x:9001
conf.set("fs.default.name", "<file server name>"); // hdfs://x.x.x.x:9000
conf.set("hadoop.tmp.dir", "<tmp file address>"); // /home/user/Data/tmp
conf.set("dfs.datanode.data.dir", "<datanode directory>");
//Set Fat Jar file Path
conf.set("mapred.jar", "<path of the fat jar file>");//E:\jar\flatJar.jar
//Set the native file as Distributed Cache files
final String[] arguments = new String[]{"-files", "file:///E:/libctr.so"};
// Initate the Tool Runner
final int res = ToolRunner.run(conf, new CustomTool(conf), arguments);
System.exit(res);
return true;
}
});
}
}
The CustomTool code looks like a simple with not much changes:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
public class CustomTool extends Configured implements Tool
{
private Configuration configuration;
public CustomTool()
{
}
public CustomTool(final Configuration conf)
{
setConf(conf);
}
@Override
public Configuration getConf()
{
return this.configuration;
}
@Override
public void setConf(final Configuration conf)
{
this.configuration = conf;
}
@Override
public int run(final String[] arg0) throws Exception
{
final Job job = new Job(getConf());
final String outputFolderPath = "/home/hduser/Custom-output";
final FileSystem hdfs = FileSystem.get(getConf());
if (hdfs.exists(new Path(outputFolderPath)))
{
hdfs.delete(new Path(outputFolderPath), true);
}
job.setJobName("Job Custom-" + System.currentTimeMillis());
job.setNumReduceTasks(0);
job.setJarByClass(CustomMapper.class);
job.setMapperClass(CustomMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("/home/hduser/Custom/inputfile.txt"));
FileOutputFormat.setOutputPath(job, new Path(outputFolderPath));
job.waitForCompletion(true);
return 0;
}
}
The Mapper Code is again simple is used to Invoke the 3rd Party Library Code & include the native (.so) files.
import com.Custom.javademo.CustomThirdPartyCode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.net.URI;
public class CustomMapper extends Mapper<LongWritable, Text, NullWritable, NullWritable>
{
@Override
protected void setup(final Context context) throws IOException, InterruptedException
{
final Configuration conf = context.getConfiguration();
URI[] cacheFiles = null;
cacheFiles = DistributedCache.getCacheFiles(conf);
System.out.println("cacheFiles:" + cacheFiles);
if (cacheFiles != null)
{
System.out.println("cacheFiles:" + cacheFiles.length);
for (final URI cacheFile : cacheFiles)
{
System.out.println(cacheFile.toString());
}
}
};
@Override
protected void map(final LongWritable key, final Text val, final Context context) throws java.io.IOException, InterruptedException
{
System.out.println("Invoked for record ==========>" + val.toString());
final String[] args = new String[1];
args[0] = "/home/hduser/Custom/data"; //works, if the data is accessible to the datanode
CustomThirdPartyCode.main(args,val.toString());
};
}