Apache Spark – aggregateByKey

January 18, 2021

Let us say we have an input RDD[(K,V)] and and we want to group the values for K , process them and output RDD[(K,U)].

In one of the earlier post we have seen how reduceByKey is more efficient operation than groupByKey and then reducing the result.

We cannot use reduceByKey in this case because reduceByKey does not allow you to change the the type of the value you are outputting. In such cases “aggregateByKey” comes handy.

aggregateByKey” gives us the ability to process the values for a key in each partition to produce a different type of result. In the second step, Spark will merge all such values for the keys in different partition to transform the input RDD with values of different datatype.

There are multiple variants of “aggregateByKey“, I will demonstrate use of

aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

Presume we have tweets from different users and we want to bring together all the tweets for a user. We also need to make sure each tweet occurs only once in the output produced.

val part1 = List((1, "Tweet-1"), (2, "Tweet-2"),(1, "Tweet-3"), (1, "Tweet-1") ,(2, "Tweet-4") ,(1, "Tweet-5"),(3,"Tweet-6"))
val part2 = List((1, "Tweet-4"), (2, "Tweet-5"),(1, "Tweet-6"),(2, "Tweet-7") ,(3,"Tweet-8") ,(4,"Tweet-9"))
val part3 = List((3, "Tweet-6") ,(4, "Tweet-7") ,(4,"Tweet-1") ,(2,"Tweet-10"))

val inputdata = part1 ::: part2 ::: part3
val input = sc.parallelize(inputdata,3)
//input RDD is of type RDD[(Int,String)]

In this case our Input RDD will be of type (Int,String) representing (userid,tweet). The output RDD shall be of type (Int,Set[String]) based on the scenario mentioned above.

We can compute this by groupByKey() and then running a map operation on the grouped data to produce Set of unique tweets per user. However, in this case all the tweets for a user would need to made available on one partition. This would cause data shuffling much before we actually start merging the tweets together in a Set. Wouldn’t it be nice to have duplicates removed while working with partitions itself ? If you create Set for each key in that partition, it will help us do this. (This is seqOp, further explained below). This would help reduce amount of data that needs to be shuffled at the very end.

aggregateByKey” as shown below help us combine the values for a key in a given partition and form an interim Set[String]. This is represented by a seqOp or sequenceOperation. We need to provide a neutral Zero value to start the seqOp. In our case it will EMPTY Set[String].

Hence our seqOp should append the tweet to the the Set available for that key in that partition. seqOp takes two values , first is the existing Set[String] for the key under process. If within a partition a key is processed for first time then the neutral value which is EMPTY Set is used for the computation. If Spark has seen that key before while working with this partition then it will use available Set and not the empty one.

(setOfTweets,tweet)=> setOfTweets + tweet

When all the partitions are worked upon, the values produced by seqOp step are merged together to compute the final output for a Key. Please refer the function definition of combOp above. It takes in two values of type U ( in our case Set[String]) and outputs U. We will implement this by combOp to merge two Sets together.

(setOfTweets1,setOfTweets2) => setOfTweets1 ++ setOfTweets2

Putting all of this together it will look like below

val output = input.aggregateByKey(Set.empty[String])((setOfTweets,tweet)=> setOfTweets + tweet ,
(setOfTweets1,setOfTweets2) => setOfTweets1 ++ setOfTweets2)

Including the diagrammatic representation of the transformation to help you understand and imagine how this is worked upon by Spark.

The overall Spark application that illustrates the “aggregateByKey” is included for your reference.

val part1 = List((1, "Tweet-1"), (2, "Tweet-2"),(1, "Tweet-3"), (1, "Tweet-1") ,(2, "Tweet-4") ,(1, "Tweet-5"),(3,"Tweet-6"))
val part2 = List((1, "Tweet-4"), (2, "Tweet-5"),(1, "Tweet-6"),(2, "Tweet-7") ,(3,"Tweet-8") ,(4,"Tweet-9"))
val part3 = List((3, "Tweet-6") ,(4, "Tweet-7") ,(4,"Tweet-1") ,(2,"Tweet-10"))

val inputdata = part1 ::: part2 ::: part3

val input = sc.parallelize(inputdata,3)

val output = input.aggregateByKey(Set.empty[String])((setOfTweets,tweet)=> setOfTweets + tweet , (setOfTweets1,setOfTweets2) => setOfTweets1 ++ setOfTweets2)

output.collect.foreach(println)

Output :
(3,Set(Tweet-6, Tweet-8))
(4,Set(Tweet-9, Tweet-7, Tweet-1))
(1,Set(Tweet-5, Tweet-6, Tweet-4, Tweet-1, Tweet-3))
(2,Set(Tweet-5, Tweet-2, Tweet-10, Tweet-7, Tweet-4))

