Spark rdd api transformations and actions tutorial with examples – tutorial 1

An RDD in Spark is simply an immutable distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user defined classes.Users create RDDs in two ways by

1.Loading an external dataset.


SparkConf sparkConf = new SparkConf();
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> radiusData = sc.textFile("Input Data Folder Path");


val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val radiusData = sc.textFile("Input Data Folder Path")

2.By distributing a collection of objects (e.g., a list or set) in their driver program.

In scala


val lines = sc.parallelize(List("One", "Two"))

In java


JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));

Once created, RDDs offer two types of operations

Transformations

Transformations are operations on RDDs that return a new RDD. Transformations construct a new RDD from a previous one. For example, one common transformation is filtering data that matches a predicate. Transformed RDDs are computed lazily, only when you use them in an action. Many transformations are element-wise that is, they work on one element at a time but this is not true for all transformations.

Lets take an example

We want to filter the lines which does not belong to port=27 to create a new RDD holding just the lines that contain the port “port=27”

Below is the sample data used


FTTH00509294,NDABUL,/shelf=0/slot=0/port=27
FTTH00509330,NDABVQ,/shelf=0/slot=2/port=13
FTTH00509311,NDABUN,/shelf=0/slot=0/port=17
FTTH00509363,NDABUN,/shelf=0/slot=0/port=18
FTTH00509491,NDABVQ,/shelf=0/slot=1/port=4
FTTH00587796,NDACQU,/shelf=0/slot=1/port=2
FTTH00509564,NDABUL,/shelf=0/slot=0/port=29
FTTH00509159,NDABUY,/shelf=0/slot=0/port=9
FTTH00587753,NDACQU,/shelf=0/slot=0/port=46
FTTH00509164,NDABUY,/shelf=0/slot=0/port=10

In JAVA

As we can see there are two ways to work with filter transformation function the first and the easy to read java lambda expression which implements the interface Function and the second is the traditional java way of inline interface implementation


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class TransformationExample {

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> radiusData = jsc.textFile("C:\\codebase\\scala-project\\input data\\inventory");
JavaRDD<String> filtered = radiusData.filter(line -> {

String[] split = line.split(",");

if (split[2].contains("port=27")) {
return true;
} else {
return false;
}

});

filtered.foreach(x -> System.out.println(x));

JavaRDD<String> filtered2=radiusData.filter(new Function<String, Boolean>() {

@Override
public Boolean call(String arg0) throws Exception {

String[] split = arg0.split(",");

if (split[2].contains("port=13")) {
return true;
} else {
return false;
}
}
});

filtered2.foreach(x -> System.out.println(x));

filtered.union(filtered2).foreach(x->System.out.println(x));

}

}

In Scala

As we can see below in scala the code is more concise and easy to write


import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object TransformationExample {
def main(args: Array[String]) {

val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\inventory")
val filter1 = rdd.filter {
x =>
val split = x.split(",")
if (split(2).contains("port=27")) {
true
} else {
false
}

}

filter1.foreach { x => println(x) }

val filter2 = rdd.filter {
x =>
val split = x.split(",")
if (split(2).contains("port=13")) {
true
} else {
false
}

}

filter2.foreach { x => println(x) }

filter1.union(filter2).foreach { x => println(x)}

}
}

Note that the filter() operation does not mutate the existing radiusData. Instead, it returns a pointer to an entirely new RDD. radiusData can still be reused later in the program for instance, to search for other port data. The implementation in java we are reusing the radiusData to get the port data for port=13 which clearly indicated that we are not changing the radiusData. We are also performing union operation on the two transformed rdd to merge the result though we could have done this as a single transformation.

As you derive new RDDs from each other using transformations, Spark keeps track of the set of dependencies between different RDDs, called the lineage graph. It uses this information to compute each RDD on demand and to recover lost data if part of a persistent RDD is lost.

Actions

They are the operations that return a final value to the driver program or write data to an external storage system. Actions force the evaluation of the transformations required for the RDD they were called on, since they need to actually produce output.

Continuing the previous example lets say we want to count() the number of records and take() which collects a number of elements
from the RDD.

In Scala


println(rdd.count())

rdd.take(10).foreach { x => println(x) }

In Java


System.out.println(radiusData.count());
for (String string : radiusData.take(10)) {
System.out.println(string);
}

we used take() to retrieve a small number of elements in the RDD at the driver program. We then iterate over them locally to print out information at the driver.

