mapreduce reduce side join and top n records pattern with real world example

The problem mentioned below revolves around movies dataset. The dataset contains 2 files which are follows,

 File Name           Description / Schema

movies.txt           MovieID – Title – Genres
ratings.txt           UserID – MovieID – Rating – Timestamp

The dataset can be downloaded from the link :  http://grouplens.org/datasets/movielens/1m/

Problem to solve

1. Top ten most viewed movies with their movies Name (Ascending or Descending order)

We will be using reduce side join to join the datasets. A reduce side join is arguably one of the easiest implementations of a join in MapReduce, and therefore is a very attractive choice. It can be used to execute all types of joins like inner join,outer joins,anti joins and Cartesian product.This pattern has no limitation on the size of the data sets and also it can join as many data sets together at once as you need. But a reduce side join will likely require a large amount of network bandwidth because the bulk of the data is sent to the reduce phase.Unfortunately,if all of the data sets are large, reduce side join may be your only choice.

Steps to follow

1. The mapper prepares the join operation by taking each input record from each of the data sets that is movies and rating dataset in our case and extracting the foreign key which is a movieid from each record. The foreign key is written as the output key in MoviesDataMapper and RatingDataMapper and movie name is written as the value from MoviesDataMapper and Rating from RatingDataMapper. This output value is flagged by some unique identifier for the data set, such as M for Movies data and R for Rating data.

2. The reducer performs the desired join operation by collecting the values of each input group into temporary lists. For example, all records flagged with M are stored in the movies list and all records flagged with R are stored in the rating list. These lists are then iterated over and the records from both sets are joined together. For an inner join, a joined record is output if all the lists are not empty. For an outer join (left,right, or full), empty lists are still joined with non empty lists. The antijoin is done by examining that exactly one list is empty.We output the total number of users who have rated this movie to the output.

3. We find the top ten most  rated movies using the top n pattern of mapreduce.

 

Performance Check

A plain reduce side join puts a lot of strain on the cluster’s network. Because the foreign key of each input record is extracted and output along with the record and no data can be filtered ahead of time, pretty much all of the data will be sent to the shuffle and sort step. For this reason, reduce side joins will typically utilize relatively more reducers than your typical analytic so if possible try to use any other join patterns available in mapreduce like a replicated join or composite join.

Driver Code

Lets start with driver code . As we have two different dataset with different representations we need to parse the two input dataset differently.These cases are handled elegantly by using the MultipleInputs class, which allows you to specify the InputFormat and Mapper to use on a per-path basis. For example, we have movies data that we want to combine with the rating data for our analysis, then we might set up the input as follows:

MultipleInputs.addInputPath(sampleJob, new Path(args[0]), TextInputFormat.class,
MoviesDataMapper.class);
MultipleInputs.addInputPath(sampleJob, new Path(args[1]), TextInputFormat.class,
RatingDataMapper.class);

There are two jobs in this solution the first job joins the movies and the rating data set and also finds the number of ratings for each movie and the second job finds the highest viewed/rated movies from the list.

import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Driver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location for movies data",
"Replace this string with Input Path location for rating data"
"Replace this string with output Path location for the first job",
"Replace this string with output Path location for the second job" };

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[2]));

