hadoop mapreduce example with custom inputformat,inputsplit,recordreader,outputformat and recordwriter for generating test data

Generating data isn’t common. Typically you’ll generate a bunch of the data at once then use it over and over again. However, when you do need to generate data, MapReduce is an excellent system for doing it. The most common use case for this pattern is generating random data. Building some sort of representative data set could be useful for large scale testing for when the real data set is still too small. It can also be useful dataset for researching a proof of concept for an analytic at scale.Unfortunately, the implementation of this pattern isn’t straight forward in Hadoop because one of the foundational pieces of the framework is assigning one map task to an input split and assigning one map function call to one record. In this case, there are no input splits and there are no records, so we have to fool the framework to think there are.

To implement this pattern in Hadoop, implement a custom InputFormat and let a RecordReader generate the random data.The map function is completely oblivious to the origin of the data, so it can be built on the fly instead of being loaded out of some file in HDFS.

Below are the steps involved

1. The InputFormat creates the fake splits from nothing. The number of splits it creates should be configurable.

2. The RecordReader takes its fake split and generates random records from it.

3. In most cases, the IdentityMapper is used to just write the data out as it comes in.

Note : The major consideration here in terms of performance is how many worker map tasks are needed to generate the data.However, it makes little sense to fire up more map tasks than you have map slots since they are all doing the same thing.

Role of InputFormat

1. Validate the input configuration for the job (checking that the data is there).

2. Split the input blocks and files into logical chunks of type InputSplit, each of which is assigned to a map task for processing. For example, a 160 megabyte file in HDFS will generate three input splits along the byte ranges 0MB-64MB, 64MB-128MB and 128MB-160MB. Each map task will be assigned exactly one of these input splits, and then the RecordReader implementation is responsible for generate key/value pairs out of all the bytes it has been assigned.

3. Create the RecordReader implementation to be used to create key/value pairs from the raw InputSplit. These pairs are sent one by one to their mapper. Typically, the RecordReader has the additional responsibility of fixing boundaries, because the input split boundary is arbitrary and probably will not fall on a record boundary. For example, the TextInputFormat reads text files using a LineRecordReader to create key/value pairs for each map task for each line of text (separated by a newline character). The key is the number of bytes read in the file so far and the value is a string of characters up to a newline character. Because it is very unlikely that the chunk of bytes for each input split will be lined up with a newline character, the LineRecordReader will read past its given end in order to make sure a complete line is read. This bit of data comes from a different data block and is therefore not stored on the same node, so it is streamed from a DataNode hosting the block. This streaming is all handled by an instance of the FSDataInputStream class, and we don’t have to deal with any knowledge of where these blocks are.

Role of RecordReader

The RecordReader abstract class creates key/value pairs from a given InputSplit. While the InputSplit represents the byte-oriented view of the split, the RecordReader makes sense out of it for processing by a mapper. This is why Hadoop and MapReduce is considered schema on read. It is in the RecordReader that the schema is defined, based solely on the record reader implementation, which changes based on what the expected input is for the job. Bytes are read from the input source and turned into a Writable Comparable key and a Writable value. Custom data types are very common when creating custom input formats, as they are a nice object-oriented way to present information to a mapper.

A RecordReader uses the data within the boundaries created by the input split to generate key/value pairs. In the context of file-based input, the start is the byte position in the file where the RecordReader should start generating key/value pairs. The end is where it should stop reading records. These are not hard boundaries as far as the API is concerned—there is nothing stopping a developer from reading the entire file for each map task. While reading the entire file is not advised, reading outside of the boundaries it often necessary to ensure that a complete record is generated.

Role of output format

1. Validate the output configuration for the job.
2. Create the RecordWriter implementation that will write the output of the job.

On the flip side of the FileInputFormat, there is a FileOutputFormat to work with filebased output. Because most output from a MapReduce job is written to HDFS, the many file-based output formats that come with the API will solve most of yours needs. The default used by Hadoop is the TextOutputFormat, which stores key/value pairs to HDFS at a configured output directory with a tab delimiter. Each reduce task writes an individual part file to the configured output directory. The TextOutputFormat also validates that the output directory does not exist prior to starting the MapReduce job.The TextOutputFormat uses a LineRecordWriter to write key/value pairs for each map task or reduce task, depending on whether there is a reduce phase or not.

Role of RecordWriter

The RecordWriter abstract class writes key/value pairs to a file system, or another output. Unlike its RecordReader counterpart, it does not contain an initialize phase. However, the constructor can always be used to set up the record writer for whatever is needed. Any parameters can be passed in during construction, because the record writer instance is created via OutputFormat.getRecordWriter.

Problem To Solve

Lets solve a telecom problem .Given a list of device id and port id generate the dsl file for entire days data which can be used for testing. Ems generates one data point every 15 minutes so we are expecting 96 datapoints . The first datapoint should start from 000000 to 234500 which is in HHMMSS format.

Driver Code

The driver parses the four command line arguments to configure this job. It sets our custom input format and calls the static methods to configure it further. All the output is written to the given output directory. The identity mapper is used for this job, and the reduce phase is disabled by setting the number of reduce tasks to zero.

import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Driver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

