Spark’s primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). It is a fault-tolerant collection of elements which allows parallel operations upon itself. RDDs can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs.
Creating RDD
Spark provides two ways to create RDDs: loading an external dataset and parallelizing a collection in your driver program.
The simplest way to create RDDs is to take an existing collection in your program and pass it to SparkContext’s parallelize()
method.
For example, in different programming languages it will look like this:
val input = sc.parallelize(List(1, 2, 3, 4))
numbers = sc.parallelize([1, 2, 3, 4])
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4))
The other way is to read from a file:
val lines = sc.textFile("README.md")
lines = sc.textFile("README.md")
JavaRDD<String> lines = sc.textFile("README.md")
RDD Operations
RDDs support two types of operations: transformations and actions. Transformations are operations on RDDs, which create a new RDD from existing one. Actions return a result to the driver program. All transformations in Spark are lazy. This means, they do not compute their result right away, they just remember all the transformations applied to the base dataset (or a file). Transformations are only computed when an action requires a result to be returned to driver program, or written to the storage.
Transformations
Let’s create an RDD vector and do some transformations with it. We will be using Pyspark for this example.
Small tip: if you want to suppress the Spark logging output, do the following:
sc.setLogLevel("ERROR")
num = sc.parallelize([4, 6, 6, 1, 3, 0, 2, 2, 2])
The map(function)
transformation returns a new RDD, applying a function to each element of the original one.
result = num.map(lambda x: x**2)
For now, Spark has only remembered the transformations. To get the actual result we need to use an action. Like take()
, which take the specified number of element from the RDD.
result.take(10)
[16, 36, 36, 1, 9, 0, 4, 4, 4]
The filter(function)
transformation returns a new RDD, retaining only those, for which function is evaluated to true.
result = num.filter(lambda x: x >= 3)result.take(10)
[4, 6, 6, 3]
The distinct()
transformation returns a new RDD, removing all the duplicates from the original dataset.
result = num.distinct()
result.take(10)
[0, 1, 2, 3, 4, 6]
In case we have two RDDs, we can do some transformations to them too. Let’s create a new RDD:
num2 = sc.parallelize([5, 5, 8, 2, 2, 1, 7, 3, 3])
The union(other)
transformation returns a new dataset, which contains all elements from both RDDs.
result = num.union(num2)result.take(20)
[4, 6, 6, 1, 3, 0, 2, 2, 2, 5, 5, 8, 2, 2, 1, 7, 3, 3]
An intersection(other)
returns a dataset, which contains only elements found in both RDDs.
result = num.intersection(num2)result.take(20)
[2, 1, 3]
The subtract(other)
transformation removes all contents of the other RDD.
result = num.subtract(num2)
result.take(20)
[0, 4, 6, 6]
We can also compute a Cartesian product of two datasets. The cartesian(other)
transformation returns a dataset of all pairs (a, b), where a belongs to original dataset, and b to other.
result = num.cartesian(num2)
result.take(20)
[(4, 5), (4, 5), (4, 8), (4, 2), (4, 2), (4, 1), (4, 7), (4, 3), (4, 3), (6, 5), (6,5), (6, 8), (6, 2), (6, 2), (6, 1), (6, 7), (6, 3), (6, 3), (6, 5), (6, 5)]
Actions
As we’ve mentioned earlier, actions return some value. For example, we can count elements in the dataset using the simple command:
num.count()
9
Count occurrences of elements in RDD. This action returns a dictionary of (value
, count
) elements.
num.countByValue()
defaultdict(<type 'int'>, {0: 1, 1: 1, 2: 3, 3: 1, 4: 1, 6: 2})
collect
returns all elements from the dataset as a list
num.collect()
[4, 6, 6, 1, 3, 0, 2, 2, 2]
top
returns a number of top elements from the RDD
num.top(3)
[6, 6, 4]
takeOrdered
returns a number of elements in ascending order
num.takeOrdered(5)
[0, 1, 2, 2, 2]
The most common action upon RDD is reduce(function),
which takes a function operating on two elements from RDD returning one element of the same type.
num.reduce(lambda x, y: x + y)
[26]
Now, let’s take a look at the fold()
action, which is similar to reduce() and
acts pretty much the same, but allows to take the zero value for the initial call.
num.fold(0, lambda x,y : x + y)
[26]
An aggregate()
function frees us from the constraint of having the return be the same type as the RDD we are working on. Let’s take a closer look at this function and walk through the simple example step by step:
num = sc.parallelize([4, 6, 6, 1, 3, 1, 2, 2, 2])sumCount = num.aggregate((1, 0), (lambda tup, value: (value * tup[0], tup[1] + 1), (lambda tup, value_tup: (value_tup[0] * tup[0], value_tup[1] + tup[1])))
sumCount
(3456, 9)
(1,0) is a starting value, here it is a tuple which we are going to use. First lambda()
function takes tuple and one value as an input, the second function in its turn, takes two tuples as an input.
RDD of key-value pairs
Spark provides special operations on RDDs containing key-value pairs. These RDDs are called pair RDDs. Pair RDDs are a useful building block in many programs, as they expose operations that allow you to act on each key in parallel or regroup data across the network.
RDDs of key-value pairs also support some operations like RDDs. That’s the topic of our next blog post.
Conclusion
In this second article in the line of tutorials about working with Apache Spark, we’ve guided you through the Apache Spark’s RDD which is its primary abstraction. Use RDD programming guide to learn more about commands and operations you can use.
In the next article, we will talk about Data Frames in Apache Spark.
Feel free to leave comments below and tell us what else do you want to see in our tutorials. Good luck!