An aggregate function is an eval function that takes a bag and returns a scalar value. One interesting and useful property of many aggregate functions is that they can be computed incrementally in a distributed fashion. We call these functions algebraic. COUNT is an example of an algebraic function because we can count the number of elements in a subset of the data and then sum the counts to produce a final output. In the Hadoop world, this means that the partial computations can be done by the map and combiner, and the final result can be computed by the reducer. It is very important for performance to make sure that aggregate functions that are algebraic are implemented as such.

For a function to be algebraic, it needs to implement Algebraic interface that consist of definition of three classes derived from EvalFunc. The contract is that the exec function of the Initial class is called once and is passed the original input tuple. Its output is a tuple that contains partial results. The exec function of the Intermed class can be called zero or more times and takes as its input a tuple that contains partial results produced by the Initial class or by prior invocations of the Intermed class and produces a tuple with another partial result. Finally, the exec function of the Final class is called and produces the final result as a scalar type.

So to understand from mapreduce perspective the exec function of the Initial class is invoked once by the map process and produces partial results. The exec function of the Intermed class is invoked once by each combiner invocation (which can happen zero or more times) and also produces partial results. The exec function of the Final class is invoked once by the reducer and produces the final result.

###### The algebraic interface

public interface Algebraic{ /** * Get the initial function. * @return A function name of f_init. f_init should be an eval func. * The return type of f_init.exec() has to be Tuple */ public String getInitial(); /** * Get the intermediate function. * @return A function name of f_intermed. f_intermed should be an eval func. * The return type of f_intermed.exec() has to be Tuple */ public String getIntermed(); /** * Get the final function. * @return A function name of f_final. f_final should be an eval func parametrized by * the same datum as the eval func implementing this interface. */ public String getFinal(); }

Below is an example of count which implements the algebraic interface

public class COUNT extends EvalFunc(Long)implements Algebraic { public Long exec(Tuple input) throws IOException { return count(input); } public String getInitial() { return Initial.class.getName(); } public String getIntermed() { return Intermed.class.getName(); } public String getFinal() { return Final.class.getName(); } static public class Initial extends EvalFunc(Tuple) { public Tuple exec(Tuple input) throws IOException { return TupleFactory.getInstance().newTuple(count(input)); } } static public class Intermed extends EvalFunc(Tuple) { public Tuple exec(Tuple input) throws IOException { return TupleFactory.getInstance().newTuple(sum(input)); } } static public class Final extends EvalFunc(Long) { public Tuple exec(Tuple input) throws IOException {return sum(input);} } static protected Long count(Tuple input) throws ExecException { Object values = input.get(0); if (values instanceof DataBag) return ((DataBag) values).size(); else if (values instanceof Map) return new Long(((Map) values).size()); } static protected Long sum(Tuple input) throws ExecException, NumberFormatException { DataBag values = (DataBag)input.get(0); long sum = 0; for (Iterator (Tuple) it = values.iterator(); it.hasNext();) { Tuple t = it.next(); sum += (Long)t.get(0); } return sum; } }

###### Accumulator Interface

In Pig, problems with memory usage can occur when data, which results from a group or cogroup operation, needs to be placed in a bag and passed in its entirety to a UDF.

This problem is partially addressed by Algebraic UDFs that use the combiner and can deal with data being passed to them incrementally during different processing phases (map, combiner, and reduce). However, there are a number of UDFs that are not Algebraic, don’t use the combiner, but still don’t need to be given all data at once.

The new Accumulator interface is designed to decrease memory usage by targeting such UDFs. For the functions that implement this interface, Pig guarantees that the data for the same key is passed continuously but in small increments. To work with incremental data, here is the interface a UDF needs to implement.

public interface Accumulator <T> { /** * Process tuples. Each DataBag may contain 0 to many tuples for current key */ public void accumulate(Tuple b) throws IOException; /** * Called when all tuples from current key have been passed to the accumulator. * @return the value for the UDF for this key. */ public T getValue(); /** * Called after getValue() to prepare processing for next key. */ public void cleanup(); }

There are several things to note here:

1. Each UDF must extend the EvalFunc class and implement all necessary functions there.

2. If a function is algebraic but can be used in a FOREACH statement with accumulator functions, it needs to implement the Accumulator interface in addition to the Algebraic interface.

3. The interface is parameterized with the return type of the function.

4. The accumulate function is guaranteed to be called one or more times, passing one or more tuples in a bag, to the UDF. (Note that the tuple that is passed to the accumulator has the same content as the one passed to exec – all the parameters passed to the UDF – one of which should be a bag.)

5. The getValue function is called after all the tuples for a particular key have been processed to retrieve the final value.

6. The cleanup function is called after getValue but before the next value is processed.