args = new String[] { "Replace this string with number of tasks", "Replace this string with number of records to create per records", "Replace this string with output Path location" };

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[2]));

Configuration conf = new Configuration();
int numMapTasks = Integer.parseInt(args[0]);
int numRecordsPerTask = Integer.parseInt(args[1]);
Path outputDir = new Path(args[2]);
Job job = Job.getInstance(conf);
job.setJarByClass(Driver.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(DslInputFormat.class);
DslInputFormat.setNumMapTasks(job, numMapTasks);
DslInputFormat.setNumRecordPerTask(job, numRecordsPerTask);
TextOutputFormat.setOutputPath(job, outputDir);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 2);
}
}

InputSplit code

The DummyInputSplit class simply extends InputSplit and implements Writable. There is no implementation for any of the overridden methods, or for methods requiring return values return basic values. This input split is used to trick the framework into assigning a task to generate the random data.


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;

public class DummyInputSplit extends InputSplit implements Writable{

@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub

}

@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub

}

@Override
public long getLength() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return 0;
}

@Override
public String[] getLocations() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return new String[0];
}

}

InputFormat code

The input format has two main purposes: returning the list of input splits for the framework to generate map tasks from, and then creating the Random DslRecordReader for the map task. We override the getSplits method to return a configured number of DummyInputSplit splits. This number is pulled from the configuration. When the framework calls createRecordReader, a DslRecordReader is instantiated, initialized, and returned.


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class DslInputFormat extends InputFormat<Text, NullWritable> {

public static final String NUM_MAP_TASKS = "random.map.tasks";
public static final String NUM_RECORDS_PER_TASK = "random.num.records.per.map.task";
public static final String DEVICE_PORT_LIST = "random.device.port.file";

/*
* The implementation of getSplits typically uses the given JobContext
* object to retrieve the configured input and return a List of InputSplit
* objects. The input splits have a method to return an array of machines
* associated with the locations of the data in the cluster, which gives
* clues to the framework as to which TaskTracker should process the map
* task. This method is also a good place to verify the configuration and
* throw any necessary exceptions, because the method is used on the
* front-end (i.e. before the job is submitted to the JobTracker).
*/

@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
int numSplits = context.getConfiguration().getInt(NUM_MAP_TASKS, -1);
// Create a number of input splits equivalent to the number of tasks
ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
for (int i = 0; i < numSplits; ++i) {
splits.add(new DummyInputSplit());
}
return splits;
}

/*
* This method is used on the back-end to generate an implementation of
* Record Reader. Typically, a
* new instance is created and immediately returned, because the record
* reader has an initialize method that is called by the framework.
*/

@Override
public RecordReader<Text, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
DslRecordReader dslRecordReader = new DslRecordReader();
dslRecordReader.initialize(split, context);
return dslRecordReader;
}

public static void setNumMapTasks(Job job, int i) {
job.getConfiguration().setInt(NUM_MAP_TASKS, i);
}

public static void setNumRecordPerTask(Job job, int i) {
job.getConfiguration().setInt(NUM_RECORDS_PER_TASK, i);
}

}

 

RecordReader Code

This record reader is where the data is actually generated. It is given during our FakeInputSplit during initialization, but simply ignores it. The number of records to create is pulled from the job configuration.For each call to nextKeyValue, a random record is created using a simple random number generator. The counter is incremented to keep track of how many records have been generated. Once all the records are generated, the record reader returns false, signaling the framework that there is no more input for the mapper.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class DslRecordReader extends RecordReader<Text, NullWritable> {

private int numRecordsToCreate = 0;
private int createdRecords = 0;
private Text key = new Text();
private NullWritable value = NullWritable.get();
private Random random = new Random();
private ArrayList<String> device_port_list = new ArrayList<String>(Arrays.asList("device_id_1,port_id_1"));
public static final String NUM_MAP_TASKS = "random.map.tasks";
public static final String NUM_RECORDS_PER_TASK = "random.num.records.per.map.task";
public String time = "000000";

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.numRecordsToCreate = context.getConfiguration().getInt(NUM_RECORDS_PER_TASK, -1);

}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {

// Math.abs(new Random().nextInt()) % 15000
StringBuilder sb = new StringBuilder();

if (createdRecords < numRecordsToCreate) {

for (String device_port_id : device_port_list) {

sb.append(device_port_id + "," + time + "," + Math.abs(random.nextInt()) % 15000);
time = TimeStampUtil.getTimeStamp(time);
key.set(sb.toString());
createdRecords++;

}

return true;

} else {
return false;
}
}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return key;
}

@Override
public NullWritable getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return (float) createdRecords / (float) numRecordsToCreate;
}

@Override
public void close() throws IOException {
// TODO Auto-generated method stub

}

}

Output Sample

device_id_1,port_id_1,000000,13694
device_id_1,port_id_1,001500,14205
device_id_1,port_id_1,003000,6794
device_id_1,port_id_1,004500,3361
device_id_1,port_id_1,010000,7588
device_id_1,port_id_1,011500,7949
device_id_1,port_id_1,013000,4930
device_id_1,port_id_1,014500,12377
device_id_1,port_id_1,020000,7628
device_id_1,port_id_1,021500,4245