Big O notation

May 14, 2016

Hello,

Came across a very nice introductory article on Big O notation.

https://rob-bell.net/2009/06/a-beginners-guide-to-big-o-notation/


Viewing Log statements in Hadoop Map-reduce jobs

August 23, 2014

Hello,

Anyone who is new to the hadoop world often ends up frantically searching for the debug log statements that he might have added in mapper or reducer function. At least this has been the case with me when I started working on Hadoop. Hence I thought it might be a good idea to post this particular entry.

The mapper and reducer are not executed locally and hence you can not find the logs for those on local file system. The mapper/reducer are run on the hadoop cluster and hence cluster is the place where you should look for them. However you do need to know the “JobTracker” URL for your cluster.

  1. Access the “JobTracker.  The default URL for JobTracker is “http://{hostname}:50030/jobtracker.jsp”.
  2. This simple UI lists down the different Map-reduce jobs and their states ( namely “running”,”completed”, “failed” and “retired”).
  3. You need to locate the Map-reduce job that you started (Please Refer the attached screenshot.)jobtracker
  4. There are various ways to identify your Map-reduce job.
    1. If you run a Pig script, the Pig client will log the job id which you can search in jobtracker. The screen shot shows the map-reduce job corresponding to a Pig script.
    2. If you are running a plain map-reduce application then once you submit the job, you can search the same either by using the userid used to submit the job or the application name.
  5. Clicking on the job number reveals the job details as shown below –map reduce job details
  6. Following screenshot shows way to navigate to the log statements. Please note that the screenshot comprises of three steps.Map Reduce Log Statements

Hope you find this post useful while learning Hadoop.

Thanks,

Amit


Horizontal scaling using Hadoop

July 13, 2014

What is meant by Horizontal scaling ? And is there anything called as Vertical scaling. In simple words, whenever we buy a new laptop or desktop, we make sure that we have a powerful processor than the one we had earlier. This is kind of vertical scaling where we keep increasing the processing power either by adding processors or upgrading them. But at the same time there is a limit to the number of processors you can add. For e.g. if there are two RAM slots which can at most accommodate 2 GB each, you can only have 4 GB RAM. The other way of increasing the processing power is to add more machines. Consider if you can use the processing power of two such machines ( each with 4 GB RAM), you have got more processing power. This act of adding more and more machines to increase processing power is Horizontal scaling. Creating cluster of application and web servers is based on the same principle. This is all good from the hardware or infrastructure perspective. However in this post I will try to explain this using the standard Hadoop HelloWorld example (word counter).

Let’s consider that we want to count the number of times a word has occurred in the files located in certain directory. We could write a single threaded program that can do this. However if we want better performance we could spawn multiple threads and then collate their outputs to form a single result output. The worker thread that counts number of times a word has occurred in a file/document would look like below.

