Problem To Solve : Calculate the simple moving average closing price of stocks in a 5-minute sliding window for the last 10 minutes.
Input Data Format:
[{"timestamp": "2018-09-25 10:09:00", "symbol": "MSFT", "priceData": {"high": "114.6100", "close": "114.5400", "open": "114.5200", "low": "114.4900", "volume": "53615"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "ADBE", "priceData": {"high": "265.9100", "close": "265.9100", "open": "265.5820", "low": "265.5820", "volume": "8283"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "GOOGL", "priceData": {"high": "1183.1600", "close": "1183.1600", "open": "1183.1600", "low": "1183.1600", "volume": "2335"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "FB", "priceData": {"high": "162.2700", "close": "162.0700", "open": "161.8300", "low": "161.8000", "volume": "239757"}}]
Below is the code to calculate the simple moving average from the json data
import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.json.JSONArray; import org.json.JSONObject; import scala.Tuple2; public class SimpleMovingAverage { public static void main(String[] args) throws InterruptedException { final String[] path = new String[]{ "/home/adarsh/Desktop/test/df", "/home/adarsh/Desktop/test/dfout/out"}; SparkConf sparkConf = new SparkConf().setAppName("Test") .setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); @SuppressWarnings("resource") JavaStreamingContext jstream = new JavaStreamingContext(jsc, new Duration(300000)); JavaDStream<String> dstream = jstream .textFileStream(path[0]); JavaPairDStream<String, AverageTuple> pair = dstream.flatMapToPair( new PairFlatMapFunction<String, String, AverageTuple>() { /** * */ private static final long serialVersionUID = 67676744; public Iterator<Tuple2<String, AverageTuple>> call(String t) throws Exception { List<Tuple2<String, AverageTuple>> list = new ArrayList<Tuple2<String, AverageTuple>>(); JSONArray js1 = new JSONArray(t); for (int i = 0; i < js1.length(); i++) { String symbol = js1.getJSONObject(i).get("symbol") .toString(); JSONObject jo = new JSONObject(js1.getJSONObject(i) .get("priceData").toString()); list.add(new Tuple2<String, AverageTuple>(symbol, new AverageTuple(1, jo.getDouble("close")))); } return list.iterator(); } }); JavaPairDStream<String, AverageTuple> result=pair.reduceByKeyAndWindow( new Function2<AverageTuple, AverageTuple, AverageTuple>() { /** * */ private static final long serialVersionUID = 76761212; public AverageTuple call(AverageTuple result, AverageTuple value) throws Exception { result.setAverage( result.getAverage() + value.getAverage()); result.setCount(result.getCount() + value.getCount()); return result; } }, new Duration(600000), new Duration(300000)); result.foreachRDD(new VoidFunction<JavaPairRDD<String,AverageTuple>>() { /** * */ private static final long serialVersionUID = 6767679; public void call(JavaPairRDD<String, AverageTuple> t) throws Exception { t.coalesce(1).saveAsTextFile(path[1]+java.io.File.separator + System.currentTimeMillis()); } }); jstream.start(); jstream.awaitTermination(); } }
Below is the averageTuple class used in the above code
import java.io.Serializable; public class AverageTuple implements Serializable { /** * */ private static final long serialVersionUID = 332323; private int count; private double average; public AverageTuple(int count, double average) { super(); this.count = count; this.average = average; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public double getAverage() { return average; } public void setAverage(double average) { this.average = average; } @Override public String toString() { return String.valueOf(average/count); } }
how to solve this problem