stateful transformation spark streaming example

Stateful transformations are operations on DStreams that track data across time that is, some data from previous batches is used to generate the results for a new batch. The two main types are windowed operations, which act over a sliding window of
time periods, and updateStateByKey(), which is used to track state across events for each key.

Stateful transformations require checkpointing to be enabled in your StreamingContext for fault tolerance. And we enable the same as below


JavaStreamingContext jstream = new JavaStreamingContext(jsc, new Duration(50000));

jstream.checkpoint("hdfs://nameservice//user/checkpoint/");

Windowed transformations

Windowed operations compute results across a longer time period than the StreamingContext’s batch interval, by combining results from multiple batches.

All windowed operations need two parameters, window duration and sliding duration, both of which must be a multiple of the StreamingContext’s batch interval. The window duration controls how many previous batches of data are considered. If we had a source DStream with a batch interval of 10 seconds and wanted to create a sliding window of the last 30 seconds (or last 3 batches) we would set the windowDuration to 30 seconds. The sliding duration,which defaults to the batch interval, controls how frequently the new DStream computes results. If we had the source DStream with a batch interval of 10 seconds and wanted to compute our window only on every second batch, we would set our sliding interval to 20 seconds.

The simplest window operation we can do on a DStream is window(), which returns a new DStream with the data for the requested window. In other words, each RDD in the DStream resulting from window() will contain data from multiple batches, which we can then process with count(), transform(), and so on.

In the below code we have set the batch interval as 5 seconds and the window interval as 10 seconds and the sliding window as 5 seconds . So here the result will get calculated every 5 second as the sliding window is configured as 5 second but it takes sample from two previous batches as the window duration is 10 seconds.

Below is an example of the same


import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import com.bt.ims.util.ConfigFile;
import com.bt.ims.util.Constants;
import com.bt.ims.util.FileType;
import kafka.serializer.StringDecoder;

public class MovieDataConsumer {

public static void main(String[] args) throws InterruptedException {

ConfigFile conf = new ConfigFile(Constants.CONFIG, FileType.property);
String target = "C:\\codebase\\scala-project\\outputdatastream\\movies";
String broker = "localhost:9092";
SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaStreamingContext jstream = new JavaStreamingContext(jsc, new Duration(5000));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", broker);
Set<String> topic_movie = Collections.singleton("movie");
JavaPairInputDStream<String, Average> directKafkaStreamMedia = KafkaUtils.createDirectStream(jstream,
String.class, Average.class, StringDecoder.class, AverageEncoder.class, kafkaParams, topic_movie);

JavaPairDStream<String, Average> windowed = directKafkaStreamMedia.window(new Duration(10000),
new Duration(5000));

JavaPairDStream<String, Average> windowedReduced = windowed
.reduceByKey(new Function2<Average, Average, Average>() {

@Override
public Average call(Average arg0, Average arg1) throws Exception {
arg0.setSum(arg0.getSum() + arg1.getValue());
arg0.setCount(arg0.getCount() + arg1.getCount());

return arg0;
}

});

windowedReduced.foreachRDD(new VoidFunction<JavaPairRDD<String, Average>>() {

@Override
public void call(JavaPairRDD<String, Average> arg0) throws Exception {
// TODO Auto-generated method stub

arg0.saveAsTextFile(target + java.io.File.separator + System.currentTimeMillis());

}

});

jstream.start();
jstream.awaitTermination();

}

}

While we can build all other windowed operations on top of window(), Spark Streaming provides a number of other windowed operations for efficiency and convenience. First, reduceByWindow() and reduceByKeyAndWindow() allow us to perform reductions on each window more efficiently.Below are the example for using the same


import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import com.bt.ims.util.ConfigFile;
import com.bt.ims.util.Constants;
import com.bt.ims.util.FileType;

