mapreduce example to join and convert row based structured data into hierarchical pattern like json or xml

The structured to hierarchical pattern is used to convert the format of data . This pattern can be used when we need to transform the row-based data to a hierarchical format, such as JSON or XML.If you have multiple data set which requires you to first do a join, an expensive operation, then extract the data that allows you to do your real work you should take advantage of hierarchical data to avoid doing joins in which case we can use this pattern.So basically there are couple of scenarios in which this pattern is applicable i,e You have data sources that are linked by some set of foreign keys or Your data is structured and row-based.

Steps to achieve hierarchical data

1. If you wish to combine multiple data sources into a hierarchical data structure, a Hadoop class called MultipleInputs from org.apache.hadoop.mapreduce.lib.input is extremely valuable. MultipleInputs allows you to specify different input paths and different mapper classes for each input. The configuration is done in the driver. If you are loading data from only one source in this pattern,you don’t need this step.
2. The mappers load the data and parse the records into one cohesive format so that your work in the reducers is easier. The output key should reflect how you want to identify the root of each hierarchical record.
3.The reducer receives the data from all the different sources key by key. All of the data for a particular grouping is going to be provided for you in one iterator, so all that is left for you to do is build the hierarchical data structure from the list of data items.

UseCases for the pattern

1. Pre joining data – Data arrives in disjointed structured data sets, and for analytical purposes it would be easier to bring the data together into more complex objects. By doing this, you are setting up your data to take advantage of the NoSQL model of analysis.
2. Preparing data for HBase or MongoDB – HBase is a natural way to store this data, so you can use this method to bring the data together in preparation for loading into HBase or MongoDB.

Achieving the same using Pig

The COGROUP method in Pig does a great job of bringing data together while preserving the original structure. However, using the predefined
keywords to do any sort of real analysis on a complex record is more challenging out of the box. For this, a user-defined function is the right way to go.

Employee_Info = LOAD '/input/data/employee' AS PigStorage(',');
Employee_Info = LOAD '/input/data/department' AS PigStorage(',');
grouped_data = COGROUP Employee_Info BY $1, Employee_Info BY $1;
analyzed_data = FOREACH grouped_data GENERATE udfFunction(group, $1, $2);
Performance Consideration

There are two performance concerns that you need to pay attention to when using this pattern. First, you need to be aware of how much data is being sent to the reducers from the mappers, and second you need to be aware of the memory footprint of the object that the reducer builds.The next major concern is the possibility of hot spots in the data that could result in an obscenely large record. With large data sets, it is conceivable that a particular output record is going to have a lot of data associated with it.Imagine a specific key is having million of records and if you are building some sort of XML object, all of those data at one point might be stored in memory before writing the object out. This can cause you to blow out the heap of the Java Virtual machine, which obviously should be avoided.

Problem To Solve

Lets take a Telecom domain example . We take two different dataset one is a hlog data which helps in determining the speed of the line and the second is the line management data and lets call it dsl data. Given the two dataset which are in the row based structure make a join of the dataset to fetch all the required fields from the two dataset and covert the data into a hierarchical json format.We are fetching the supplier_id,system_id,vendor_id and version_id from the dsl data set and the hlog from the hlog dataset and also we are using the device and port id as the forieng key to join the two dataset. Below is the expected output


{
"Test_Device_id_345_/shelf=0/slot=1/port=0":{
"supplier_id":"Test_Vendor_Id",
"system_id":"B5004244434D0000", "hlog":"03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff03ff0af01b101b201b201b301b401b601b701b701b801b801ba01b901ba01b901b901ba",
"vendor_id":"B5004244434D0000",
"version_id":"A2pv6C038m"
}
}
Sample input data

Here is a sample hlog input data attached sample_hlog.csv

Here is a sample dsl input data attached sample_dsl.csv

Driver Code

Lets start with driver code . As we have two different dataset with different representations we need to parse the two input dataset differently.These cases are handled elegantly by using the MultipleInputs class, which allows you to specify the InputFormat and Mapper to use on a per-path basis. For example, we have hlog data that we want to combine with the dsl data for our analysis, then we might set up the input as follows:

MultipleInputs.addInputPath(sampleJob, new Path(args[0]), TextInputFormat.class,
SpeedHlogDeltaDataMapper.class);
MultipleInputs.addInputPath(sampleJob, new Path(args[1]), TextInputFormat.class,
DsllDataMapper.class);

This code replaces the usual calls to FileInputFormat.addInputPath() and job.setMapperClass().The important thing is that the map outputs have the same types, since the reducers see the aggregated map outputs and are not aware of the different mappers used to produce them.
The MultipleInputs class has an overloaded version of addInputPath() that doesn’t take a mapper this is useful when you only have one mapper (set using the Job’s setMapperClass() method) but multiple input formats.

public static void addInputPath(Job job, Path path,Class<? extends InputFormat> inputFormatClass)

import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.hadoop.design.summarization.blog.ConfigurationFactory;

public class DriverStructuredToHierarchical {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location for hlog data",
"Replace this string with Input Path location for dsl data"
"Replace this string with output Path location" };

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[2]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 3) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

