Apache Spark – aggregateByKey

January 18, 2021

Let us say we have an input RDD[(K,V)] and and we want to group the values for K , process them and output RDD[(K,U)].

In one of the earlier post we have seen how reduceByKey is more efficient operation than groupByKey and then reducing the result.

We cannot use reduceByKey in this case because reduceByKey does not allow you to change the the type of the value you are outputting. In such cases “aggregateByKey” comes handy.

aggregateByKey” gives us the ability to process the values for a key in each partition to produce a different type of result. In the second step, Spark will merge all such values for the keys in different partition to transform the input RDD with values of different datatype.

There are multiple variants of “aggregateByKey“, I will demonstrate use of

aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

Presume we have tweets from different users and we want to bring together all the tweets for a user. We also need to make sure each tweet occurs only once in the output produced.

val part1 = List((1, "Tweet-1"), (2, "Tweet-2"),(1, "Tweet-3"), (1, "Tweet-1") ,(2, "Tweet-4") ,(1, "Tweet-5"),(3,"Tweet-6"))
val part2 = List((1, "Tweet-4"), (2, "Tweet-5"),(1, "Tweet-6"),(2, "Tweet-7") ,(3,"Tweet-8") ,(4,"Tweet-9"))
val part3 = List((3, "Tweet-6") ,(4, "Tweet-7") ,(4,"Tweet-1") ,(2,"Tweet-10"))

val inputdata = part1 ::: part2 ::: part3
val input = sc.parallelize(inputdata,3)
//input RDD is of type RDD[(Int,String)]

In this case our Input RDD will be of type (Int,String) representing (userid,tweet). The output RDD shall be of type (Int,Set[String]) based on the scenario mentioned above.

We can compute this by groupByKey() and then running a map operation on the grouped data to produce Set of unique tweets per user. However, in this case all the tweets for a user would need to made available on one partition. This would cause data shuffling much before we actually start merging the tweets together in a Set. Wouldn’t it be nice to have duplicates removed while working with partitions itself ? If you create Set for each key in that partition, it will help us do this. (This is seqOp, further explained below). This would help reduce amount of data that needs to be shuffled at the very end.

aggregateByKey” as shown below help us combine the values for a key in a given partition and form an interim Set[String]. This is represented by a seqOp or sequenceOperation. We need to provide a neutral Zero value to start the seqOp. In our case it will EMPTY Set[String].

Hence our seqOp should append the tweet to the the Set available for that key in that partition. seqOp takes two values , first is the existing Set[String] for the key under process. If within a partition a key is processed for first time then the neutral value which is EMPTY Set is used for the computation. If Spark has seen that key before while working with this partition then it will use available Set and not the empty one.

(setOfTweets,tweet)=> setOfTweets + tweet

When all the partitions are worked upon, the values produced by seqOp step are merged together to compute the final output for a Key. Please refer the function definition of combOp above. It takes in two values of type U ( in our case Set[String]) and outputs U. We will implement this by combOp to merge two Sets together.

(setOfTweets1,setOfTweets2) => setOfTweets1 ++ setOfTweets2

Putting all of this together it will look like below

val output = input.aggregateByKey(Set.empty[String])((setOfTweets,tweet)=> setOfTweets + tweet ,
(setOfTweets1,setOfTweets2) => setOfTweets1 ++ setOfTweets2)

Including the diagrammatic representation of the transformation to help you understand and imagine how this is worked upon by Spark.

The overall Spark application that illustrates the “aggregateByKey” is included for your reference.

val part1 = List((1, "Tweet-1"), (2, "Tweet-2"),(1, "Tweet-3"), (1, "Tweet-1") ,(2, "Tweet-4") ,(1, "Tweet-5"),(3,"Tweet-6"))
val part2 = List((1, "Tweet-4"), (2, "Tweet-5"),(1, "Tweet-6"),(2, "Tweet-7") ,(3,"Tweet-8") ,(4,"Tweet-9"))
val part3 = List((3, "Tweet-6") ,(4, "Tweet-7") ,(4,"Tweet-1") ,(2,"Tweet-10"))

