compression formats and their effects in hdfs and map reduce program

File compression brings two major benefits: it reduces the space needed to store files, and it speeds up data transfer across the network or to or from disk. When dealing with large volumes of data, both of these savings can be significant, so it pays to carefully consider how to use compression in Hadoop. Hadoop mainly uses deflate,gzip,bzip2,lzo,lz4 and snappy compression format and only bzip2 is a compression format which support splittable and all other compression format are not splittable.

All compression algorithms exhibit a space/time trade-off: faster compression and decompression speeds usually come at the expense of smaller space savings. The compression tools give some control over this trade-off at compression time by offering nine different options: –1 means optimize for speed, and -9 means optimize for space.

GZIP – general purpose compressor and sits in the middle of the space/time trade-off. Codec used org.apache.hadoop.io.compress.GzipCodec.

BZIP2 – compresses more effectively than gzip, but is slower. bzip2’s decompression speed is faster than its compression speed, but it is still slower than the other formats. And also this format is splittable. Codec used org.apache.hadoop.io.compress.BZip2Codec.

LZO, LZ4, and Snappy – optimize for speed and are around an order of magnitude faster than gzip, but compress less effectively.Codec used LzopCodec, Lz4Codec, SnappyCodec

Note : The Splittable property indicates whether the compression format supports splitting that is, whether you can seek to any point in the stream and start reading from some point further on). Splittable compression formats are especially suitable for Map-Reduce as larger file could be splitted and processed parallely by different mapper class.

Note :A codec is the implementation of a compression-decompression algorithm. In Hadoop, a codec is represented by an implementation of the CompressionCodec interface. So, for example, GzipCodec encapsulates the compression and decompression algorithm for gzip.

Note : For performance, it is preferable to use a native library for compression and decompression. For example, in one test, using the native gzip libraries reduced decompression times by up to 50% and compression times by around 10% (compared to the built-in Java implementation) .Native libraries are available for all of the compression formats like deflate,gzip,bzip2,lzo,lz4 and snappy where as java implementation is available only for deflate,gzip and bzip2.The native libraries are picked up using the Java system property java.library.path.

Note : If you are using a native library and you are doing a lot of compression or decompression in your application, consider using CodecPool, which allows you to reuse compressors and decompressors, thereby amortizing the cost of creating these objects by using the code compressor = CodecPool.getCompressor(codec) and the same can be returned to the pool using CodecPool.returnCompressor(compressor)

Effect of splittable produce on map-reduce

When considering how to compress data that will be processed by MapReduce, it is important to understand whether the compression format supports splitting. Consider an uncompressed file stored in HDFS whose size is 1 GB. With an HDFS block size of 128 MB, the file will be stored as eight blocks, and a MapReduce job using this file as input will create eight input splits, each processed independently as input to a separate map task. Imagine now that the file is a gzip-compressed file whose compressed size is 1 GB. As before, HDFS will store the file as eight blocks.

However, creating a split for each block won’t work, because it is impossible to start reading at an arbitrary point in the gzip stream and therefore impossible for a map task to read its split independently of the others. The gzip format uses DEFLATE to store the compressed data, and DEFLATE stores data as a series of compressed blocks. The problem is that the start of each block is not distinguished in any way that would allow a reader positioned at an arbitrary point in the stream to advance to the beginning of the next block, thereby synchronizing itself with the stream. For this reason, gzip does not support splitting.

In this case, MapReduce will do the right thing and not try to split the gzipped file, since it knows that the input is gzip-compressed (by looking at the filename extension) and that gzip does not support splitting. This will work, but at the expense of locality: a single map will process the eight HDFS blocks, most of which will not be local to the map. Also, with fewer maps, the job is less granular and so may take longer to run.

A bzip2 file, on the other hand, does provide a synchronization marker between blocks a 48-bit approximation of pi, so it does support splitting.

Map-reduce and Compression

If your input files are compressed, they will be decompressed automatically as they are read by MapReduce, using the filename extension to determine which codec to use.

In order to compress the output of a MapReduce job, in the job configuration, set the mapreduce.output.fileoutputformat.compress property to true and set the mapreduce.output.fileoutputformat.compress.codec property to the classname of the compression codec you want to use. Alternatively, you can use the static convenience methods on FileOutputFormat.

FileOutputFormat.setCompressOutput
(job, true);
FileOutputFormat.
setOutputCompressorClass(job, GzipCodec.class);

If you are emitting sequence files for your output, you can set the mapreduce.output.fileoutputformat.compress.type property to control the type of compression to use. The default is RECORD, which compresses individual records. Changing this to BLOCK, which compresses groups of records, is recommended because it compresses better.

Even if your MapReduce application reads and writes uncompressed data, it may benefit from compressing the intermediate output of the map phase. The map output is written to disk and transferred across the network to the reducer nodes, so by using a fast compressor such as LZO, LZ4, or Snappy, you can get performance gains simply because the volume of data to transfer is reduced. This can be achieved by setting the property mapreduce.map.output.compress to true and also optionally we can specify the codec to use using the property mapreduce.map.output.compress.codec and default codec which will be used is org.apache.hadoop.io.compress.DefaultCodec.

Configuration conf = new Configuration();
conf.setBoolean
(Job.MAP_OUTPUT_COMPRESS, true);
conf.setClass
(Job.MAP_OUTPUT_COMPRESS_CODEC, 
GzipCodec.class,
CompressionCodec.class);
Job job = new Job(conf);
Finally which compressing to use

Hadoop applications process large datasets, so you should strive to take advantage of compression. Which compression format you use depends on such considerations as file size, format, and the tools you are using for processing. Below are some guidelines arranged roughly in order of most to least effective

1. Use a container file format such as sequence files, Avro datafiles, ORCFiles,or Parquet files , all of which support both compression
and splitting. A fast compressor such as LZO, LZ4, or Snappy is generally a good choice.

2. Use a compression format that supports splitting, such as bzip2 (although bzip2 is fairly slow), or one that can be indexed to support splitting, such as LZO.

3. Split the file into chunks in the application, and compress each chunk separately using any supported compression format (it doesn’t matter whether it is splittable). In this case, you should choose the chunk size so that the compressed chunks are approximately the size of an HDFS block.

4. Store the files uncompressed.

Note :For large files, you should not use a compression format that does not support splitting on the whole file, because you lose locality and make MapReduce applications very inefficient.