spark textFileStream with sliding window and reduceByKeyAndWindow example

Problem To Solve : Calculate the maximum profit (average closing price – average opening price) in a 5-minute sliding window for the last 10 minutes

Input Data Format:


[{"timestamp": "2018-09-25 10:09:00", "symbol": "MSFT", "priceData": {"high": "114.6100", "close": "114.5400", "open": "114.5200", "low": "114.4900", "volume": "53615"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "ADBE", "priceData": {"high": "265.9100", "close": "265.9100", "open": "265.5820", "low": "265.5820", "volume": "8283"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "GOOGL", "priceData": {"high": "1183.1600", "close": "1183.1600", "open": "1183.1600", "low": "1183.1600", "volume": "2335"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "FB", "priceData": {"high": "162.2700", "close": "162.0700", "open": "161.8300", "low": "161.8000", "volume": "239757"}}]

Below is the code to calculate the maximum profit from the json data


import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.json.JSONArray;
import org.json.JSONObject;
import scala.Tuple2;

public class MaximumProfit {

public static void main(String[] args) throws InterruptedException {

final String[] path = new String[]{
"/home/adarsh/Desktop/test/input",
"/home/adarsh/Desktop/test/dfout/out"};

SparkConf sparkConf = new SparkConf().setAppName("Test")
.setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

@SuppressWarnings("resource")
JavaStreamingContext jstream = new JavaStreamingContext(jsc,
new Duration(300000));
JavaDStream<String> dstream = jstream.textFileStream(path[0]);

JavaPairDStream<String, MaximumTuple> pair = dstream.flatMapToPair(
new PairFlatMapFunction<String, String, MaximumTuple>() {

/**
*
*/
private static final long serialVersionUID = 67676744;

public Iterator<Tuple2<String, MaximumTuple>> call(String t)
throws Exception {

List<Tuple2<String, MaximumTuple>> list = new ArrayList<Tuple2<String, MaximumTuple>>();

JSONArray js1 = new JSONArray(t);

for (int i = 0; i < js1.length(); i++) {

String symbol = js1.getJSONObject(i).get("symbol")
.toString();

JSONObject jo = new JSONObject(js1.getJSONObject(i)
.get("priceData").toString());

list.add(new Tuple2<String, MaximumTuple>(symbol,
new MaximumTuple(1,
jo.getDouble("close"),jo.getDouble("open"))));

}

return list.iterator();

}
});

JavaPairDStream<String, MaximumTuple> result=pair.reduceByKeyAndWindow(
new Function2<MaximumTuple, MaximumTuple, MaximumTuple>() {

/**
*
*/
private static final long serialVersionUID = 76761212;

public MaximumTuple call(MaximumTuple result, MaximumTuple value)
throws Exception {
result.setClosingPrice(
result.getClosingPrice() + value.getClosingPrice());
result.setOpeningPrice(result.getOpeningPrice()+value.getOpeningPrice());
result.setCount(result.getCount() + value.getCount());
return result;
}
}, new Duration(600000), new Duration(300000));

result.foreachRDD(new VoidFunction<JavaPairRDD<String,MaximumTuple>>() {

/**
*
*/
private static final long serialVersionUID = 6767679;

public void call(JavaPairRDD<String, MaximumTuple> t)
throws Exception {

t.coalesce(1).saveAsTextFile(path[1]+java.io.File.separator + System.currentTimeMillis());

}



});

jstream.start();
jstream.awaitTermination();
}

}

Below is the MaximumTuple class used in the above code


package com.freelance.project;

import java.io.Serializable;

public class MaximumTuple implements Serializable {

/**
*
*/
private static final long serialVersionUID = 44545;

private int count;

private double closingPrice;

private double openingPrice;

public MaximumTuple(int count, double closingPrice, double openingPrice) {
super();
this.count = count;
this.closingPrice = closingPrice;
this.openingPrice = openingPrice;
}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}

public double getClosingPrice() {
return closingPrice;
}

public void setClosingPrice(double closingPrice) {
this.closingPrice = closingPrice;
}

public double getOpeningPrice() {
return openingPrice;
}

public void setOpeningPrice(double openingPrice) {
this.openingPrice = openingPrice;
}

@Override
public String toString() {
return String.valueOf(((closingPrice/count)-(openingPrice/ count)));
}

}

1 thought on “spark textFileStream with sliding window and reduceByKeyAndWindow example”

  1. Please help me to sort the output based on maximum profit. what changes do I need to make so that the stock with maximum profit is shown at the top?

Comments are closed.