val inputdata = part1 ::: part2 ::: part3

val input = sc.parallelize(inputdata,3)

val output = input.aggregateByKey(Set.empty[String])((setOfTweets,tweet)=> setOfTweets + tweet , (setOfTweets1,setOfTweets2) => setOfTweets1 ++ setOfTweets2)

output.collect.foreach(println)

Output :
(3,Set(Tweet-6, Tweet-8))
(4,Set(Tweet-9, Tweet-7, Tweet-1))
(1,Set(Tweet-5, Tweet-6, Tweet-4, Tweet-1, Tweet-3))
(2,Set(Tweet-5, Tweet-2, Tweet-10, Tweet-7, Tweet-4))

Number of Records in an RDD partition

November 9, 2020

I was recently answering a question on stackoverflow.com about how data is partitioned when it is smaller than number of partitions itself. I ended up writing a simple code snippet to see how many records ended up in each partition when number if elements in an RDD are less than number of partitions specified.

Thought of putting down together couple of lines code that is useful if someone is looking for a way to count number of records in a partition.

This could be also useful to see if your partitions are skewed. Sometimes you run into OutOfMemory Error as one partition is too big when compared to other partition. In such cases, it is usually the case where lot of elements share the similar hash-key and thus end up on same partition.

For example if the key is null then all such elements would end up having same hash code and same partition.

Here is the code that would help you find number of records in a partition.

Below we have 4 elements in RDD and number of partitions are 8.


val rdd = sc.parallelize(List(1,2,3,4),8) 
rdd.mapPartitionsWithIndex((x,y) => { 
   println(s"partitions $x has ${y.length} records");y
}).collect.foreach(println)


Apache Spark groupByKey and reduceByKey

March 7, 2020

I was answering a question on stackoverflow and came up with a pictorial representation for groupByKey and reduceByKey. Thought of sharing the same so that I myself do not lose it over the time.
Spark documentation itself is quite clear about this so not providing lot of text in here.

Both the examples below are summing up the values for a key.
1. groupByKey – Shuffles data first based on the keys and later gives us opportunity to work with the values.

Diagram

groupByKey

groupByKey

 

2. reduceByKey – Computes the reduce function first locally and then shuffling the results and run the reduce function once again to achieve final result.
Hence reduceByKey is like a combiner in Map-reduce world. It helps in reducing amount of data shuffled during the process.

Diagram

reduceByKey

reduceByKey


GroupBy and count using Spark DataFrame

June 18, 2019

Here we are trying to group by keys and run a count against them.

val datardd = sc.parallelize(Seq(“a”->1,”b”->1,”a”->1,”c”->1))

val mydf = datardd.toDF

mydf.groupBy($”name”).agg(“count” -> “count”).
withColumnRenamed(“count(count)”,”noofoccurrences”).
orderBy($”noofoccurrences”.desc).show

name noofoccurrences
a 2
b 1
c 1

Kill Tomcat service running on Windows

October 25, 2018

If you terminate a running Spring boot application from within the eclipse, at times the port on which embedded tomcat listens does not free up. I found the below commands on one of the StackOverflow posts which are really handy.

C:\yourdir>netstat -aon |find /i “listening” |find “8080”

TCP 0.0.0.0:8080 0.0.0.0:0 LISTENING 68

Now grab the PID (68 in this case) and run the below command to kill it.

C:\yourdir>taskkill /F /PID 68


Big + Far Math Challenge @ ICC

April 22, 2017

Hello,

Recently I participated and won First prize in Big Far Math challenge hosted by ICC. The challenge description can be found here – http://big-far.webflow.io/

Participating in it was a quite exciting and learning experience for me. I could explore different technical areas while gathering data and preparing visualizations with it.

