mapreduce replicate,reduce side join,average pattern with real world example

Problem to solve : We wish to know how have the genres ranked by Average Rating, for each profession and age group. The age groups to be considered are: 18-35, 36-50 and 50+.

You need to formulate results in following table:

Occupation,Age Group,Rank 1,Rank 2,Rank 3,Rank 4,Rank 5
K-12 student::18-35::Film-Noir::War::Animation::Musical::Drama
K-12 student::50+::Documentary::Film-Noir::Mystery::Drama::Musical
academic/educator::18-35::Film-Noir::Documentary::War::Animation::Drama
academic/educator::36-50::Film-Noir::Documentary::War::Mystery::Crime
academic/educator::50+::Film-Noir::Mystery::War::Drama::Documentary

The problem mentioned below revolves around movies dataset. The dataset contains 4 files which are follows,

 File Name Description / Schema

movies.dat MovieID – Title – Genres
ratings.dat UserID – MovieID – Rating – Timestamp
users.dat UserID – Gender – Age – Occupation – ZipCode

The dataset can be downloaded from this link http://grouplens.org/datasets/movielens/1m/

We will be using a mapping file to map between the profession id and the profession which is attached here mapping.txt

The Example has three jobs to solve this problem.

1. First job joins the users and rating dataset. And also we are doing a replicate join in the users mapper class to join the profession id to profession mapping data into the output.
2. Second job joins the previous output with the movies dataset and also split one record per genre so that we can calculate ranking based on genre.
3. Third job aggregates the data based on the genre and the age group and ranks the data based on genre and age group as per the solution required.

Driver Code

Lets start with driver code . As we have three different dataset with different representations we need to parse the three input dataset differently.These cases are handled elegantly by using the MultipleInputs class, which allows you to specify the InputFormat and Mapper to use on a per-path basis. For example, we have rating data that we want to combine with the users data for our analysis, we setup the same using the MultipleInputs.

import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Driver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location for rating data",
"Replace this string with Input Path location for users data",
"Replace this string with output Path location for the first job",
"Replace this string with output Path location for the second job",
"Replace this string with Input Path location for movies data",
"Replace this string with Input mapping file location for profession id and profession",
"Replace this string with output Path location for the third/final job",
};

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[2]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 7) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

Configuration conf = ConfigurationFactory.getInstance();
conf.set("id.to.profession.mapping.file.path", args[5]);
Job sampleJob = Job.getInstance(conf);
sampleJob.setJarByClass(Driver.class);
sampleJob.getConfiguration().set("mapreduce.output.textoutputformat.separator", "::");
TextOutputFormat.setOutputPath(sampleJob, new Path(args[2]));
sampleJob.setOutputKeyClass(Text.class);
sampleJob.setOutputValueClass(Text.class);
sampleJob.setReducerClass(UsersRatingJoinReducer.class);
MultipleInputs.addInputPath(sampleJob, new Path(args[0]), TextInputFormat.class, RatingDataMapper.class);
MultipleInputs.addInputPath(sampleJob, new Path(args[1]), TextInputFormat.class, UsersDataMapper.class);
int code = sampleJob.waitForCompletion(true) ? 0 : 1;
int code2 = 1;
if (code == 0) {

Job sampleJob2 = Job.getInstance(conf);
sampleJob2.setJarByClass(Driver.class);
sampleJob2.getConfiguration().set("mapreduce.output.textoutputformat.separator", "::");
TextOutputFormat.setOutputPath(sampleJob2, new Path(args[3]));
sampleJob2.setOutputKeyClass(Text.class);
sampleJob2.setOutputValueClass(Text.class);
sampleJob2.setReducerClass(MoviesRatingJoinReducer.class);
MultipleInputs.addInputPath(sampleJob2, new Path(args[2]), TextInputFormat.class,
UsersRatingDataMapper.class);
MultipleInputs.addInputPath(sampleJob2, new Path(args[4]), TextInputFormat.class, MoviesDataMapper.class);
code2 = sampleJob2.waitForCompletion(true) ? 0 : 1;

}

