Monday 19 August 2013

Word Chain in Map Reduce

A program that solves the word-chain problem using Map Reduce technique:


Word chain is chain of dictionary words where successive entries in a chain differ from the previous word by just one letter.
For example, you can get from "cat" to "dog" using the following chain.
cat -> cot -> cog -> dog
The challenge is to write a program that accepts start and end words of the chain and using the words from a dictionary build the shortest word chain between them.

The attached dictionary for the universe of the words is to be used.
The code has been written in Java using Map Reduce Library.

There are 3 files defined for this purpose:
1. Driver file - WordChainDriver.java
2. 2 Mapper-Reduce files:
    i. DictMR : Map Reduce for processing the given Dictionary file
   ii. WordChainMR : Map Reduce for processing the given word chain
3.WordChainConstant file


The process is like:



The code includes:

WordChainDriver.java

package com.self.wordchain.driver;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.commons.cli2.CommandLine;
import org.apache.commons.cli2.Group;
import org.apache.commons.cli2.Option;
import org.apache.commons.cli2.OptionException;
import org.apache.commons.cli2.builder.ArgumentBuilder;
import org.apache.commons.cli2.builder.DefaultOptionBuilder;
import org.apache.commons.cli2.builder.GroupBuilder;
import org.apache.commons.cli2.commandline.Parser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.CommandLineUtil;
import org.apache.tools.ant.util.FileUtils;

import com.self.wordchain.mapreduce.DictMR;
import com.self.wordchain.mapreduce.WordChainMR;
import com.self.wordchain.util.WordChainConstant;

public class WordChainDriver extends Configured implements Tool {

  public static void main(final String[] args) throws Exception {

   final int res = ToolRunner.run(new Configuration(),
    new WordChainDriver(), args);
  System.exit(res);
 }

  private Configuration conf;

  private String start;

  private String end;
 private String dictionaryFile;

  private String outputName;
 private final String dictDirectoryPrefix = "Dict";
 private final String wcDirectoryPrefix = "WordChain";

  public boolean cpFilestoUserLocation(final String useroutputfile,
   final String systemfile) throws IOException {
  System.out.println("Copy file to user location");
  System.out.println("useroutputfile::" + useroutputfile);
  System.out.println("systemfile::" + systemfile);
  final FileUtils fu = FileUtils.getFileUtils();
  fu.copyFile(systemfile, useroutputfile, null, true);

   return true;
 }

  @Override
 public Configuration getConf() {
  return this.conf;
 }

  public final String getDateTime() {
  final DateFormat df = new SimpleDateFormat("yyyy-MM-dd_hh-mm-ss");
  return df.format(new Date());
 }