RDDs also have a collect() function to retrieve the entire RDD. This can be useful if your program filters RDDs down to a very small size and you’d like to deal with it locally. Keep in mind that your entire dataset must fit in memory on a single machine to use collect() on it, so collect() shouldn’t be used on large datasets.

In Scala


rdd.collect().foreach(println(_))

In Java


for (String string : radiusData.collect()) {

System.out.println(string);

}

Lazy Evaluation

Transformations on RDDs are lazily evaluated, meaning that Spark will not begin to execute until it sees an action.Lazy evaluation means that when we call a transformation on an RDD, the operation is not immediately performed. Instead, Spark internally records metadata to indicate that this operation has been requested. Rather than thinking of an RDD as containing specific data, it is best to think of each RDD as consisting of instructions on how to compute the data that we build up through transformations.

Loading data into an RDD is lazily evaluated in the same way transformations are. So, when we call sc.textFile(), the data is not loaded until it is necessary. As with transformations, the operation (in this case, reading the data) can occur multiple times.Spark uses lazy evaluation to reduce the number of passes it has to take over our data
by grouping operations together.

In systems like Hadoop MapReduce, developers often have to spend a lot of time considering how to group together operations to minimize the number of MapReduce passes. In Spark, there is no substantial benefit to writing a single complex map instead of chaining together many simple operations. Thus, users are free to organize their program into smaller, more manageable
operations.

Passing Functions to Spark

Most of Spark’s transformations, and some of its actions, depend on passing in functions that are used by Spark to compute data. Each of the core languages has a slightly different mechanism for passing functions to Spark.

In scala function we pass and the data referenced in it needs to be serializable. Furthermore passing a method or field of an object includes a reference to that whole object so it is recommended to extract the fields we need as local variables and avoid needing to pass the whole object containing them.


class SearchFunctions(val query: String) {
def isMatch(s: String): Boolean = {
s.contains(query)
}

def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
// Problem: "isMatch" means "this.isMatch", so we pass all of "this"
rdd.map(isMatch)
}

def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {
// Problem: "query" means "this.query", so we pass all of "this"
rdd.map(x => x.split(query))
}

def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
// Safe: extract just the field we need into a local variable
val query_ = this.query
rdd.map(x => x.split(query_))
}
}

 

In Java, functions are specified as objects that implement one of Spark’s function interfaces from the org.apache.spark.api.java.function package. There are a number of different interfaces based on the return type of the function.

Interface to implement Function<T, R> where a method R call(T) is defined which takes in one input and return one output, for use with operations like map() and filter().

Interface to implement Function2<T1, T2, R> where a method R call(T1, T2) is defined which takes in two inputs and return one output, for use with operations like aggregate() or fold()..

Interface to implement FlatMapFunction<T,R> where a method Iterable<R> call(T) is defined which takes in one input and return zero or more outputs, for use with operations like flatMap().

There are three ways to pass functions to spark api in java

1. With anonymous inner class


JavaRDD<String> filtered2=rdd.filter(new Function<String, Boolean>() {

@Override
public Boolean call(String arg0) throws Exception {

String[] split = arg0.split(",");

if (split[2].contains("port=13")) {
return true;
} else {
return false;
}
}
});

2. With named class


public class NamedClass implements Function<String, Boolean> {

private String filterQuery;

public NamedClass(String filterQuery) {
super();
this.filterQuery = filterQuery;
}

@Override
public Boolean call(String arg0) throws Exception {
String[] split = arg0.split(",");

if (split[2].contains(getFilterQuery())) {
return true;
} else {
return false;
}
}

public String getFilterQuery() {
return filterQuery;
}

public void setFilterQuery(String filterQuery) {
this.filterQuery = filterQuery;
}

}

And this class can be passed into the filter function as below


radiusData.filter(new NamedClass("port=27"));

One benefit of using named class style is that you can give them constructor parameters.

3. lambda expression in Java 8


JavaRDD<String> filtered = radiusData.filter(line -> {

String[] split = line.split(",");

if (split[2].contains("port=27")) {
return true;
} else {
return false;
}

});

Map Transformation

The map() transformation takes in a function and applies it to each element in the RDD with the result of the function being the new value of each element in the resulting RDD. It is useful to note that map() return type does not have to be the same as its input type, so if we had an RDD String and our map() function were to parse the strings and return a Double, our input RDD type would be RDD[String] and the resulting RDD type would be RDD[Double].