import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class MovieDataConsumer {

public static void main(String[] args) throws InterruptedException {

ConfigFile conf = new ConfigFile(Constants.CONFIG, FileType.property);
String target = "C:\\codebase\\scala-project\\outputdatastream\\movies";
String broker = "localhost:9092";
SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaStreamingContext jstream = new JavaStreamingContext(jsc, new Duration(5000));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", broker);
Set<String> topic_movie = Collections.singleton("movie");
JavaPairInputDStream<String, Average> directKafkaStreamMedia = KafkaUtils.createDirectStream(jstream,
String.class, Average.class, StringDecoder.class, AverageEncoder.class, kafkaParams, topic_movie);

 

JavaPairDStream<String, Average> pair = directKafkaStreamMedia
.reduceByKeyAndWindow(new Function2<Average, Average, Average>() {

@Override
public Average call(Average arg0, Average arg1) throws Exception {
// TODO Auto-generated method stub

arg0.setSum(arg0.getSum() + arg1.getValue());
arg0.setCount(arg0.getCount() + arg1.getCount());

return arg0;

}
}, new Duration(10000), new Duration(5000));

pair.foreachRDD(new VoidFunction<JavaPairRDD<String, Average>>() {

@Override
public void call(JavaPairRDD<String, Average> arg0) throws Exception {
// TODO Auto-generated method stub

arg0.saveAsTextFile(target + java.io.File.separator + System.currentTimeMillis());

}
});

JavaDStream<Tuple2<String, Average>> reduceByKey = directKafkaStreamMedia.reduceByWindow(
new Function2<Tuple2<String, Average>, Tuple2<String, Average>, Tuple2<String, Average>>() {

@Override
public Tuple2<String, Average> call(Tuple2<String, Average> arg0, Tuple2<String, Average> arg1)
throws Exception {

arg0._2.setSum(arg0._2.getSum() + arg1._2.getValue());

arg0._2.setCount(arg0._2.getCount() + arg1._2.getCount());

return arg0;
}
}, new Duration(10000), new Duration(5000));

reduceByKey.foreachRDD(new VoidFunction<JavaRDD<Tuple2<String, Average>>>() {

@Override
public void call(JavaRDD<Tuple2<String, Average>> arg0) throws Exception {

arg0.saveAsTextFile(target + java.io.File.separator + System.currentTimeMillis());

}
});

JavaPairDStream<String, Average> windowed = directKafkaStreamMedia.window(new Duration(10000),
new Duration(5000));

JavaPairDStream<String, Average> windowedReduced = windowed
.reduceByKey(new Function2<Average, Average, Average>() {

@Override
public Average call(Average arg0, Average arg1) throws Exception {
arg0.setSum(arg0.getSum() + arg1.getValue());
arg0.setCount(arg0.getCount() + arg1.getCount());

return arg0;
}

});

windowedReduced.foreachRDD(new VoidFunction<JavaPairRDD<String, Average>>() {

@Override
public void call(JavaPairRDD<String, Average> arg0) throws Exception {

arg0.saveAsTextFile(target + java.io.File.separator + System.currentTimeMillis());

}

});

jstream.start();
jstream.awaitTermination();

}

}

For counting data, DStreams offer countByWindow() and countByValueAndWindow() as shorthands. countByWindow() gives us a DStream representing the number of elements in each window. countByValueAndWindow() gives us a DStream with the counts for each value.

Below is an example


import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import com.bt.ims.util.ConfigFile;
import com.bt.ims.util.Constants;
import com.bt.ims.util.FileType;
import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class MovieDataConsumer {

public static void main(String[] args) throws InterruptedException {

ConfigFile conf = new ConfigFile(Constants.CONFIG, FileType.property);
String target = "C:\\codebase\\scala-project\\outputdatastream\\movies";
String broker = "localhost:9092";
SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaStreamingContext jstream = new JavaStreamingContext(jsc, new Duration(5000));

jstream.checkpoint("C:\\codebase\\scala-project\\Checkdatastream\\movies");
// jstream.checkpoint("hdfs://nameservice1//user/checkpoint/");
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", broker);
Set<String> topic_movie = Collections.singleton("movie");
JavaPairInputDStream<String, Average> directKafkaStreamMedia = KafkaUtils.createDirectStream(jstream,
String.class, Average.class, StringDecoder.class, AverageEncoder.class, kafkaParams, topic_movie);

JavaDStream<Long> count=directKafkaStreamMedia.countByWindow(new Duration(10000), new Duration(5000));

JavaPairDStream<Tuple2<String, Average>, Long> countByValve=directKafkaStreamMedia.countByValueAndWindow(new Duration(10000), new Duration(5000));

// This will just print the total number of records

count.foreachRDD(new VoidFunction<JavaRDD<Long>>() {

@Override
public void call(JavaRDD<Long> arg0) throws Exception {
// TODO Auto-generated method stub
arg0.saveAsTextFile(target + java.io.File.separator + System.currentTimeMillis());

}
});

//The output will be in the format
//((Bat Man,Average [count=1, sum=4.0, value=4.0]),1)
//((Iron Man,Average [count=1, sum=4.0, value=4.0]),1)
//((Iron Man,Average [count=1, sum=2.0, value=2.0]),1)
//((Iron Man,Average [count=1, sum=4.0, value=4.0]),1)

countByValve.foreachRDD(new VoidFunction<JavaPairRDD<Tuple2<String,Average>,Long>>() {

@Override
public void call(JavaPairRDD<Tuple2<String, Average>, Long> arg0) throws Exception {

arg0.saveAsTextFile(target + java.io.File.separator + System.currentTimeMillis());

}
});

}

It is important to understand how the receivers are executed in the Spark cluster to use multiple ones. Each receiver runs as a long-running task within Spark’s executors, and hence occupies CPU cores allocated to the application. In addition, there need to be available cores for processing the data. This means that in order to run multiple receivers, you should have at least as many cores as the number of receivers, plus however many are needed to run your computation. For example, if we want to run 10 receivers in our streaming application, then we have to allocate at least 11 cores.

Do not run Spark Streaming programs locally with master configured as “local” or “local[1]”. This allocates only one CPU for tasks and if a receiver is running on it, there is no resource left to process the received data. Use at least “local[2]” to have more cores.