pig tutorial 12 – pig example to implement custom load function

The Pig load/store API is aligned with Hadoop’s InputFormat and OutputFormat classes. This enables you to create new LoadFunc and StoreFunc implementations based on existing Hadoop InputFormat and OutputFormat classes with minimal code. The complexity of reading the data and creating a record lies in the InputFormat while the complexity of writing the data lies in the OutputFormat. This enables Pig to easily read/write data in new storage formats as and when an Hadoop InputFormat and OutputFormat is available for them.

Load Functions

LoadFunc abstract class has three main methods for loading data and for most use cases it would suffice to extend it. There are three other optional interfaces which can be implemented to achieve extended functionality:

LoadMetadata – has methods to deal with metadata – most implementation of loaders don’t need to implement this unless they interact with some metadata system. The getSchema() method in this interface provides a way for loader implementations to communicate the schema of the data back to pig. If a loader implementation returns data comprised of fields of real types (rather than DataByteArray fields), it should provide the schema describing the data returned through the getSchema() method. The other methods are concerned with other types of metadata like partition keys and statistics. Implementations can return null return values for these methods if they are not applicable for that implementation.

LoadPushDown – has methods to push operations from Pig runtime into loader implementations. Currently only the pushProjection() method is called by Pig to communicate to the loader the exact fields that are required in the Pig script. The loader implementation can choose to honor the request (return only those fields required by Pig script) or not honor the request (return all fields in the data). If the loader implementation can efficiently honor the request, it should implement LoadPushDown to improve query performance. (Irrespective of whether the implementation can or cannot honor the request, if the implementation also implements getSchema(), the schema returned in getSchema() should describe the entire tuple of data.)

LoadCaster – has methods to convert byte arrays to specific types. A loader implementation should implement this if casts (implicit or explicit) from DataByteArray fields to other types need to be supported.

The LoadFunc abstract class is the main class to extend for implementing a loader. The methods which need to be overridden are explained below

getInputFormat() – This method is called by Pig to get the InputFormat used by the loader. The methods in the InputFormat (and underlying RecordReader) are called by Pig in the same manner (and in the same context) as by Hadoop in a MapReduce java program.

If a custom loader using a text-based InputFormat or a file-based InputFormat would like to read files in all subdirectories under a given input directory recursively, then it should use the PigTextInputFormat and PigFileInputFormat classes provided in org.apache.pig.backend.hadoop.executionengine.mapReduceLayer. The Pig InputFormat classes work around a current limitation in the Hadoop TextInputFormat and FileInputFormat classes which only read one level down from the provided input directory.

setLocation() – This method is called by Pig to communicate the load location to the loader. The loader should use this method to communicate the same information to the underlying InputFormat. This method is called multiple times by pig – implementations should bear this in mind and should ensure there are no inconsistent side effects due to the multiple calls.

prepareToRead() – Through this method the RecordReader associated with the InputFormat provided by the LoadFunc is passed to the LoadFunc. The RecordReader can then be used by the implementation in getNext() to return a tuple representing a record of data back to pig.

getNext() – The meaning of getNext() has not changed and is called by Pig runtime to get the next tuple in the data – in this method the implementation should use the underlying RecordReader and construct the tuple to return.

Custom loader implementation

Lets say we want to load the below data though we can use the pigstorage for the same we need to filter some invalid characters from input and also we want to modify the sub element name so we are using a custom loader for the same. The code is self explanatory.

Input Data

#EltName,Sub-ElementName,ServiceID,TimeStamp
ABCDDI,ABCDDI./shelf=0/port=21,OGEA02699269,08/03/2012
ABCDJC,ABCDJC./shelf=0/port=31,OGEA05357149,06/18/2016
ABCDEG,ABCDEG./shelf=0/port=15,OGEA16722054,07/24/2015


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CustomLoader extends LoadFunc {

private static final Logger LOGGER = LoggerFactory.getLogger(PortsMapsCustomLoader.class);
private RecordReader<LongWritable, Text> reader = null;
private TupleFactory tupleFactory = TupleFactory.getInstance();
private static final int COLUMNS_COUNT = 5;
private static final String DELIMETER = ",";

public PortsMapsCustomLoader() {

}

@Override
public void setLocation(String location, Job job) throws IOException {
FileInputFormat.setInputPaths(job, location);

}

@Override
public InputFormat<LongWritable, Text> getInputFormat() throws IOException {
return new TextInputFormat();
}

@SuppressWarnings("unchecked")
@Override
public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split) throws IOException {
this.reader = reader;

}

@Override
public Tuple getNext() throws IOException {

List<String> resultList = null;
int resultLength = 0;
String[] tupleArray = null;
boolean validRecord = false;
Tuple tuple = tupleFactory.newTuple(5);
try {
if (!reader.nextKeyValue()) {
return null;
}
resultLength = COLUMNS_COUNT;
resultList = new ArrayList<String>(resultLength);
for (int i = 0; i < resultLength; i++) {
resultList.add("");
}
resultList.set(0, "INVALID");

Text value = (Text) reader.getCurrentValue();

if (value != null && value.getLength() > 0) {
tupleArray = value.toString().split(DELIMETER, -1);
if (tupleArray.length == COLUMNS_COUNT && !value.toString().contains(ServiceID)) {
validRecord = true;
for (int i = 0; i < COLUMNS_COUNT; i++) {
if (i == 1) {
tupleArray[1] = tupleArray[i].split(tupleArray[0] + ".")[1];
}
resultList.set(i + 1, (tupleArray[i]));
if (tupleArray[i].contains("\u0000")) {
validRecord = false;
}
}

if (validRecord) {
resultList.set(0, "VALID");
}

}
}

tuple = this.tupleFactory.newTupleNoCopy(resultList);

} catch (InterruptedException e) {
LOGGER.info(e.getLocalizedMessage());
}
return tuple;

}
}