Monday 19 August 2013

Calculate Average in Map Reduce using Combiners

A program that calculates the average of given numerics using Map Reduce Combiner technique:

Please find the below figure which explains the concept in detail:



The code includes:
1. AverageDriver.java - Driver class
package com.self.average.driver;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.commons.cli2.CommandLine;
import org.apache.commons.cli2.Group;
import org.apache.commons.cli2.Option;
import org.apache.commons.cli2.OptionException;
import org.apache.commons.cli2.builder.ArgumentBuilder;
import org.apache.commons.cli2.builder.DefaultOptionBuilder;
import org.apache.commons.cli2.builder.GroupBuilder;
import org.apache.commons.cli2.commandline.Parser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.CommandLineUtil;
import org.apache.tools.ant.util.FileUtils;

import com.self.average.mapreduce.AverageMRC;

public class AverageDriver extends Configured implements Tool {

  public static void main(final String[] args) throws Exception {

   final int res = ToolRunner.run(new Configuration(),
    new AverageDriver(), args);
  System.exit(res);
 }

  private Configuration conf;

  private String inputFile;
 private String outputName;

  private final String inputDirectoryPrefix = "Average";

  private final String averagefilename = "average.txt";

  public boolean cpFilestoUserLocation(final String useroutputfile,
   final String systemfile) throws IOException {
  System.out.println("Copy file to user location");
  System.out.println("useroutputfile::" + useroutputfile);
  System.out.println("systemfile::" + systemfile);
  final FileUtils fu = FileUtils.getFileUtils();
  fu.copyFile(systemfile, useroutputfile, null, true);

   return true;
 }

  @Override
 public Configuration getConf() {
  return this.conf;
 }

  public final String getDateTime() {
  final DateFormat df = new SimpleDateFormat("yyyy-MM-dd_hh-mm-ss");
  return df.format(new Date());
 }

  @SuppressWarnings("deprecation")
 @Override
 public int run(final String[] args) throws Exception {
  final DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
  final ArgumentBuilder abuilder = new ArgumentBuilder();
  final GroupBuilder gbuilder = new GroupBuilder();

   final Option inputFileOpt = obuilder
    .withLongName("input")
    .withShortName("input")
    .withRequired(false)
    .withArgument(
      abuilder.withName("input").withMinimum(1)
        .withMaximum(1).create())
    .withDescription("input file").create();

   final Option outputDirOpt = obuilder
    .withLongName("o")
    .withShortName("o")
    .withRequired(false)
    .withArgument(
      abuilder.withName("o").withMinimum(1).withMaximum(1)
        .create())
    .withDescription("output directory option").create();

   final Group group = gbuilder.withName("Options")
    .withOption(inputFileOpt).withOption(outputDirOpt).create();

   System.out.println(group);

   try {
   final Parser parser = new Parser();
   parser.setGroup(group);
   final CommandLine cmdLine = parser.parse(args);

    if (cmdLine.hasOption("help")) {
    CommandLineUtil.printHelp(group);
    return -1;
   }

    this.inputFile = cmdLine.getValue(inputFileOpt).toString().trim();
   this.outputName = cmdLine.getValue(outputDirOpt).toString().trim();

    System.out.println("input file path::" + this.inputFile);
   System.out.println("output Directory::" + this.outputName);

   } catch (final OptionException e) {
   System.err.println("Exception : " + e);
   CommandLineUtil.printHelp(group);
   return -1;
  }

   final String parentDirectory = "Average";
  final String baseDirectory = this.outputName + "/" + parentDirectory;
  if (!FileSystem.get(new Configuration()).isDirectory(
    new Path(baseDirectory))) {
   FileSystem.get(new Configuration()).mkdirs(new Path(baseDirectory));
  }

   final Job job = new Job(this.getConf());
  job.setJobName("AverageProcess");
  job.setJarByClass(AverageMRC.class);

   job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

   job.setNumReduceTasks(1);

   job.setMapperClass(AverageMRC.AverageMap.class);
  job.setReducerClass(AverageMRC.Reduce.class);
  job.setCombinerClass(AverageMRC.Combiner.class);

   job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

   DistributedCache.addCacheFile(new Path(this.inputFile).toUri(),
    job.getConfiguration());
  final String averageoutputPath = baseDirectory + "/"
    + this.inputDirectoryPrefix;

   if (FileSystem.get(new Configuration()).isDirectory(
    new Path(averageoutputPath))) {
   FileSystem.get(new Configuration()).delete(new Path(baseDirectory),
     true);
  }

   FileInputFormat.setInputPaths(job, new Path(this.inputFile));
  FileOutputFormat.setOutputPath(job, new Path(averageoutputPath));

   final boolean status = job.waitForCompletion(true);

   if (!status) {
   System.out.println("Could Not create the dictionary file");
   return -1;
  }

   if (this.cpFilestoUserLocation(baseDirectory + "/"
    + this.averagefilename, averageoutputPath + "/"
    + "part-r-00000")) {
   System.out.println("Average File Copied at " + baseDirectory + "/"
     + this.averagefilename);
  }

   return 0;
 }

