mapreduce example to shuffle and anonymize data using a random key

Shuffling pattern can be used when we want to randomize the data set for repeatable random sampling For example, the first hundred records will be a simple random sampling. Every time we pull the first hundred records, we’ll get the same sample. This allows analytics that run over a random sample to have a repeatable result. Also, a separate job won’t have to be run to produce a simple random sampling every time you need a new sample.

Another sceario where we need to shuffle data is when we need to anonymize the data. Anonymizing data has recently become important for organizations that want to maintain their users’ privacy, but still run analytics. The order of the data can provide some information that might lead to the identity of a user. By shuffling the entire data set, the organization is taking an extra step to anonymize the data.

Steps to achieve data shuffling

1. Mapper outputs the record as a value and a random key.
2. The reducer sorts the random keys, further randomizing the data.

Achieving this in pig

We can GROUP BY a random key, and then FLATTEN the grouping. This effectively implements the shuffle pattern we proposed behind the scenes.


Employee_random = GROUP employee_data BY RANDOM();
Shuffled_data = FOREACH Employee_random GENERATE FLATTEN(employee_data);

Performance of shuffling pattern

Since the reducer is chosen in random manner the data distribution across reducers will be completely balanced. With more reducers, the data will be more spread out. The size of the files will also be very predictable which will be size of the data set divided by the number of reducers. This makes it easy to get a specific desired file size as output.The pattern shuffles all of the data over the network and writes all of the data back to HDFS, so a relatively high number of reducers should be used.

Problem to solve

1. Given a employee information anonymize each employee by removing the first and last name and then randomly shuffle the records within the data set.

Here is a sample input data attached employee_info.csv

Input Data sample

First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate

dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51

Mapper Code

In the mapper code we are removing the first and last name using the method StringArrayToString to anonymize the data . To shuffle the data we are setting the key to a random number using the Random class of java.

import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class AnonymizeEmployeeMapper extends Mapper<Object, Text, IntWritable, Text> {

public IntWritable randomKey = new IntWritable();
public Text value = new Text();
public Random randomNumber = new Random();

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String data = value.toString();
String[] field = data.split(",", -1);
if (null != field && field.length == 9) {

randomKey.set(randomNumber.nextInt());
value.set(StringArrayToString(field));
context.write(randomKey, value);
}

}

public static String StringArrayToString(String[] input) {
StringBuilder inputData = new StringBuilder();

for (int i = 2; i < input.length; i++) {
if (i == (input.length - 1)) {
inputData.append(input[i]);
} else {
inputData.append(input[i] + ",");
}
}
return inputData.toString();
}

}
Reducer Code

This reducer class just outputs the values in order to strip out the random key.


import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ShufflingValueReducer extends Reducer<IntWritable, Text, Text, NullWritable> {
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text t : values) {
context.write(t, NullWritable.get());
}
}
}

 

 

Driver Code

Finally we will use the driver class to test everything is working fine as expected . The output employee data will be anonymized and shuffled.


import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.hadoop.design.summarization.blog.ConfigurationFactory;

public class DriverShufflingRecord {

public static void main(String[] args) throws Exception {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location",
"Replace this string with output Path location" };

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[1]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 2) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

Configuration conf = ConfigurationFactory.getInstance();
Job job = Job.getInstance(conf);
job.setJarByClass(DriverShufflingRecord.class);
job.setJobName("Shuffling_Record");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(AnonymizeEmployeeMapper.class);
job.setReducerClass(ShufflingValueReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);

}
}