spark performance tuning and optimization – tutorial 14

Tuning Spark often simply means changing the Spark application’s runtime configuration. The primary configuration mechanism in Spark is the SparkConf class. A SparkConf instance is required when you are creating a new SparkContext


SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
sparkConf.set("spark.master", "local[4]");
sparkConf.set("spark.ui.port", "36000");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

SparkConf instance contains key/value pairs of configuration options the user would like to override. To use a SparkConf object you create one, call set() to add configuration values, and then supply it to the SparkContext constructor.

The spark-submit tool provides built-in flags for the most common Spark configuration parameters and a generic –conf flag that accepts any Spark configuration value.


spark-submit --master spark://hostname:7077 --deploy-mode cluster --class com.consumer.SparkDstreamConsumer --name "Call Quality" --total-executor-cores 5 --executor-memory 5g spark-0.0.1-SNAPSHOT.jar

By default, spark-submit will look for a file called conf/spark-defaults.conf in the Spark directory and attempt to read whitespace-delimited key/value pairs from this file. You can also customize the exact location of the file using the –properties-file flag to spark-submit.

Spark has a specific precedence order. The highest priority is given to configurations declared explicitly in the user’s code using the set() function on a SparkConf object. Next are flags passed to sparksubmit, then values in the properties file, and finally default values.

Common spark configuration spark.executor.memory (–executor-memory), spark.executor.cores(–executor-cores), spark.cores.max(–totalexecutor-cores), spark.speculation setting to true will enable speculative execution of tasks. This means tasks that are running slowly will have a second copy launched on another node. Enabling this can help cut down on straggler tasks in large clusters, spark.storage.blockManagerTimeoutIntervalMs – An internal timeout used for tracking the liveness of executors.spark.executor.extraJavaOptions, spark.executor.extraClassPath, spark.executor.extraLibraryPath,spark.serializer – Class to use for serializing objects that will be sent over the network or need to be cached in serialized form.

The default of Java Serialization works with any serializable Java object but is quite slow, so we recommend using org.apache.spark.serializer.KryoSerializer and configuring Kryo serialization when speed is necessary. Can be any subclass of org.apache.spark.Serializer.

Execution of spark application

When executing, Spark translates the logical representation into a physical execution plan by merging multiple operations into tasks. To display the lineage of an RDD, Spark provides a toDebugString() method. Below is an example


(1) MapPartitionsRDD[3] at mapValues at FilterKeyValue.java:32 []
| MapPartitionsRDD[2] at mapToPair at FilterKeyValue.java:20 []
| C:\codebase\scala-project\inputdata\system_data MapPartitionsRDD[1] at textFile at FilterKeyValue.java:18 []
| C:\codebase\scala-project\inputdata\system_data HadoopRDD[0] at textFile at FilterKeyValue.java:18 []

Before an action is called an RDD simply store metadata that will help to compute them later. Spark’s scheduler creates a physical execution plan to compute the RDDs needed for performing the action. Here when we call collect() on the RDD, every partition of the RDD must be materialized and then transferred to the driver program. Spark’s scheduler starts at the final RDD being computed and works backward to find what it must compute. It visits that RDD’s parents, its parents’ parents, and so on, recursively to develop a physical plan necessary to compute all ancestor RDDs.

In the simplest case, the scheduler outputs a computation stage for each RDD in this graph where the stage has tasks for each partition in that RDD. Those stages are then executed in reverse order to compute the final required RDD.

Sparks internal scheduler may truncate the lineage of the RDD graph if an existing RDD has already been persisted in cluster memory or on disk. A second case in which this truncation can happen is when an RDD is already materialized as a side effect of an earlier shuffle, even if it was not explicitly persist(). This is an under-the-hood optimization that takes advantage of the fact that Spark shuffle outputs are written to disk, and exploits the fact that many times portions of the RDD graph are recomputed.

Once the stage graph is defined, tasks are created and dispatched to an internal scheduler, which varies depending on the deployment mode being used. Stages in the physical plan can depend on each other, based on the RDD lineage, so they will be executed in a specific order. For instance, a stage that outputs shuffle data must occur before one that relies on that data being present. A physical stage will launch tasks that each do the same thing but on specific partitions of data.

