mapreduce example to do a composite joins with many very large formatted inputs on mapside

Composite joins are particularly useful if you want to join very large data sets together. However, the data sets must first be sorted by foreign key, partitioned by foreign key, and read in a very particular manner in order to use this type of join. With that said, if your data can be read in such a way or you can prepare your data, a composite join has a huge leg-up over the other types as its a map only job.

Hadoop has built in support for a composite join using the CompositeInputFormat. This join utility is restricted to only inner and full outer joins. The inputs for each mapper must be partitioned and sorted in a specific way, and each input dataset must be divided into the same number of partitions. In addition to that, all the records for a particular foreign key must be in the same partition. Usually, this occurs only if the output of several jobs has the same number of reducers and the same foreign key, and output files aren’t splittable, i.e., not bigger than the HDFS block size or gzipped. In many cases, one of the other patterns such as reduce side join or replicate join is more applicable than composite join.If you find yourself having to format the data prior to using a composite join, you are probably better off just using a reduce side join unless this output is used by many analytics.

Below are the condition for using this pattern

1. An inner or full outer join is desired.
2. All the data sets are sufficiently large.
3. All data sets can be read with the foreign key as the input key to the mapper.
4. All data sets have the same number of partitions.
5. Each partition is sorted by foreign key, and all the foreign keys reside in the associated partition of each data set. That is, partition X of data sets A and B contain the same foreign keys and these foreign keys are present only in partition X.
6. The data sets do not change often (if they have to be prepared).

Performance analysis

A composite join can be executed relatively quickly over large data sets. However, the MapReduce framework can only set up the job so that one of the two data sets are data local. The respective files that are partitioned by the same key cannot be assumed to be on the same node.Any sort of data preparation needs to taken into account in the performance of this analytic. The data preparation job is typically a MapReduce job, but if the data sets rarely change, then the sorted and partitioned data sets can be used over and over. Thus, the cost of producing these prepared data sets is averaged out over all of the runs.

Problem to solve

Lets take a Telecom domain example . We take two different dataset one is a service id to pin code data which has mapping data between service id to pin code and the second is the line management data and lets call it dsl data. Given the two dataset join the two datasets using a composite join pattern.

Sample input data

Here is a sample dsl input data attached sample_dsl_service_id.csv

Here is a sample service_id to pin code mapping input file attached service_id_to_pincode.csv

This data will be preprocessed before using this in composite join

Preparation of data for composite join

Driver Code for Service to pin code mapping data pre processing


import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class DriverDataPrepService {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location
for service to pin code mapping input data","Replace this string with output Path location" };

 

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[1]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 2) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

Configuration conf = new Configuration();

Job sampleJob = Job.getInstance(conf);
sampleJob.setJarByClass(DriverDataPrepService.class);
TextOutputFormat.setOutputPath(sampleJob, new Path(args[1]));
TextInputFormat.setInputPaths(sampleJob, new Path(args[0]));
sampleJob.setOutputKeyClass(Text.class);
sampleJob.setOutputValueClass(Text.class);
sampleJob.setMapperClass(ServiceInventoryDataMapper.class);
sampleJob.setNumReduceTasks(0);
@SuppressWarnings("unused")
int code = sampleJob.waitForCompletion(true) ? 0 : 1;

}
}
Mapper Code for Service to pin code mapping data pre processing

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ServiceInventoryDataMapper extends Mapper<Object, Text, Text, Text> {

private Text outkey = new Text();
private Text outvalue = new Text();

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String data = value.toString();
String[] field = data.split(",", -1);
String service_id = field[0];
if (null != service_id && service_id.length() > 0) {
outkey.set(service_id);
outvalue.set(field[1]);
context.write(outkey, outvalue);

}

}

}

Driver Code dsl data pre processing

import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class DriverDataPrepDsl {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location for dsl input data","Replace this string with output Path location" };

 

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[1]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 2) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

Configuration conf = new Configuration();

