Spark Streaming provides an abstraction called DStreams, or discretized streams which is build on top of RDD. A DStream is a sequence of data arriving over time. Internally, each DStream is represented as a sequence of RDDs arriving at each time step.
DStreams can be created from various input sources, such as Flume, Kafka, or HDFS. Once built, they offer two types of operations 1. transformations – which yield a new DStream 2. output operations – which write data to an external system.
DStreams provide many of the same operations available on RDDs, plus new operations related to time, such as sliding windows.
lets take an example of fetching data from a kafka topic.
We will start by creating a StreamingContext, which is the main entry point for streaming functionality. This also sets up an underlying SparkContext that it will use to process the data. It takes as input a batch interval specifying how often to process
new data, which we set to 3 second.
Next, we use KafkaUtils createDirectStream method to create a DStream based on the data received on kafka topic.Then we transform the DStream with filter() to get only the metric of type media and finally we save it as hadoop file.This sets up only the computation that will be done when the system receives data. To start receiving data, we must explicitly call start() on the StreamingContext. Then, Spark Streaming will start to schedule Spark jobs on the underlying SparkContext. This will occur in a separate thread, so to keep our application from exiting, we also need to call awaitTermination to wait for the streaming computation to finish.
ConfigFile conf = new ConfigFile(Constants.CONFIG, FileType.property); String broker = "localhost:9093"; SparkConf sparkConf = new SparkConf(); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaStreamingContext jstream = new JavaStreamingContext(jsc, new Duration(3000)); Map<String, String> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", broker); Set<String> topic_media = Collections.singleton("media"); Set<String> topic_sip = Collections.singleton("sip"); JavaPairInputDStream<String, String> directKafkaStreamMedia = KafkaUtils.createDirectStream(jstream, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topic_media); JavaPairDStream<String, String> filteredKafkaStreamMedia =directKafkaStreamMedia.filter(new Function<Tuple2<String,String>, Boolean>() { @Override public Boolean call(Tuple2<String, String> arg0) throws Exception { return arg0._1.equals("media"); } }); filteredKafkaStreamMedia.saveAsHadoopFiles("media_data",".dat"); jstream.start(); jstream.awaitTermination();
Note that a streaming context can be started only once, and must be started after we set up all the DStreams and output operations we want.
To run the above program in local mode create a jar file and use the below command
spark-submit --master local[10] --deploy-mode client --class com.consumer.SparkDstreamConsumer --name "Call Quality" --executor-memory 2g /home/cloudera/SparkDstreamConsumer-0.0.1-SNAPSHOT.jar
Architecture
Spark Streaming uses a micro-batch architecture, where the streaming computation is treated as a continuous series of batch computations on small batches of data. Spark Streaming receives data from various input sources and groups it into small batches. New batches are created at regular time intervals. At the beginning of each time interval a new batch is created, and any data that arrives during that interval gets added to that batch. At the end of the time interval the batch is done growing. The size of the time intervals is determined by a parameter called the batch interval. The batch interval is typically between 500 milliseconds and several seconds, as configured by the application developer. Each input batch forms an RDD, and is processed using Spark jobs to create other RDDs. The processed results can then be pushed out to external systems in batches.
The programming abstraction in Spark Streaming is a discretized stream or a DStream , which is a sequence of RDDs, where each RDD has one time slice of the data in the stream.
Apart from transformations, DStreams support output operations, such as the print(). Output operations are similar to RDD actions in that they write data to an external system, but in Spark Streaming they run periodically on each time step, producing output in batches.
For each input source, Spark Streaming launches receivers, which are tasks running within the application’s executors that collect data from the input source and save it as RDDs. These receive the input data and replicate it (by default) to another executor for fault tolerance. This data is stored in the memory of the executors in the same way as cached RDDs. The StreamingContext in the driver program then periodically runs Spark jobs to process this data and combine it with RDDs from previous time steps.
The received data by default is replicated across two nodes, so Spark Streaming can tolerate single worker failures. Using just lineage, however, recomputation could take a long time for data that has been built up since the beginning of the program. Thus Spark Streaming also includes a mechanism called checkpointing that saves state periodically to a reliable filesystem (e.g., HDFS or S3). Typically, you might set up checkpointing every 5–10 batches of data.When recovering lost data, Spark Streaming needs only to go back to the last checkpoint.
Transformations
Transformations on DStreams can be grouped into either stateless or stateful
1. In stateless transformations the processing of each batch does not depend on the data of its previous batches. They include the common RDD transformations like map(), filter(), and reduceByKey().
2. Stateful transformations, in contrast, use data or intermediate results from previous batches to compute the results of the current batch. They include transformations based on sliding windows and on tracking state across time.