Lets take an example

Lets say we want to apply some calibration into the system data we receive which is at the index 4 in the below sample data.

Below is the data snippet


NAAACA,/shelf=0/slot=0/port=27,PTP,100000,605521918,6401884577,0,676,,,,
NAAACA,/shelf=0/slot=0/port=27,PTP,101500,448588930,6096776528,0,663,,,,
NAAACA,/shelf=0/slot=0/port=27,PTP,103000,472867048,5775316908,0,670,,,,
NAAACA,/shelf=0/slot=0/port=27,PTP,104500,608929155,6594627955,0,679,,,,

Lets use the map function to achieve the same

In Java


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class MapExample {

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\system_data");
JavaRDD<Double> rdd2 = rdd.map((String x) -> {
String[] split = x.split(",", -1);
if (split[4].length() > 0) {
split[4] = String.valueOf(Double.parseDouble(split[4]) * 0.9876);
return Double.parseDouble(split[4]);
}
return 0d;
});

for (Double string : rdd2.collect()) {

System.out.println(string.toString());

}

}

}

In Scala


import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object MapExample extends App {

val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\system_data")

val rdd2 = rdd.map { x =>
val split = x.split(",", -1)

if (split(4).length() > 0) {
split(4).toDouble * 0.9876
} else {
0
}

}

rdd2.foreach { x =>
println(x)
}

}

FlatMap

Sometimes we want to produce multiple output elements for each input element. The operation to do this is called flatMap(). As with map() function flatMap() is also called individually for each element in our input RDD.Instead of returning a single element, we return an iterator with our return values. Rather than producing an RDD of iterators, we get back an RDD that consists of the elements from all of the iterators.

Lets take an example for the same

In Java


import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class FlatMap {

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\system_data");

JavaRDD<String> word = rdd.flatMap(x -> {

String[] eachElement = x.split(",");

return Arrays.asList(eachElement).iterator();

});

for (String string : word.collect()) {

System.out.println(string);

}

}

}

In Scala


object FlatMap extends App {

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\system_data")

rdd.flatMap ({ x =>

val words = x.split(",")

words.iterator;

}).foreach { println(_) }

}

Distinct

We often have duplicates in the data and removing the duplicates from dataset is a common use case.If we want only unique elements we can use the RDD.distinct() transformation to produce a new RDD with only distinct items. Note that distinct() is expensive, however, as it requires shuffling all the data over the network to ensure that we receive only one copy of each element. Shuffling though can be avoided and will be discussed in other article.

Union

The simplest set operation is union(other), which gives back an RDD consisting of the data from both sources. Unlike the mathematical union(), if there are duplicates in the input RDDs, the result of Spark’s union() will contain duplicates.

Intersection

Spark also provides an intersection(other) method, which returns only elements in both RDDs. intersection() also removes all duplicates (including duplicates from a single RDD) while running. While intersection() and union() are two similar concepts, the performance of intersection() is much worse since it requires a shuffle over the network to identify common elements.

Subtract

Sometimes we need to remove some data from consideration. The subtract(other) function takes in another RDD and returns an RDD that has only values present in the first RDD and not the second RDD. Like intersection(), it performs a shuffle.

Cartesian

The cartesian(other) transformation returns all possible pairs of (a,b) where a is in the source RDD and b is in the other RDD. The Cartesian product can be useful when we wish to consider the similarity between all possible pairs.Cartesian product is very expensive for large RDDs

Below is an example for set operation described above

In Java


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

public class SetOperation {

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\inventory");

JavaRDD<String> rdd2 = jsc.textFile("C:\\codebase\\scala-project\\input data\\inventory2");

JavaRDD<String> distinct_rdd =rdd.distinct();

for (String string : distinct_rdd.collect()) {

System.out.println(string);

}

JavaRDD<String> union_rdd=rdd.union(rdd2);

for (String string : union_rdd.collect()) {

System.out.println(string);
}

JavaRDD<String> intersection_rdd=rdd.intersection(rdd2);

for (String string :intersection_rdd.collect()) {

System.out.println(string);
}

JavaRDD<String> substract_rdd=rdd.subtract(rdd2);

JavaPairRDD<String,String> cartesian_rdd=rdd.cartesian(rdd2);

for (Tuple2<String,String> string : cartesian_rdd.collect()) {

System.out.println(string._1+"-----"+string._2);

}

}

}