  @SuppressWarnings("deprecation")
 @Override
 public int run(final String[] args) throws Exception {
  final DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
  final ArgumentBuilder abuilder = new ArgumentBuilder();
  final GroupBuilder gbuilder = new GroupBuilder();

   final Option startOpt = obuilder
    .withLongName("start")
    .withShortName("start")
    .withRequired(true)
    .withArgument(
      abuilder.withName("start").withMinimum(1)
        .withMaximum(1).create())
    .withDescription("starting word").create();

   final Option endOpt = obuilder
    .withLongName("end")
    .withShortName("end")
    .withRequired(true)
    .withArgument(
      abuilder.withName("end").withMinimum(1).withMaximum(1)
        .create()).withDescription("ending word")
    .create();

   final Option dictionaryFileOpt = obuilder
    .withLongName("dict")
    .withShortName("dict")
    .withRequired(false)
    .withArgument(
      abuilder.withName("dict").withMinimum(1).withMaximum(1)
        .create()).withDescription("dictionary file")
    .create();

   final Option outputDirOpt = obuilder
    .withLongName("o")
    .withShortName("o")
    .withRequired(false)
    .withArgument(
      abuilder.withName("o").withMinimum(1).withMaximum(1)
        .create())
    .withDescription("output directory option").create();

   final Group group = gbuilder.withName("Options").withOption(startOpt)
    .withOption(endOpt).withOption(dictionaryFileOpt)
    .withOption(outputDirOpt).create();

   System.out.println(group);

   try {
   final Parser parser = new Parser();
   parser.setGroup(group);
   final CommandLine cmdLine = parser.parse(args);

    if (cmdLine.hasOption("help")) {
    CommandLineUtil.printHelp(group);
    return -1;
   }

    this.start = cmdLine.getValue(startOpt).toString().trim();
   this.end = cmdLine.getValue(endOpt).toString().trim();
   this.dictionaryFile = cmdLine.getValue(dictionaryFileOpt)
     .toString().trim();
   this.outputName = cmdLine.getValue(outputDirOpt).toString().trim();

    System.out.println("start word::" + this.start);
   System.out.println("end word::" + this.end);
   System.out.println("dictionary file path::" + this.dictionaryFile);
   System.out.println("output Directory::" + this.outputName);

    if (this.start.length() != this.end.length()) {
    System.out
      .println("Both Start & End Words need to be of same length");
    return -1;
   }

   } catch (final OptionException e) {
   System.err.println("Exception : " + e);
   CommandLineUtil.printHelp(group);
   return -1;
  }

   final String parentDirectory = "WORD_CHAIN";
  final String baseDirectory = this.outputName + "/" + parentDirectory;
  if (!FileSystem.get(new Configuration()).isDirectory(
    new Path(baseDirectory))) {
   FileSystem.get(new Configuration()).mkdirs(new Path(baseDirectory));
  }

   // JOB1 - Process the Dictionary File
  Job job = new Job(this.getConf());
  job.setJobName("DictionaryProcess");
  job.setJarByClass(DictMR.class);

   job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

   job.setNumReduceTasks(1);
  job.getConfiguration().set("START", this.start);
  job.getConfiguration().set("END", this.end);

   job.setMapperClass(DictMR.DictMap.class);
  job.setReducerClass(DictMR.Reduce.class);
  job.setCombinerClass(DictMR.Reduce.class);

   job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

   DistributedCache.addCacheFile(new Path(this.dictionaryFile).toUri(),
    job.getConfiguration());
  final String dictoutputPath = baseDirectory + "/"
    + this.dictDirectoryPrefix;

   if (FileSystem.get(new Configuration()).isDirectory(
    new Path(dictoutputPath))) {
   FileSystem.get(new Configuration()).delete(new Path(baseDirectory),
     true);
  }

   FileInputFormat.setInputPaths(job, new Path(this.dictionaryFile));
  FileOutputFormat.setOutputPath(job, new Path(dictoutputPath));

   boolean status = job.waitForCompletion(true);

   if (!status) {
   System.out.println("Could Not create the dictionary file");
   return -1;
  }

   // JOB2 - Process the Dictionary File in a loop to get the start to end
  // words
  boolean flag = true;
  int iterations = 0;
  // final String dictfile = dictoutputPath + "/"
  // + WordChainConstants.dictfilename;

   String inputfile = dictoutputPath;
  String wcoutputPath = null;

   while (flag) {
   iterations++;
   wcoutputPath = baseDirectory + "/" + this.wcDirectoryPrefix
     + this.getDateTime();

    job = new Job(this.getConf());
   job.setJobName("WordChain-" + iterations);
   // job.getConfiguration().set("mapred.job.queue.name", "score2");
   job.setJarByClass(WordChainMR.class);

    job.setOutputKeyClass(Text.class);
   job.setOutputValueClass(Text.class);

    job.setNumReduceTasks(1);

    job.setMapperClass(WordChainMR.WordMap.class);
   job.setReducerClass(WordChainMR.Reduce.class);
   job.setCombinerClass(WordChainMR.Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
   job.setOutputFormatClass(TextOutputFormat.class);

    job.getConfiguration().set("START", this.start);
   job.getConfiguration().set("END", this.end);

    // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
   // SequenceFileOutputFormat.setOutputCompressionType(job,
   // CompressionType.BLOCK);

    DistributedCache.addCacheFile(new Path(dictoutputPath
     + "/part-r-00000").toUri(), job.getConfiguration());
   FileInputFormat.setInputPaths(job, new Path(inputfile));
   FileOutputFormat.setOutputPath(job, new Path(wcoutputPath));

    status = job.waitForCompletion(true);
   final org.apache.hadoop.mapreduce.Counter counter = job
     .getCounters().findCounter(WordChainConstant.Status.FOUND);

    System.out.println("Found Counter:" + counter.getValue());

    if (counter.getValue() > 0) {
    flag = false;
   } else {
    // inputfile = wcoutputPath + "/" +
    // WordChainConstants.wcfilename;
    inputfile = wcoutputPath;
   }

    if (!job.isSuccessful()) {
    break;
   }
  }

   if (!flag
    && this.cpFilestoUserLocation(baseDirectory + "/"
      + WordChainConstant.wcfilename, wcoutputPath + "/"
      + "part-r-00000")) {
   System.out.println("Word Chain File Copied at " + baseDirectory
     + "/" + WordChainConstant.wcfilename);
  } else {
   System.out.println("No Word Chain found!!!!!");
  }

   return 0;
 }

  @Override
 public void setConf(final Configuration conf) {
  this.conf = conf;
 }

}





DictMR.java

package com.self.wordchain.mapreduce;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;

import com.self.wordchain.util.WordChainConstant;

public class DictMR {