  @Override
 public void setConf(final Configuration conf) {
  this.conf = conf;
 }

}


2. AverageMRC.java - Map Reduce & Combiner class
package com.self.average.mapreduce;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class AverageMRC {

 public class AverageMap extends Mapper<LongWritable, Text, Text, Text> {
  @Override
  protected void map(final LongWritable key, final Text value,
    final Context context) throws IOException, InterruptedException {
   context.write(new Text("MAPPER"), value);
  };
 }

 public class Combiner extends Reducer<Text, Text, Text, Text> {
  @Override
  protected void reduce(final Text key, final Iterable<Text> values,
    final Context context) throws IOException, InterruptedException {
   Integer count = 0;
   Double sum = 0D;
   final Iterator<Text> itr = values.iterator();
   while (itr.hasNext()) {
    final String text = itr.next().toString();
    final Double value = Double.parseDouble(text);
    count++;
    sum += value;
   }

   final Double average = sum / count;

   context.write(new Text("A_C"), new Text(average + "_" + count));
  };
 }

 public class Reduce extends Reducer<Text, Text, Text, Text> {
  @Override
  protected void reduce(final Text key, final Iterable<Text> values,
    final Context context) throws IOException, InterruptedException {
   Double sum = 0D;
   Integer totalcount = 0;
   final Iterator<Text> itr = values.iterator();
   while (itr.hasNext()) {
    final String text = itr.next().toString();
    final String[] tokens = text.split("_");
    final Double average = Double.parseDouble(tokens[0]);
    final Integer count = Integer.parseInt(tokens[1]);
    sum += (average * count);
    totalcount += count;
   }

   final Double average = sum / totalcount;

   context.write(new Text("AVERAGE"), new Text(average.toString()));
  };
 }
}





The Maven pom.xml file should contain

  <dependency>
   <artifactId>hadoop-core</artifactId>
   <groupId>org.apache.hadoop</groupId>
   <version>0.20.2</version>
   <scope>provided</scope>
  </dependency>
  <dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
   <version>1.7.5</version>
   <scope>provided</scope>
  </dependency>
  <dependency>
   <groupId>org.apache.mahout.commons</groupId>
   <artifactId>commons-cli</artifactId>
   <version>2.0-mahout</version>
  </dependency>
  <dependency>
   <groupId>org.apache.mahout</groupId>
   <artifactId>mahout-core</artifactId>
   <version>0.4</version>
  </dependency>
  <dependency>
   <groupId>org.apache.commons</groupId>
   <artifactId>commons-lang3</artifactId>
   <version>3.1</version>
  </dependency>

Word Chain in Map Reduce

A program that solves the word-chain problem using Map Reduce technique:


Word chain is chain of dictionary words where successive entries in a chain differ from the previous word by just one letter.
For example, you can get from "cat" to "dog" using the following chain.
cat -> cot -> cog -> dog
The challenge is to write a program that accepts start and end words of the chain and using the words from a dictionary build the shortest word chain between them.

The attached dictionary for the universe of the words is to be used.
The code has been written in Java using Map Reduce Library.

