spark accumulator and broadcast example in java and scala – tutorial 10

When we normally pass functions to Spark, such as a map() function or a condition for filter(), they can use variables defined outside them in the driver program, but each task running on the cluster gets a new copy of each variable, and updates from these copies are not propagated back to the driver. Accumulators provides a simple syntax for aggregating values from worker nodes back to the driver program. One of the most common uses of accumulators is to count events that occur during job execution for debugging purposes for example we can count the number of valid and invalid records in the dataset.

Lets see a example for using accumulators

In Java


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class Accumulator {

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2");

org.apache.spark.Accumulator<Integer> acc=jsc.accumulator(0,"Count");

for (String string : rdd.collect()) {

System.out.println(string);
acc.add(1);

};

System.out.println(acc.value());

}

}

In Scala


import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2

object Accumulator extends App {

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2")

var count = 0;

val accuCount=sc.accumulator(0,"total count")

rdd.collect().foreach { x =>

accuCount += 1
println(x)

}

println("total count "+accuCount.value)

Of course, it is possible to aggregate values from an entire RDD back to the driver program using actions like reduce(), but sometimes we need a simple way to aggregate values that, in the process of transforming an RDD, are generated at different scale or granularity than that of the RDD itself. For example, accumulators will let us count number of valid and invalid records during normal transformation, without doing a separate filter() or reduce().

Note that tasks on worker nodes cannot access the accumulator’s value()—from the point of view of these tasks, accumulators are write-only variables. This allows accumulators to be implemented efficiently, without having to communicate every update.

Spark automatically deals with failed or slow machines by re-executing failed or slow tasks. For example, if the node running a partition of a map() operation crashes, Spark will rerun it on another node; and even if the node does not crash but is simply much slower than other nodes, Spark can preemptively launch a speculative copy of the task on another node, and take its result if that finishes. Even if no nodes fail, Spark may have to rerun a task to rebuild a cached value that falls out of memory. The net result is therefore that the same function may run multiple times on the same data depending on what happens on the cluster.

The end result is that for accumulators used in actions, Spark applies each task’s update to each accumulator only once. Thus, if we want a reliable absolute value counter, regardless of failures or multiple evaluations, we must put it inside an action like foreach().

For accumulators used in RDD transformations instead of actions, this guarantee does not exist. An accumulator update within a transformation can occur more than once. One such case of a probably unintended multiple update occurs when a cached but infrequently used RDD is first evicted from the LRU cache and is then subsequently needed. This forces the RDD to be recalculated from its lineage, with the unintended side effect that calls to update an accumulator within the transformations in that lineage are sent again to the driver. Within transformations, accumulators should, consequently, be used only for debugging purposes.

Spark supports accumulators of type Double, Long, and Float. In addition to these, Spark also includes an API to define custom accumulator types and custom aggregation operations. Custom accumulators need to extend AccumulatorParam.

Broadcast variables allows the program to efficiently send a large, read-only value to all the worker nodes for use in one or more Spark operations. They come in handy, for example, if your application needs to send a large, read-only lookup table to all the nodes.

Spark automatically sends all variables referenced in your closures to the worker nodes. While this is convenient, it can also be inefficient because

(1)  the default task launching mechanism is optimized for small task sizes

(2)  you might, in fact, use the same variable in multiple parallel operations, but Spark will send it separately for each operation.

Lets take an example here we are loading the lookUpData and passing into a function were we are processing the rdd.


import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.MapPartitionsRDD

object Broadcaster extends App {

def loadLookUpTable(): Map[String, String] = {

//Logic to load the lookup table

return Map(("One", "2323"));

}

val lookUpData: Map[String, String] = loadLookUpTable()

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2")

loadMovieData(lookUpData, rdd)

def loadMovieData(map: Map[String, String], rdd: RDD[String]) = {

rdd.map { x =>
{

new Tuple2(x, 1)

}
}.reduceByKey((x, y) => {

// Some complex logic here where we will using the map for lookup

})

}

}

This program would run, but if we had a large data of several megabytes in size, making it expensive to send that map from the master alongside each task. In addition, if we used the same lookUpData object later it would be sent again to each node.

We can fix this by making lookUpData a broadcast variable. A broadcast variable is simply an object of type spark.broadcast.Broadcast[T], which wraps a value of type T. We can access this value by calling value on the Broadcast object in our tasks. The value is sent to each node only once, using an efficient, BitTorrent-like communication mechanism.

Using the Using broadcast variables, our previous example looks like this and the data from the broadcast variable can be accessed using the value property in scala and value() method in Java.


import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.MapPartitionsRDD
import org.apache.spark.broadcast.Broadcast

object Broadcaster extends App {

def loadLookUpTable(): Map[String, String] = {

//Logic to load the lookup table

return Map(("One", "2323"));

}

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2")

val lookUpData: Broadcast[Map[String, String]] = sc.broadcast(loadLookUpTable())

loadMovieData(lookUpData, rdd)

def loadMovieData(map: Broadcast[Map[String, String]], rdd: RDD[String]) = {

rdd.map { x =>
{

new Tuple2(x, 1)

}
}.reduceByKey((x, y) => {

// Some complex logic here where we will using the map for lookup

map.value.get("test_key")

})

}

}

In java it will be as below


final Broadcast<Map<String,String>> lookUpData = sc.broadcast(loadLookUpTable());

lookUpData.value() - will return the map

The variable will be sent to each node only once, and should be treated as readonly and updates will not be propagated to other nodes.The easiest way to satisfy the read-only requirement is to broadcast a primitive value or a reference to an immutable object. In such cases, you won’t be able to change the value of the broadcast variable except within the driver code. However, sometimes it can be more convenient or more efficient to broadcast a mutable object. If you do that, it is up to you to maintain the read-only condition.

When we are broadcasting large values, it is important to choose a data serialization format that is both fast and compact, because the time to send the value over the network can quickly become a bottleneck if it takes a long time to either serialize a value or to send the serialized value over the network. In particular, Java Serialization, the default serialization library used in Spark’s Scala and Java APIs, can be very inefficient out of the box for anything except arrays of primitive types. You can optimize serialization by selecting a different serialization library using the spark.serializer
property.

1 thought on “spark accumulator and broadcast example in java and scala – tutorial 10”

Comments are closed.