Monday, 8 December 2014

Remotely Execute Map Reduce Job having 3rd Party Jars & Files


There are scenarios when we want to create a Map Reduce Job which uses:

  1. 3rd Party Jars Code reference in Mapper or Reducer
  2. These 3rd Party Jars uses large Data for reference
  3. These 3rd Party Jars also executes native Code like C/C++ (.so) files
Above all the job is executed from remote server (not from hadoop namenode).

So in the given scenario, its important that:
  1. The Map/Reduce Job jar is available to all DataNodes
  2. The 3rd Party Jars & native files are also available at DataNodes
  3. The Reference Data used by 3rd Party jars is also made available to DataNodes

The overall design should be something like:




***The Code is applicable to Hadoop 1.x (Tested on Apache Hadoop 1.2.1), for Hadoop 2.x changes would be required.

The Maven POM should contain following dependency:


<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-core</artifactId>
   <version>1.2.1</version>
</dependency>
In order to achieve the same most of the things are configured in the Driver class itself:
  • Define the following Hadoop Cluster parameters in the Hadoop Job Configuration
    • final Configuration conf = new Configuration();
    • conf.set("mapred.job.tracker", "<job tracker address>"); // x.x.x.x:9001
    • conf.set("fs.default.name", "<file server name>"); // hdfs://x.x.x.x:9000
    • conf.set("hadoop.tmp.dir", "<tmp file address>"); // /home/user/Data/tmp
    • conf.set("dfs.datanode.data.dir", "<datanode directory>"); // file:/home/user/Data/datanode
  • Create a Fat Jar file which would contain the 3rd Party Jar file, Mapper Code, Reducer Code (if applicable), Driver Code etc. Define the following component in the Driver class:
          • conf.set("mapred.jar", "<path of the fat jar file>");
        • Pass the 3rd Party native (.so) files as Distributed Cache files as arguments
          • final String[] arguments = new String[]{"-files", "file:///E:/libctr.so"};
          • final int res = ToolRunner.run(conf, new CustomTool(conf), arguments);

        Apart from that user need to make sure that the DataNodes have access to the 3rd Party Data as local data (User can store all data in one machine & then use NFS in linux to mount the data to other machines)

        The Exact Driver code looks like:

        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.security.UserGroupInformation;
        import org.apache.hadoop.util.ToolRunner;
        
        import java.io.File;
        import java.security.PrivilegedExceptionAction;
        
        public class CustomDriver
        {
          /**
           * @param args
           * @throws Exception
           */
          public static void main(final String[] args) throws Exception
          {
         
         // Create a remote user Instance
         final UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hduser"); //hduser is the hadoop cluster user
            ugi.doAs(new PrivilegedExceptionAction<Boolean>()
            {
              @Override
              public Boolean run() throws Exception
              {
                final Configuration conf = new Configuration();
                // Set Hadoop 1.x Cluster Parameters
          conf.set("mapred.job.tracker", "<job tracker address>"); // x.x.x.x:9001
          conf.set("fs.default.name", "<file server name>"); // hdfs://x.x.x.x:9000
          conf.set("hadoop.tmp.dir", "<tmp file address>"); // /home/user/Data/tmp
          conf.set("dfs.datanode.data.dir", "<datanode directory>"); 
          
          //Set Fat Jar file Path
          conf.set("mapred.jar", "<path of the fat jar file>");//E:\jar\flatJar.jar
          
          //Set the native file as Distributed Cache files
          final String[] arguments = new String[]{"-files", "file:///E:/libctr.so"};
          
          // Initate the Tool Runner
          final int res = ToolRunner.run(conf, new CustomTool(conf), arguments);
                System.exit(res);
                return true;
              }
            });
          }
        }
        



        The CustomTool code looks like a simple with not much changes:



        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.conf.Configured;
        import org.apache.hadoop.fs.FileSystem;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.NullWritable;
        import org.apache.hadoop.mapreduce.Job;
        import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
        import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
        import org.apache.hadoop.util.Tool;
        
        public class CustomTool extends Configured implements Tool
        {
        
          private Configuration configuration;
        
          public CustomTool()
          {
        
          }
        
          public CustomTool(final Configuration conf)
          {
            setConf(conf);
          }
        
          @Override
          public Configuration getConf()
          {
            return this.configuration;
          }
        
          @Override
          public void setConf(final Configuration conf)
          {
            this.configuration = conf;
          }
        
          @Override
          public int run(final String[] arg0) throws Exception
          {
            final Job job = new Job(getConf());
            final String outputFolderPath = "/home/hduser/Custom-output";
        
            final FileSystem hdfs = FileSystem.get(getConf());
            if (hdfs.exists(new Path(outputFolderPath)))
            {
              hdfs.delete(new Path(outputFolderPath), true);
            }
        
            job.setJobName("Job Custom-" + System.currentTimeMillis());
            job.setNumReduceTasks(0);
           
            job.setJarByClass(CustomMapper.class);
            job.setMapperClass(CustomMapper.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(NullWritable.class);
        
            FileInputFormat.setInputPaths(job, new Path("/home/hduser/Custom/inputfile.txt"));
            FileOutputFormat.setOutputPath(job, new Path(outputFolderPath));
            job.waitForCompletion(true);
            return 0;
          }
        }
        




        The Mapper Code is again simple is used to Invoke the 3rd Party Library Code & include the native (.so) files.




        import com.Custom.javademo.CustomThirdPartyCode;
        
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.filecache.DistributedCache;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.LongWritable;
        import org.apache.hadoop.io.NullWritable;
        import org.apache.hadoop.io.Text;
        import org.apache.hadoop.mapreduce.Mapper;
        
        import java.io.IOException;
        import java.net.URI;
        
        public class CustomMapper extends Mapper<LongWritable, Text, NullWritable, NullWritable>
        {
        
          @Override
          protected void setup(final Context context) throws IOException, InterruptedException
          {
            final Configuration conf = context.getConfiguration();
            URI[] cacheFiles = null;
            cacheFiles = DistributedCache.getCacheFiles(conf);
            System.out.println("cacheFiles:" + cacheFiles);
            if (cacheFiles != null)
            {
              System.out.println("cacheFiles:" + cacheFiles.length);
              for (final URI cacheFile : cacheFiles)
              {
                System.out.println(cacheFile.toString());
              }
            }
           
          };
        
          @Override
          protected void map(final LongWritable key, final Text val, final Context context) throws java.io.IOException, InterruptedException
          {
            System.out.println("Invoked for record ==========>" + val.toString());
            final String[] args = new String[1];
            args[0] = "/home/hduser/Custom/data"; //works, if the data is accessible to the datanode
            CustomThirdPartyCode.main(args,val.toString());
          };
        }
        

        Wednesday, 9 July 2014

        Multi-Tenancy


        What is Multi-Tenancy ?
        • A single instance of the software runs on a server, serving multiple client  organizations (tenants) 
        • Designed to virtually partition its data and configuration 
        • Essential attribute of Cloud Computing 
        What's a Tenant ?
        • Tenant is "my user who has her own users"
        • Multi-tenancy is not to the tenant's advantage instead its for the Multi-tenancy provider
        • Tenant would always prefer an environment which is isolated from other tenants


        Levels in Multi-tenancy
        Multitenancy can be introduced in either of 2 levels:
        1. Hypervisor level Isolation
        2. DB level Isolation

        Hypervisor level Isolation
        • Maps the physical machine to a virtualized machine
        • Hypervisor allows to partition the hardware into finer granularity 
        • improve the efficiency by having more tenants running on the same physical machine 
        • Provides cleanest separation 
        • Less security concerns 
        • Easier cloud adoption
        • virtualization introduces a certain % of overheadvirtualization introduces a certain % of overhead 

        DB level Isolation
        • Re-architect the underlying data layer  
        • Computing resources and application code shared between all the tenants on a server 
        • Introduce distributed and partitioned DB 
        • Degree of isolating is as good as the rewritten query 
        • Approaches for DB level Isolation:
        • Separated Databases
        • Shared Database, Separate Schemas
        • Shared Database, Shared Schema
        • No VM overhead 

        Types of DB level Isolation:
        1. Separated Databases
        2. Shared Database, Separate Schemas
        3. Shared Database, Shared Schemas

        Separated Databases




        • Each tenant has its own set of data, isolated from others
        • Metadata associates each database with the correct tenant 
        • Easy to extend the application's data model to meet tenants' individual needs 
        • Higher costs for hardware & maintaining equipment and backing up tenant data 


        Shared Database, Separate Schemas




        • Multiple tenants in the same database 
        • Each tenant having its database schema
        • Moderate degree of logical data isolation
        • Tenant data is harder to restore in the event of a failure 
        • Appropriate for small number of tables per database

        Shared Database, Shared Schemas




        • Same database & the same set of tables to host multiple tenants' data
        • Introduce an extra attribute "tenantId" in every table 
        • Append a "where tenantId = $thisTenantId" in every query 
        • Lowest hardware and backup costs 
        • Additional development effort in the area of security 
        • Small number of servers required


        Virtualization vs Data Partitioning

         

        VirtualizationData Partitioning
        Type of Implementation
        SimpleComplex
        Nature
        Multiple instances of the application and database servers on the same hardware as per the number of Tenants
        Single instance of the application for all the tenants with a shared database schema
        Architecture Changes
        NoYes
        Extension
        Each tenant can have its own extension of the code and database schema
        Difficult to Maintain
        Handling custom extensions for each tenant can be harder to implement.
        Easy to Maintain
        H/W Requirement
        Very HighVery Less
        Cost ( Dev. + Service)
        Very HighLess
        Multi-tenantNot 100%100%
        RecommendedNoYes



        Hadoop - A Framework for Data Intensive Computing Applications



        What does it do?
        • Hadoop implements Google’s MapReduce, using HDFS
        • MapReduce divides applications into many small blocks of work. 
        • HDFS creates multiple replicas of data blocks for reliability, placing them on compute nodes around the cluster. 
        • MapReduce can then process the data where it is located. 
        • Hadoop ‘s target is to run on clusters of the order of 10,000-nodes.


        Hadoop: Assumptions
        • It is written with large clusters of computers in mind and is built around the following assumptions:
        • Hardware will fail.
        •  Processing will be run in batches. Thus there is an emphasis on high throughput as opposed to low latency.
        • Applications that run on HDFS have large data sets. A typical file in HDFS is gigabytes to terabytes in size. 
        • It should provide high aggregate data bandwidth and scale to hundreds of nodes in a single cluster. It should support tens of millions of files in a single instance.
        • Applications need a write-once-read-many access model. 
        • Moving Computation is Cheaper than Moving Data. 
        •  Portability is important.

        Example Applications and Organizations using Hadoop
        • A9.com – Amazon: To build Amazon's product search indices; process millions of sessions daily for analytics, using both the Java and streaming APIs; clusters vary from 1 to 100 nodes. 
        • Yahoo! : More than 100,000 CPUs in ~20,000 computers running Hadoop; biggest cluster: 2000 nodes (2*4cpu boxes with 4TB disk each); used to support research for Ad Systems and Web Search 
        • AOL : Used for a variety of things ranging from statistics generation to running advanced algorithms for doing behavioral analysis and targeting; cluster size is 50 machines, Intel Xeon, dual processors, dual core, each with 16GB Ram and 800 GB hard-disk giving us a total of 37 TB HDFS capacity. 
        • Facebook: To store copies of internal log and dimension data sources and use it as a source for reporting/analytics and machine learning; 320 machine cluster with 2,560 cores and about 1.3 PB raw storage; 
        • FOX Interactive Media : 3 X 20 machine cluster (8 cores/machine, 2TB/machine storage) ; 10 machine cluster (8 cores/machine, 1TB/machine storage); Used for log analysis, data mining and machine learning 
        • University of Nebraska Lincoln: one medium-sized Hadoop cluster (200TB) to store and serve physics data;

        MapReduce Paradigm
        • Programming  model developed at Google
        • Sort/merge based distributed computing
        • Initially, it was intended for their internal search/indexing application, but now used extensively by more organizations (e.g., Yahoo, Amazon.com, IBM, etc.)
        • It is functional style programming (e.g., LISP) that is naturally parallelizable across  a large cluster of workstations or PCS.
        •  The underlying system takes care of the partitioning of the input data, scheduling the program’s execution across several machines, handling machine failures, and managing required inter-machine communication. (This is the key for Hadoop’s success)


        How does MapReduce work?
        • The run time partitions the input and provides it to different Map instances;
        • Map (key, value)  (key’, value’)
        • The run time collects the (key’, value’) pairs and distributes them to several Reduce functions so that each Reduce function gets the pairs with the same key’. 
        • Each Reduce produces a single (or zero) file output.
        • Map and Reduce are user written functions

        Hadoop Architecture




        MapReduce-Fault tolerance
        • Worker failure: The master pings every worker periodically. If no response is received from a worker in a certain amount of time, the master marks the worker as failed. Any map tasks completed by the worker are reset back to their initial idle state, and therefore become eligible for scheduling on other workers. Similarly, any map task or reduce task in progress on a failed worker is also reset to idle and becomes eligible for rescheduling.
        • Master Failure: It is easy to make the master write periodic checkpoints of the master data structures described above. If the master task dies, a new copy can be started from the last checkpointed state. However, in most cases, the user restarts the job.

        Mapping workers to Processors
        • The input data (on HDFS) is stored on the local disks of the machines in the cluster. HDFS divides each file into 64 MB blocks, and stores several copies of each block (typically 3 copies) on different machines. 
        • The MapReduce master takes the location information of the input files into account and attempts to schedule a map task on a machine that contains a replica of the corresponding input data. Failing that, it attempts to schedule a map task near a replica of that task's input data. When running large MapReduce operations on a significant fraction of the workers in a cluster, most input data is read locally and consumes no network bandwidth.


        Additional support functions
        • Partitioning function: The users of MapReduce specify the number of reduce tasks/output files that they desire (R). Data gets partitioned across these tasks using a partitioning function on the intermediate key. A default partitioning function is provided that uses hashing (e.g. .hash(key) mod R.). In some cases, it may be useful to partition data by some other function of the key. The user of the MapReduce library can provide a special partitioning function. 
        • Combiner function: User can specify a Combiner function that does partial merging of  the intermediate local disk data before it is sent over the network. The Combiner function is executed on each machine that performs a map task. Typically the same code is used to implement both the combiner and the reduce functions.


        Hadoop Distributed File System (HDFS)
        • The Hadoop Distributed File System (HDFS) is a distributed file system designed to run on commodity hardware. It has many similarities with existing distributed file systems. However, the differences from other distributed file systems are significant. 
        • highly fault-tolerant and is designed to be deployed on low-cost hardware. 
        • provides high throughput access to application data and is suitable for applications that have large data sets. 
        • relaxes a few POSIX requirements to enable streaming access to file system data. 
        • part of the Apache Hadoop Core project. The project URL is http://hadoop.apache.org/core/. 

        Hadoop Community
        • Hadoop Users
        1. Adobe
        2. Alibaba
        3. Amazon
        4. AOL
        5. Facebook
        6. Google
        7. IBM
        • Major Contributor
        1. Apache
        2. Cloudera
        3. Yahoo