pig tutorial 8 – performance tuning and optimizing a pig script

Below are the pointers which can be considered to optimize a pig script

1. If types are not specified in the load statement, Pig assumes the type of  double for numeric computations. A lot of the time, your data would be much smaller, maybe, integer or long. Specifying the real type will help with speed of arithmetic computation. It has an additional advantage of early error detection. So always use schema on load .

2. Pig does not determine when a field is no longer needed and drop the field from the row. So if there are some parameters which are not required we should not carry it in the relation and we should drop it by using foreach which will greatly reduce the amount of data being carried through the map and reduce phases by pig.

Example


A = LOAD '/home/sa081876/blogdata/service.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray,param1:chararray,param2:chararray,param3:chararray );

B = LOAD '/home/sa081876/blogdata/metric.txt' using PigStorage(',') AS (service_id:chararray , cpu:chararray,memory:int );

C = JOIN A BY service_id, B BY service_id;

D = group C by portid;

E = foreach D generate group, AVG(B.memory);

There is no need for param1,param2,param13,cpu to participate in this query. And there is no need to carry these past the join. Changing the query above to the query below will greatly reduce the amount of data being carried through the map and reduce phases by pig.


A = LOAD '/home/sa081876/blogdata/service.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray,param1:chararray,param2:chararray,param3:chararray );

A1 = foreach A generate service_id, neid,portid;

B = LOAD '/home/sa081876/blogdata/metric.txt' using PigStorage(',') AS (service_id:chararray , cpu:chararray,service_id:int );

B1 = foreach B generate service_id,service_id;

3. In most cases it is beneficial to apply filters as early as possible to reduce the amount of data flowing through the pipeline.The first Script is clearly more efficient than the second one because it reduces the amount of data going into the join. One case where pushing filters up might not be a good idea is if the cost of applying filter is very high and only a small amount of data is filtered out.


Script 1

A = LOAD 'service.txt'
B = LOAD 'metric.txt'
C = filter A by $0 == 101;
D = join C by $1, B by $1;
E = group D by $1;

Script 2

A = LOAD 'service.txt'
B = LOAD 'metric.txt'
C = join A by $1, B by $1;
D = group C by $1;
E = filter D by $0 == 101;

4. Reduce your operator pipeline


A = load 'data' as (in: map[]);

B = foreach A generate in#k1 as k1, in#k2 as k2;

C = foreach B generate CONCAT(k1, k2);

While the example above is easier to read, you might want to consider combining the two foreach statements to improve your query performance


A = load 'data' as (in: map[]);

B = foreach A generate CONCAT(in#k1, in#k2);

5. Make Your UDFs Algebraic – Queries that can take advantage of the combiner generally ran much faster (sometimes several times faster) than the versions that don’t. The latest code significantly improves combiner usage; however, you need to make sure you do your part. If you have a UDF that works on grouped data and is, by nature, algebraic (meaning their computation can be decomposed into multiple steps) make sure you implement it as such.You need to implements Algebraic interface in the UDF.

6. Implement the Accumulator Interface – If your UDF can’t be made Algebraic but is able to deal with getting input in chunks rather than all at once, consider implementing the Accumulator interface to reduce the amount of memory used by your script. If your function is Algebraic and can be used on conjunction with Accumulator functions, you will need to implement the Accumulator interface as well as the Algebraic interface.

7. Drop Nulls Before a Join . With the introduction of nulls, join and cogroup semantics were altered to work with nulls. The semantic for cogrouping with nulls is that nulls from a given input are grouped together, but nulls across inputs are not grouped together. This preserves the semantics of grouping (nulls are collected together from a single input to be passed to aggregate functions like COUNT) and the semantics of join (nulls are not joined across inputs). Since flattening an empty bag results in an empty row, in a standard join the rows with a null key will always be dropped


A = load 'myfile' as (t, u, v);
B = load 'myotherfile' as (x, y, z);
A1 = filter A by t is not null;
B1 = filter B by x is not null;
C = join A1 by t, B1 by x;

8. Regular Join Optimizations – Optimization for regular joins ensures that the last table in the join is not brought into memory but streamed through instead. Optimization reduces the amount of memory used which means you can avoid spilling the data and also should be able to scale your query to larger data volumes. To take advantage of this optimization, make sure that the table with the largest number of tuples per key is the last table in your query.


small = load 'small_file' as (t, u, v);
large = load 'large_file' as (x, y, z);
C = join small by t, large by x;

9. Use the PARALLEL Clause – Use the PARALLEL clause to increase the parallelism of a job. PARALLEL sets the number of reduce tasks for the MapReduce jobs generated by Pig. The default value is 1 (one reduce task). PARALLEL only affects the number of reduce tasks. Map parallelism is determined by the input file, one map for each HDFS block. If you don’t specify PARALLEL, you still get the same map parallelism but only one reduce task. As noted, the default value for PARALLEL is 1 (one reduce task). However, the number of reducers you need for a particular construct in Pig that forms a MapReduce boundary depends entirely on your data and the number of intermediate keys you are generating in your mappers and the partitioner and distribution of map (combiner) output keys. In the best cases we should have a reducer processing about 500 MB of data to be efficient.

You can include the PARALLEL clause with any operator that starts a reduce phase. This includes COGROUP, CROSS, DISTINCT, GROUP, JOIN (inner), JOIN (outer), and ORDER. You can also set the value of PARALLEL for all Pig scripts using the set default parallel command.

10. Use the LIMIT Operator – Often you are not interested in the entire output but rather a sample or top results. In such cases, using LIMIT can yield a much better performance as the pig framework pushes the limit as high as possible to minimize the amount of data travelling through the pipeline.

11. Prefer DISTINCT over GROUP BY – GENERATE

When it comes to extracting the unique values from a column in a relation, one of two approaches can be used:

Example Using GROUP BY – GENERATE


A = load 'myfile' as (t, u, v);
B = foreach A generate u;
C = group B by u;
D = foreach C generate group as uniquekey;
dump D;

Example Using DISTINCT


A = load 'myfile' as (t, u, v);
B = foreach A generate u;
C = distinct B;
dump C;

In pig 0.1.x, DISTINCT is just GROUP BY/PROJECT under the hood. In pig 0.2.0 it is not, and it is much faster and more efficient. Therefore, the use of DISTINCT is recommended over GROUP BY – GENERATE.