Problem To Solve : Calculate the maximum profit (average closing price – average opening price) in a 5-minute sliding window for the last 10 minutes
Input Data Format:
[{"timestamp": "2018-09-25 10:09:00", "symbol": "MSFT", "priceData": {"high": "114.6100", "close": "114.5400", "open": "114.5200", "low": "114.4900", "volume": "53615"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "ADBE", "priceData": {"high": "265.9100", "close": "265.9100", "open": "265.5820", "low": "265.5820", "volume": "8283"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "GOOGL", "priceData": {"high": "1183.1600", "close": "1183.1600", "open": "1183.1600", "low": "1183.1600", "volume": "2335"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "FB", "priceData": {"high": "162.2700", "close": "162.0700", "open": "161.8300", "low": "161.8000", "volume": "239757"}}]
Below is the code to calculate the maximum profit from the json data
import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.json.JSONArray; import org.json.JSONObject; import scala.Tuple2; public class MaximumProfit { public static void main(String[] args) throws InterruptedException { final String[] path = new String[]{ "/home/adarsh/Desktop/test/input", "/home/adarsh/Desktop/test/dfout/out"}; SparkConf sparkConf = new SparkConf().setAppName("Test") .setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); @SuppressWarnings("resource") JavaStreamingContext jstream = new JavaStreamingContext(jsc, new Duration(300000)); JavaDStream<String> dstream = jstream.textFileStream(path[0]); JavaPairDStream<String, MaximumTuple> pair = dstream.flatMapToPair( new PairFlatMapFunction<String, String, MaximumTuple>() { /** * */ private static final long serialVersionUID = 67676744; public Iterator<Tuple2<String, MaximumTuple>> call(String t) throws Exception { List<Tuple2<String, MaximumTuple>> list = new ArrayList<Tuple2<String, MaximumTuple>>(); JSONArray js1 = new JSONArray(t); for (int i = 0; i < js1.length(); i++) { String symbol = js1.getJSONObject(i).get("symbol") .toString(); JSONObject jo = new JSONObject(js1.getJSONObject(i) .get("priceData").toString()); list.add(new Tuple2<String, MaximumTuple>(symbol, new MaximumTuple(1, jo.getDouble("close"),jo.getDouble("open")))); } return list.iterator(); } }); JavaPairDStream<String, MaximumTuple> result=pair.reduceByKeyAndWindow( new Function2<MaximumTuple, MaximumTuple, MaximumTuple>() { /** * */ private static final long serialVersionUID = 76761212; public MaximumTuple call(MaximumTuple result, MaximumTuple value) throws Exception { result.setClosingPrice( result.getClosingPrice() + value.getClosingPrice()); result.setOpeningPrice(result.getOpeningPrice()+value.getOpeningPrice()); result.setCount(result.getCount() + value.getCount()); return result; } }, new Duration(600000), new Duration(300000)); result.foreachRDD(new VoidFunction<JavaPairRDD<String,MaximumTuple>>() { /** * */ private static final long serialVersionUID = 6767679; public void call(JavaPairRDD<String, MaximumTuple> t) throws Exception { t.coalesce(1).saveAsTextFile(path[1]+java.io.File.separator + System.currentTimeMillis()); } }); jstream.start(); jstream.awaitTermination(); } }
Below is the MaximumTuple class used in the above code
package com.freelance.project; import java.io.Serializable; public class MaximumTuple implements Serializable { /** * */ private static final long serialVersionUID = 44545; private int count; private double closingPrice; private double openingPrice; public MaximumTuple(int count, double closingPrice, double openingPrice) { super(); this.count = count; this.closingPrice = closingPrice; this.openingPrice = openingPrice; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public double getClosingPrice() { return closingPrice; } public void setClosingPrice(double closingPrice) { this.closingPrice = closingPrice; } public double getOpeningPrice() { return openingPrice; } public void setOpeningPrice(double openingPrice) { this.openingPrice = openingPrice; } @Override public String toString() { return String.valueOf(((closingPrice/count)-(openingPrice/ count))); } }
Please help me to sort the output based on maximum profit. what changes do I need to make so that the stock with maximum profit is shown at the top?