Problem To Solve : Calculate the simple moving average closing price of stocks in a 5-minute sliding window for the last 10 minutes.
Input Data Format:
[{"timestamp": "2018-09-25 10:09:00", "symbol": "MSFT", "priceData": {"high": "114.6100", "close": "114.5400", "open": "114.5200", "low": "114.4900", "volume": "53615"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "ADBE", "priceData": {"high": "265.9100", "close": "265.9100", "open": "265.5820", "low": "265.5820", "volume": "8283"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "GOOGL", "priceData": {"high": "1183.1600", "close": "1183.1600", "open": "1183.1600", "low": "1183.1600", "volume": "2335"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "FB", "priceData": {"high": "162.2700", "close": "162.0700", "open": "161.8300", "low": "161.8000", "volume": "239757"}}]
Below is the code to calculate the simple moving average from the json data
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.json.JSONArray;
import org.json.JSONObject;
import scala.Tuple2;
public class SimpleMovingAverage {
public static void main(String[] args) throws InterruptedException {
final String[] path = new String[]{
"/home/adarsh/Desktop/test/df",
"/home/adarsh/Desktop/test/dfout/out"};
SparkConf sparkConf = new SparkConf().setAppName("Test")
.setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
@SuppressWarnings("resource")
JavaStreamingContext jstream = new JavaStreamingContext(jsc,
new Duration(300000));
JavaDStream<String> dstream = jstream
.textFileStream(path[0]);
JavaPairDStream<String, AverageTuple> pair = dstream.flatMapToPair(
new PairFlatMapFunction<String, String, AverageTuple>() {
/**
*
*/
private static final long serialVersionUID = 67676744;
public Iterator<Tuple2<String, AverageTuple>> call(String t)
throws Exception {
List<Tuple2<String, AverageTuple>> list = new ArrayList<Tuple2<String, AverageTuple>>();
JSONArray js1 = new JSONArray(t);
for (int i = 0; i < js1.length(); i++) {
String symbol = js1.getJSONObject(i).get("symbol")
.toString();
JSONObject jo = new JSONObject(js1.getJSONObject(i)
.get("priceData").toString());
list.add(new Tuple2<String, AverageTuple>(symbol,
new AverageTuple(1,
jo.getDouble("close"))));
}
return list.iterator();
}
});
JavaPairDStream<String, AverageTuple> result=pair.reduceByKeyAndWindow(
new Function2<AverageTuple, AverageTuple, AverageTuple>() {
/**
*
*/
private static final long serialVersionUID = 76761212;
public AverageTuple call(AverageTuple result, AverageTuple value)
throws Exception {
result.setAverage(
result.getAverage() + value.getAverage());
result.setCount(result.getCount() + value.getCount());
return result;
}
}, new Duration(600000), new Duration(300000));
result.foreachRDD(new VoidFunction<JavaPairRDD<String,AverageTuple>>() {
/**
*
*/
private static final long serialVersionUID = 6767679;
public void call(JavaPairRDD<String, AverageTuple> t)
throws Exception {
t.coalesce(1).saveAsTextFile(path[1]+java.io.File.separator + System.currentTimeMillis());
}
});
jstream.start();
jstream.awaitTermination();
}
}
Below is the averageTuple class used in the above code
import java.io.Serializable;
public class AverageTuple implements Serializable {
/**
*
*/
private static final long serialVersionUID = 332323;
private int count;
private double average;
public AverageTuple(int count, double average) {
super();
this.count = count;
this.average = average;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public double getAverage() {
return average;
}
public void setAverage(double average) {
this.average = average;
}
@Override
public String toString() {
return String.valueOf(average/count);
}
}
how to solve this problem