Configuration conf = ConfigurationFactory.getInstance();
Job sampleJob = Job.getInstance(conf);
sampleJob.setJarByClass(DriverStructuredToHierarchical.class);
TextOutputFormat.setOutputPath(sampleJob, new Path(args[2]));
sampleJob.setOutputKeyClass(Text.class);
sampleJob.setOutputValueClass(Text.class);
sampleJob.setReducerClass(SpeedHlogDslJoinReducer.class);
MultipleInputs.addInputPath(sampleJob, new Path(args[0]), TextInputFormat.class,
SpeedHlogDeltaDataMapper.class);
MultipleInputs.addInputPath(sampleJob, new Path(args[1]), TextInputFormat.class, DsllDataMapper.class);
sampleJob.getConfiguration().set("validCount", "1");
sampleJob.getConfiguration().set("totalCount", "1");
@SuppressWarnings("unused")
int code = sampleJob.waitForCompletion(true) ? 0 : 1;

}
}

Mapper Code

In this case, there are two mapper classes, one for hlog i,e SpeedHlogDeltaDataMapper and one for dsl i,e DsllDataMapper. In both, we extract the device and port id which are in the index 5 and 6 in the hlog data and 2 and 3 in the dsl data to use it as the output key. We output the input value prepended with a character ‘H’ for a hlog data or ‘D’ for a dsl data so we know which data set the record came from during the reduce phase.


import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SpeedHlogDeltaDataMapper extends Mapper<Object, Text, Text, Text> {

private Text outkey = new Text();
private Text outvalue = new Text();
public static final String COMMA = ",";

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String[] values = value.toString().split(",", -1);
String neid = values[5];
String portid = values[6];
outkey.set(neid + portid);
outvalue.set("H" + values[4] + COMMA + values[5] + COMMA + values[6] + COMMA
+ values[7]+COMMA+values[8]);
context.write(outkey, outvalue);

}
}

 

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DsllDataMapper extends Mapper<Object, Text, Text, Text> {

private Text outkey = new Text();
private Text outvalue = new Text();
public static final String COMMA = ",";

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String data = value.toString();
String[] field = data.split(",", -1);
if (null != field && field.length > 62) {

String neid = field[2];
String portid = field[3];
outkey.set(neid + portid);
outvalue.set("D" + field[0] + COMMA + field[5] + COMMA + field[7].toString() + COMMA + field[62] + COMMA
+ field[63]);
context.write(outkey, outvalue);

}

}

}

Json Builder

We are using the json api from org.json.JSONObject jar file which can be downloaded from maven using the below artifactId.

<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20170516</version>
</dependency>

we we will be using the below code to convert the final data into a json format before writing the output in the reducer.We will be passing the parent key which will be a combination of device and the port id and map to frame the key value pairs in the json.


 

import java.util.Map;
import org.json.JSONObject;

public class JsonBuilder {

public String buildJson(String parentKey, Map<String, String> jsonMap) {

JSONObject jsonString = new JSONObject();
for (Map.Entry<String, String> entry : jsonMap.entrySet()) {

jsonString.put(entry.getKey(), entry.getValue());

}

return new JSONObject().put(parentKey, jsonString).toString();
}

}

 

Reducer code

The reducer builds the hierarchical JSON object using the code above. All the values are iterated to get the required fields . We know which record is which by the flag we added to the value. These flags are removed before adding these values to the respective list. Then we check that there is a mapping between the hlog and the dsl data by checking the corresponding lists.If the mapping is found we retrieve the hlog from the hlog data and vendor_id,system_id,version_id and supplier_id from the dsl data. Finally using our JsonBuilder we convert the data into a json.

import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SpeedHlogDslJoinReducer extends Reducer<Text, Text, NullWritable, Text> {

private ArrayList<Text> listH = new ArrayList<Text>();
private ArrayList<Text> listD = new ArrayList<Text>();

@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

listH.clear();
listD.clear();

for (Text text : values) {

if (text.charAt(0) == 'H') {
listH.add(new Text(text.toString().substring(1)));
} else if (text.charAt(0) == 'D') {
listD.add(new Text(text.toString().substring(1)));
}

}
try {
executeConversionLogic(context);
} catch (ParseException e) {

throw new IOException("Its a parse exception wrapped in IOException " + e.getMessage());

}

}

private void executeConversionLogic(Context context) throws IOException, InterruptedException, ParseException {

if (!listH.isEmpty() && !listD.isEmpty()) {

for (Text hlogText : listH) {
String[] hlog = hlogText.toString().split(",");

for (Text dslText : listD) {

String[] dsl = dslText.toString().split(",", -1);
Map<String, String> maps = new HashMap<String, String>();
maps.put("vendor_id", dsl[2]);
maps.put("system_id", dsl[3]);
maps.put("version_id", dsl[4]);
maps.put("supplier_id", dsl[0]);
maps.put("hlog", hlog[4]);
JsonBuilder jsonBuilder = new JsonBuilder();
String json = jsonBuilder.buildJson(hlog[1] + "_" + hlog[2], maps);
context.write(NullWritable.get(), new Text(json));
break;

}

}
}
}

}