class Worker implements Callable<Map<String, Integer>> {
	File processThisFile;
	public Worker(File f) {
		processThisFile = f;
	}
	@Override
	public Map<String, Integer> call() throws Exception {
		Map<String, Integer> wordCountMap = new HashMap<String, Integer>();
		BufferedReader reader = null;
		try {
			System.out.println("Now processing " + processThisFile);
			reader = new BufferedReader(new FileReader(processThisFile));
			String str = null;
			while ((str = reader.readLine()) != null) {
				// Get rid of periods
				str = StringUtils.replace(str, ".", " ");
				// This maps to Mapper
				String[] words = StringUtils.split(str);
				for (String word : words) {
					if (wordCountMap.containsKey(word)) {
						int incrementedCount = wordCountMap.get(word) + 1;
						wordCountMap.put(word, incrementedCount);
					} else {
						wordCountMap.put(word, 1);
					}
				}
			}
		} catch (Exception e) {
			// Handle Exception
		} finally {
			try {
				reader.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		return wordCountMap;
	}
}

The above Callable can process one file at a time. We can instantiate multiple threads to process multiple files at once as shown below. This Master could also gather word counts from various Worker threads it instantiated to output summarized counts for all the documents.

public class WordCounter {
	String inputPath = "";
	public static void main(String args[]) {
		WordCounter counter = new WordCounter();
		counter.inputPath = "res/";
		try {
			File f = new File(counter.inputPath);
			File[] files = f.listFiles();
			ExecutorService executorService = Executors.newFixedThreadPool(2);
			List<Future<Map<String, Integer>>> results = new ArrayList<Future<Map<String, Integer>>>();
			// Create and start the worker threads
			for (File fileToProcess : files) {
				Worker w = new Worker(fileToProcess);
				results.add(executorService.submit(w));
			}
			Map<String, Integer> wordCountForAllFiles = new HashMap<String, Integer>();
			// Wait until all the threads are finished
			// This is kind of reducer implementation.
			for (Future<Map<String, Integer>> result : results) {
				Map<String, Integer> wordCountPerFile = result.get();
				System.out.println("wordCountPerFile ->" + wordCountPerFile);
				for (Map.Entry<String, Integer> entry : wordCountPerFile
						.entrySet()) {
					if (wordCountForAllFiles.containsKey(entry.getKey())) {
						int newCount = wordCountForAllFiles.get(entry.getKey())
								+ entry.getValue();
						wordCountForAllFiles.put(entry.getKey(), newCount);
					} else {
						wordCountForAllFiles.put(entry.getKey(),
								entry.getValue());
					}
				}
			}
			System.out.println(wordCountForAllFiles);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

Note that, I have presumed folder “res” is available on the classpath and it contains the text documents those needs to be processed. Also the number of threads that can be spawned by this Master are finite. The upper bound being Integer.MAX_VALUE. However, I have not tried this code sample with those many number of threads. Wouldn’t it be helpful if we could run these threads on multiple machines and then gather the output ? We could even write a RMI based version of this to handle such use-case.

Instead of inventing our own framework/algorithm to handle all of this, Hadoop Map-Reduce algorithm provides the same feature. However we need to define what should Map/Reduce tasks should do? Rest all is taken care by the framework.

Standard Map-Reduce based Word counter is shown below ( Referred as is from http://wiki.apache.org/hadoop/WordCount ). We will not delve into Map-Reduce API at this point.

public class WordCount {
        
 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
        
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
 } 
        
 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
      throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
 }
        
 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = new Job(conf, "wordcount");
    
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
        
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
        
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
        
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
    job.waitForCompletion(true);
 }
}

Comparison between the Map-Reduce based example with our Multi-threaded based one shows that –

  1. The part of the code that reads and parses the file (Refer Listing 1) to count words is similar to the Mapper code.
  2. Similarly we can relate the Reducer to the part of the code that collates the output from various threads. (Refer Listing 2)

Hadoop spawns multiple Map jobs that can process the different documents simultaneously resulting in a Horizontally scaled application presuming we use HDFS to store the input documents. Number of Map jobs are “approximately” equal to the number of files on HDFS. This helps us increase the “throughput” of the application compared to a multi-threaded application.

Please note that using HDFS is not mandatory but in such case application cannot scale at bigger level. I hope this post helps understand logical mapping between a multi-threaded application and a Map-Reduce based implementation from horizontal vs vertical scaling perspective.

Thanks, Amit


Hadoop Overview

June 1, 2014

Hadoop-Overview

Hadoop Overview


Good REST Article

February 18, 2012

Hello,

Wanted to share a very nice article written by Martin Fowler about REST webservices. Easier to understand for the people like me who keeps using the webservices the traditional way as RPC.

I am hoping for the new development at least, I get a chance to try these principles.

Here is the article.

Bye,

Amit


iBatis or Hibernate ?

December 16, 2011

Hello,
I have been thinking to write this one since long. It almost now took 10 months to write the new blog entry.
In this one I will be talking about the iBatis and Hibernate. I purposely did not mention comparison as I myself do not think they are comparable to each other. Both of them have a different purpose and different applications in designing the solutions for Data access. While taking interviews I often come across candidates mentioning iBatis as a competitor of Hibernate. This exactly is the reason I felt I should share my views around the same.

iBatis is a very good and handy SQLMapper tool. I had experienced it’s usefulness while consuming the iBatis framework to build the solution that involved invoking Oracle Stored Procedures that returns REFERENCE cursors back. You simply configure the returned resultset to the Java objects and you are done. You get to work with the Java Objects in your application without bothering to write the usual and monotonous traversing logic and initializing the Java objects on your own. But this is where some colleagues starts thinking it as an ORM(Object Relational Mapping) tool. This is not the fact. iBatis  helps you map Java objects to the Parameters being passed to Stored PRocedure/Function/SQL without worrying about JDBC.

ORM has a altogether different domain to address. To understand this we have to first understand the challenges involved in mapping Object model to Relational model. This is usually called as Object Relational Impedance. We will see few of them hereafter.
First and foremost thing is Object Inheritance available in Object model which is not in Relational model. Hibernate as such addresses this by implementing Table per Class, Table per subclass and Table per Class hierarchy. We cannot go into more details of these and already a lot of information is available around this. The crux is when persisting/retrieving the Object hierarchy to/from database, either a discriminator column or table joins are implemented based on the strategy selection.
Another important feature that ORM tool provides is persisting the Object Graph when parent is being persisted. These are some of the things which are not addressed by iBatis. Well, you can do this with iBatis provided you write your own code for this. ORM tools also has a feature that helps identifying the dirty instance and persist it to the database once it is associated to Persistence context. Apart from this ORM has lot of other features which can be checked here.

In essence when we want to build a solution that involves just retrieving data from database and present it to the user, iBatis is simpler and effective solution. Whereas when one has the liberty to derive Relational model from the Object model and heavily retrieve/persist Object Graphs, Hibernate is appropriate solution. Using hibernate only for Data retrieval makes your solution over-engineered.

Hope this helps someone trying to compare iBatis and Hibernate.


n+1 rather 1+n

January 1, 2011

Hello,
I have been thinking to post this for quite some time. This one is about the famous “n+1” behavior/problem with the ORM tools. Initially when I had learnt about this one it took me a while to digest. I felt “1+n” is more easier to understand than “n+1” and it would be so for Beginner audience. That’s the reason behind this post.

In ORM world you often mark the relationships as “lazy” so that they are lazily loaded. First we will see what this means,

Consider the classic one-to-many Department(One) and Employee(Many) relationship.

From Department’s perspective this one is represented as the Collection of Employee objects within a Department as shown below.


/**
 * The Employee Entity.
 * Only OneToMany Bidirectional association is shown in the code.
 */
@Entity
public class Employee {
    @ManyToOne
private Department dept;
}

/**
 * Department Entity.
 */
@Entity
public class Department {
    @OneToMany(mappedBy="dept")
private Collection employees;
}

When we mark this relationship as lazy and query for the Departments, ORM shall fetch the Department objects satisfying the criteria. Due to the “lazy” attribute ORM will not initialize the Collection of Employees until they are requested.

For e.g Consider the following data set –

When we query the database to get all the departments, ORM will initialize the Department objects except the Employee Collection. This corresponds to 1 query namely

SELECT * FROM DEPARTMENT

Later whenever we access the Employee Collection contained within the Department object,ORM will again have to go the Database and fetch the employees for that department.

The query will be something like

SELECT * FROM EMPLOYEE WHERE DEPTID=?

This means for whatever the number of departments(Let’s say “n” department records),the ORM will need to fire above query those many times. i.e. n times. So to iterate all the Departments along with their Employees ORM will fire “1+n” database queries in total.

This can be overcome by using eager fetching which will result in only one JOIN query that would collectively fetch Departments along with their Employees.

Hope this helps someone confused about 1+n problem.

Cheers,
Amit


Single SignOn and HTTP Cookies

September 1, 2010

Hello,
In this article we will see how the Single Sign On implementations use the HTTP Cookies.
Recently we integrated one of our intranet portals in a company-wide SSO. I wanted to test some functionality in the portal locally before releasing it to the Integration environment but was not able to login to the portal
because the codebase was dependent on the SSO solution.

To address this one of my colleague suggested to hit Integration environment and login there. After this in the same browser window type the URL of the locally deployed application.
Of course this is not going to work. Even if the local application has the similar URI, the browser shall not transmit cookie belonging to other domains to locally deployed application or for that matter applications in different domains.

To understand this we need to see how cookies are used in SSO applications.

Let’s say we own on a portal “foo.com” that is SSO enabled. In a Siteminder enabled SSO solution the cookie named “SMSESSION” is used to store the client identity. Consider that I am developing a webapp that will be part of foo.com named “myapp”. Now when we want to integrate this “myapp” as part of the SSO solution we use a sub-domain say “myapp.foo.com”. Of course we need to write some code that would read this SMSESSION and do the thing that would login the respective user in your application.(in this case “myapp”).
Now why we need the sub-domain in this case.

Domains share their cookie information with their sub-domains.

Hence when the Siteminder solution sets the cookie for “foo.com” and when we navigate to “myapp.foo.com”, the server side code serving “myapp” would get SMSESSION cookie that is set at the domain level. Reading and using SMSESSION, “myapp” can login the user in “myapp” and proceed.

In the figure below I am trying to explain this graphically.

Single SignOn and HTTP Cookies

Single SignOn and HTTP Cookies

Hence if you try a workaround as mentioned above it is not going to work.
In case of tomcat my URL will be http://localhost:8080/myapp&#8221; which is not part of the main domain “foo.com”. Due to this cookie information will never be shared with “myapp” deployed locally.

Hope this helps someone looking for information on this topic.
Your comments/suggestions/corrections are always welcome.

Cheers !!
Amit