A reduce side join is arguably one of the easiest implementations of a join in MapReduce, and therefore is a very attractive choice. It can be used to execute all types of joins like inner join,outer joins,anti joins and Cartesian product.This pattern has no limitation on the size of the data sets and also it can join as many data sets together at once as you need. But a reduce side join will likely require a large amount of network bandwidth because the bulk of the data is sent to the reduce phase.Unfortunately,if all of the data sets are large, reduce side join may be your only choice.
Use cases
1. Multiple large data sets are being joined by a foreign key. If all but one of the data sets can be fit into memory, try using the replicated join.
2. You want the flexibility of being able to execute any join operation.
Problem To Solve
Lets take a Telecom domain example . We take two different dataset one is a hlog data which helps in determining the speed of the line and the second is the line management data and lets call it dsl data. Given the two dataset join the two datasets using a foreign key and fetch all the required fields from the two dataset. We are fetching the supplier_id,system_id,vendor_id and version_id from the dsl data set and the hlog from the hlog dataset and also we are using the device and port id as the foreign key to join the two dataset.
Steps to follow
1. The mapper prepares the join operation by taking each input record from each of the data sets that is dsl and hlog dataset in our case and extracting the foreign key which is a combination of device and port id from each record. The foreign key is written as the output key, and the entire input record as the output value. This output value is flagged by some unique identifier for the data set, such as D for dsl data and H for hlog data.
2. If required hash partitioner can be used, or a customized partitioner can be created to distribute the intermediate key/value pairs more evenly across the reducers.
3. The reducer performs the desired join operation by collecting the values of each input group into temporary lists. For example, all records flagged with D are stored in the dsl list and all records flagged with H are stored in the hlog list. These lists are then iterated over and the records from both sets are joined together. For an inner join, a joined record is output if all the lists are not empty. For an outer join (left,right, or full), empty lists are still joined with non empty lists. The antijoin is done by examining that exactly one list is empty. The records of the non-empty list are written with an empty writable.
Achieving in pig
Pig has support for inner joins and left, right, and full outer joins.
[Java]
Inner Join
A = JOIN dsl by device_id, hlog by device_id;
Outer Join
A = JOIN dsl by device_id [left|right|full] outer hlog by device_id;
[/Java]
Performance Check
A plain reduce side join puts a lot of strain on the cluster’s network. Because the foreign key of each input record is extracted and output along with the record and no data can be filtered ahead of time, pretty much all of the data will be sent to the shuffle and sort step. For this reason, reduce side joins will typically utilize relatively more reducers than your typical analytic so if possible try to use any other join patterns available in mapreduce like a replicated join or composite join.
Here is a sample hlog input data attached sample_hlog.csv
Here is a sample dsl input data attached sample_dsl.csv
Driver Code
Lets start with driver code . As we have two different dataset with different representations we need to parse the two input dataset differently.These cases are handled elegantly by using the MultipleInputs class, which allows you to specify the InputFormat and Mapper to use on a per-path basis. For example, we have hlog data that we want to combine with the dsl data for our analysis, then we might set up the input as follows:
MultipleInputs.addInputPath(sampleJob, new Path(args[0]), TextInputFormat.class, SpeedHlogDeltaDataMapper.class); MultipleInputs.addInputPath(sampleJob, new Path(args[1]), TextInputFormat.class, DsllDataMapper.class);
We can set the join type required in the conf.set(“join.type”,”inner”) . We can set join type as inner or leftouter or rightouter or fullouter or anti join types which are implemented in the reducer.
import java.io.File; import java.io.IOException; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import com.hadoop.design.summarization.blog.ConfigurationFactory; public class DriverReduceSideJoin { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { /* * I have used my local path in windows change the path as per your * local machine */ args = new String[] { "Replace this string with Input Path location for hlog data","Replace this string with Input Path location for dsl data" "Replace this string with output Path location" }; /* delete the output directory before running the job */ FileUtils.deleteDirectory(new File(args[2])); /* set the hadoop system parameter */ System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location"); if (args.length != 3) { System.err.println("Please specify the input and output path"); System.exit(-1); } Configuration conf = ConfigurationFactory.getInstance(); conf.set("join.type","inner"); Job sampleJob = Job.getInstance(conf); sampleJob.setJarByClass(DriverReduceSideJoin.class); sampleJob.getConfiguration().set("mapreduce.output.textoutputformat.separator", ","); TextOutputFormat.setOutputPath(sampleJob, new Path(args[2])); sampleJob.setOutputKeyClass(Text.class); sampleJob.setOutputValueClass(Text.class); sampleJob.setReducerClass(SpeedHlogDslJoinReducer.class); MultipleInputs.addInputPath(sampleJob, new Path(args[0]), TextInputFormat.class, SpeedHlogDeltaDataMapper.class); MultipleInputs.addInputPath(sampleJob, new Path(args[1]), TextInputFormat.class, DsllDataMapper.class); @SuppressWarnings("unused") int code = sampleJob.waitForCompletion(true) ? 0 : 1; } }
Mapper Code
In this case, there are two mapper classes, one for hlog i,e SpeedHlogDeltaDataMapper and one for dsl i,e DsllDataMapper. In both, we extract the device and port id which are in the index 5 and 6 in the hlog data and 2 and 3 in the dsl data to use it as the output key. We output the input value prepended with a character ‘H’ for a hlog data or ‘D’ for a dsl data so we know which data set the record came from during the reduce phase.
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class DsllDataMapper extends Mapper<Object, Text, Text, Text> { private Text outkey = new Text(); private Text outvalue = new Text(); public static final String COMMA = ","; @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String data = value.toString(); String[] field = data.split(",", -1); if (null != field && field.length > 62) { String neid = field[2]; String portid = field[3]; outkey.set(neid + portid); outvalue.set("D" + field[0] + COMMA + field[5] + COMMA + field[7].toString() + COMMA + field[62] + COMMA + field[63]); context.write(outkey, outvalue); } } }
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SpeedHlogDeltaDataMapper extends Mapper<Object, Text, Text, Text> { private Text outkey = new Text(); private Text outvalue = new Text(); public static final String COMMA = ","; @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] values = value.toString().split(",", -1); String neid = values[5]; String portid = values[6]; outkey.set(neid + portid); outvalue.set("H" + values[4] + COMMA + values[5] + COMMA + values[6] + COMMA + values[7]+COMMA+values[8]); context.write(outkey, outvalue); } }
Note : When you output the value from the map side, the entire record doesn’t have to be sent. This is an opportunity to optimize the join by keeping only the fields of data you want to join together. It requires more processing on the map side, but is worth it in the long run. Also, since the foreign key is in the map output key, you don’t need to keep that in the value, either.
Reducer code
The reducer code iterates through all the values of each group and looks at what each record is tagged with and then puts the record in one of two lists. After all values are binned in either list, the actual join logic is executed using the two lists. The join logic differs slightly based on the type of join, but always involves iterating through both lists and writing to the Context object. The type of join is pulled from the job configuration in the setup method.Below is the explanation on each join type
We are considering the hlog data as the left side dataset and dsl data as the right side dataset
Inner Join – In inner join there has to be a matching data in both data set so we are checking the hlog list and also the dsl list if both the list are not empty we can consider there is a matching data and output it into the context .
Left outer join – In left outer join as we are considering the hlog data as the left side dataset we are iterating over the hlog data and then we check the dsl list which is right side list and if the list is not empty join hlog with dsl data. If the right list is empty,
output each record of hlog with an empty string.
right outer join – In right outer join as we are considering the dsl data as the right side dataset we are iterating over the dsl data and then we check the hlog list which is left side list and if the list is not empty join dsl with hlog data. If the left list is empty,
output each record of dsl with an empty string.
full outer join – A full outer join is more complex, in that we want to keep all records, ensuring that we join records where appropriate. If hlog list is not empty, then for every element in hlog, join with dsl list when the dsl list is not empty, or output hlog by itself if dsl list is empty. If hlog list is empty, then just output dsl list.
anti join – For an antijoin, if at least one of the lists is empty, output the records from the nonempty list with an empty Text object.
Note : Be considerate of follow on data parsing to ensure proper field delimiters. Outputting an empty text object is actually unwise. A record that contains the proper structure but with null fields should be generated instead of outputting an empty object. This will ensure proper parsing for follow-on analytics.
import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SpeedHlogDslJoinReducer extends Reducer<Text, Text, Text, Text> { private ArrayList<Text> listH = new ArrayList<Text>(); private ArrayList<Text> listD = new ArrayList<Text>(); private static final Text EMPTY_TEXT = new Text(""); @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { listH.clear(); listD.clear(); for (Text text : values) { if (text.charAt(0) == 'H') { listH.add(new Text(text.toString().substring(1))); } else if (text.charAt(0) == 'D') { listD.add(new Text(text.toString().substring(1))); } } executeJoinLogic(context); } private void executeJoinLogic(Context context) throws IOException, InterruptedException { String joinType = context.getConfiguration().get("join.type"); if (joinType.equalsIgnoreCase("inner")) { if (!listH.isEmpty() && !listD.isEmpty()) { for (Text hlogData : listH) { for (Text dslData : listD) { context.write(hlogData, dslData); } } } } else if (joinType.equalsIgnoreCase("leftouter")) { for (Text hlogData : listH) { if (!listD.isEmpty()) { for (Text dslData : listD) { context.write(hlogData, dslData); } } else { context.write(hlogData, EMPTY_TEXT); } } } else if (joinType.equalsIgnoreCase("rightouter")) { for (Text dslData : listD) { if (!listH.isEmpty()) { for (Text hlogData : listH) { context.write(dslData, hlogData); } } else { context.write(dslData, EMPTY_TEXT); } } } else if (joinType.equalsIgnoreCase("fullouter")) { if (!listH.isEmpty()) { for (Text hlogData : listH) { if (!listD.isEmpty()) { for (Text dslData : listD) { context.write(hlogData, dslData); } } context.write(hlogData, EMPTY_TEXT); } } else { for (Text dslData : listD) { context.write(dslData, EMPTY_TEXT); } } } else if (joinType.equalsIgnoreCase("anti")) { if (listH.isEmpty() ^ listD.isEmpty()) { for (Text hlogData : listH) { context.write(hlogData, EMPTY_TEXT); } for (Text dslData : listD) { context.write(EMPTY_TEXT, dslData); } } } } }
Final Thoughts
Consider using added optimization of using a Bloom filter to filter out some of mapper output. This will help reduce the amount of data being sent to the reducers and in effect reduce the run time of our analytic . Please check this link for more information on the bloom filter usage https://timepasstechies.com/map-reduce-bloom-filter-examplepattern-optimization-sample-data/
Very interesting and helpfull post. Nevertheless, I think there is a problem with your mappers. If you use “outkey.set(neid + portid);” for mapping then e.g. a resulting outkey may have be caused by different neids or portids. E.g. outkey 12345 could be originate by 12;345 or 123;45 or 1;2345 and so on.
Thanks Eduardo for pointing it out . I will correct this when asap