  public static class DictMap extends Mapper<LongWritable, Text, Text, Text> {

   private static final int VECTOR_SIZE = 10485760;
  private static final int NBHASH = 6;
  private static final int HASH_TYPE = Hash.MURMUR_HASH;
  private String start;

   private Set<String> dictionary = new HashSet<String>();
  private BloomFilter bloomFilter = new BloomFilter(DictMap.VECTOR_SIZE,
    DictMap.NBHASH, DictMap.HASH_TYPE);

   private void loadDictionary(final String file, final int len) {
   BufferedReader br = null;
   try {
    br = new BufferedReader(new FileReader(new File(file)));
    String strLine;
    while ((strLine = br.readLine()) != null) {
     if (strLine.trim().length() == len) {
      this.dictionary.add(strLine);
      this.bloomFilter.add(new Key(strLine.getBytes()));
     }
    }
   } catch (final Exception e) {
    e.printStackTrace();
   } finally {
    if (br != null) {
     try {
      br.close();
     } catch (final IOException e) {
      e.printStackTrace();
     }
    }
   }

   }

   @Override
  protected void map(final LongWritable key, final Text value,
    final Context context) throws IOException, InterruptedException {

    final String word = value.toString();

    // Only work on similar length words
   if (word.length() != this.getStart().length()) {
    return;
   }

    for (int i = 0; i < word.length(); i++) {
    for (int j = 0; j < WordChainConstant.alLetters.size(); j++) {
     final char[] newString = word.toCharArray();
     newString[i] = WordChainConstant.alLetters.get(j);
     final String newword = String.valueOf(newString);
     if (this.bloomFilter.membershipTest(new Key(newword
       .getBytes()))
       && this.dictionary.contains(newword)
       && !newword.equals(word)) {
      context.write(value, new Text(newword));
     }
    }
   }

   };

   public void setBloomFilter(final BloomFilter bloomFilter) {
   this.bloomFilter = bloomFilter;
  }

   public void setDictionary(final Set<String> dictionary) {
   this.dictionary = dictionary;
  }

   @Override
  public void setup(final Context context) {
   final Configuration config = context.getConfiguration();
   final Path[] cacheFiles;
   try {
    cacheFiles = DistributedCache.getLocalCacheFiles(config);
    if (cacheFiles == null) {
     System.out.println("Cache File should not be null");
     return;
    }
    final String dictfile = cacheFiles[0].toString();
    System.out.println("Dictfile:" + dictfile + ":"
      + config.get("START"));
    this.setStart(config.get("START"));
    this.loadDictionary(dictfile, config.get("START").length());
    System.out.println("Dict file Size::" + this.dictionary.size());
   } catch (final Exception e) {
    e.printStackTrace();
   }
  }

   public void setStart(String start) {
   this.start = start;
  }

   public String getStart() {
   return start;
  }
 }

  public static class Reduce extends Reducer<Text, Text, Text, Text> {

   @Override
  protected void reduce(final Text key,
    final java.lang.Iterable<Text> values, final Context context)
    throws IOException, InterruptedException {
   final Iterator<Text> itr = values.iterator();
   while (itr.hasNext()) {
    context.write(new Text(key + "\t" + itr.next()), new Text());
   }
  };
 }

}


WordChainMR.java
package com.self.wordchain.mapreduce;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import com.self.wordchain.util.WordChainConstant;

public class WordChainMR {

