Batch and Window Sizes – The most common question is what minimum batch size Spark Streaming can use. In general, 500 milliseconds has proven to be a good minimum size for many applications. The best approach is to start with a larger batch size (around 10 seconds) and work your way down to a smaller batch size. If the processing times reported in the Streaming UI remain consistent, then you can continue to decrease the batch size, but if they are increasing you may have reached the limit for your application.
In a similar way, for windowed operations, the interval at which you compute a result (i.e., the slide interval) has a big impact on performance. Consider increasing this interval for expensive computations if it is a bottleneck.
Level of Parallelism – A common way to reduce the processing time of batches is to increase the parallelism.
There are three ways to increase the parallelism
1. Increasing the number of receivers
Receivers can sometimes act as a bottleneck if there are too many records for a single machine to read in and distribute. You can add more receivers by creating multiple input DStreams (which creates multiple receivers), and then applying union to merge them into a single stream.
2. Explicitly repartitioning received data
If receivers cannot be increased anymore, you can further redistribute the received data by explicitly repartitioning the input stream (or the union of multiple streams) using DStream.repartition.
3. Increasing parallelism in aggregation
For operations like reduceByKey(), you can specify the parallelism as a second parameter, as we can for regular RDDs.
Garbage Collection and Memory Usage – Another aspect that can cause problems is Java’s garbage collection. You can minimize unpredictably large pauses due to GC by enabling Java’s Concurrent Mark-Sweep garbage collector. The Concurrent Mark-Sweep garbage collector does consume more resources overall, but introduces fewer pauses.
We can control the GC by adding -XX:+UseConcMarkSweepGC to the spark.executor.extraJavaOptions configuration parameter.
Enable the Concurrent Mark-Sweep GC
spark-submit –conf spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC App.jar
In addition to using a garbage collector less likely to introduce pauses, you can make a big difference by reducing GC pressure. Caching RDDs in serialized form (instead of as native objects) also reduces GC pressure, which is why, by default, RDDs generated by Spark Streaming are stored in serialized form. Using Kryo serialization further reduces the memory required for the in-memory representation of cached data.
Spark also allows us to control how cached/persisted RDDs are evicted from the cache. By default Spark uses an LRU cache. Spark will also explicitly evict RDDs older than a certain time period if you set spark.cleaner.ttl. By preemptively evicting RDDs that we are unlikely to need from the cache, we may be able to reduce the GC pressure.