if (code2 == 0) {

Job job = Job.getInstance(conf);
job.setJarByClass(Driver.class);
job.getConfiguration().set("mapreduce.output.textoutputformat.separator", "::");
job.setJobName("Find_Highest_Rank");
FileInputFormat.addInputPath(job, new Path(args[3]));
FileOutputFormat.setOutputPath(job, new Path(args[6]));
job.setMapperClass(AggregatedDataMapper.class);
job.setReducerClass(AggregatedReducer.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}
}

Configuration Factory code
import org.apache.hadoop.conf.Configuration;

public class ConfigurationFactory {

private ConfigurationFactory() {

}

private final static Configuration conf = new Configuration();

public static Configuration getInstance() {
return conf;
}

}

Mapper Code

In this case, there are two mapper classes for the first job, one for users i,e UsersDataMapper and one for rating i,e RatingDataMapper. In both, we extract the users id to use it as the output key. We output the users age and profession from UsersDataMapper and movie id and rating value from RatingDataMapper with a character ‘U’ for a users data or ‘R’ for a rating data so we know which data set the record came from during the reduce phase. In the UsersDataMapper we also do a replicate join to join the profession_id to profession name in the setup method of the mapper.

Note : When you output the value from the map side, the entire record doesn’t have to be sent. This is an opportunity to optimize the join by keeping only the fields of data you want to join together. It requires more processing on the map side, but is worth it in the long run. Also, since the foreign key is in the map output key, you don’t need to keep that in the value, either.

 

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class RatingDataMapper extends Mapper<Object, Text, Text, Text> {

private Text userId = new Text();
private Text outvalue = new Text();

@Override
public void map(Object key, Text values, Context context) throws IOException, InterruptedException {
String data = values.toString();
String[] field = data.split("::", -1);
if (null != field && field.length == 4 && field[0].length() > 0) {

userId.set(field[0]);
outvalue.set("R" + field[1]+"::"+field[2]);
context.write(userId, outvalue);

}

}

}

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class UsersDataMapper extends Mapper<Object, Text, Text, Text> {

private Text userId = new Text();
private Text outvalue = new Text();
private HashMap<Integer, String> profession_mapping = new HashMap<Integer, String>();
private String profession_id_file_location = null;

@Override
public void map(Object key, Text values, Context context) throws IOException, InterruptedException {
String data = values.toString();
String[] field = data.split("::", -1);
if (null != field && field.length == 5 && field[0].length() > 0) {

String profession = profession_mapping.get(Integer.parseInt(field[3]));
String ageRange = AgeStringFactory.getAgeString(field[2]);
if (ageRange != null) {
userId.set(field[0]);
outvalue.set("U" + ageRange + "::" + profession);
context.write(userId, outvalue);
}

}

}

@Override
public void setup(Context context) {
profession_id_file_location = context.getConfiguration().get("id.to.profession.mapping.file.path");

try {

BufferedReader bufferedReader = new BufferedReader(new FileReader(profession_id_file_location));
String line;
while ((line = bufferedReader.readLine()) != null) {

String[] field = line.split(",", -1);

profession_mapping.put(Integer.parseInt(field[0]), field[1]);

}
bufferedReader.close();

} catch (Exception ex) {
System.out.println(ex.getLocalizedMessage());
}

}

}

The UsersDataMapper uses a helper class to convert the age into a age group

public class AgeStringFactory {

public static String getAgeString(String userAge) {
int age = Integer.parseInt(userAge);

if (age < 18) {
return null;
} else if (age >= 18 && age <= 35) {
return "18-35";
} else if (age >= 36 && age <= 50) {
return "36-50";
} else if (age > 50) {
return "50+";
}

return null;
}

}

Reducer Code

The first reducer code iterates through all the values of each group and looks at what each record is tagged with and then puts the record in one of two lists. After all values are binned in either list, the actual join logic is executed using the two lists. The join logic differs slightly based on the type of join, but always involves iterating through both lists and writing to the Context object. The type of join is pulled from the job configuration in the setup method.We are doing an inner join and in inner join there has to be a matching data in both data set so we are checking the users list and also the rating list if both the list are not empty we can consider there is a matching data and output it into the context .

import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class UsersRatingJoinReducer extends Reducer<Text, Text, Text, Text> {

private ArrayList<Text> listUsers = new ArrayList<Text>();
private ArrayList<Text> listRating = new ArrayList<Text>();

@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

listUsers.clear();
listRating.clear();

for (Text text : values) {

if (text.charAt(0) == 'U') {
listUsers.add(new Text(text.toString().substring(1)));
} else if (text.charAt(0) == 'R') {
listRating.add(new Text(text.toString().substring(1)));
}

}

executeJoinLogic(context);

}

private void executeJoinLogic(Context context) throws IOException, InterruptedException {

if (!listUsers.isEmpty() && !listRating.isEmpty()) {
for (Text usersData : listUsers) {

for (Text ratingData : listRating) {

context.write(usersData, ratingData);
}
}

}
}

}

Output sample from first job

18-35::academic/educator::1091::3
18-35::academic/educator::2470::5
18-35::academic/educator::2617::4
18-35::academic/educator::3347::5

Second Job
Mapper Code

The second job takes the output of the previous job output and the movies dataset as the input.UsersRatingDataMapper parses the output data from the previous job and the MoviesDataMapper parses the movie dataset.In both, we extract the movie id to use it as the output key. We output the movie title and genre and the rating information value from the respective mappers with a character ‘M’ for a movies data or ‘C’ for a rating and user combined data from the last job so we know which data set the record came from during the reduce phase.

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class UsersRatingDataMapper extends Mapper<Object, Text, Text, Text> {

private Text movieId = new Text();
private Text outvalue = new Text();

@Override
public void map(Object key, Text values, Context context) throws IOException, InterruptedException {
String data = values.toString();
String[] field = data.split("::", -1);
if (null != field && field.length == 4) {
movieId.set(field[2]);
outvalue.set("C" + field[0] + "::" + field[1]+"::"+field[3]);
context.write(movieId, outvalue);

}

}

}

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MoviesDataMapper extends Mapper<Object, Text, Text, Text> {

private Text movieId = new Text();
private Text outvalue = new Text();

@Override
public void map(Object key, Text values, Context context) throws IOException, InterruptedException {
String data = values.toString();
String[] field = data.split("::", -1);
if (null != field && field.length == 3 && field[0].length() > 0) {
movieId.set(field[0]);
outvalue.set("M" + field[1] + "::" + field[2]);
context.write(movieId, outvalue);

}

}

}

Reducer Code

The reducer code iterates through all the given data and also splits the data and creates multiple record for each genre.

import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MoviesRatingJoinReducer extends Reducer<Text, Text, Text, Text> {

private ArrayList<Text> listMovies = new ArrayList<Text>();
private ArrayList<Text> listRating = new ArrayList<Text>();
private Text outvalue=new Text();

@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

listMovies.clear();
listRating.clear();

for (Text text : values) {

if (text.charAt(0) == 'M') {
listMovies.add(new Text(text.toString().substring(1)));
} else if (text.charAt(0) == 'C') {
listRating.add(new Text(text.toString().substring(1)));
}

}

executeJoinLogic(context);

}

private void executeJoinLogic(Context context) throws IOException, InterruptedException {

if (!listMovies.isEmpty() && !listRating.isEmpty()) {
for (Text moviesData : listMovies) {

String[] data = moviesData.toString().split("::");
String[] genres=data[1].split("\\|");

for (Text ratingData : listRating) {

for (String genre : genres) {
outvalue.set(genre);
context.write(outvalue, ratingData);
}

}
}

}
}

}

Output sample from second job

Animation::18-35::programmer::5
Children’s::18-35::programmer::5
Comedy::18-35::programmer::5
Animation::36-50::other or not specified::4

Final Job
Mapper Code

The AggregatedDataMapper parses the output of the previous job and puts profession and the age range as the key and the genre and rating as the value.

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class AggregatedDataMapper extends Mapper<Object, Text, Text, Text> {

private Text keyData = new Text();
private Text outvalue = new Text();

@Override
public void map(Object key, Text values, Context context) throws IOException, InterruptedException {
String data = values.toString();
String[] field = data.split("::", -1);
if (null != field && field.length == 4) {
keyData.set(field[2] + "::" + field[1]);
outvalue.set(field[0] + "::" + field[3]);
context.write(keyData, outvalue);

}

}

}

Reducer Code

AggregatedReducer iterates through the data and finds the highest ranked genre per age group .

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class AggregatedReducer extends Reducer<Text, Text, Text, Text> {

private Map<String, GenreRanking> map = new HashMap<String, GenreRanking>();

private TreeMap<Double, String> finalMap = new TreeMap<Double, String>();

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

map.clear();
finalMap.clear();

for (Text text : values) {

String[] value = text.toString().split("::");
String genre = value[0];
double rating = Double.parseDouble(value[1]);

GenreRanking ranking = map.get(genre);

if (ranking != null) {
ranking.setSum(ranking.getSum() + rating);
ranking.setCount(ranking.getCount() + 1);
} else {
GenreRanking rankingNew = new GenreRanking();
rankingNew.setSum(rating);
rankingNew.setCount(1);
map.put(genre, rankingNew);
}

}

for (Map.Entry<String, GenreRanking> entry : map.entrySet()) {
GenreRanking gr = entry.getValue();
double average = gr.getSum() / gr.getCount();
finalMap.put(average, entry.getKey());
}

StringBuilder sb = new StringBuilder();
int count = 0;

for (Map.Entry<Double, String> entry : finalMap.descendingMap().entrySet()) {

if (count < 5) {

sb.append(entry.getValue() + "::");
count++;
} else {
break;
}

}

context.write(key, new Text(sb.toString().substring(0, sb.toString().lastIndexOf("::"))));

}

}

GenreRanking Code
public class GenreRanking {

private String genre;
private double sum;
private int count;

public String getGenre() {
return genre;
}

public void setGenre(String genre) {
this.genre = genre;
}

public double getSum() {
return sum;
}

public void setSum(double sum) {
this.sum = sum;
}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}

}

Final Output Sample

K-12 student::18-35::Film-Noir::War::Animation::Musical::Drama
K-12 student::50+::Documentary::Film-Noir::Mystery::Drama::Musical
academic/educator::18-35::Film-Noir::Documentary::War::Animation::Drama
academic/educator::36-50::Film-Noir::Documentary::War::Mystery::Crime
academic/educator::50+::Film-Noir::Mystery::War::Drama::Documentary
artist::18-35::Film-Noir::Documentary::War::Drama::Mystery
artist::36-50::War::Film-Noir::Documentary::Crime::Mystery
artist::50+::War::Musical::Fantasy::Adventure::Animation
clerical/admin::18-35::Film-Noir::War::Musical::Documentary::Animation
clerical/admin::36-50::Film-Noir::War::Animation::Documentary::Mystery

7 thoughts on “mapreduce replicate,reduce side join,average pattern with real world example”

  1. My mapper side replicate is not functioning using the UserDataMapper class.

    Any suggestions what can Ido for the same?

    I have written my own code but using this code just for reference . . .but UserDataMapper I an using as it is still the problem persists. . . need help!!

    1. What exactly is not working can u explain a bit or if ur getting any error paste the stacktrace

  2. not able to specify the 4 input files(user, rating,movie,mapping) in YARN statement while running map reduce job. after second input file, it start considering the third file as output file.

  3. “Replace this string with Input mapping file location for profession id and profession”……etc
    What exactly the inputs we have to provide to get results ,Give an example inputs of four types

Comments are closed.