kafka example for custom serializer, deserializer and encoder with spark streaming integration

Lets say we want to send a custom object as the kafka value type and we need to push this custom object into the kafka topic so we need to implement our custom serializer and deserializer and also a custom encoder to read the data in spark streaming.

Lets say below is the custom object we want to push into kafka and receive the same in the spark streaming application


import java.io.Serializable;

public class Average implements Serializable {

private int count = 1;

private double sum;

private double value;

public Average(int count, double sum, double value) {
super();
this.count = count;
this.sum = sum;
this.value = value;
}

public Average() {
super();
// TODO Auto-generated constructor stub
}

public int getCount() {
return count;
}

public double getValue() {
return value;
}

public void setValue(double value) {
this.value = value;
}

public void setCount(int count) {
this.count = count;
}

public double getSum() {
return sum;
}

public void setSum(double sum) {
this.sum = sum;
}

public double average() {
return getSum() / getCount();
}

@Override
public String toString() {
return "Average [count=" + count + ", sum=" + sum + ", value=" + value + "]";
}
}

Below is the code for deserializer


import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;

import com.fasterxml.jackson.databind.ObjectMapper;

public class AverageDeserializer implements Deserializer<Average> {

@Override
public void close() {
// TODO Auto-generated method stub

}

@Override
public void configure(Map<String, ?> arg0, boolean arg1) {
// TODO Auto-generated method stub

}

@Override
public Average deserialize(String arg0, byte[] arg1) {
ObjectMapper mapper = new ObjectMapper();
Average average = null;
try {
average = mapper.readValue(arg1, Average.class);
} catch (Exception e) {
e.printStackTrace();
}
return average;
}

}

Below is the code for serializer


import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class AverageSerializer implements Serializer<Average>{

@Override
public void close() {
// TODO Auto-generated method stub

}

@Override
public void configure(Map<String, ?> arg0, boolean arg1) {
// TODO Auto-generated method stub

}

@Override
public byte[] serialize(String arg0, Average arg1) {
byte[] value = null;
ObjectMapper objectMapper = new ObjectMapper();

try {
value = objectMapper.writeValueAsString(arg1).getBytes();
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return value;
}

}

Below is the code for custom encoder


import java.io.IOException;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;

public class AverageEncoder implements Encoder<Average>, Decoder<Average> {

VerifiableProperties verifiableProperties;

Class<Average> clazz;

AverageSerializer averageSerializer = new AverageSerializer();

public AverageEncoder() {
// TODO Auto-generated constructor stub
}

public AverageEncoder(VerifiableProperties verifiableProperties) {
// TODO Auto-generated constructor stub
}

@Override
public Average fromBytes(byte[] arg0) {
// TODO Auto-generated method stub

ObjectMapper objectMapper = new ObjectMapper();

try {
Average ave=objectMapper.readValue(arg0, Average.class);
return ave;
} catch (JsonParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JsonMappingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return null;
}

@Override
public byte[] toBytes(Average arg0) {
// TODO Auto-generated method stub

ObjectMapper objectMapper = new ObjectMapper();

try {
return objectMapper.writeValueAsString(arg0).getBytes();
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}

}

Lets see how to use the same in kafka and spark.

Lets write the kafka producer class which will read and push the data into the movie topic we had created


import java.io.FileReader;
import java.io.IOException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import au.com.bytecode.opencsv.CSVReader;

public class MovieDataProducer {

public static void main(String[] args) throws IOException {

MovieDataProducer.sendMessage(KafkaProducerServer.getInstance());

}

public static void sendMessage(Producer<String, Average> producer) throws IOException {

CSVReader reader = new CSVReader(
new FileReader("C:\\codebase\\scala-project\\inputdata\\movies_data_2\\ratings.csv"));
String[] nextLine;

while ((nextLine = reader.readNext()) != null) {

producer.send(new ProducerRecord<String, Average>("movie", nextLine[0], new Average(1,Double.parseDouble(nextLine[1]), Double.parseDouble(nextLine[1]))));

System.out.println("Send message");
}

reader.close();

}

}

The KafkaProducerServer.getInstance() returns an instance of org.apache.kafka.clients.producer.KafkaProducer which takes an object of properties with all the properties configured as required.


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;

public class KafkaProducerServer {

private KafkaProducerServer() {

}

public static final Producer<String, Average> producer = new KafkaProducer<>(ProducerProperties.getInstance());

public static Producer<String, Average> getInstance() {

return producer;

}

}


import java.util.Properties;

public class ProducerProperties {

private ProducerProperties() {

}

public static final Properties props = new Properties();

static {

props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "0");
props.put("retries", 0);
props.put("batch.size", 500);
props.put("linger.ms", 500);
props.put("buffer.memory", 500);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.producer.AverageSerializer");
props.put("serializer.class", "com.producer.AverageEncoder");
}

public static Properties getInstance() {
return props;
}

}

Lets write the spark consumer class which will read the data from kafka topic and convert it to dstream and find the average rating of movies.


import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import com.bt.ims.util.ConfigFile;
import com.bt.ims.util.Constants;
import com.bt.ims.util.FileType;
import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class MovieDataConsumer {

public static void main(String[] args) throws InterruptedException {

ConfigFile conf = new ConfigFile(Constants.CONFIG, FileType.property);
String target = "C:\\codebase\\scala-project\\outputdatastream\\movies";
String broker = "localhost:9092";
SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaStreamingContext jstream = new JavaStreamingContext(jsc, new Duration(50000));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", broker);
Set<String> topic_movie = Collections.singleton("movie");

JavaPairInputDStream<String, Average> directKafkaStreamMedia = KafkaUtils.createDirectStream(jstream,
String.class, Average.class, StringDecoder.class, AverageEncoder.class, kafkaParams, topic_movie);

directKafkaStreamMedia.foreachRDD(new VoidFunction<JavaPairRDD<String, Average>>() {

private static final long serialVersionUID = 1787878;

@Override
public void call(JavaPairRDD<String, Average> arg0) throws Exception {

JavaPairRDD<String, Average> pair = arg0.reduceByKey(new Function2<Average, Average, Average>() {

@Override
public Average call(Average arg0, Average arg1) throws Exception {

arg0.setSum(arg0.getSum() + arg1.getValue());
arg0.setCount(arg0.getCount() + arg1.getCount());

return arg0;
}
});

List<Tuple2<String, Average>> list = pair.collect();

for (Tuple2<String, Average> string : list) {

System.out.println(string._1 + " " + string._2.getSum()/string._2.getCount());

}

}
});

jstream.start();
jstream.awaitTermination();

}

}

1 thought on “kafka example for custom serializer, deserializer and encoder with spark streaming integration”

  1. For this one could you provide the pom file?

    the createDirectStream() seems not working for many combination of the spark version I tried.

Leave a Reply

Your email address will not be published. Required fields are marked *