/*This has been created by Mohammad Hammoud- Nov. 3, 2011. *CMUQ. * */ import java.util.*; import java.io.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class WordCount{ public static class Map extends MapReduceBase implements Mapper{ Text word = new Text(); IntWritable count = new IntWritable(1); public void map(LongWritable key, Text value, OutputCollector output, Reporter report) throws IOException{ String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while(tokenizer.hasMoreTokens()){ word.set(tokenizer.nextToken()); output.collect(word, count); } } } public static class Reduce extends MapReduceBase implements Reducer{ public void reduce(Text key, Iterator values, OutputCollector output, Reporter report) throws IOException{ int sum = 0; while(values.hasNext()){ sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String args[]) throws Exception{ JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); conf.setNumReduceTasks(1); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }