Problem to solve : Top twenty rated movies (Condition: The movie should be rated/viewed by at least 40 users)
The problem mentioned below revolves around movies dataset. The dataset contains 4 files which are follows,
File Name Description / Schema
movies.dat MovieID – Title – Genres
ratings.dat UserID – MovieID – Rating – Timestamp
users.dat UserID – Gender – Age – Occupation – ZipCode
The dataset can be downloaded from this link http://grouplens.org/datasets/movielens/1m/
We will be using reduce side join to join the datasets. A reduce side join is arguably one of the easiest implementations of a join in MapReduce, and therefore is a very attractive choice. It can be used to execute all types of joins like inner join,outer joins,anti joins and Cartesian product.This pattern has no limitation on the size of the data sets and also it can join as many data sets together at once as you need. But a reduce side join will likely require a large amount of network bandwidth because the bulk of the data is sent to the reduce phase.Unfortunately,if all of the data sets are large, reduce side join may be your only choice.
Steps to follow
1. The mapper prepares the join operation by taking each input record from each of the data sets that is movies and rating dataset in our case and extracting the foreign key which is a movieid from each record. The foreign key is written as the output key in MoviesDataMapper and RatingDataMapper and movie name is written as the value from MoviesDataMapper and Rating from RatingDataMapper. This output value is flagged by some unique identifier for the data set, such as M for Movies data and R for Rating data.
2. The reducer performs the desired join operation by collecting the values of each input group into temporary lists. For example, all records flagged with M are stored in the movies list and all records flagged with R are stored in the rating list. These lists are then iterated over and the records from both sets are joined together. For an inner join, a joined record is output if all the lists are not empty. We also calculate the average rating during the reduce phase.
3. We find the top twenty rated movies using the top n pattern of mapreduce.
Performance Check
A plain reduce side join puts a lot of strain on the cluster’s network. Because the foreign key of each input record is extracted and output along with the record and no data can be filtered ahead of time, pretty much all of the data will be sent to the shuffle and sort step. For this reason, reduce side joins will typically utilize relatively more reducers than your typical analytic so if possible try to use any other join patterns available in mapreduce like a replicated join or composite join.
Driver Code
Lets start with driver code . As we have two different dataset with different representations we need to parse the two input dataset differently.These cases are handled elegantly by using the MultipleInputs class, which allows you to specify the InputFormat and Mapper to use on a per-path basis. For example, we have movies data that we want to combine with the rating data for our analysis, then we might set up the input as follows:
MultipleInputs.addInputPath(sampleJob, new Path(args[0]), TextInputFormat.class, MoviesDataMapper.class); MultipleInputs.addInputPath(sampleJob, new Path(args[1]), TextInputFormat.class, RatingDataMapper.class);
There are two jobs in this solution the first job joins the movies and the rating data set and also finds the average rating for each movie and the second job finds the highest rated 20 movies from the list.
import java.io.File; import java.io.IOException; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class Driver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { /* * I have used my local path in windows change the path as per your * local machine */ /* * I have used my local path in windows change the path as per your * local machine */ args = new String[] { "Replace this string with Input Path location for movies data", "Replace this string with Input Path location for rating data" "Replace this string with output Path location for the first job", "Replace this string with output Path location for the second job"}; /* delete the output directory before running the job */ FileUtils.deleteDirectory(new File(args[2])); if (args.length != 4) { System.err.println("Please specify the input and output path"); System.exit(-1); } Configuration conf = ConfigurationFactory.getInstance(); Job sampleJob = Job.getInstance(conf); sampleJob.setJarByClass(Driver.class); sampleJob.getConfiguration().set("mapreduce.output.textoutputformat.separator", "::"); TextOutputFormat.setOutputPath(sampleJob, new Path(args[2])); sampleJob.setOutputKeyClass(Text.class); sampleJob.setOutputValueClass(Text.class); sampleJob.setReducerClass(MoviesRatingJoinReducer.class); MultipleInputs.addInputPath(sampleJob, new Path(args[0]), TextInputFormat.class, MoviesDataMapper.class); MultipleInputs.addInputPath(sampleJob, new Path(args[1]), TextInputFormat.class, RatingDataMapper.class); int code = sampleJob.waitForCompletion(true) ? 0 : 1; if (code == 0) { Job job = Job.getInstance(conf); job.setJarByClass(Driver.class); job.getConfiguration().set("mapreduce.output.textoutputformat.separator", "::"); job.setJobName("Highest_Rated_Movies"); FileInputFormat.addInputPath(job, new Path(args[2])); FileOutputFormat.setOutputPath(job, new Path(args[3])); job.setMapperClass(HighestRatedMoviesMapper.class); job.setReducerClass(HighestRatedMoviesReducer.class); job.setNumReduceTasks(1); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } } }
Mapper Code
In this case, there are two mapper classes for the first job, one for movies i,e MoviesDataMapper and one for rating i,e RatingDataMapper. In both, we extract the movie id to use it as the output key. We output the movie name and the rating value prepended in respective mappers with a character ‘M’ for a movies data or ‘R’ for a rating data so we know which data set the record came from during the reduce phase.
Note : When you output the value from the map side, the entire record doesn’t have to be sent. This is an opportunity to optimize the join by keeping only the fields of data you want to join together. It requires more processing on the map side, but is worth it in the long run. Also, since the foreign key is in the map output key, you don’t need to keep that in the value, either.
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MoviesDataMapper extends Mapper<Object, Text, Text, Text> { private Text movieId = new Text(); private Text outvalue = new Text(); @Override public void map(Object key, Text values, Context context) throws IOException, InterruptedException { String data = values.toString(); String[] field = data.split("::", -1); if (null != field && field.length == 3 && field[0].length() > 0) { movieId.set(field[0]); outvalue.set("M" + field[1]); context.write(movieId, outvalue); } }
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class RatingDataMapper extends Mapper<Object, Text, Text, Text> { private Text movieId = new Text(); private Text outvalue = new Text(); @Override public void map(Object key, Text values, Context context) throws IOException, InterruptedException { String data = values.toString(); String[] field = data.split("::", -1); if (null != field && field.length == 4 && field[0].length() > 0) { movieId.set(field[1]); outvalue.set("R" + field[2]); context.write(movieId, outvalue); } } }
Reducer Code
The reducer code iterates through all given movies and thier rating and keeps one local variables: a running sum. And once we have the complete sum we find the average by dividing sum by count. After iteration, the input key is written to the file system with the count and average, calculated by dividing the running sum by the running count.
import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MoviesRatingJoinReducer extends Reducer<Text, Text, Text, Text> { private ArrayList<Text> listMovies = new ArrayList<Text>(); private ArrayList<Text> listRating = new ArrayList<Text>(); private Text outvalue = new Text(); @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { listMovies.clear(); listRating.clear(); for (Text text : values) { if (text.charAt(0) == 'M') { listMovies.add(new Text(text.toString().substring(1))); } else if (text.charAt(0) == 'R') { listRating.add(new Text(text.toString().substring(1))); } } executeJoinLogic(context); } private void executeJoinLogic(Context context) throws IOException, InterruptedException { double sum = 0; if (!listMovies.isEmpty() && !listRating.isEmpty()) { for (Text moviesData : listMovies) { for (Text ratingData : listRating) { sum = sum + Double.parseDouble(ratingData.toString()); } if (listRating.size() > 40) { double average = sum / listRating.size(); outvalue.set(String.valueOf(average)); context.write(moviesData, outvalue); } } } } }
Output from the first job
Toy Story (1995)::4.146846413095811
GoldenEye (1995)::3.5405405405405403
City Hall (1996)::3.0625
Extreme Measures (1996)::2.9421487603305785
Glimmer Man, The (1996)::2.6633663366336635
D3: The Mighty Ducks (1996)::2.3732394366197185
Second Job
The second job finds the top twenty most highest rated movies and it takes the output of the first job as the input. We are using the top n mapreduce pattern here .
Mapper Code
Each mapper determines the top twenty records of its input split and outputs them to the reduce phase. The mappers are essentially filtering their input split to the top twenty records, and the reducer is responsible for the final twenty. Just remember to configure your job to only
use one reducer.The mapper processes all input records and stores them in a TreeMap.If there are more than twenty records in our TreeMap, the first element can be removed.After all the records have been processed, the top twenty records in the TreeMap are output to the reducers in the cleanup method. This method gets called once after all key/value pairs have been through map, just like how setup is called once before any calls to map.
import java.io.IOException; import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class HighestRatedMoviesMapper extends Mapper<Object, Text, NullWritable, Text> { private TreeMap<Double, Text> highestRated = new TreeMap<Double, Text>(); @Override public void map(Object key, Text values, Context context) throws IOException, InterruptedException { String data = values.toString(); String[] field = data.split("::", -1); if (null != field && field.length == 2) { double views = Double.parseDouble(field[1]); highestRated.put(views, new Text(field[0]+"::"+field[1])); if (highestRated.size() > 20) { highestRated.remove(highestRated.firstKey()); } } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (Map.Entry<Double, Text> entry : highestRated.entrySet()) { context.write(NullWritable.get(), entry.getValue()); } } }
Reducer code
Overall, the reducer determines its top twenty records in a way that’s very similar to the mapper. Because we configured our job to have one reducer using job.setNumReduceTasks(1) and we used NullWritable as our key, there will be one input group for this reducer that contains all the potential top twenty records. The reducer iterates through all these records and stores them in a TreeMap. If the TreeMap’s size is above twenty, the first element (lowest value) is remove from the map. After all the values have been iterated over, the values contained in the TreeMap are flushed to the file system in descending order. This ordering is achieved by getting the descending map from the TreeMap prior to outputting the values. This can be done directly in the reduce method, because there will be only one input group, but doing it in the cleanup method would also work.
import java.io.IOException; import java.util.TreeMap; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class HighestRatedMoviesReducer extends Reducer<NullWritable, Text, NullWritable, Text> { private TreeMap<Double, Text> highestRated = new TreeMap<Double, Text>(); public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { String data = value.toString(); String[] field = data.split("::", -1); if (field.length == 2) { highestRated.put(Double.parseDouble(field[1]), new Text(value)); if (highestRated.size() > 20) { highestRated.remove(highestRated.firstKey()); } } } for (Text t : highestRated.descendingMap().values()) { context.write(NullWritable.get(), t); } } }
Configuration Factory
import org.apache.hadoop.conf.Configuration; public class ConfigurationFactory { private ConfigurationFactory() { } private final static Configuration conf = new Configuration(); public static Configuration getInstance() { return conf; } }
Output from the second job
Sanjuro (1962)::4.608695652173913
Seven Samurai (The Magnificent Seven) (Shichinin no samurai) (1954)::4.560509554140127
Shawshank Redemption, The (1994)::4.554557700942973
Godfather, The (1972)::4.524966261808367
Close Shave, A (1995)::4.52054794520548
Usual Suspects, The (1995)::4.517106001121705