spark textFileStream to find Relative Strength Index or RSI of stocks with sliding window and reduceByKeyAndWindow example

The Relative Strength Index is a momentum indicator that measures the magnitude of recent price changes to analyze overbought or oversold conditions. It is primarily used to attempt to identify overbought or oversold conditions in the trading of an asset.

Problem To Solve : Calculate the Relative Strength Index or RSI of the four stocks in a 5-minute sliding window for the last 10 minutes. RSI is considered overbought when above 70 and oversold when below 30.

Input Data Format:


[{"timestamp": "2018-09-25 10:09:00", "symbol": "MSFT", "priceData": {"high": "114.6100", "close": "114.5400", "open": "114.5200", "low": "114.4900", "volume": "53615"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "ADBE", "priceData": {"high": "265.9100", "close": "265.9100", "open": "265.5820", "low": "265.5820", "volume": "8283"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "GOOGL", "priceData": {"high": "1183.1600", "close": "1183.1600", "open": "1183.1600", "low": "1183.1600", "volume": "2335"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "FB", "priceData": {"high": "162.2700", "close": "162.0700", "open": "161.8300", "low": "161.8000", "volume": "239757"}}]

Below is the code to calculate the maximum profit from the json data


import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.json.JSONArray;
import org.json.JSONObject;
import scala.Tuple2;

public class RelativeStrengthIndex {

public static void main(String[] args) throws InterruptedException {

final String[] path = new String[]{
"/home/adarsh/Desktop/test/input",
"/home/adarsh/Desktop/test/dfout/out"};

SparkConf sparkConf = new SparkConf().setAppName("Test")
.setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

@SuppressWarnings("resource")
JavaStreamingContext jstream = new JavaStreamingContext(jsc,
new Duration(300000));
JavaDStream<String> dstream = jstream
.textFileStream(path[0]);

JavaPairDStream<String, RelativeTuple> pair = dstream.flatMapToPair(
new PairFlatMapFunction<String, String, RelativeTuple>() {

/**
*
*/
private static final long serialVersionUID = 67676744;

public Iterator<Tuple2<String, RelativeTuple>> call(String t)
throws Exception {

List<Tuple2<String, RelativeTuple>> list = new ArrayList<Tuple2<String, RelativeTuple>>();

JSONArray js1 = new JSONArray(t);

for (int i = 0; i < js1.length(); i++) {

String symbol = js1.getJSONObject(i).get("symbol")
.toString();

JSONObject jo = new JSONObject(js1.getJSONObject(i)
.get("priceData").toString());

Double closingPrice=jo.getDouble("close");
Double openingPrice=jo.getDouble("open");
RelativeTuple relativeTuple=null;

if(closingPrice>openingPrice)
{
relativeTuple=new RelativeTuple(1, 0, closingPrice, 0);
}
else
{
relativeTuple=new RelativeTuple(0, 1, openingPrice, 0);
}

list.add(new Tuple2<String, RelativeTuple>(symbol,relativeTuple));

}

return list.iterator();

}
});

JavaPairDStream<String, RelativeTuple> result=pair.reduceByKeyAndWindow(
new Function2<RelativeTuple, RelativeTuple, RelativeTuple>() {

/**
*
*/
private static final long serialVersionUID = 76761212;

public RelativeTuple call(RelativeTuple result, RelativeTuple value)
throws Exception {
result.setUpward(result.getUpward() + value.getUpward());
result.setUpwardCount(result.getUpwardCount()+value.getUpwardCount());
result.setDownward(result.getDownward() + value.getDownward());
result.setDownwardCount(result.getDownwardCount()+value.getDownwardCount());


return result;
}
}, new Duration(600000), new Duration(300000));

result.foreachRDD(new VoidFunction<JavaPairRDD<String,RelativeTuple>>() {

/**
*
*/
private static final long serialVersionUID = 6767679;

public void call(JavaPairRDD<String, RelativeTuple> t)
throws Exception {

t.coalesce(1).saveAsTextFile(path[1]+java.io.File.separator + System.currentTimeMillis());

}



});

jstream.start();
jstream.awaitTermination();
}

}

Below is the RelativeTuple class used in the above code


import java.io.Serializable;

public class RelativeTuple implements Serializable {

/**
*
*/
private static final long serialVersionUID = 54545454;

private int upwardCount;

private int downwardCount;

private double upward;

private double downward;

public RelativeTuple(int upwardCount, int downwardCount, double upward,
double downward) {
super();
this.upwardCount = upwardCount;
this.downwardCount = downwardCount;
this.upward = upward;
this.downward = downward;
}

public int getUpwardCount() {
return upwardCount;
}

public void setUpwardCount(int upwardCount) {
this.upwardCount = upwardCount;
}

public int getDownwardCount() {
return downwardCount;
}

public void setDownwardCount(int downwardCount) {
this.downwardCount = downwardCount;
}

public double getUpward() {
return upward;
}

public void setUpward(double upward) {
this.upward = upward;
}

public double getDownward() {
return downward;
}

public void setDownward(double downward) {
this.downward = downward;
}

@Override
public String toString() {

Double averageGain = 0d;
Double averageLoss = 0d;
Double rs = 0d;

if (upward > 0) {
averageGain = upward / upwardCount;
}

if (downward > 0) {
averageLoss = downward / downwardCount;
}

if (averageGain > 0 && averageLoss > 0) {
rs = averageGain / averageLoss;
} else if (averageGain == 0) {
rs = 0d;
} else if (averageLoss == 0) {
rs = 100d;
}

return String.valueOf(100 - 100 / (1 + rs));
}

}