Horizontal scaling using Hadoop

July 13, 2014

What is meant by Horizontal scaling ? And is there anything called as Vertical scaling. In simple words, whenever we buy a new laptop or desktop, we make sure that we have a powerful processor than the one we had earlier. This is kind of vertical scaling where we keep increasing the processing power either by adding processors or upgrading them. But at the same time there is a limit to the number of processors you can add. For e.g. if there are two RAM slots which can at most accommodate 2 GB each, you can only have 4 GB RAM. The other way of increasing the processing power is to add more machines. Consider if you can use the processing power of two such machines ( each with 4 GB RAM), you have got more processing power. This act of adding more and more machines to increase processing power is Horizontal scaling. Creating cluster of application and web servers is based on the same principle. This is all good from the hardware or infrastructure perspective. However in this post I will try to explain this using the standard Hadoop HelloWorld example (word counter).

Let’s consider that we want to count the number of times a word has occurred in the files located in certain directory. We could write a single threaded program that can do this. However if we want better performance we could spawn multiple threads and then collate their outputs to form a single result output. The worker thread that counts number of times a word has occurred in a file/document would look like below.

class Worker implements Callable<Map<String, Integer>> {
	File processThisFile;
	public Worker(File f) {
		processThisFile = f;
	}
	@Override
	public Map<String, Integer> call() throws Exception {
		Map<String, Integer> wordCountMap = new HashMap<String, Integer>();
		BufferedReader reader = null;
		try {
			System.out.println("Now processing " + processThisFile);
			reader = new BufferedReader(new FileReader(processThisFile));
			String str = null;
			while ((str = reader.readLine()) != null) {
				// Get rid of periods
				str = StringUtils.replace(str, ".", " ");
				// This maps to Mapper
				String[] words = StringUtils.split(str);
				for (String word : words) {
					if (wordCountMap.containsKey(word)) {
						int incrementedCount = wordCountMap.get(word) + 1;
						wordCountMap.put(word, incrementedCount);
					} else {
						wordCountMap.put(word, 1);
					}
				}
			}
		} catch (Exception e) {
			// Handle Exception
		} finally {
			try {
				reader.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		return wordCountMap;
	}
}

The above Callable can process one file at a time. We can instantiate multiple threads to process multiple files at once as shown below. This Master could also gather word counts from various Worker threads it instantiated to output summarized counts for all the documents.

public class WordCounter {
	String inputPath = "";
	public static void main(String args[]) {
		WordCounter counter = new WordCounter();
		counter.inputPath = "res/";
		try {
			File f = new File(counter.inputPath);
			File[] files = f.listFiles();
			ExecutorService executorService = Executors.newFixedThreadPool(2);
			List<Future<Map<String, Integer>>> results = new ArrayList<Future<Map<String, Integer>>>();
			// Create and start the worker threads
			for (File fileToProcess : files) {
				Worker w = new Worker(fileToProcess);
				results.add(executorService.submit(w));
			}
			Map<String, Integer> wordCountForAllFiles = new HashMap<String, Integer>();
			// Wait until all the threads are finished
			// This is kind of reducer implementation.
			for (Future<Map<String, Integer>> result : results) {
				Map<String, Integer> wordCountPerFile = result.get();
				System.out.println("wordCountPerFile ->" + wordCountPerFile);
				for (Map.Entry<String, Integer> entry : wordCountPerFile
						.entrySet()) {
					if (wordCountForAllFiles.containsKey(entry.getKey())) {
						int newCount = wordCountForAllFiles.get(entry.getKey())
								+ entry.getValue();
						wordCountForAllFiles.put(entry.getKey(), newCount);
					} else {
						wordCountForAllFiles.put(entry.getKey(),
								entry.getValue());
					}
				}
			}
			System.out.println(wordCountForAllFiles);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

Note that, I have presumed folder “res” is available on the classpath and it contains the text documents those needs to be processed. Also the number of threads that can be spawned by this Master are finite. The upper bound being Integer.MAX_VALUE. However, I have not tried this code sample with those many number of threads. Wouldn’t it be helpful if we could run these threads on multiple machines and then gather the output ? We could even write a RMI based version of this to handle such use-case.

Instead of inventing our own framework/algorithm to handle all of this, Hadoop Map-Reduce algorithm provides the same feature. However we need to define what should Map/Reduce tasks should do? Rest all is taken care by the framework.

Standard Map-Reduce based Word counter is shown below ( Referred as is from http://wiki.apache.org/hadoop/WordCount ). We will not delve into Map-Reduce API at this point.

public class WordCount {
        
 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
        
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
 } 
        
 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
      throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
 }
        
 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = new Job(conf, "wordcount");
    
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
        
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
        
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
        
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
    job.waitForCompletion(true);
 }
}

Comparison between the Map-Reduce based example with our Multi-threaded based one shows that –

  1. The part of the code that reads and parses the file (Refer Listing 1) to count words is similar to the Mapper code.
  2. Similarly we can relate the Reducer to the part of the code that collates the output from various threads. (Refer Listing 2)

Hadoop spawns multiple Map jobs that can process the different documents simultaneously resulting in a Horizontally scaled application presuming we use HDFS to store the input documents. Number of Map jobs are “approximately” equal to the number of files on HDFS. This helps us increase the “throughput” of the application compared to a multi-threaded application.

Please note that using HDFS is not mandatory but in such case application cannot scale at bigger level. I hope this post helps understand logical mapping between a multi-threaded application and a Map-Reduce based implementation from horizontal vs vertical scaling perspective.

Thanks, Amit


Multithreading and Locking Strategies

July 30, 2010

Hello,
Concurrency in Java 5 has introduced new ways of developing multi-threaded applications.

The basic difference in these two APIs is Locking Strategy that is implemented for achieving the concurrency.

Java 5 concurrency API use Compare And Swap(CAS) mechanism. In CAS the original value is compared with the value that was read last time, if it matches then the thread modifies the shared variable with new value. The thread in this case is optimistic hoping that no other thread has modified the original value. This is the Optimistic locking strategy.

In Java 4, we use the synchronized keyword for locking the resource. This way we use the Pessimistic locking strategy. Because we lock the shared resource exclusively for the current thread. No other thread can access and modify the variable.

Both approaches have their own advantages and shortcomings. In this entry I just wanted to highlight the implementation difference that appealed me while reading the article.

Cheers !!
Amit


Producer-Consumer using Java Threads – wait,notifyAll

April 27, 2010

Hello,

In Java , threads communicate with each other using “wait,notify and notifyAll” APIs.  Producer-Consumer is the classic example to explain these APIs. The example below tries to show how we can use inter-thread communication to read shared data when it is available. I have also tried to demonstrate use of Thread Interrupt using which we can cancel the running task, in this case exiting the application.

import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
 * @author Amit
 * This Producer-Consumer example tries to read/write one shared String variable.
 * This example also uses Thread interrupt concept to stop the application.
 * The variable in main() are defined final so that we can use them in
 * Anonymous inner class implementation of thread within main() method.
 */
public class ConsumerProducerTest {

	static String data = new String();

	public static void main(String args[]) {

		//Declaring these variables as final so that
		//I can access them inside the Anonymous inner class
		//for user thread.
		final Object lock = new Object();

		final Thread readerThread = new Thread(new Reader(lock));
		final Thread writerThread = new Thread(new Writer(lock));
		writerThread.start();
		readerThread.start();

		//User Input Thread
		final BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
		new Thread(new Runnable() {
			public void run() {
				while(true) {
					try {
						String str = in.readLine();
						if("quit".equalsIgnoreCase(str)) {
								readerThread.interrupt();
						}
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}

		}).start();
	}
}
class Reader implements Runnable {

	private Object lock;

	Reader(Object lock) {
		this.lock = lock;
	}

	public void run() {
		System.out.println("Reader::run");
		while(true) {
			synchronized(lock) {
				if("".equals(ConsumerProducerTest.data)) {
					System.out.println("Reader is going to wait");
					try {
						lock.wait();
					} catch (InterruptedException e) {
						System.out.println("Reader thread interrupted !!");
					}
					System.out.println("After call to Reader wait");
				} else {
					System.out.println("Reader got the data" + ConsumerProducerTest.data);
					System.out.println("Reader is notifying");
					ConsumerProducerTest.data = ""; //Clear the data
					//Wait for 10 seconds before notifying other threads.
					try {
						Thread.sleep(10000);
					} catch(InterruptedException ie) {
						System.out.println("Reader is interrupted while sleeping!!");
						System.exit(0);
					}
					lock.notifyAll();
					System.out.println("Reader done notifying");
				}
			}
		}
	}
}

class Writer implements Runnable {

	private Object lock;

	Writer(Object lock) {
		this.lock = lock;
	}

	public void run() {
		System.out.println("Writer::run");
		while(true) {
			synchronized(lock) {
				if("".equals(ConsumerProducerTest.data)) {
					ConsumerProducerTest.data = "data-"+System.currentTimeMillis();
						System.out.println("Writer wrote the data - " + ConsumerProducerTest.data);
						System.out.println("Writer is notifying reader");
						lock.notifyAll();
						System.out.println("Writer done notifying");
				} else {
					try {
						System.out.println("Writer is going to wait");
						lock.wait();
						System.out.println("After call to Writer wait");
					} catch(InterruptedException ie) {
						System.out.println("Writer is interrupted !!");
					}
				}
			}
		}
	}
}

Hope you find this useful.

Cheers !

Amit