Basic Spark RDD transformations
Table of contents
RDD(resilient distributed datasets) are the basic unit of storage in spark. you can think of an rdd as a collection distributed over multiple machines.
Most of the time higher level structured APIs are used in spark applications which under the hood get converted to low-level rdd and transformations.
Transformations
As the name suggests rdd transformations are like functions that do some work over each record in rdd and transform the data into a new required form.
List of important transformations
map
in map transformation, we have 1 output record per input record.
It accepts some function to be passed that needs to be run per record to get each output record.Usage -
output_rdd = input_rdd.map(lambda x: (x.split(",")[0],x.split(",")[1])) output_rdd.collect()
flatMap
here we have multiple output records per input record.
It's just like map transformation but it additionally merges the final output results together in other words it flattens out the final output records.Usage -
output_rdd = input_rdd.flatMap(lambda x: x.split(" ")) output_rdd.collect()
In the above example consider 2 input records are like "Hello World" and "Its rainy today" then the output record will be ("Hello", "World", "Its", "rainy", "today")
mapValues
It is like map transformation only but it works with a pair rdd(rdd having a tuple of two elements per record) It can be used when we only want to do the transformation on the 2nd value in a pair rdd.
Usage
output_rdd = input_rdd.mapValues(lambda x: len(x)) output_rdd.collect()
flatMapValues
It is like a flatMap transformation only but it works with a pair rdd only. you can use this when you want to flatten the value and attach the flatten values to the same keys as a lateral view in Hive.Usage -
output_rdd = input_rdd.mapValues(lambda x: x.split(",")) output_rdd.collect()
groupByKey
It works on pair RDDs and groups the data per key (key is first element in a pair RDD). We need to use groupByKey very carefully as this reduces parallelism and groups multiple data in less number of machines as it is and a lot of data shuffling is involved generally. This can also lead to out of memory issue.
Usage -
output_rdd = input_rdd.groupByKey() output_rdd.collect()
reduceByKey
It works on pair RDDs and aggregates the data per key based on the function provided to it. Firstly local aggregation happens which gives more parallelism and then aggregation final aggregation happens on different machine after data shuffling.
Usage -
output_rdd = input_rdd.reduceByKey(lambda x,y: x+y,False) output_rdd.collect()
the function provided to reduceBykey should accept two values as parameters and it should form some aggregation and return 1 value. like this, all the values are aggregated into a single value.
sortBy
sortBy accepts a function that returns the value from the record based on which sorting will happen. It accepts 1 more boolean parameter that will decide if sorting needs to happen in ascending or descending order by default its value is True and sorting will happen in ascending order, in descending order we need to provide False value.
Usage -
output_rdd = input_rdd.sortBy(lambda x: x[1]) output_rdd.collect()
sortByKey
it is just like sortBy but only works on pair RDD and sorts based on the key.
Usage -
output_rdd = input_rdd.sortByKey() output_rdd.collect()
filter
This transformation is used to filter the records based on the function provided to it which returns True( allow the value to pass) and False(drop the record).
Usage -
output_rdd = input_rdd.filter( lambda x: len(x[1])> 10) output_rdd.collect()
The above examples filters the data and output rdd will be having those records whose 2nd element's length is greater than 10.
join
join basically joins two rdds based on the and both the rdd needs to be a pair rdd.
Usage -
output_rdd = input_rdd1.join(input_rdd2) output_rdd.collect()
repartition
as the name suggests it is used to change the number of partitions (increase or decrease) in an rdd. it accepts 1 value as a parameter i.e number of partitions.
generally, it is used to increase the number of partitions and increase parallelism. The main focus of repartition is to make partitions of equal size and ignore the shuffling.
Usage -
output_rdd = input_rdd.repartition(4) output_rdd.collect()
The above example will repartition input_rdd to 4 partitions.
coalesce
This is like repartition but is only used to decrease the number of partitions its main focus is on shuffling and it does less shuffling by joining partitions on the same machine first. even if you try to increase the number of partitions using coalesce by providing more number of partition in the parameter, it will not do anything and ignore.
Usage -
output_rdd = input_rdd.coalsce(4) output_rdd.collect()
Subscribe to my newsletter
Read articles from Yash Srivastava directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by