stateless transformation spark streaming example

Stateless transformations like map(), flatMap(), filter(), repartition(), reduceByKey(), groupByKey() are simple RDD transformations being applied on every batch.

Keep in mind that although these functions look like they’re applying to the whole stream, internally each DStream is composed of multiple RDDs , and each stateless transformation applies separately to each RDD. For example, reduceByKey() will reduce data within each time step, but not across time steps. The stateful transformations on the other hand allow us combining data across time.

Lets take an example of calculating the average rating of movies using spark streaming and kafka

We will use the below dataset


Spider Man,4,978302174
Spider Man,4,978301398
Spider Man,4,978302091
Bat Man,5,978298709
Bat Man,4,978299000

Lets write the kafka producer class which will read and push the data into the movie topic we had created


import java.io.FileReader;
import java.io.IOException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import au.com.bytecode.opencsv.CSVReader;

public class MovieDataProducer {

public static void main(String[] args) throws IOException {

MovieDataProducer.sendMessage(KafkaProducerServer.getInstance());

}

public static void sendMessage(Producer<String, Average> producer) throws IOException {

CSVReader reader = new CSVReader(
new FileReader("C:\\codebase\\scala-project\\inputdata\\movies_data_2\\ratings.csv"));
String[] nextLine;

while ((nextLine = reader.readNext()) != null) {

producer.send(new ProducerRecord<String, Average>("movie", nextLine[0], new Average(1,Double.parseDouble(nextLine[1]), Double.parseDouble(nextLine[1]))));

System.out.println("Send message");
}

reader.close();

}

}

The KafkaProducerServer.getInstance() returns an instance of org.apache.kafka.clients.producer.KafkaProducer which takes an object of properties with all the properties configured as required.


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;

public class KafkaProducerServer {

private KafkaProducerServer() {

}

public static final Producer<String, Average> producer = new KafkaProducer<>(ProducerProperties.getInstance());

public static Producer<String, Average> getInstance() {

return producer;

}

}


import java.util.Properties;

public class ProducerProperties {

private ProducerProperties() {

}

public static final Properties props = new Properties();

static {

props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "0");
props.put("retries", 0);
props.put("batch.size", 500);
props.put("linger.ms", 500);
props.put("buffer.memory", 500);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.producer.AverageSerializer");
props.put("serializer.class", "com.producer.AverageEncoder");
}

public static Properties getInstance() {
return props;
}

}

Lets write the spark consumer class which will read the data from kafka topic and convert it to dstream and find the average rating of movies.


import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import com.bt.ims.util.ConfigFile;
import com.bt.ims.util.Constants;
import com.bt.ims.util.FileType;
import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class MovieDataConsumer {

public static void main(String[] args) throws InterruptedException {

ConfigFile conf = new ConfigFile(Constants.CONFIG, FileType.property);
String target = "C:\\codebase\\scala-project\\outputdatastream\\movies";
String broker = "localhost:9092";
SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaStreamingContext jstream = new JavaStreamingContext(jsc, new Duration(50000));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", broker);
Set<String> topic_movie = Collections.singleton("movie");

JavaPairInputDStream<String, Average> directKafkaStreamMedia = KafkaUtils.createDirectStream(jstream,
String.class, Average.class, StringDecoder.class, AverageEncoder.class, kafkaParams, topic_movie);

directKafkaStreamMedia.foreachRDD(new VoidFunction<JavaPairRDD<String, Average>>() {

private static final long serialVersionUID = 1787878;

@Override
public void call(JavaPairRDD<String, Average> arg0) throws Exception {

JavaPairRDD<String, Average> pair = arg0.reduceByKey(new Function2<Average, Average, Average>() {

@Override
public Average call(Average arg0, Average arg1) throws Exception {

arg0.setSum(arg0.getSum() + arg1.getValue());
arg0.setCount(arg0.getCount() + arg1.getCount());

return arg0;
}
});

List<Tuple2<String, Average>> list = pair.collect();

for (Tuple2<String, Average> string : list) {

System.out.println(string._1 + " " + string._2.getSum()/string._2.getCount());

}

}
});

jstream.start();
jstream.awaitTermination();

}

}

Here we have used a custom object to push the data into the kafka topic so we need to implement our custom AverageSerializer and AverageDeserializer and also a custom encoder AverageEncoder to read the data in spark streaming.

Below is the code


import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;

import com.fasterxml.jackson.databind.ObjectMapper;

public class AverageDeserializer implements Deserializer<Average> {

@Override
public void close() {
// TODO Auto-generated method stub

}

@Override
public void configure(Map<String, ?> arg0, boolean arg1) {
// TODO Auto-generated method stub

}

@Override
public Average deserialize(String arg0, byte[] arg1) {
ObjectMapper mapper = new ObjectMapper();
Average average = null;
try {
average = mapper.readValue(arg1, Average.class);
} catch (Exception e) {
e.printStackTrace();
}
return average;
}

}


import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class AverageSerializer implements Serializer<Average>{

@Override
public void close() {
// TODO Auto-generated method stub

}

@Override
public void configure(Map<String, ?> arg0, boolean arg1) {
// TODO Auto-generated method stub

}

@Override
public byte[] serialize(String arg0, Average arg1) {
byte[] value = null;
ObjectMapper objectMapper = new ObjectMapper();

try {
value = objectMapper.writeValueAsString(arg1).getBytes();
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return value;
}

}


import java.io.IOException;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;

public class AverageEncoder implements Encoder<Average>, Decoder<Average> {

VerifiableProperties verifiableProperties;

Class<Average> clazz;

AverageSerializer averageSerializer = new AverageSerializer();

public AverageEncoder() {
// TODO Auto-generated constructor stub
}

public AverageEncoder(VerifiableProperties verifiableProperties) {
// TODO Auto-generated constructor stub
}

@Override
public Average fromBytes(byte[] arg0) {
// TODO Auto-generated method stub

ObjectMapper objectMapper = new ObjectMapper();

try {
Average ave=objectMapper.readValue(arg0, Average.class);
return ave;
} catch (JsonParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JsonMappingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return null;
}

@Override
public byte[] toBytes(Average arg0) {
// TODO Auto-generated method stub

ObjectMapper objectMapper = new ObjectMapper();

try {
return objectMapper.writeValueAsString(arg0).getBytes();
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}

}