mapreduce example using replicated join between one large and many small data sets that can be performed on the map side

A replicated join is an extremely useful, but has a strict size limit on all but one of the data sets to be joined. All the data sets except the very large one are essentially read into memory during the setup phase of each map task, which is limited by the JVM heap. If you can live within this limitation, you get a drastic benefit because there is no reduce phase at all, and therefore no shuffling or sorting. The join is done entirely in the map phase, with the very large data set being the input for the MapReduce job.

There is an additional restriction that a replicated join is really useful only for an inner or a left outer join where the large data set is the left data set. The other join types require a reduce phase to group the “right” data set with the entirety of the left data set. Although there may not be a match for the data stored in memory for a given map task, there could be match in another input split. Because of this, we will restrict this pattern to inner and left outer joins.

Steps to follow

1. The mapper is responsible for reading all files from the distributed cache during the setup phase and storing them into in-memory lookup tables. After this setup phase completes, the mapper processes each record and joins it with all the data stored in-memory. If the foreign key is not found in the in-memory structures, the record is either omitted or output, based on the join type.
2. No combiner, partitioner, or reducer is used for this pattern. It is map-only.
3. The output is a number of part files equivalent to the number of map tasks. The part files contain the full set of joined records. If a left outer join is used, the input to the MapReduce analytic will be output in full, with possible null values.

Implementing this pattern in pig

Pig supports replicated join through a simple modification to the standard join operation syntax. Only inner and left outer joins are supported in replicated joins, for the same reasons explained above.The order of the data sets in the line of code matters because other than the first dataset in the code all other dataset is stored in memory.

Lets say we have broadband speed metric dataset which is a huge dataset which we need to join with the broadband service id to pincode mapping dataset to do some analytic on the actual broadband speed at each location . Below code can be used for the same but make sure bigdataset is used first as order matters when doing replicated join in pig.


bigdataset = LOAD 'broadband_speed_metric_dataset' AS (schema_here);
smalldataset = LOAD 'service_id_to_pincode_mapper' AS (schema_here);
joined_data = JOIN bigdataset BY service_id, smalldataset BY service_id USING 'replicated';

Performance analysis

A replicated join can be the fastest type of join executed because there is no reducer required, but it comes at a cost. There are limitations on the amount of data that can be stored safely inside the JVM, which is largely dependent on how much memory you are willing to give to each map and reduce task. Experiment around with your data sets to see how much you can fit into memory prior to fully implementing this pattern.

Problem to solve

Lets take a Telecom domain example . We take two different dataset one is a service id to pin code data which has mapping data between service id to pin code and the second is the line management data and lets call it dsl data. Given the two dataset join the two datasets using a foreign key which is service id here and fetch all the required fields from the two dataset. We are fetching the dsl data and the pin-code from the service id to pin code dataset to join both dataset.As the service_id to pin-code mapping file is of small size we are going to load this data in memory and make a join with dsl data which we expect to be larger.

Sample input data

Here is a sample dsl input data attached sample_dsl_service_id.csv

Here is a sample service_id to pin code mapping  input file attached  service_id_to_pincode.csv

Mapper code

During the setup phase of the mapper, the service_id to pin-code mapping file data is read from the filesystem and stored in memory.Each record is parsed and the service id and pincode is pulled out of the record. Then, the service id and pincode are added to a HashMap for retrieval in the map method. This is where an out of memory error could occur, as the entire contents of the file is stored, with additional overhead of the data structure itself. If it does, you will either have to increase the JVM size or use a plain reduce side join.

After setup, consecutive calls to the map method are performed. For each input record,the service_id is pulled from the dsl data which is available at the 0th index. This service_id is then used to retrieve a value from the HashMap built during the setup phase of the map. If pincode is found, the input value is output along with the retrieved pincode. If a value is not found, but a left outer join is being executed, the input value is output with an empty Text object.

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ReplicatedJoinMapper extends Mapper<Object, Text, Text, Text> {

private static final Text EMPTY_TEXT = new Text("");
private HashMap<String, String> serviceid_To_pincode = new HashMap<String, String>();
private Text outvalue = new Text();
private String joinType = null;
private String service_id_file_location = null;

@Override
public void map(Object key, Text values, Context context) throws IOException, InterruptedException {

String data = values.toString();
String[] field = data.split(",", -1);
String service_id = field[0];
String pincode = serviceid_To_pincode.get(service_id);
outvalue.set(EMPTY_TEXT);
if (null != pincode && pincode.length() > 0) {
outvalue.set(pincode);
context.write(values, outvalue);

} else if (joinType.equalsIgnoreCase("leftouter")) {
context.write(values, EMPTY_TEXT);
}

}

@Override
public void setup(Context context) {
joinType = context.getConfiguration().get("join.type");
service_id_file_location = context.getConfiguration().get("service.id.file.path");

try {

BufferedReader bufferedReader = new BufferedReader(new FileReader(service_id_file_location));
String line;
while ((line = bufferedReader.readLine()) != null) {

String[] field = line.split(",", -1);
String service_id = field[0];
String pincode = field[1];
if (null != service_id && service_id.length() > 0 && null != pincode && pincode.length() > 0) {
serviceid_To_pincode.put(service_id, pincode);
}
}
bufferedReader.close();

} catch (Exception ex) {
System.out.println(ex.getLocalizedMessage());
}

}

}

Driver Code

Finally we will use the driver class to test everything is working fine as expected . The output is a number of part files equivalent to the number of map tasks. The part files contain the full set of joined records. If a left outer join is used, the input to the MapReduce analytic will be output in full, with possible null values.


import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DriverReplicatedJoin {

public static void main(String[] args) throws Exception {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location for dsl data",
"Replace this string with Input file location for of service_id_to_pincode mapping file"
"Replace this string with output Path location" };

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[2]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 3) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

Configuration conf = new Configuration();
conf.set("join.type","inner");
conf.set("service.id.file.path", args[1]);
Job job = Job.getInstance(conf);
job.setJarByClass(DriverReplicatedJoin.class);
job.setJobName("Replicate Join");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setMapperClass(ReplicatedJoinMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);

}
}