There are 3 files defined for this purpose:
1. Driver file - WordChainDriver.java
2. 2 Mapper-Reduce files:
    i. DictMR : Map Reduce for processing the given Dictionary file
   ii. WordChainMR : Map Reduce for processing the given word chain
3.WordChainConstant file


The process is like:



The code includes:

WordChainDriver.java

package com.self.wordchain.driver;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.commons.cli2.CommandLine;
import org.apache.commons.cli2.Group;
import org.apache.commons.cli2.Option;
import org.apache.commons.cli2.OptionException;
import org.apache.commons.cli2.builder.ArgumentBuilder;
import org.apache.commons.cli2.builder.DefaultOptionBuilder;
import org.apache.commons.cli2.builder.GroupBuilder;
import org.apache.commons.cli2.commandline.Parser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.CommandLineUtil;
import org.apache.tools.ant.util.FileUtils;

import com.self.wordchain.mapreduce.DictMR;
import com.self.wordchain.mapreduce.WordChainMR;
import com.self.wordchain.util.WordChainConstant;

public class WordChainDriver extends Configured implements Tool {

  public static void main(final String[] args) throws Exception {

   final int res = ToolRunner.run(new Configuration(),
    new WordChainDriver(), args);
  System.exit(res);
 }

  private Configuration conf;

  private String start;

  private String end;
 private String dictionaryFile;

  private String outputName;
 private final String dictDirectoryPrefix = "Dict";
 private final String wcDirectoryPrefix = "WordChain";

  public boolean cpFilestoUserLocation(final String useroutputfile,
   final String systemfile) throws IOException {
  System.out.println("Copy file to user location");
  System.out.println("useroutputfile::" + useroutputfile);
  System.out.println("systemfile::" + systemfile);
  final FileUtils fu = FileUtils.getFileUtils();
  fu.copyFile(systemfile, useroutputfile, null, true);

   return true;
 }

  @Override
 public Configuration getConf() {
  return this.conf;
 }

  public final String getDateTime() {
  final DateFormat df = new SimpleDateFormat("yyyy-MM-dd_hh-mm-ss");
  return df.format(new Date());
 }