In Scala


object SetOperationScala extends App {

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\inventory")

val rdd2 = sc.textFile("C:\\codebase\\scala-project\\input data\\inventory2")

rdd.distinct().foreach { println(_)}

rdd.union(rdd2).foreach{println(_)}

rdd.intersection(rdd2).foreach { println(_) }

rdd.subtract(rdd2).foreach { println(_) }

rdd.cartesian(rdd2).foreach { println(_) }

}

Reduce Action

The reduce function takes a function that operates on two elements of the type in your RDD and returns a new element of the same type. A simple example of such a function is +, which we can use to sum our RDD. With reduce(), we can easily sum the elements of our RDD, count the number of elements, and perform other types of aggregations.

Fold Action

The fold() takes a function with the same signature as needed for reduce(), but in addition takes a zero value to be used for the initial call on each partition. the zero value you provide should be the identity element for your operation that is, applying it multiple times with your function should not change the value.

Below is an example for reduce and fold

In Java


import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class ReduceExample {

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\system_data");
String sum = rdd.reduce((x, y) -> {

String[] one = x.split(",", -1);
String[] two = y.split(",", -1);
if (one[4].length() > 0 && two[4].length() > 0) {
one[4] = String.valueOf(Double.parseDouble(one[4]) + Double.parseDouble(two[4]));
} else if (two[4].length() > 0) {
one[4] = two[4];
}

return StringUtils.join(one, ",");

});

String sumfold = rdd.fold("ALL_DEVICE,ALL_SUBELEMENT,SYSTEM,0,0,0,0,0,0,0,0,0",(x, y) -> {

String[] one = x.split(",", -1);
String[] two = y.split(",", -1);
if (one[4].length() > 0 && two[4].length() > 0) {
one[4] = String.valueOf(Double.parseDouble(one[4]) + Double.parseDouble(two[4]));
} else if (two[4].length() > 0) {
one[4] = two[4];
}

return StringUtils.join(one, ",");

});

System.out.println(sum);
System.out.println(sumfold);

}

}

In Scala


object ReduceExample extends App {

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\system_data")

val sum = rdd.reduce((x, y) => {

var one = x.split(",", -1)
var two = y.split(",", -1)

if (one(4).length() > 0 && two(4).length() > 0) {
one(4) = (one(4).toDouble + two(4).toDouble).toString()
one.mkString(",")
} else if (two(4).length() > 0) {
one(4) = two(4)
one.mkString(",")
}
one.mkString(",")

})

val fol = rdd.fold("ALL_DEVICE,ALL_SUBELEMENT,SYSTEM,0,0,0,0,0,0,0,0,0")((x, y) => {

var one = x.split(",", -1)
var two = y.split(",", -1)

if (one(4).length() > 0 && two(4).length() > 0) {
one(4) = (one(4).toDouble + two(4).toDouble).toString()
one.mkString(",")
} else if (two(4).length() > 0) {
one(4) = two(4)
one.mkString(",")
}
one.mkString(",")

})

println(fol)

}

Aggregate Action

Both fold() and reduce() require that the return type of our result be the same type as that of the elements in the RDD we are operating over. This works well for operations like sum, but sometimes we want to return a different type.

The aggregate() function frees us from the constraint of having the return be the same type as the RDD we are working on. With aggregate(), like fold(), we supply an initial zero value of the type we want to return. We then supply a function to combine the elements from our RDD with the accumulator. Finally, we need to supply a second function to merge two accumulators, given that each node accumulates its own results locally.

In Java

Lets create a average class which has count and sum instance variable


import java.io.Serializable;

public class Average implements Serializable{

private int count;

private double sum;

public Average(int count, double sum) {
super();
this.count = count;
this.sum = sum;
}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}

public double getSum() {
return sum;
}

public void setSum(double sum) {
this.sum = sum;
}

public double average() {
return getSum() / getCount();
}

}

Lets create the accumulator class which has sumAndCount and combine function which implements the Function2 interface inline.


import org.apache.spark.api.java.function.Function2;

public class Accumulator {

public static Function2<Average, String, Average> sumAndCount = new Function2<Average, String, Average>() {

@Override
public Average call(Average avg, String arg1) throws Exception {

avg.setCount(avg.getCount() + 1);
avg.setSum(avg.getSum() + Integer.parseInt(arg1));

return avg;
}

};

public static Function2<Average, Average, Average> combine = new Function2<Average, Average, Average>() {

@Override
public Average call(Average one, Average two) throws Exception {

one.setCount(one.getCount() + two.getCount());
one.setSum(one.getSum() + two.getSum());

return one;
}
};

}

