Apache Spark groupByKey and reduceByKey

March 7, 2020

I was answering a question on stackoverflow and came up with a pictorial representation for groupByKey and reduceByKey. Thought of sharing the same so that I myself do not lose it over the time.
Spark documentation itself is quite clear about this so not providing lot of text in here.

Both the examples below are summing up the values for a key.
1. groupByKey – Shuffles data first based on the keys and later gives us opportunity to work with the values.

Diagram

groupByKey

groupByKey

 

2. reduceByKey – Computes the reduce function first locally and then shuffling the results and run the reduce function once again to achieve final result.
Hence reduceByKey is like a combiner in Map-reduce world. It helps in reducing amount of data shuffled during the process.

Diagram

reduceByKey

reduceByKey


Viewing Log statements in Hadoop Map-reduce jobs

August 23, 2014

Hello,

Anyone who is new to the hadoop world often ends up frantically searching for the debug log statements that he might have added in mapper or reducer function. At least this has been the case with me when I started working on Hadoop. Hence I thought it might be a good idea to post this particular entry.

The mapper and reducer are not executed locally and hence you can not find the logs for those on local file system. The mapper/reducer are run on the hadoop cluster and hence cluster is the place where you should look for them. However you do need to know the “JobTracker” URL for your cluster.

  1. Access the “JobTracker.  The default URL for JobTracker is “http://{hostname}:50030/jobtracker.jsp”.
  2. This simple UI lists down the different Map-reduce jobs and their states ( namely “running”,”completed”, “failed” and “retired”).
  3. You need to locate the Map-reduce job that you started (Please Refer the attached screenshot.)jobtracker
  4. There are various ways to identify your Map-reduce job.
    1. If you run a Pig script, the Pig client will log the job id which you can search in jobtracker. The screen shot shows the map-reduce job corresponding to a Pig script.
    2. If you are running a plain map-reduce application then once you submit the job, you can search the same either by using the userid used to submit the job or the application name.
  5. Clicking on the job number reveals the job details as shown below –map reduce job details
  6. Following screenshot shows way to navigate to the log statements. Please note that the screenshot comprises of three steps.Map Reduce Log Statements

Hope you find this post useful while learning Hadoop.

Thanks,

Amit


Horizontal scaling using Hadoop

July 13, 2014

What is meant by Horizontal scaling ? And is there anything called as Vertical scaling. In simple words, whenever we buy a new laptop or desktop, we make sure that we have a powerful processor than the one we had earlier. This is kind of vertical scaling where we keep increasing the processing power either by adding processors or upgrading them. But at the same time there is a limit to the number of processors you can add. For e.g. if there are two RAM slots which can at most accommodate 2 GB each, you can only have 4 GB RAM. The other way of increasing the processing power is to add more machines. Consider if you can use the processing power of two such machines ( each with 4 GB RAM), you have got more processing power. This act of adding more and more machines to increase processing power is Horizontal scaling. Creating cluster of application and web servers is based on the same principle. This is all good from the hardware or infrastructure perspective. However in this post I will try to explain this using the standard Hadoop HelloWorld example (word counter).

Let’s consider that we want to count the number of times a word has occurred in the files located in certain directory. We could write a single threaded program that can do this. However if we want better performance we could spawn multiple threads and then collate their outputs to form a single result output. The worker thread that counts number of times a word has occurred in a file/document would look like below.