  @SuppressWarnings("deprecation")
 @Override
 public int run(final String[] args) throws Exception {
  final DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
  final ArgumentBuilder abuilder = new ArgumentBuilder();
  final GroupBuilder gbuilder = new GroupBuilder();

   final Option startOpt = obuilder
    .withLongName("start")
    .withShortName("start")
    .withRequired(true)
    .withArgument(
      abuilder.withName("start").withMinimum(1)
        .withMaximum(1).create())
    .withDescription("starting word").create();

   final Option endOpt = obuilder
    .withLongName("end")
    .withShortName("end")
    .withRequired(true)
    .withArgument(
      abuilder.withName("end").withMinimum(1).withMaximum(1)
        .create()).withDescription("ending word")
    .create();

   final Option dictionaryFileOpt = obuilder
    .withLongName("dict")
    .withShortName("dict")
    .withRequired(false)
    .withArgument(
      abuilder.withName("dict").withMinimum(1).withMaximum(1)
        .create()).withDescription("dictionary file")
    .create();

   final Option outputDirOpt = obuilder
    .withLongName("o")
    .withShortName("o")
    .withRequired(false)
    .withArgument(
      abuilder.withName("o").withMinimum(1).withMaximum(1)
        .create())
    .withDescription("output directory option").create();

   final Group group = gbuilder.withName("Options").withOption(startOpt)
    .withOption(endOpt).withOption(dictionaryFileOpt)
    .withOption(outputDirOpt).create();

   System.out.println(group);

   try {
   final Parser parser = new Parser();
   parser.setGroup(group);
   final CommandLine cmdLine = parser.parse(args);

    if (cmdLine.hasOption("help")) {
    CommandLineUtil.printHelp(group);
    return -1;
   }

    this.start = cmdLine.getValue(startOpt).toString().trim();
   this.end = cmdLine.getValue(endOpt).toString().trim();
   this.dictionaryFile = cmdLine.getValue(dictionaryFileOpt)
     .toString().trim();
   this.outputName = cmdLine.getValue(outputDirOpt).toString().trim();

    System.out.println("start word::" + this.start);
   System.out.println("end word::" + this.end);
   System.out.println("dictionary file path::" + this.dictionaryFile);
   System.out.println("output Directory::" + this.outputName);

    if (this.start.length() != this.end.length()) {
    System.out
      .println("Both Start & End Words need to be of same length");
    return -1;
   }

   } catch (final OptionException e) {
   System.err.println("Exception : " + e);
   CommandLineUtil.printHelp(group);
   return -1;
  }

   final String parentDirectory = "WORD_CHAIN";
  final String baseDirectory = this.outputName + "/" + parentDirectory;
  if (!FileSystem.get(new Configuration()).isDirectory(
    new Path(baseDirectory))) {
   FileSystem.get(new Configuration()).mkdirs(new Path(baseDirectory));
  }

   // JOB1 - Process the Dictionary File
  Job job = new Job(this.getConf());
  job.setJobName("DictionaryProcess");
  job.setJarByClass(DictMR.class);

   job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

   job.setNumReduceTasks(1);
  job.getConfiguration().set("START", this.start);
  job.getConfiguration().set("END", this.end);

   job.setMapperClass(DictMR.DictMap.class);
  job.setReducerClass(DictMR.Reduce.class);
  job.setCombinerClass(DictMR.Reduce.class);

   job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

   DistributedCache.addCacheFile(new Path(this.dictionaryFile).toUri(),
    job.getConfiguration());
  final String dictoutputPath = baseDirectory + "/"
    + this.dictDirectoryPrefix;

   if (FileSystem.get(new Configuration()).isDirectory(
    new Path(dictoutputPath))) {
   FileSystem.get(new Configuration()).delete(new Path(baseDirectory),
     true);
  }

   FileInputFormat.setInputPaths(job, new Path(this.dictionaryFile));
  FileOutputFormat.setOutputPath(job, new Path(dictoutputPath));

   boolean status = job.waitForCompletion(true);

   if (!status) {
   System.out.println("Could Not create the dictionary file");
   return -1;
  }

   // JOB2 - Process the Dictionary File in a loop to get the start to end
  // words
  boolean flag = true;
  int iterations = 0;
  // final String dictfile = dictoutputPath + "/"
  // + WordChainConstants.dictfilename;

   String inputfile = dictoutputPath;
  String wcoutputPath = null;

   while (flag) {
   iterations++;
   wcoutputPath = baseDirectory + "/" + this.wcDirectoryPrefix
     + this.getDateTime();

    job = new Job(this.getConf());
   job.setJobName("WordChain-" + iterations);
   // job.getConfiguration().set("mapred.job.queue.name", "score2");
   job.setJarByClass(WordChainMR.class);

    job.setOutputKeyClass(Text.class);
   job.setOutputValueClass(Text.class);

    job.setNumReduceTasks(1);

    job.setMapperClass(WordChainMR.WordMap.class);
   job.setReducerClass(WordChainMR.Reduce.class);
   job.setCombinerClass(WordChainMR.Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
   job.setOutputFormatClass(TextOutputFormat.class);

    job.getConfiguration().set("START", this.start);
   job.getConfiguration().set("END", this.end);

    // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
   // SequenceFileOutputFormat.setOutputCompressionType(job,
   // CompressionType.BLOCK);

    DistributedCache.addCacheFile(new Path(dictoutputPath
     + "/part-r-00000").toUri(), job.getConfiguration());
   FileInputFormat.setInputPaths(job, new Path(inputfile));
   FileOutputFormat.setOutputPath(job, new Path(wcoutputPath));

    status = job.waitForCompletion(true);
   final org.apache.hadoop.mapreduce.Counter counter = job
     .getCounters().findCounter(WordChainConstant.Status.FOUND);

    System.out.println("Found Counter:" + counter.getValue());

    if (counter.getValue() > 0) {
    flag = false;
   } else {
    // inputfile = wcoutputPath + "/" +
    // WordChainConstants.wcfilename;
    inputfile = wcoutputPath;
   }

    if (!job.isSuccessful()) {
    break;
   }
  }

   if (!flag
    && this.cpFilestoUserLocation(baseDirectory + "/"
      + WordChainConstant.wcfilename, wcoutputPath + "/"
      + "part-r-00000")) {
   System.out.println("Word Chain File Copied at " + baseDirectory
     + "/" + WordChainConstant.wcfilename);
  } else {
   System.out.println("No Word Chain found!!!!!");
  }

   return 0;
 }

  @Override
 public void setConf(final Configuration conf) {
  this.conf = conf;
 }

}





DictMR.java

package com.self.wordchain.mapreduce;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;

import com.self.wordchain.util.WordChainConstant;

public class DictMR {