Performance Tuning Using the code-level changes

Level of Parallelism

Every RDD has a fixed number of partitions that determine the degree of parallelism to use when executing operations on the RDD. When performing aggregations or grouping operations, we can ask Spark to use a specific number of partitions. Spark will always try to infer a sensible default value based on the size of your cluster, but in some cases you will want to tune the level of parallelism for better performance. Most of the functions accept a second parameter giving the number of partitions to use when creating the grouped or aggregated RDD.

Below is an example in java where 10 will be the number of partitions.


reduceByKey(new Function2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer arg0, Integer arg1) throws Exception {
// TODO Auto-generated method stub
return arg0 + arg1;
}

},10)

Sometimes, we want to change the partitioning of an RDD outside the context of grouping and aggregation operations. For those cases, Spark provides the repartition() function, which shuffles the data across the network to create a new set of partitions. Keep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of repartition() called coalesce() that allows avoiding data movement, but only if you are decreasing the number of RDD partitions. To know whether you can safely call coalesce(), you can check the size of the RDD using rdd.partitions.size() in Java/Scala and make sure that you are coalescing it to fewer partitions than it currently has.

The logical representation of an RDD is a single collection of objects. During physical execution an RDD is divided into a set of partitions with each partition containing some subset of the total data. When Spark schedules and runs tasks, it creates a single task for data stored in one partition, and that task will require, by default, a single core in the cluster to execute. Out of the box, Spark will infer what it thinks is a good degree of parallelism for RDDs, and this is sufficient for many use cases. Input RDDs typically choose parallelism based on the underlying storage systems. For example, HDFS input RDDs have one partition for each block of the underlying HDFS file. RDDs that are derived from shuffling other RDDs will have parallelism set based on the size of their parent RDDs.

The degree of parallelism can affect performance in two ways. First, if there is too little parallelism, Spark might leave resources idle. For example, if your application has 100 cores allocated to it, and you are running a stage with only 10 tasks, you might be able to increase the level of parallelism to utilize more cores. If there is too much parallelism, small overheads associated with each partition can add up and become significant. A sign of this is that you have tasks that complete almost instantly—in a few milliseconds—or tasks that do not read or write any data.

Spark offers two ways to tune the degree of parallelism for operations. The first is that, during operations that shuffle data, you can always give a degree of parallelism for the produced RDD as a parameter. The second is that any existing RDD can be redistributed to have more or fewer partitions. The repartition() operator will randomly shuffle an RDD into the desired number of partitions. If you know you are shrinking the RDD, you can use the coalesce() operator; this is more efficient than repartition() since it avoids a shuffle operation. If you think you have too much or too little parallelism, it can help to redistribute your data with these operators.

As an example, let’s say we are reading a large amount of data from hdfs, but then immediately performing a filter() operation that is likely to exclude all but a tiny fraction of the dataset. By default the RDD returned by filter() will have the same size as its parent and might have many empty or small partitions. In this case you can improve the application’s performance by coalescing down to a smaller RDD.

Serialization Format

When Spark is transferring data over the network or spilling data to disk, it needs to serialize objects into a binary format. This comes into play during shuffle operations, where potentially large amounts of data are transferred. By default Spark will use Java’s built-in serializer. Spark also supports the use of Kryo, a third-party serialization library that improves on Java’s serialization by offering both faster serialization times and a more compact binary representation, but cannot serialize all types of objects out of the box. Almost all applications will benefit from shifting to Kryo for serialization.

To use Kryo serialization, you can set the spark.serializer setting to org.apache.spark.serializer.KryoSerializer. For best performance, you’ll also want to register classes with Kryo that you plan to serialize. Registering a class allows Kryo to avoid writing full class names with individual objects, a space savings that can add up over thousands or millions of serialized records. If you want to force this type of registration, you can set spark.kryo.registrationRequired to true, and Kryo will throw errors if it encounters an unregistered class.


import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val sparkConf = new SparkConf()
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// Be strict about class registration
sparkConf.set("spark.kryo.registrationRequired", "true")
sparkConf.registerKryoClasses(Array(classOf[Movie], classOf[Util]))

