pig tutorial 13 – pig example to implement custom store function

The Pig load/store API is aligned with Hadoop’s InputFormat and OutputFormat classes. This enables you to create new LoadFunc and StoreFunc implementations based on existing Hadoop InputFormat and OutputFormat classes with minimal code. The complexity of reading the data and creating a record lies in the InputFormat while the complexity of writing the data lies in the OutputFormat. This enables Pig to easily read/write data in new storage formats as and when an Hadoop InputFormat and OutputFormat is available for them.

StoreFunc abstract class has the main methods for storing data and for most use cases it should suffice to extend it. There is an optional interface which can be implemented to achieve extended functionality:

StoreMetadata – This interface has methods to interact with metadata systems to store schema and store statistics. This interface is optional and should only be implemented if metadata needs to stored.

The methods which need to be overridden in StoreFunc are explained below:

getOutputFormat() – This method will be called by Pig to get the OutputFormat used by the storer. The methods in the OutputFormat (and underlying RecordWriter and OutputCommitter) will be called by pig in the same manner (and in the same context) as by Hadoop in a map-reduce java program. The checkOutputSpecs() method of the OutputFormat will be called by pig to check the output location up-front. This method will also be called as part of the Hadoop call sequence when the job is launched. So implementations should ensure that this method can be called multiple times without inconsistent side effects.

setStoreLocation() – This method is called by Pig to communicate the store location to the storer. The storer should use this method to communicate the same information to the underlying OutputFormat. This method is called multiple times by pig – implementations should bear in mind that this method is called multiple times and should ensure there are no inconsistent side effects due to the multiple calls.

prepareToWrite() – In the new API, writing of the data is through the OutputFormat provided by the StoreFunc. In prepareToWrite() the RecordWriter associated with the OutputFormat provided by the StoreFunc is passed to the StoreFunc. The RecordWriter can then be used by the implementation in putNext() to write a tuple representing a record of data in a manner expected by the RecordWriter.

putNext() – The meaning of putNext() has not changed and is called by Pig runtime to write the next tuple of data – in the new API, this is the method wherein the implementation will use the underlying RecordWriter to write the Tuple out.

Here we are assuming that we will be writing only the basic datatypes.

Custom Store Code

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pig.PigException;
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;

public class CustomStoreFunction extends StoreFunc {

protected RecordWriter writer = null;
private byte fieldDel = '\t';
private static final int BUFFER_SIZE = 1024;
private static final String UTF8 = "UTF-8";
private ByteArrayOutputStream mOut = new ByteArrayOutputStream(BUFFER_SIZE);

public CustomStoreFunction() {

}

public CustomStoreFunction(String delimiter) {

this();
if (delimiter.length() == 1) {
this.fieldDel = (byte) delimiter.charAt(0);
} else if (delimiter.length() > 1 && delimiter.charAt(0) == '\\') {
switch (delimiter.charAt(1)) {
case 't':
this.fieldDel = (byte) '\t';
break;

case 'x':
fieldDel = Integer.valueOf(delimiter.substring(2), 16).byteValue();
break;
case 'u':
this.fieldDel = Integer.valueOf(delimiter.substring(2)).byteValue();
break;

default:
throw new RuntimeException("Unknown delimiter " + delimiter);
}
} else {
throw new RuntimeException("PigStorage delimeter must be a single character");
}

}

@Override
public OutputFormat getOutputFormat() throws IOException {
return new TextOutputFormat<WritableComparable, Text>();
}

@Override
public void setStoreLocation(String location, Job job) throws IOException {
job.getConfiguration().set("mapreduce.output.textoutputformat.separator", "");
FileOutputFormat.setOutputPath(job, new Path(location));
if (location.endsWith(".bz2")) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
} else if (location.endsWith(".gz")) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
}

}

@Override
public void prepareToWrite(RecordWriter writer) throws IOException {
this.writer = writer;

}

@Override
public void putNext(Tuple t) throws IOException {
// TODO Auto-generated method stub

int sz = t.size();
for (int i = 0; i < sz; i++) {
Object field;
try {
field = t.get(i);
} catch (ExecException ee) {
throw ee;
}

putField(field);

if (i != sz - 1) {
mOut.write(fieldDel);
}
}
Text text = new Text(mOut.toByteArray());
try {
writer.write(null, text);
mOut.reset();
} catch (InterruptedException e) {
throw new IOException(e);
}

}

@SuppressWarnings("unchecked")
private void putField(Object field) throws IOException {
// string constants for each delimiter

switch (DataType.findType(field)) {
case DataType.NULL:
break; // just leave it empty

case DataType.BOOLEAN:
mOut.write(((Boolean) field).toString().getBytes());
break;

case DataType.INTEGER:
mOut.write(((Integer) field).toString().getBytes());
break;

case DataType.LONG:
mOut.write(((Long) field).toString().getBytes());
break;

case DataType.FLOAT:
mOut.write(((Float) field).toString().getBytes());
break;

case DataType.DOUBLE:
mOut.write(((Double) field).toString().getBytes());
break;

case DataType.BYTEARRAY: {
byte[] b = ((DataByteArray) field).get();
mOut.write(b, 0, b.length);
break;
}

case DataType.CHARARRAY:
mOut.write(((String) field).getBytes(UTF8));
break;

default: {
int errCode = 2108;
String msg = "Could not determine data type of field: " + field;
throw new ExecException(msg, errCode, PigException.BUG);
}

}

}
}