  public static class DictMap extends Mapper<LongWritable, Text, Text, Text> {

   private static final int VECTOR_SIZE = 10485760;
  private static final int NBHASH = 6;
  private static final int HASH_TYPE = Hash.MURMUR_HASH;
  private String start;

   private Set<String> dictionary = new HashSet<String>();
  private BloomFilter bloomFilter = new BloomFilter(DictMap.VECTOR_SIZE,
    DictMap.NBHASH, DictMap.HASH_TYPE);

   private void loadDictionary(final String file, final int len) {
   BufferedReader br = null;
   try {
    br = new BufferedReader(new FileReader(new File(file)));
    String strLine;
    while ((strLine = br.readLine()) != null) {
     if (strLine.trim().length() == len) {
      this.dictionary.add(strLine);
      this.bloomFilter.add(new Key(strLine.getBytes()));
     }
    }
   } catch (final Exception e) {
    e.printStackTrace();
   } finally {
    if (br != null) {
     try {
      br.close();
     } catch (final IOException e) {
      e.printStackTrace();
     }
    }
   }

   }

   @Override
  protected void map(final LongWritable key, final Text value,
    final Context context) throws IOException, InterruptedException {

    final String word = value.toString();

    // Only work on similar length words
   if (word.length() != this.getStart().length()) {
    return;
   }

    for (int i = 0; i < word.length(); i++) {
    for (int j = 0; j < WordChainConstant.alLetters.size(); j++) {
     final char[] newString = word.toCharArray();
     newString[i] = WordChainConstant.alLetters.get(j);
     final String newword = String.valueOf(newString);
     if (this.bloomFilter.membershipTest(new Key(newword
       .getBytes()))
       && this.dictionary.contains(newword)
       && !newword.equals(word)) {
      context.write(value, new Text(newword));
     }
    }
   }

   };

   public void setBloomFilter(final BloomFilter bloomFilter) {
   this.bloomFilter = bloomFilter;
  }

   public void setDictionary(final Set<String> dictionary) {
   this.dictionary = dictionary;
  }

   @Override
  public void setup(final Context context) {
   final Configuration config = context.getConfiguration();
   final Path[] cacheFiles;
   try {
    cacheFiles = DistributedCache.getLocalCacheFiles(config);
    if (cacheFiles == null) {
     System.out.println("Cache File should not be null");
     return;
    }
    final String dictfile = cacheFiles[0].toString();
    System.out.println("Dictfile:" + dictfile + ":"
      + config.get("START"));
    this.setStart(config.get("START"));
    this.loadDictionary(dictfile, config.get("START").length());
    System.out.println("Dict file Size::" + this.dictionary.size());
   } catch (final Exception e) {
    e.printStackTrace();
   }
  }

   public void setStart(String start) {
   this.start = start;
  }

   public String getStart() {
   return start;
  }
 }

  public static class Reduce extends Reducer<Text, Text, Text, Text> {

   @Override
  protected void reduce(final Text key,
    final java.lang.Iterable<Text> values, final Context context)
    throws IOException, InterruptedException {
   final Iterator<Text> itr = values.iterator();
   while (itr.hasNext()) {
    context.write(new Text(key + "\t" + itr.next()), new Text());
   }
  };
 }

}


WordChainMR.java
package com.self.wordchain.mapreduce;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import com.self.wordchain.util.WordChainConstant;

public class WordChainMR {

