spark file streaming with sliding window to calculate the simple moving average using reduceByKeyAndWindow

Problem To Solve : Calculate the simple moving average closing price of stocks in a 5-minute sliding window for the last 10 minutes.

Input Data Format:


[{"timestamp": "2018-09-25 10:09:00", "symbol": "MSFT", "priceData": {"high": "114.6100", "close": "114.5400", "open": "114.5200", "low": "114.4900", "volume": "53615"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "ADBE", "priceData": {"high": "265.9100", "close": "265.9100", "open": "265.5820", "low": "265.5820", "volume": "8283"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "GOOGL", "priceData": {"high": "1183.1600", "close": "1183.1600", "open": "1183.1600", "low": "1183.1600", "volume": "2335"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "FB", "priceData": {"high": "162.2700", "close": "162.0700", "open": "161.8300", "low": "161.8000", "volume": "239757"}}]

Below is the code to calculate the simple moving average from the json data


import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.json.JSONArray;
import org.json.JSONObject;
import scala.Tuple2;

public class SimpleMovingAverage {

public static void main(String[] args) throws InterruptedException {

final String[] path = new String[]{
"/home/adarsh/Desktop/test/df",
"/home/adarsh/Desktop/test/dfout/out"};

SparkConf sparkConf = new SparkConf().setAppName("Test")
.setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

@SuppressWarnings("resource")
JavaStreamingContext jstream = new JavaStreamingContext(jsc,
new Duration(300000));
JavaDStream<String> dstream = jstream
.textFileStream(path[0]);

JavaPairDStream<String, AverageTuple> pair = dstream.flatMapToPair(
new PairFlatMapFunction<String, String, AverageTuple>() {

/**
*
*/
private static final long serialVersionUID = 67676744;

public Iterator<Tuple2<String, AverageTuple>> call(String t)
throws Exception {

List<Tuple2<String, AverageTuple>> list = new ArrayList<Tuple2<String, AverageTuple>>();

JSONArray js1 = new JSONArray(t);

for (int i = 0; i < js1.length(); i++) {

String symbol = js1.getJSONObject(i).get("symbol")
.toString();

JSONObject jo = new JSONObject(js1.getJSONObject(i)
.get("priceData").toString());

list.add(new Tuple2<String, AverageTuple>(symbol,
new AverageTuple(1,
jo.getDouble("close"))));

}

return list.iterator();

}
});

JavaPairDStream<String, AverageTuple> result=pair.reduceByKeyAndWindow(
new Function2<AverageTuple, AverageTuple, AverageTuple>() {

/**
*
*/
private static final long serialVersionUID = 76761212;

public AverageTuple call(AverageTuple result, AverageTuple value)
throws Exception {
result.setAverage(
result.getAverage() + value.getAverage());
result.setCount(result.getCount() + value.getCount());
return result;
}
}, new Duration(600000), new Duration(300000));

result.foreachRDD(new VoidFunction<JavaPairRDD<String,AverageTuple>>() {

/**
*
*/
private static final long serialVersionUID = 6767679;

public void call(JavaPairRDD<String, AverageTuple> t)
throws Exception {

t.coalesce(1).saveAsTextFile(path[1]+java.io.File.separator + System.currentTimeMillis());

}



});

jstream.start();
jstream.awaitTermination();
}

}

Below is the averageTuple class used in the above code


import java.io.Serializable;

public class AverageTuple implements Serializable {

/**
*
*/
private static final long serialVersionUID = 332323;

private int count;

private double average;

public AverageTuple(int count, double average) {
super();
this.count = count;
this.average = average;
}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}

public double getAverage() {
return average;
}

public void setAverage(double average) {
this.average = average;
}

@Override
public String toString() {
return String.valueOf(average/count);
}

}

1 thought on “spark file streaming with sliding window to calculate the simple moving average using reduceByKeyAndWindow”

Comments are closed.