Hive tutorial 8 – Hive performance tuning using data file optimization using file format, compression and storage optimization

Hive supports TEXTFILE, SEQUENCEFILE, RCFILE, ORC, and PARQUET file formats. The three ways to specify the file format are as follows

1. CREATE TABLE … STORE AS <File_Format>
2. ALTER TABLE … [PARTITION partition_spec] SET FILEFORMAT <File_Format>
3. SET hive.default.fileformat=<File_Format> (the default fileformat for table)

The file formats supported by Hive and their optimizations

TEXTFILE – This is the default file format for Hive. Data is not compressed in the text file. It can be compressed with compression tools, such as GZip,Bzip2, and Snappy. However, these compressed files are not splittable as input during processing. As a result, it leads to running a single, huge map job to process one big file.

SEQUENCEFILE – This is a binary storage format for key/value pairs. The benefit of a sequence file is that it is more compact than a text file and fits well with the MapReduce output format. Sequence files can be compressed on record or block level where block level has a better compression ratio. To enable block level compression, we need to do the following settings

SET hive.exec.compress.output=true;

SET io.seqfile.compression.type=BLOCK;

Unfortunately, both text and sequence files as a row level storage file format are not an optimal solution since Hive has to read a full row even if only one column is being requested. For instance, a hybrid row-columnar storage file format, such as RCFILE, ORC, and PARQUET implementation, is created to resolve this problem.

RCFILE – This is short for Record Columnar File. It is a flat file consisting of binary key/value pairs that shares much similarity with a sequence file. The RCFile splits data horizontally into row groups. One or several groups are stored in an HDFS file. Then, RCFile saves the row group data in a columnar format by saving the first column across all rows, then the second column across all rows, and so on. This format is splittable and allows Hive to skip irrelevant parts of data and get the results faster and cheaper.

ORC – This is short for Optimized Row Columnar. The ORC format can be considered an improved version of RCFILE. It provides a larger block size of 256 MB by default (RCFILE has 4 MB and SEQUENCEFILE has 1 MB) optimized for large sequential reads on HDFS for more throughput and fewer files to reduce overload in the namenode. Different from RCFILE that relies on metastore to know data types, the ORC file understands the data types by using specific encoders so that it can optimize compression depending on different types. It also stores basic statistics, such as MIN, MAX, SUM, and COUNT, on columns as well as a lightweight index that can be used to skip blocks of rows that do not matter.

For integer and float types (tinyint, smallint, int, bigint,float,double), the column statistics includes the minimum, maximum, and sum. If the sum overflows long at any point during the calculation, no sum is recorded.

message IntegerStatistics {
optional sint64 minimum = 1;
optional sint64 maximum = 2;
optional sint64 sum = 3;
}

message DoubleStatistics {
optional double minimum = 1;
optional double maximum = 2;
optional double sum = 3;
}

For strings, the minimum value, maximum value, and the sum of the lengths of the values are recorded.


message StringStatistics {
optional string minimum = 1;
optional string maximum = 2;
// sum will store the total length of all strings
optional sint64 sum = 3;
}

PARQUET – This is another row columnar file format that has a similar design to that of ORC. What’s more, Parquet has a wider range of support for the majority projects in the Hadoop ecosystem compared to ORC that only supports Hive and Pig.

Considering the maturity of Hive, it is suggested to use the ORC format if Hive is the main majority tool used in your Hadoop environment. If you use several tools in the Hadoop ecosystem, PARQUET is a better choice in terms of adaptability.

Hadoop Archive File (HAR) is another type of file format to pack HDFS files into archives. This is an option for storing a large number of small-sized files in HDFS, as storing a large number of small-sized files directly in HDFS is not very efficient. However, HAR still has some limitations that make it unpopular, such as immutable archive process, not being splittable, and compatibility issues.

Compression

Compression techniques in Hive can significantly reduce the amount of data transferring between mappers and reducers by proper intermediate output compression as well as output data size in HDFS by output compression. As a result, the overall Hive query will have better performance. To compress intermediate files produced by Hive between multiple MapReduce jobs, we need to set SET hive.exec.compress.intermediate=true which by default is set to false.

Below are the compression codec that can be used in order as Compression-Codec-Extension-Splittable

Deflate - org.apache.hadoop.io.compress.DefaultCodec - .deflate - N
GZip - org.apache.hadoop.io.compress.GzipCodec - .gz - N
Bzip2 - org.apache.hadoop.io.compress.BZip2Codec - .gz - Y
LZO - com.hadoop.compression.lzo.LzopCodec - .lzo - N
LZ4 - org.apache.hadoop.io.compress.Lz4Codec - .lz4 - N
Snappy - org.apache.hadoop.io.compress.SnappyCodec - .snappy - N

Hadoop has a default codec (.deflate). The compression ratio for GZip is higher as well as its CPU cost. Bzip2 is splittable but is too slow for compression considering its huge CPU cost. LZO files are not natively splittable. But we can preprocess them (using com.hadoop.compression. lzo.LzoIndexer) to create an index that determines the file splits. When it comes to the balance of CPU cost and compression ratio, LZ4 or Snappy do a better job. Since the majority of codec do not support split after compression, it is suggested to avoid compressing big files in HDFS.

SET hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec

Intermediate compression will only save disk space for specific jobs that require multiple map and reduce jobs. For further saving of disk space, the actual Hive output files can be compressed. When the hive.exec.compress.output property is set to true, Hive will use the codec configured by the mapred.map.output.compression.codec property to compress the storage in HDFS.

SET hive.exec.compress.output=true;

SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
Storage optimization

The data, which is used or scanned frequently, can be identified as hot data. Usually, the query performance on the hot data is critical for overall performance. Increasing the data replication factor in HDFS for hot data could increase the chance of data being hit locally by Hive jobs and improve the performance. However, this is a trade-off for storage.

hdfs dfs -setrep -R -w 4 /user/hive/warehouse/testdata

On the other hand, too many files or redundancy could make namenode’s memory exhausted, especially for lots of small files less than the HDFS block sizes. Hadoop itself already has some solutions to deal with too many small-file issues, such as the following

1. Hadoop Archive and HAR: These are toolkits to pack small files.

2. SequenceFile format: This is a format to compress small files to bigger files

3. CombineFileInputFormat: A type of InputFormat to combine small files before map and reduce processing. It is the default InputFormat for Hive

4. HDFS federation: It makes namenodes extensible and powerful to manage more files.

We can also leverage other tools in the Hadoop ecosystem if we have them installed, such as the following:

1. HBase has a smaller block size and better file format to deal with smaller-file access issues

2. Flume NG can be used as pipes to merge small files to big ones

3. A scheduled offline file merge program to merge small files in HDFS or before loading them to HDFS

For Hive, we can do the following configurations for merging files of query results to avoid recreating small files

1. hive.merge.mapfiles: This merges small files at the end of a map-only job. By default, it is true.

2. hive.merge.mapredfiles: This merges small files at the end of a MapReduce job. Set it to true since its default is false.

3. hive.merge.size.per.task: This defines the size of merged files at the end of the job. The default value is 256 MB.

4. hive.merge.smallfiles.avgsize: This is the threshold for triggering file merge. The default value is 16 MB.

When the average output file size of a job is less than the value specified by hive.merge.smallfiles.avgsize, and both hive.merge.mapfiles (for map-only jobs) and hive.merge.mapredfiles (for MapReduce jobs) are set to true, Hive will start an additional MapReduce job to merge the output files into big files.