  public static class Reduce extends Reducer<Text, Text, Text, Text> {
  private boolean found = false;

   @Override
  protected void reduce(final Text key,
    final java.lang.Iterable<Text> values, final Context context)
    throws IOException, InterruptedException {
   if (this.found) {
    return;
   }

    final Iterator<Text> itr = values.iterator();

    if (key.equals(new Text("0"))) {
    while (itr.hasNext()) {
     context.write(itr.next(), new Text());
    }
    this.found = true;
   } else {
    while (itr.hasNext()) {
     context.write(new Text(key), new Text(itr.next()));
    }
   }
  }
 }

  public static class WordMap extends Mapper<LongWritable, Text, Text, Text> {
  private Map<String, List<String>> dictionary = new HashMap<String, List<String>>();
  private String start = null;

   private String end = null;

   private void loadDictionary(final String file) {
   BufferedReader br = null;
   try {
    br = new BufferedReader(new FileReader(new File(file)));
    String strLine;
    while ((strLine = br.readLine()) != null) {
     final String[] tokens = strLine.split("\t");
     final String word = tokens[0];

      // if (word.length() != Map.start.length()) {
     // continue;
     // }

      final String next = tokens[1];
     if (this.dictionary.containsKey(word)) {
      this.dictionary.get(word).add(next);
     } else {
      final List<String> alnext = new LinkedList<String>();
      alnext.add(next);
      // System.out.println("[filter]Word:" + word);
      this.dictionary.put(word, alnext);
     }
    }
   } catch (final Exception e) {
    e.printStackTrace();
   } finally {
    if (br != null) {
     try {
      br.close();
     } catch (final IOException e) {
      e.printStackTrace();
     }
    }
   }
  }

   @Override
  protected void map(final LongWritable key, final Text value,
    final Context context) throws IOException, InterruptedException {
   final String line = value.toString().trim();
   final String[] tokens = line.split("\t");
   final String word = tokens[0];
   final String lastword = tokens[tokens.length - 1];
   if ((word != null) && word.equalsIgnoreCase(this.start)) {
    final List<String> alnext = this.dictionary.get(lastword);
    if (alnext == null) {
     return;
    }
    for (final String next : alnext) {
     if (next.equalsIgnoreCase(this.end)) {
      System.out.println("Found");
      context.getCounter(WordChainConstant.Status.FOUND)
        .increment(1);
      context.write(new Text("0"), new Text(line + "\t"
        + next));
     } else {
      if (!line.contains(next)) {
       context.write(value, new Text(next));
      }
     }
    }
   }
  }

   /**
   * Added for JUnit
   * 
   * @param dictionary
   */
  public void setDictionary(final Map<String, List<String>> dictionary) {
   this.dictionary = dictionary;
  };

   @Override
  protected void setup(final Context context) throws IOException,
    InterruptedException {
   final Configuration config = context.getConfiguration();
   Path[] cacheFiles;
   try {
    this.start = config.get("START");
    this.end = config.get("END");
    cacheFiles = DistributedCache.getLocalCacheFiles(config);
    if (cacheFiles == null) {
     System.out.println("Cache File should not be null");
     return;
    }
    final String dictfile = cacheFiles[0].toString();
    this.loadDictionary(dictfile);

     System.out.println("----------Conf---------------");
    System.out.println("Start:" + this.start);
    System.out.println("End:" + this.end);
    System.out.println("dictfile:" + dictfile);
    System.out.println("Dict size:" + this.dictionary.size());
   } catch (final Exception e) {
    e.printStackTrace();
   }
  }
 }

}





WordChainConstant.java

package com.self.wordchain.util;

import java.util.ArrayList;
import java.util.Arrays;

public class WordChainConstant {

 public static enum Status {
  FOUND;
 }

 private final static Character[] letters = { 'a', 'b', 'c', 'd', 'e', 'f',
   'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's',
   't', 'u', 'v', 'w', 'x', 'y', 'z' };

 public static final ArrayList<Character> alLetters = new ArrayList<Character>(
   Arrays.asList(WordChainConstant.letters));

 public static String wcfilename = "wordChain.txt";

}