Job sampleJob = Job.getInstance(conf);
sampleJob.setJarByClass(DriverDataPrepDsl.class);
TextOutputFormat.setOutputPath(sampleJob, new Path(args[1]));
TextInputFormat.setInputPaths(sampleJob, new Path(args[0]));
sampleJob.setOutputKeyClass(Text.class);
sampleJob.setOutputValueClass(Text.class);
sampleJob.setMapperClass(DsllDataMapper.class);
sampleJob.setNumReduceTasks(0);
@SuppressWarnings("unused")
int code = sampleJob.waitForCompletion(true) ? 0 : 1;

}
}

 

Mapper Code dsl data pre processing

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DsllDataMapper extends Mapper<Object, Text, Text, Text> {

private Text outkey = new Text();
public static final String COMMA = ",";

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String data = value.toString();
String[] field = data.split(",", -1);
String service_id = field[0];
if (null != service_id && service_id.length() > 0) {
outkey.set(service_id);

context.write(outkey, value);

}

}

}

Driver Code Composite Join

The driver code handles most of the work in the job configuration stage. It sets up the type of input format used to parse the data sets, as well as the join type to execute. The framework then handles executing the actual join when the data is read.To meet the preconditions of a composite join, both the service id to pin code data and the dsl data has been preprocessed by MapReduce and output using the TextOutputFormat. The key of each data set is the service id and the value is the combination of dsl and service id to pin code data.Hadoop has a KeyValueTextOutputFormat that can parse these formatted data sets exactly as required.

Each data set was sorted by the foreign key, the caveat being that they are sorted as Text objects rather than LongWritable objects. That is, service id 12345 comes before user 2. This is because the CompositeInputFormat uses Text objects as the key for comparisons when doing the join. Each data set should be gzipped to prevent it from being split. The driver code demonstrates how to configure MapReduce to handle the join, while the mapper code is trivial.

The driver parses the input arguments for the job: the path to the service id to pin code data, the path to the dsl data, the analytic output directory, and the type of join (inner or outer).The most important piece of configuration is setting the input format and then configuring the join expression.

The input format has a static helper function to create the join expression itself. It takes in the join type (inner or outer), the input format class used to parse all the data sets, and then as many Path or String objects as desired, which represent the data sets to join together.

import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.join.CompositeInputFormat;

public class DriverCompositeJoin {

public static void main(String[] args) throws Exception {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location for dsl pre processed data",
"Replace this string with Input Path location for service id to pin code pre processed data","Replace this string with output Path location" };

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[2]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 3) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

JobConf job = new JobConf("CompositeJoin");
job.setJarByClass(DriverCompositeJoin.class);
job.setJobName("Composite Join");
job.setInputFormat(CompositeInputFormat.class);
job.set("mapred.join.expr",
CompositeInputFormat.compose("inner", KeyValueTextInputFormat.class, args[0], args[1]));
TextOutputFormat.setOutputPath(job, new Path(args[2]));

job.setMapperClass(CompositeMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

RunningJob runjob = JobClient.runJob(job);
while (!runjob.isComplete()) {
Thread.sleep(1000);
}
System.exit(runjob.isSuccessful() ? 0 : 1);

}
}
Mapper Code

The input to the mapper is the foreign key and a TupleWritable. This tuple contains a number of Text objects equivalent to the number of data sets. As far as position is concerned, the ordering of the Text objects maps directly to how it was configured. The first input path is the zeroth index, the second input path is the first index, and so on. The mapper simply grabs the objects from the tuple and outputs them.There are only two data sets to be joined in this example, so they are output as the key and value. If more were used, the strings would need be concatenated in some manner prior to being output.

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.join.TupleWritable;

public class CompositeMapper extends MapReduceBase implements Mapper<Text, TupleWritable, Text, Text> {
public void map(Text key, TupleWritable value, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
output.collect((Text) value.get(0), (Text) value.get(1));
}
}