I have shared the source code and a static version of the visualization on GitHub. The dynamic version was hosted on Apache Solr running on my local desktop.

You can visit the project page @ https://amitlondhe.github.io/bigfarmathchallenge/ from which you can navigate to the visualizations that I came up with.

I am also sharing the presentation given to the judges as part of assessment if you are looking for more details.

– Amit


Running Apache Spark on Windows

July 10, 2016

Running hadoop on windows is not trivial, however running Apache Spark on Windows proved not too difficult. I came across couple of blogs and stackoverflow discussion which made this possible. Putting down my notes below which are outcome of these reference material.

  1. Download http://d3kbcqa49mib13.cloudfront.net/spark-1.6.0-bin-without-hadoop.tgz ( http://spark.apache.org/downloads.html )
  2. Download Hadoop distribution for Windows from http://www.barik.net/archive/2015/01/19/172716/
  3. Create hadoop_env.cmd  in {HADOOP_INSTALL_DIR}/conf directory.
    SET JAVA_HOME=C:\Progra~1\Java\jdk1.7.0_80
    
  4. In a new command window run hadoop-env.cmd followed by  {HADOOP_INSTALL_DIR}/bin/hadoop classpath
    The output of this command is used to initialize SPARK_DIST_CLASSPATH in spark-env.cmd (You may need to create this file.)
  5. Create spark-env.cmd in {SPARK_INSTALL_DIR}/conf
     #spark-env.cmd content
     SET HADOOP_HOME=C:\amit\hadoop\hadoop-2.6.0
     SET HADOOP_CONF_DIR=%HADOOP_HOME%\conf
     set SPARK_DIST_CLASSPATH=<Output of hadoop classpath>
     SET JAVA_HOME=C:\Progra~1\Java\jdk1.7.0_80
    
  6. Now run the examples or spark shell from {SPARK_INSTALL_DIR}/bin directory. Please note that you may have to run spark-env.cmd explicitly prior running the examples or spark-shell.

References :


Big O notation

May 14, 2016

Hello,

Came across a very nice introductory article on Big O notation.

https://rob-bell.net/2009/06/a-beginners-guide-to-big-o-notation/


Big Data For Social Good Challenge

March 16, 2015

Hello,

During this winter, I participated in

Big Data For Social Good Challenge

which I just stumbled upon while searching something.

This challenge was about using IBM Bluemix’s “Analytics For Hadoop” service to process a data set that is minimum 500MB in size.

This was a wonderful opportunity to get some hands on on IBM Bluemix ( IBM is giving extended trial access if you are a participant). Apart from this I was also keen to build some Data visualization app on my own.

I selected CitiBike data for one year (2013-2014). Initially I did not had a clue about what insights I could gather from the dataset, but as soon as I ran some Apache Pig scripts and started looking at the output, I could see more and more use cases around the dataset.  I could not address all the use cases I thought as I soon hit the deadline pressure. I had to finish the video demonstration and write some write up about the project.

Overall it was a very enriching experience as I did so many things for the very first time.

Listing some of them below

  • IBM Bigsheets and  BigSQL
  • Using Chart.js library
  • Using Google Maps JavaScript APIs –  It was remarkably simpler than I thought. Much appreciate these APIs from Google.
  • Creating the custom Map icon – Never realized it would be this difficult
  • HTML 5/CSS challenges when putting up the UI
  • Last but not the least GitHub’s easy way to publish your work online.

Now that the challenge is in Public voting and judging phase, appreciate if you could take a look at

http://ibmhadoop.challengepost.com/submissions/33509-citibike-looking-back-in-a-year

and provide your feedback and vote if you like it.


Introduction to Apache Pig

September 28, 2014

Hello,

I had created this presentation on introduction of Apache Pig. Hope you find this useful to understand basics of Apache Pig.

Introduction to Apache Pig
Introduction to Apache Pig

Thanks,
Amit