Whether using Kryo or Java’s serializer, you may encounter a NotSerializableException if your code refers to a class that does not extend Java’s Serializable interface. It can be difficult to track down which class is causing the problem in this case, since many different classes can be referenced from user code. Many JVMs support a special option to help debug this situation: “-Dsun.io.serialization.extended DebugInfo=true”. You can enable this option this using the –driver-java-options and –executor-java-options flags to spark-submit.

Memory Management

Spark uses memory in different ways, so understanding and tuning Spark’s use of memory can help optimize your application. Inside of each executor, memory is used for a few purposes

RDD storage

When you call persist() or cache() on an RDD, its partitions will be stored in memory buffers. Spark will limit the amount of memory used when caching to a certain fraction of the JVM’s overall heap, set by spark.storage.memoryFraction. If this limit is exceeded, older partitions will be dropped from memory.

Shuffle and aggregation buffers

When performing shuffle operations, Spark will create intermediate buffers for storing shuffle output data. These buffers are used to store intermediate results of aggregations in addition to buffering data that is going to be directly output as part of the shuffle. Spark will attempt to limit the total amount of memory used in shuffle-related buffers to spark.shuffle.memoryFraction.

User code

Spark executes arbitrary user code, so user functions can themselves require substantial memory. For instance, if a user application allocates large arrays or other objects, these will contend for overall memory usage. User code has access to everything left in the JVM heap after the space for RDD storage and shuffle storage are allocated.

By default Spark will leave 60% of space for RDD storage, 20% for shuffle memory, and the remaining 20% for user programs. In some cases users can tune these options for better performance. If your user code is allocating very large objects, it might make sense to decrease the storage and shuffle regions to avoid running out of memory.

In addition to tweaking memory regions, you can improve certain elements of Spark’s default caching behavior for some workloads. Spark’s default cache() operation persists memory using the MEMORY_ONLY storage level. This means that if there is not enough space to cache new RDD partitions, old ones will simply be deleted and, if they are needed again, they will be recomputed. It is sometimes better to call persist() with the MEMORY_AND_DISK storage level, which instead drops RDD partitions to disk and simply reads them back to memory from a local store if they are needed again. This can be much cheaper than recomputing blocks and can lead to more predictable performance. This is particularly useful if your RDD partitions are very expensive to recompute for example, if you are reading data from a database.

A second improvement on the default caching policy is to cache serialized objects instead of raw Java objects, which you can accomplish using the MEMORY_ONLY_SER or MEMORY_AND_DISK_SER storage levels. Caching serialized objects will slightly slow down the cache operation due to the cost of serializing objects, but it can substantially reduce time spent on garbage collection in the JVM, since many individual records can be stored as a single serialized buffer. This is because the cost of garbage collection scales with the number of objects on the heap, not the number of bytes of data, and this caching method will take many objects and serialize them into a single giant buffer. Consider this option if you are caching large amounts of data eg in gigabytes as objects or seeing long garbage collection pauses. Such pauses would be visible in the application UI under the GC Time column for each task.

Tuning the cluster and environment

Hardware Provisioning

The hardware resources you give to Spark will have a significant effect on the completion time of your application. The main parameters that affect cluster sizing are the amount of memory given to each executor, the number of cores for each executor, the total number of executors, and the number of local disks to use for scratch data.

In all deployment modes, executor memory is set with spark.executor.memory or the –executor-memory flag to spark-submit. The options for number and cores of executors differ depending on deployment mode. In YARN you can set spark.executor.cores or the –executor-cores flag and the –num-executors flag to determine the total count.

In Mesos and Standalone mode, Spark will greedily acquire as many cores and executors as are offered by the scheduler. However, both Mesos and Standalone mode support setting spark.cores.max to limit the total number of cores across all executors for an application. Local disks are used for scratch storage during shuffle operations.