  public static class Reduce extends Reducer<Text, Text, Text, Text> {
  private boolean found = false;

   @Override
  protected void reduce(final Text key,
    final java.lang.Iterable<Text> values, final Context context)
    throws IOException, InterruptedException {
   if (this.found) {
    return;
   }

    final Iterator<Text> itr = values.iterator();

    if (key.equals(new Text("0"))) {
    while (itr.hasNext()) {
     context.write(itr.next(), new Text());
    }
    this.found = true;
   } else {
    while (itr.hasNext()) {
     context.write(new Text(key), new Text(itr.next()));
    }
   }
  }
 }

  public static class WordMap extends Mapper<LongWritable, Text, Text, Text> {
  private Map<String, List<String>> dictionary = new HashMap<String, List<String>>();
  private String start = null;

   private String end = null;

   private void loadDictionary(final String file) {
   BufferedReader br = null;
   try {
    br = new BufferedReader(new FileReader(new File(file)));
    String strLine;
    while ((strLine = br.readLine()) != null) {
     final String[] tokens = strLine.split("\t");
     final String word = tokens[0];

      // if (word.length() != Map.start.length()) {
     // continue;
     // }

      final String next = tokens[1];
     if (this.dictionary.containsKey(word)) {
      this.dictionary.get(word).add(next);
     } else {
      final List<String> alnext = new LinkedList<String>();
      alnext.add(next);
      // System.out.println("[filter]Word:" + word);
      this.dictionary.put(word, alnext);
     }
    }
   } catch (final Exception e) {
    e.printStackTrace();
   } finally {
    if (br != null) {
     try {
      br.close();
     } catch (final IOException e) {
      e.printStackTrace();
     }
    }
   }
  }

   @Override
  protected void map(final LongWritable key, final Text value,
    final Context context) throws IOException, InterruptedException {
   final String line = value.toString().trim();
   final String[] tokens = line.split("\t");
   final String word = tokens[0];
   final String lastword = tokens[tokens.length - 1];
   if ((word != null) && word.equalsIgnoreCase(this.start)) {
    final List<String> alnext = this.dictionary.get(lastword);
    if (alnext == null) {
     return;
    }
    for (final String next : alnext) {
     if (next.equalsIgnoreCase(this.end)) {
      System.out.println("Found");
      context.getCounter(WordChainConstant.Status.FOUND)
        .increment(1);
      context.write(new Text("0"), new Text(line + "\t"
        + next));
     } else {
      if (!line.contains(next)) {
       context.write(value, new Text(next));
      }
     }
    }
   }
  }

   /**
   * Added for JUnit
   * 
   * @param dictionary
   */
  public void setDictionary(final Map<String, List<String>> dictionary) {
   this.dictionary = dictionary;
  };

   @Override
  protected void setup(final Context context) throws IOException,
    InterruptedException {
   final Configuration config = context.getConfiguration();
   Path[] cacheFiles;
   try {
    this.start = config.get("START");
    this.end = config.get("END");
    cacheFiles = DistributedCache.getLocalCacheFiles(config);
    if (cacheFiles == null) {
     System.out.println("Cache File should not be null");
     return;
    }
    final String dictfile = cacheFiles[0].toString();
    this.loadDictionary(dictfile);

     System.out.println("----------Conf---------------");
    System.out.println("Start:" + this.start);
    System.out.println("End:" + this.end);
    System.out.println("dictfile:" + dictfile);
    System.out.println("Dict size:" + this.dictionary.size());
   } catch (final Exception e) {
    e.printStackTrace();
   }
  }
 }

}





WordChainConstant.java

package com.self.wordchain.util;

import java.util.ArrayList;
import java.util.Arrays;

public class WordChainConstant {

 public static enum Status {
  FOUND;
 }

 private final static Character[] letters = { 'a', 'b', 'c', 'd', 'e', 'f',
   'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's',
   't', 'u', 'v', 'w', 'x', 'y', 'z' };

 public static final ArrayList<Character> alLetters = new ArrayList<Character>(
   Arrays.asList(WordChainConstant.letters));

 public static String wcfilename = "wordChain.txt";

}



No comments:

Post a Comment