Hive tutorial 11 – hive example for writing custom user defined aggregate functions

These are user-defined aggregating functions that operate row-wise or group-wise and output one row or one row for each group as a result, such as the MAX and COUNT built-in functions.A UDAF must be a subclass of org.apache.hadoop.hive.ql.exec.UDAF containing one or more nested static classes implementing org.apache.hadoop.hive.ql.exec.UDAFEvaluator. Make sure that the inner class that implements UDAFEvaluator is defined as public. Otherwise, Hive won’t be able to use reflection and determine the UDAFEvaluator implementation.

We should also implement the five required functions, init, iterate, terminatePartial, merge, and terminate.Both UDF and UDAF can also be implemented by extending from the GenericUDF and GenericUDAFEvaluator classes to avoid using Java reflection for better performance. And, these generic functions are actually extended by Hive’s built-in UDFs implementations internally. Generic functions support complex data types, such as MAP, ARRAY, and STRUCT, as arguments, but the UDF and UDAF class do not.

We are not creating a generic udf here we are creating our one-time aggregation function, so we do not have to worry about a resolver . For implementing a generic udaf we need to implement a evaluator which actually implements the UDAF logic and a resolver which handles type checking and operator overloading if you require it, and helps Hive find the correct evaluator class for a given set of argument types.

Lets consider the below input data


1920,1920./shelf=0/slot=5/port=1,BBG199999999,656,
1920,1920./shelf=0/slot=4/port=6,BBG199999999,6565,
1920,1920./shelf=0/slot=5/port=24,BBG1920050,0,
1920,1920./shelf=0/slot=5/port=0,BBG1920050,54545,

We need to find the average of the metric but we need to only consider those records whose value is greater than 0 and less than 1120 as all other values are considered as impaired values and should not be considered for the average calculation.

Hive Custom UDAF Code

The code is commented and self explanatory


import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.metadata.HiveException;

public class CustomHiveUdaf extends UDAF {

private Column column = null;

public static class Column {
double valid_sum = 0;
int valid_count = 0;
}

public CustomHiveUdaf() {
super();
init();
}

// Step 1 - Initalize evaluator-indicating that no values have been
// aggregated yet.The init() method initializes the evaluator and resets its
// internal state. We are using new Column() in the code below to indicate
// that no values have been aggregated yet.

public void init() {
column = new Column();
}

// Step 2 - iterate every time there is a new value to be aggregated. The
// evaulator should update its internal state with the result of performing
// the aggregation.We return true to indicate that the input was valid.

public boolean iterate(double value) throws HiveException {

if (column == null) {
throw new HiveException("Item is not initialized");
}
if (value > 0 && value < 1120) {
column.valid_sum = column.valid_sum + value;
column.valid_count = column.valid_count + 1;
}
return true;
}

// Step 3 - called when Hive wants partially aggregated results.this method
// is called when Hive wants a result for the partial aggregation. The
// method must return an object that encapsulates the state of the
// aggregation.

public Column terminatePartial() {
return column;
}

// Step 4 - called when Hive decides to combine one partial aggregation with
// another.

public boolean merge(Column other) {

if (other == null) {
return true;
}
column.valid_sum += other.valid_sum;
column.valid_count += other.valid_count;
return true;
}

// Step 5 - called when the final result of the aggregation needed.

public double terminate() {

return column.valid_sum / column.valid_count;

}

}

Once you have the code ready package your java class into a jar file and it to the hive class path and add a temporary function


hive> ADD HiveUdaf.jar;

hive> CREATE TEMPORARY FUNCTION VALID_AVG as 'com.hadoop.hive.custom.CustomHiveUdaf';

hive> select service_id,VALID_AVG(throughput) from service_table group by service_id;

The output will be


BBG199999999,3610.5,
BBG1920050,54545,