FileUtils.deleteDirectory(new File(args[3]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 4) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

Configuration conf = ConfigurationFactory.getInstance();
Job sampleJob = Job.getInstance(conf);
sampleJob.setJarByClass(Driver.class);
sampleJob.getConfiguration().set("mapreduce.output.textoutputformat.separator", "::");
TextOutputFormat.setOutputPath(sampleJob, new Path(args[2]));
sampleJob.setOutputKeyClass(Text.class);
sampleJob.setOutputValueClass(Text.class);
sampleJob.setReducerClass(MoviesRatingJoinReducer.class);
MultipleInputs.addInputPath(sampleJob, new Path(args[0]), TextInputFormat.class, MoviesDataMapper.class);
MultipleInputs.addInputPath(sampleJob, new Path(args[1]), TextInputFormat.class, RatingDataMapper.class);
int code = sampleJob.waitForCompletion(true) ? 0 : 1;

if (code == 0) {

Job job = Job.getInstance(conf);
job.setJarByClass(Driver.class);
job.setJobName("Highest_Viewed");
job.getConfiguration().set("mapreduce.output.textoutputformat.separator", "::");
FileInputFormat.addInputPath(job, new Path(args[2]));
FileOutputFormat.setOutputPath(job, new Path(args[3]));
job.setMapperClass(HighestViewedMoviesMapper.class);
job.setReducerClass(HighestViewMoviesReducer.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}
}

Mapper Code

In this case, there are two mapper classes for the first job, one for movies i,e MoviesDataMapper and one for rating i,e RatingDataMapper. In both, we extract the movie id to use it as the output key. We output the movie name and the rating value prepended in respective mappers with a character ‘M’ for a movies data or ‘R’ for a rating data so we know which data set the record came from during the reduce phase.

Note : When you output the value from the map side, the entire record doesn’t have to be sent. This is an opportunity to optimize the join by keeping only the fields of data you want to join together. It requires more processing on the map side, but is worth it in the long run. Also, since the foreign key is in the map output key, you don’t need to keep that in the value, either.

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MoviesDataMapper extends Mapper<Object, Text, Text, Text> {

private Text movieId = new Text();
private Text outvalue = new Text();

@Override
public void map(Object key, Text values, Context context) throws IOException, InterruptedException {
String data = values.toString();
String[] field = data.split("::", -1);
if (null != field && field.length == 3 && field[0].length() > 0) {
movieId.set(field[0]);
outvalue.set("M" + field[1]);
context.write(movieId, outvalue);

}

}
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class RatingDataMapper extends Mapper<Object, Text, Text, Text> {

private Text movieId = new Text();
private Text outvalue = new Text();

@Override
public void map(Object key, Text values, Context context) throws IOException, InterruptedException {
String data = values.toString();
String[] field = data.split("::", -1);
if (null != field && field.length == 4 && field[0].length() > 0) {

movieId.set(field[1]);
outvalue.set("R" + field[2]);
context.write(movieId, outvalue);

}

}
Reducer code

The first reducer code iterates through all the values of each group and looks at what each record is tagged with and then puts the record in one of two lists. After all values are binned in either list, the actual join logic is executed using the two lists. The join logic differs slightly based on the type of join, but always involves iterating through both lists and writing to the Context object. The type of join is pulled from the job configuration in the setup method.We are doing an inner join and in inner join there has to be a matching data in both data set so we are checking the movies list and also the rating list if both the list are not empty we can consider there is a matching data and output it into the context .

import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MoviesRatingJoinReducer extends Reducer<Text, Text, Text, Text> {

private ArrayList<Text> listMovies = new ArrayList<Text>();
private ArrayList<Text> listRating = new ArrayList<Text>();

@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

listMovies.clear();
listRating.clear();

for (Text text : values) {

if (text.charAt(0) == 'M') {
listMovies.add(new Text(text.toString().substring(1)));
} else if (text.charAt(0) == 'R') {
listRating.add(new Text(text.toString().substring(1)));
}

}

executeJoinLogic(context);

}

private void executeJoinLogic(Context context) throws IOException, InterruptedException {

if (!listMovies.isEmpty() && !listRating.isEmpty()) {
for (Text moviesData : listMovies) {

context.write(moviesData, new Text(String.valueOf(listRating.size())));

}
}
}

}

 

Output from the first job
Toy Story (1995)::2077
GoldenEye (1995)::888
City Hall (1996)::128
Curdled (1996)::20
Ed's Next Move (1996)::8
Extreme Measures (1996)::121
Glimmer Man, The (1996)::101
D3: The Mighty Ducks (1996)::142
Second Job

The second job finds the top ten most viewed/rated movies and it takes the output of the first job as the input. We are using the top n mapreduce pattern here .For more information on this pattern check in this link https://timepasstechies.com/mapreduce-topn/

Mapper Code

Each mapper determines the top ten records of its input split and outputs them to the reduce phase. The mappers are essentially filtering their input split to the top ten records, and the reducer is responsible for the final ten. Just remember to configure your job to only use one reducer.The mapper processes all input records and stores them in a TreeMap.If there are more than ten records in our TreeMap, the first element can be removed.After all the records have been processed, the top ten records in the TreeMap are output to the reducers in the cleanup method. This method gets called once after all key/value pairs have been through map, just like how setup is called once before any calls to map.

import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class HighestViewedMoviesMapper extends Mapper<Object, Text, NullWritable, Text> {

private TreeMap<Integer, Text> highestView = new TreeMap<Integer, Text>();

@Override
public void map(Object key, Text values, Context context) throws IOException, InterruptedException {
String data = values.toString();
String[] field = data.split("::", -1);
if (null != field && field.length == 2) {
int views = Integer.parseInt(field[1]);
highestView.put(views, new Text(field[0]+"::"+field[1]));
if (highestView.size() > 10) {
highestView.remove(highestView.firstKey());
}
}

}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {

for (Map.Entry<Integer, Text> entry : highestView.entrySet()) {
context.write(NullWritable.get(), entry.getValue());
}

}
}

Reducer code

Overall, the reducer determines its top ten records in a way that’s very similar to the mapper. Because we configured our job to have one reducer using job.setNumReduceTasks(1) and we used NullWritable as our key, there will be one input group for this reducer that contains all the potential top ten records. The reducer iterates through all these records and stores them in a TreeMap. If the TreeMap’s size is above ten, the first element (lowest value) is remove from the map. After all the values have been iterated over, the values contained in the TreeMap are flushed to the file system in descending order. This ordering is achieved by getting the descending map from the TreeMap prior to outputting the values. This can be done directly in the reduce method, because there will be only one input group, but doing it in the cleanup method would also work.

import java.io.IOException;
import java.util.TreeMap;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class HighestViewMoviesReducer extends Reducer<NullWritable, Text, NullWritable, Text> {

private TreeMap<Integer, Text> highestView = new TreeMap<Integer, Text>();

public void reduce(NullWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
String data = value.toString();
String[] field = data.split("::", -1);
if (field.length == 2) {
highestView.put(Integer.parseInt(field[1]), new Text(value));
if (highestView.size() > 10) {
highestView.remove(highestView.firstKey());
}
}
}

for (Text t : highestView.descendingMap().values()) {

context.write(NullWritable.get(), t);

}
}

}

Output from the second job
American Beauty (1999)::3428
Star Wars: Episode IV - A New Hope (1977)::2991
Star Wars: Episode V - The Empire Strikes Back (1980)::2990
Star Wars: Episode VI - Return of the Jedi (1983)::2883
Jurassic Park (1993)::2672
Saving Private Ryan (1998)::2653
Terminator 2: Judgment Day (1991)::2649
Matrix, The (1999)::2590
Back to the Future (1985)::2583
Silence of the Lambs, The (1991)::2578

 

17 thoughts on “mapreduce reduce side join and top n records pattern with real world example”

  1. Configuration conf = ConfigurationFactory.getInstance();

    i am getting error in above line.

    “The method getInstance() is undefined for the type ConfigurationFactory”

    could you please tell me why?

    1. I think i have missed to add the factory class but u can just replace that line of code with

      Configuration conf = new org.apache.hadoop.conf.Configuration();

  2. args = new String[] { “Replace this string with Input Path location for movies data”,…..)

    What is the exact input who have to proveide,Plz give an example path.

    1. it depends on which environment ur running the program. If u are running in windows it will be like C:\\movies_data considering u have copied the input files in this location . If ur running on a cluster u can provide the hdfs path like ${nameNode}/${InputPath}

  3. As i can see from the stack trace there is a type mismatch in key from map expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable please check this.

  4. I am getting above error even though i have given exact matching for arguments(mapper and reducer)

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

  5. Could please you or anyone can explain the flow/steps as I could only see the HighestViewedMoviesMapper in Driver what about the other mapper which are used in first and the other steps, also could you please share the screenshots if possible

    Thanks for the Article , Its really good 🙂

    1. I am confused, how to run the whole project.
      Is it like I have to run the first part and and gets the output and then the output from the first will be the file which will be used as in input for the second process, could you please explain the flow/steps.

      1. Nick u dont have to run the job separately if you analyse the driver code u will see that we are chaining the job and the output of the first job becomes input to the second job.

        1. Adarsh Thanks for your quick response
          1.First Job: MoviesDataMapper,RatingDataMapper,MoviesRatingJoinReducer

          So Output of MoviesDataMapper will be an input to RatingDataMapper and its final output will go to MoviesRatingJoinReducer
          And this completes First Job which gives movie names and their totoal count of views ?

          Output from the first job:
          MoviesDataMapper will
          Output from the first job
          Toy Story (1995)::2077
          GoldenEye (1995)::888
          City Hall (1996)::128
          Curdled (1996)::20
          Ed’s Next Move (1996)::8
          Extreme Measures (1996)::121
          Glimmer Man, The (1996)::101
          D3: The Mighty Ducks (1996)::142

          2.Second Job:HighestViewedMoviesMapper, HighestViewMoviesReducer

          So here are we using the complete output from first job and it goes as an input to HighestViewedMoviesMapper and then HighestViewMoviesReducer right
          please help 🙂

    2. nick as u can see we are using all the mappers as below

      This is the code of the first job where we are using the MoviesDataMapper and RatingDataMapper

      MultipleInputs.addInputPath(sampleJob, new Path(args[0]), TextInputFormat.class, MoviesDataMapper.class);
      MultipleInputs.addInputPath(sampleJob, new Path(args[1]), TextInputFormat.class, RatingDataMapper.class);

      And in the second job we are using the HighestViewedMoviesMapper .

  6. Hi Adarsh,
    In above code we have stored our data in ocal collection in reducer (tre e map).I f i am not wrong ? Can you explain the same how can we store it in hdfs?

    Thanks for your valuable post ,helps a lot 🙂

  7. you just need to pass the hdfs location instead of the local path and we are not storing anything in the tree map here and what ever you write to context object would be stored in the path you specify in the below code.

    FileOutputFormat.setOutputPath(job, new Path(args[3]));

    Below is an example

    hdfs://namenodeserver/hdfslocation

    1. you can either use the pig shell or u can also use oozie to schedule the same in cloudera

Comments are closed.