Broadly speaking, Spark applications will benefit from having more memory and cores. Spark’s architecture allows for linear scaling adding twice the resources will often make your application run twice as fast. An additional consideration when sizing a Spark application is whether you plan to cache intermediate datasets as part of your workload. If you do plan to use caching, the more of your cached data can fit in memory, the better the performance will be. The Spark storage UI will give details about what fraction of your cached data is in memory. One approach is to start by caching a subset of your data on a smaller cluster and extrapolate the total memory you will need to fit larger amounts of the data in memory.

In addition to memory and cores, Spark uses local disk volumes to store intermediate data required during shuffle operations along with RDD partitions that are spilled to disk. Using a larger number of local disks can help accelerate the performance of Spark applications. In YARN mode, the configuration for local disks is read directly from YARN, which provides its own mechanism for specifying scratch storage directories. In Standalone mode, you can set the SPARK_LOCAL_DIRS environment variable in spark-env.sh when deploying the Standalone cluster and Spark applications will inherit this config when they are launched. In Mesos mode, or if you are running in another mode and want to override the cluster’s default storage locations, you can set the spark.local.dir option. In all cases you specify the local directories using a single comma-separated list. It is common to have one local directory for each disk volume available to Spark. Writes will be evenly striped across all local directories provided. Larger numbers of disks will provide higher overall throughput.

One caveat to the more is better guideline is when sizing memory for executors. Using very large heap sizes can cause garbage collection pauses to hurt the throughput of a Spark job. It can sometimes be beneficial to request smaller executors (say, 64 GB or less) to mitigate this issue. Mesos and YARN can, out of the box, support packing multiple, smaller executors onto the same physical host, so requesting smaller executors doesn’t mean your application will have fewer overall resources. In Spark’s Standalone mode, you need to launch multiple workers (determined using SPARK_WORKER_INSTANCES) for a single application to run more than one executor on a host. This limitation will likely be removed in a later version of Spark. In addition to using smaller executors, storing data in serialized form can also make garbage collection more efficient.

Data Partitioning

In a distributed program, communication is very expensive, so laying out data to minimize network traffic can greatly improve performance.Spark programs can choose to control their RDDs’ partitioning to reduce communication. Partitioning will not be helpful in all applications—for example, if a given RDD is scanned only once, there is no point in partitioning it in advance. It is useful only when a dataset is reused multiple times in key-oriented operations such as joins.

Spark’s partitioning is available on all RDDs of key/value pairs, and causes the system to group elements based on a function of each key. Although Spark does not give explicit control of which worker node each key goes to (partly because the system is designed to work even if specific nodes fail), it lets the program ensure that a set of keys will appear together on some node.

For example, you might choose to hashpartition an RDD into 10 partitions so that keys that have the same hash value modulo 10 appear on the same node. Or you might range-partition the RDD into sorted ranges of keys so that elements with keys in the same range appear on the same node.

Consider an application that keeps a large table of movie information in memory—say, an RDD of (MovieName, MovieInfo) pairs, where movieInfo contains a information about the movie. The application periodically combines this table with a smaller file representing latest rating info to check the movies which are trending currently that happened in the past one hour, a table of (MovieName, Rating) pairs. We may wish to display information about the trending movies in the past one hour. We can perform this combination with Spark’s join() operation as below


object Test extends App {

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data")

val moviesRdd = rdd.map { x =>

var data = x.split(",")

new Tuple2(data(0), new Movie(data(1), data(2).toDouble, data(3)))

}

val test = new Test2();
test.analyseData("Input Location", "Output Location", moviesRdd)

}


import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.MapPartitionsRDD

class Test2 {

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

def analyseData(fileLocationInput: String, fileLocationOutput: String, rdd: RDD[Tuple2[String, Movie]]) = {

val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd2 = sc.textFile(fileLocationInput)

val ratingRdd = rdd2.map { x =>

var data = x.split(",")

new Tuple2(data(1), data(2))

}

rdd.join(ratingRdd).reduceByKey((x,y)=>{

//Logic to determine the trending movies

x
}).saveAsTextFile(fileLocationOutput)

}

}

This code will run fine as is, but it will be inefficient. This is because the join() operation is invoked each time analyseData() function is called and it does not know anything about how the keys are partitioned in the datasets. By default, this operation will hash all the keys of both datasets, sending elements with the same key hash across the network to the same machine, and then join together the elements with the same key on that machine. Because we expect the movies table to be much larger than the small ratings data per hour seen every one hour which wastes a lot of work for the movies data as it gets hashed each time and shuffled across the network on every call, even though it doesn’t change every hour.