class Worker implements Callable<Map<String, Integer>> {
	File processThisFile;
	public Worker(File f) {
		processThisFile = f;
	}
	@Override
	public Map<String, Integer> call() throws Exception {
		Map<String, Integer> wordCountMap = new HashMap<String, Integer>();
		BufferedReader reader = null;
		try {
			System.out.println("Now processing " + processThisFile);
			reader = new BufferedReader(new FileReader(processThisFile));
			String str = null;
			while ((str = reader.readLine()) != null) {
				// Get rid of periods
				str = StringUtils.replace(str, ".", " ");
				// This maps to Mapper
				String[] words = StringUtils.split(str);
				for (String word : words) {
					if (wordCountMap.containsKey(word)) {
						int incrementedCount = wordCountMap.get(word) + 1;
						wordCountMap.put(word, incrementedCount);
					} else {
						wordCountMap.put(word, 1);
					}
				}
			}
		} catch (Exception e) {
			// Handle Exception
		} finally {
			try {
				reader.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		return wordCountMap;
	}
}

The above Callable can process one file at a time. We can instantiate multiple threads to process multiple files at once as shown below. This Master could also gather word counts from various Worker threads it instantiated to output summarized counts for all the documents.

public class WordCounter {
	String inputPath = "";
	public static void main(String args[]) {
		WordCounter counter = new WordCounter();
		counter.inputPath = "res/";
		try {
			File f = new File(counter.inputPath);
			File[] files = f.listFiles();
			ExecutorService executorService = Executors.newFixedThreadPool(2);
			List<Future<Map<String, Integer>>> results = new ArrayList<Future<Map<String, Integer>>>();
			// Create and start the worker threads
			for (File fileToProcess : files) {
				Worker w = new Worker(fileToProcess);
				results.add(executorService.submit(w));
			}
			Map<String, Integer> wordCountForAllFiles = new HashMap<String, Integer>();
			// Wait until all the threads are finished
			// This is kind of reducer implementation.
			for (Future<Map<String, Integer>> result : results) {
				Map<String, Integer> wordCountPerFile = result.get();
				System.out.println("wordCountPerFile ->" + wordCountPerFile);
				for (Map.Entry<String, Integer> entry : wordCountPerFile
						.entrySet()) {
					if (wordCountForAllFiles.containsKey(entry.getKey())) {
						int newCount = wordCountForAllFiles.get(entry.getKey())
								+ entry.getValue();
						wordCountForAllFiles.put(entry.getKey(), newCount);
					} else {
						wordCountForAllFiles.put(entry.getKey(),
								entry.getValue());
					}
				}
			}
			System.out.println(wordCountForAllFiles);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

Note that, I have presumed folder “res” is available on the classpath and it contains the text documents those needs to be processed. Also the number of threads that can be spawned by this Master are finite. The upper bound being Integer.MAX_VALUE. However, I have not tried this code sample with those many number of threads. Wouldn’t it be helpful if we could run these threads on multiple machines and then gather the output ? We could even write a RMI based version of this to handle such use-case.

Instead of inventing our own framework/algorithm to handle all of this, Hadoop Map-Reduce algorithm provides the same feature. However we need to define what should Map/Reduce tasks should do? Rest all is taken care by the framework.

Standard Map-Reduce based Word counter is shown below ( Referred as is from http://wiki.apache.org/hadoop/WordCount ). We will not delve into Map-Reduce API at this point.

public class WordCount {
        
 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
        
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
 } 
        
 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
      throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
 }
        
 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = new Job(conf, "wordcount");
    
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
        
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
        
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
        
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
    job.waitForCompletion(true);
 }
}

Comparison between the Map-Reduce based example with our Multi-threaded based one shows that –

  1. The part of the code that reads and parses the file (Refer Listing 1) to count words is similar to the Mapper code.
  2. Similarly we can relate the Reducer to the part of the code that collates the output from various threads. (Refer Listing 2)

Hadoop spawns multiple Map jobs that can process the different documents simultaneously resulting in a Horizontally scaled application presuming we use HDFS to store the input documents. Number of Map jobs are “approximately” equal to the number of files on HDFS. This helps us increase the “throughput” of the application compared to a multi-threaded application.

Please note that using HDFS is not mandatory but in such case application cannot scale at bigger level. I hope this post helps understand logical mapping between a multi-threaded application and a Map-Reduce based implementation from horizontal vs vertical scaling perspective.

Thanks, Amit


Hadoop Overview

June 1, 2014
Hadoop-Overview

Hadoop Overview