spark textFileStream example to process json data

Problem To Solve : Calculate the trading volume of the stocks every 10 minutes and decide which stock to purchase.

Input Data Format:


[{"timestamp": "2018-09-25 10:09:00", "symbol": "MSFT", "priceData": {"high": "114.6100", "close": "114.5400", "open": "114.5200", "low": "114.4900", "volume": "53615"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "ADBE", "priceData": {"high": "265.9100", "close": "265.9100", "open": "265.5820", "low": "265.5820", "volume": "8283"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "GOOGL", "priceData": {"high": "1183.1600", "close": "1183.1600", "open": "1183.1600", "low": "1183.1600", "volume": "2335"}}, {"timestamp": "2018-09-25 10:09:00", "symbol": "FB", "priceData": {"high": "162.2700", "close": "162.0700", "open": "161.8300", "low": "161.8000", "volume": "239757"}}]

Below is the code to calculate the trading volume of stocks from the json data


import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.execution.columnar.DOUBLE;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.json.JSONArray;
import org.json.JSONObject;
import scala.Tuple2;

public class ShareByVolume {

public static void main(String[] args) throws InterruptedException {

final String[] path = new String[]{
"/home/adarsh/Desktop/test/df",
"/home/adarsh/Desktop/test/dfout/out"};

SparkConf sparkConf = new SparkConf().setAppName("Test")
.setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

@SuppressWarnings("resource")
JavaStreamingContext jstream = new JavaStreamingContext(jsc,
new Duration(15000));
JavaDStream<String> dstream = jstream
.textFileStream(path[0]);

JavaPairDStream<String, Double> pair = dstream.flatMapToPair(
new PairFlatMapFunction<String, String, Double>() {

/**
*
*/
private static final long serialVersionUID = 67676744;

public Iterator<Tuple2<String, Double>> call(String t)
throws Exception {

List<Tuple2<String, Double>> list = new ArrayList<Tuple2<String, Double>>();

JSONArray js1 = new JSONArray(t);

for (int i = 0; i < js1.length(); i++) {

String symbol = js1.getJSONObject(i).get("symbol")
.toString();

JSONObject jo = new JSONObject(js1.getJSONObject(i)
.get("priceData").toString());

list.add(new Tuple2<String, Double>(symbol,jo.getDouble("volume")));

}

return list.iterator();

}
});

JavaPairDStream<String, Double> result=pair.reduceByKeyAndWindow(
new Function2<Double,Double,Double>() {

/**
*
*/
private static final long serialVersionUID = 76761212;

public Double call(Double result, Double value)
throws Exception {

result=result+value;
return result;
}
}, new Duration(30000), new Duration(15000));

JavaPairDStream<Double, String> resultSwapped=result.mapToPair(new PairFunction<Tuple2<String,Double>, Double, String>() {

public Tuple2<Double, String> call(Tuple2<String, Double> t)
throws Exception {
// TODO Auto-generated method stub
return t.swap();
}
});

JavaPairDStream<Double, String> sorted=resultSwapped.transformToPair(new Function<JavaPairRDD<Double,String>, JavaPairRDD<Double,String>>() {

public JavaPairRDD<Double, String> call(JavaPairRDD<Double, String> v1)
throws Exception {
// TODO Auto-generated method stub
return v1.sortByKey(false);
}
});

sorted.foreachRDD(new VoidFunction<JavaPairRDD<Double,String>>() {

/**
*
*/
private static final long serialVersionUID = 6767679;

public void call(JavaPairRDD<Double,String> t)
throws Exception {

t.coalesce(1).saveAsTextFile(path[1]+java.io.File.separator + System.currentTimeMillis());

}



});

jstream.start();
jstream.awaitTermination();
}

}

2 thoughts on “spark textFileStream example to process json data”

  1. hey Adarsh

    .The examples you have posted are really helpful.Can you please reply back with the explaination of the code for the following examples:

    1. spark textFileStream to find Relative Strength Index or RSI of stocks with sliding window and reduceByKeyAndWindow example.

    2. spark file streaming with sliding window to calculate the simple moving average using reduceByKeyAndWindow

    3. spark textFileStream with sliding window and reduceByKeyAndWindow example.

    4. calculate the trading volume of stocks from the json data.

    Thanks

  2. 1. spark textFileStream to find Relative Strength Index or RSI of stocks with sliding window and reduceByKeyAndWindow example.

    2. spark file streaming with sliding window to calculate the simple moving average using reduceByKeyAndWindow

    3. spark textFileStream with sliding window and reduceByKeyAndWindow example.

    4. calculate the trading volume of stocks from the json data.

Comments are closed.