To make the process efficient just use the partitionBy() transformation on movies data to hash-partition it at the start of the program. We do this by passing a spark.HashPartitioner object to partitionBy as below


val moviesRdd = rdd.map { x =>

var data = x.split(",")

new Tuple2(data(0), new Movie(data(1), data(2).toDouble, data(3)))

}.partitionBy(new HashPartitioner(50)).persist()

The analyseData() method can remain unchanged the ratingRdd is local to analyseData(), and is used only once within this method, so there is no advantage in specifying a partitioner for ratingRdd. Because we called partitionBy() when building movies rdd, Spark will now know that it is hash-partitioned, and calls to join() on it will take advantage of this information. In particular, when we call moviesRdd.join(ratingRdd), Spark will shuffle only the ratings RDD, sending ratings with each particular movie id to the machine that contains the corresponding hash partition of movies data. The result is that a lot less data is communicated over the network, and the program runs significantly faster.

RDDs can never be modified once created. Therefore it is important to persist and save the result of partitionBy(), not the original rdd. Also, the 50 passed to partitionBy() represents the number of partitions, which will control how many parallel tasks perform further operations on the RDD in general, make this at least as large as the number of cores in your cluster.

Failure to persist an RDD after it has been transformed with partitionBy() will cause subsequent uses of the RDD to repeat the partitioning of the data. Without persistence, use of the partitioned RDD will cause reevaluation of the RDDs complete lineage. That would negate the advantage of partitionBy(), resulting in repeated partitioning and shuffling of data across the network, similar to what occurs without any specified partitioner.

In Scala and Java, you can determine how an RDD is partitioned using its partitioner property. This returns a scala.Option object, which is a Scala class for a container that may or may not contain one item. You can call isDefined() on the Option to check whether it has a value, and get() to get this value. If present, the value will be a spark.Partitioner object. This is essentially a function telling the RDD which partition each key goes into.

Many of Spark’s operations involve shuffling data by key across the network. All of these will benefit from partitioning. Some operations that benefit from partitioning are cogroup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), and lookup().

For operations that act on a single RDD, such as reduceByKey(), running on a prepartitioned RDD will cause all the values for each key to be computed locally on a single machine, requiring only the final, locally reduced value to be sent from each worker node back to the master. For binary operations, such as cogroup() and join(), pre-partitioning will cause at least one of the RDDs (the one with the known partitioner) to not be shuffled. If both RDDs have the same partitioner, and if they are cached on the same machines (e.g., one was created using mapValues() on the other, which preserves keys and partitioning) or if one of them has not yet been computed, then no shuffling across the network will occur.

Spark knows internally how each of its operations affects partitioning, and automatically sets the partitioner on RDDs created by operations that partition the data. For example, suppose you called join() to join two RDDs because the elements with the same key have been hashed to the same machine, Spark knows that the result is hash-partitioned, and operations like reduceByKey() on the join result are going to be significantly faster.

The flipside, however, is that for transformations that cannot be guaranteed to produce a known partitioning, the output RDD will not have a partitioner set. For example, if you call map() on a hash-partitioned RDD of key/value pairs, the function passed to map() can in theory change the key of each element, so the result will not have a partitioner. Spark does not analyze your functions to check whether they retain the key. Instead, it provides two other operations, mapValues() and flatMap Values(), which guarantee that each tuple’s key remains the same.

For binary operations, which partitioner is set on the output depends on the parent RDDs’ partitioners. By default, it is a hash partitioner, with the number of partitions set to the level of parallelism of the operation. However, if one of the parents has a partitioner set, it will be that partitioner; and if both parents have a partitioner set, it will be the partitioner of the first parent.

All that said, here are all the operations that result in a partitioner being set on the output RDD: cogroup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort(), mapValues() (if the parent RDD has a partitioner), flatMapValues() (if parent has a partitioner), and filter() (if parent has a partitioner). All other operations will produce a result with no partitioner.