Lets write the driver class to use the aggregator . Here we are creating a rdd with just one field before we are running the aggregation using the map method.


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class AggregateExample {

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\system_data");

JavaRDD<String> convertedRdd =rdd.map(x ->{

String[] split=x.split(",",-1);

return split[4];

});

Average average=new Average(0, 0);

Average avg=convertedRdd.aggregate(average, Accumulator.sumAndCount, Accumulator.combine);

System.out.println(avg.average());

}

}

In Scala

Lets create a average class which has count and sum instance variable


class Average(var count: Int, var sum: Double) extends Serializable {

def average(): Double = {

sum / count

}

}

Lets create Aggregator Object with sum and combine method.


object Aggregator {

var sum = (average: Average, value: String)=> {

average.count = average.count + 1
average.sum = average.sum + value.toDouble
average

}

var combine=(average1: Average, average2: Average) => {

average1.count = average1.count + average2.count
average1.sum = average1.sum + average2.sum
average1

}

}

Lets write the driver class to use the aggregator . Here we are creating a rdd with just one field before we are running the aggregation using the map method.


object AggregateExample extends App {

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\system_data")

val convertedRdd = rdd.map { x =>

x.split(",", -1)(4)

}

val d = convertedRdd.aggregate(new Average(0, 0))(Aggregator.sum, Aggregator.combine)

println(d.average())

Instead of creating the class Aggregator we can also pass the function inline as below


val test = convertedRdd.aggregate((new Average(0, 0)))(
(avg: Average, number: String) => {
avg.count = avg.count + 1
avg.sum = avg.sum + number.toDouble
avg
},
(avg1: Average, avg2: Average) => {
avg1.count = avg1.count + avg2.count
avg1.sum = avg1.sum + avg2.sum
avg1
})

println(test.average())

Other common actions in spark are

collect

collect() is commonly used in unit tests where the entire contents of the RDD are expected to fit in memory, as that
makes it easy to compare the value of our RDD with our expected result. collect() suffers from the restriction that all of your data must fit on a single machine, as it all needs to be copied to the driver.

take

take(n) returns n elements from the RDD and attempts to minimize the number of partitions it accesses, so it may represent a biased collection.

top

If there is an ordering defined on our data, we can also extract the top elements from an RDD using top(). top() will use the default ordering on the data, but we can supply our own comparison function to extract the top elements.

takeSample

The takeSample(withReplacement, num, seed) function allows us to take a sample of our data either with or without replacement.

foreach

foreach() action lets us perform computations on each element in the RDD without bringing it back locally.

count and countbyvalue

count() returns a count of the elements, and countByValue() returns a map of each unique value to its count.

Persisting RDD

Spark RDDs are lazily evaluated, and sometimes we may wish to use the same RDD multiple times. If we do this naively, Spark will recompute the RDD and all of its dependencies each time we call an action on the RDD. This can be especially expensive for iterative algorithms, which look at the data many times. Below is an example of double execution in Scala


val result = input.map(x => x*x)
println(result.count())
println(result.collect().mkString(","))

To avoid computing an RDD multiple times, we can ask Spark to persist the data using the persist() method. When we ask Spark to persist an RDD, the nodes that compute the RDD store their partitions. Spark has many levels of persistence to choose from based on what our goals are in Scala and Java the default persist() will store the data in the JVM heap as unserialized objects.The available levels are MEMORY_ONLY, MEMORY_ONLY_SER, MEMORY_AND_DISK, MEMORY_AND_DISK_SER and DISK_ONLY.

val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))

Notice that we called persist() on the RDD before the first action. The persist() call on its own does not force evaluation. If you attempt to cache too much data to fit in memory, Spark will automatically evict old partitions using a Least Recently Used (LRU) cache policy. For the memory only storage levels, it will recompute these partitions the next time they are accessed,while for the memory-and-disk ones, it will write them out to disk. In either case, this means that you don’t have to worry about your job breaking if you ask Spark to cache too much data.

However, caching unnecessary data can lead to eviction of useful data and more re computation time. Finally, RDDs come with a method called unpersist() that lets you manually remove them from the cache.

 

Leave a Reply

Your email address will not be published. Required fields are marked *