mapreduce bloom filter example,pattern and optimization with sample data

Bloom filtering is similar to generic filtering in that it is looking at each record and deciding whether to keep or remove it. However, there are two major differences that set it apart from generic filtering. First, we want to filter the record based on some sort of set membership operation against the hot values. For example: keep or throw away this record if the value in the user field is a member of a predetermined list of users.Second, the set membership is going to be evaluated with a Bloom filter.

Using a Bloom filter to calculate set membership in this situation has the consequence that sometimes you will get a false positive. That is, sometimes a value will return as a member of the set when it should not have. If the Bloom filter says a value is not in the Bloom filter, we can guarantee that it is indeed not in the set of values.

Bloom Filter can be used if the answer to the below points is yes
  • There is a predetermined set of items for the hot values.
  • Data can be separated into records, as in filtering.
  • A feature can be extracted from each record that could be in a set of hot values.
  • Some false positives are acceptable
Steps in Bloom Filter

1. Train the Bloom filter from the list of values. This is done by loading the data from where it is stored and adding each item to the Bloom filter. The trained Bloom filter is stored in HDFS at a known location.
2. Do the actual filtering . When the map task starts, it loads the Bloom filter from the distributed cache. Then, in the map function, it iterates through the records and checks the Bloom filter for set membership in the hot values list. Each record is either forwarded or not based on the Bloom filter membership test.

Note : The Bloom filter needs to be re-trained only when the data changes. Therefore, updating the Bloom filter in a lazy fashion is typically appropriate.

Bloom Filter Use Cases
  1. Prefiltering a data set for an expensive set membership check :Sometimes, checking whether some value is a member of a set is going to be expensive.
  2. Removing most of the non watched values : The most straightforward use case is cleaning out values that aren’t hot.Removing most of the non watched values.
Performance Note

The performance for this pattern is going to be very similar to simple filtering from a performance perspective. Loading up the Bloom filter from the distributed cache is not that expensive since the file is relatively small. Checking a value against the Bloom filter is also a relatively cheap operation, as each test is executed in constant time.

Problem to Solve

Given a list of employees details use bloom filter to filter out employees based on the department who are not part of the PUBLIC LIBRARY department hot list.

Here is a sample input data attached which has to be used for training bloom filter bloom_filter_employee_data.txt

Here is a sample input data attached for map reduce processing employee_info.csv

Input Data sample for reference

First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate

dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51

A Bloom filter is trained on the bytes of the word. The important thing of this is that the words “the” and “The” may look the same, but the bytes are different. Unless case sensitivity matters in you algorithm, it is best to trim the string and make the string all lower case when training and testing the filter.

Training Bloom Filter

The following variables are used in the code snippet below:
i – The number of bits in the filter
k – The number of members in the set
l – The desired false positive rate
m – The number of different hash functions used to map some element to one of the i bits with a uniform random distribution.

A Bloom filter is represented by a continuous string of i bits initialized to zero. For each element in k, m hash function values are taken modulo i to achieve an index from zero to i – 1.The bits of the Bloom filter at the resulting indices are set to one. This operation is often called training a Bloom filter. As elements are added to the Bloom filter, some bits may already be set to one from previous elements in the set. When testing whether a member is an element of the set, the same hash functions are used to check the bits of the array. If a single bit of all the hashes is set to zero, the test returns no. If all the bits are turned on, the test returns maybe. If the member was used to train the filter, the m hashs would have set all the bits to one.The result of the test cannot be a definitive yes because the bits may have been turned on by a combination of other elements. If the test returns maybe but should have been no, this is known as a false positive. Thankfully, the false positive rate can be controlled if the number of members in the set is known ahead of time, or at least an approximation of the number of members in the set is known.

In the below code based on the sample data we have around 32658 members in the set and we have set 0.2 as the acceptable false positive.

It can be very beneficial to know an approximation of the number of elements. If you know this ahead of time, a Bloom filter can be sized appropriately to have a hand-picked false positive rate.

Formula for optimal size of bloom filter


OptimalBloomFilterSize = (-The number of members in the set * log(The desired false positive rate))/log(2)^2

Formula to get the optimalK


OptimalK = (OptimalBloomFilterSize * log(2))/The number of members in the set

We are reading the input file and storing the bloom filter hot words file in the local file system (I am using windows) ideally the file should be read and stored in the hdfs using hadoop hdfs api for simplicity purpose have not included the code for hdfs filesystem.This Bloom filter file can later be deserialized from HDFS or local system just as easily as it was written.Just open up the file using the FileSystem object and pass it to BloomFilter.readFields.


import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;

public class DepartmentBloomFilterTrainer {

public static int getBloomFilterOptimalSize(int numElements, float falsePosRate) {
return (int) (-numElements * (float) Math.log(falsePosRate) / Math.pow(Math.log(2), 2));
}

public static int getOptimalK(float numElements, float vectorSize) {
return (int) Math.round(vectorSize * Math.log(2) / numElements);
}

public static void main(String[] args) throws IOException {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "32658", "0.2","Replace this string with Input file location",
"Replace this string with output path location where the bloom filter hot list data will be stored","" };

int numMembers = Integer.parseInt(args[0]);
float falsePosRate = Float.parseFloat(args[1]);
int vectorSize = getBloomFilterOptimalSize(numMembers, falsePosRate);
int nbHash = getOptimalK(numMembers, vectorSize);
BloomFilter filter = new BloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH);
ConfigFile configFile = new ConfigFile(args[2], FileType.script, FilePath.absolutePath);
String fileContent = configFile.getFileContent();
String[] fileLine = fileContent.split("\n");
for (String lineData : fileLine) {

String lineDataSplit[] = lineData.split(",", -1);
String departmentName = lineDataSplit[3];
filter.add(new Key(departmentName.getBytes()));

}

DataOutputStream dataOut = new DataOutputStream(new FileOutputStream(args[3]));
filter.write(dataOut);
dataOut.flush();
dataOut.close();

}

}

Mapper Code

In the setup method the bloom fileter file is deserialized and loaded into the bloom filter.In the map method, the departmentName is extracted from each input record and tested against the Bloom filter. If the word is a member, the entire record is output to the file system.Ideally to load the bloom filter hot words we should be using DistributedCache a hadoop utility that ensures that a file in HDFS is present on the local file system of each task that requires that file for simplicity purpose i am loading it from my local file system. As we have trained the bloom filter with PUBLIC LIBRARY department the output of the map reduce program will have only employee data relevant to PUBLIC LIBRARY department.


import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;

public class DepartmentBloomFilterMapper extends Mapper<Object, Text, Text, NullWritable> {

private BloomFilter filter = new BloomFilter();

protected void setup(Context context) throws IOException, InterruptedException {

DataInputStream dataInputStream = new DataInputStream(
new FileInputStream(context.getConfiguration().get("bloom_filter_file_location")));
filter.readFields(dataInputStream);
dataInputStream.close();
}

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String data = value.toString();
String[] field = data.split(",", -1);
String department = null;
if (null != field && field.length == 9 && field[3].length() > 0) {

department = field[3];
if (filter.membershipTest(new Key(department.getBytes()))) {
context.write(value, NullWritable.get());
}

}

}

}

Bloom filters can assist expensive operations by eliminating unnecessary ones. For example a Bloom filter can be previously trained with IDs of all users that have a salary of more than x and use the Bloom filter to do an initial test before querying the database to retrieve more information about each employee. By eliminating unnecessary queries, we can speed up processing time.

Driver Code

Finally we will use the driver class to test everything is working fine as expected . The output will contain only the data of employees who belongs to the department PUBLIC LIBRARY  which can be used for further analysis.


import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.hadoop.design.summarization.blog.ConfigurationFactory;

public class BloomFilterDriver {

public static void main(String[] args) throws Exception {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location",
"Replace this string with output Path location","Replace this string with Input file location of bloom filter hot list" };

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[1]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 3) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

Configuration conf = ConfigurationFactory.getInstance();
conf.set("bloom_filter_file_location",args[2]);
Job job = Job.getInstance(conf);
job.setJarByClass(BloomFilterDriver.class);
job.setJobName("Bloom_Filter_Department");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(DepartmentBloomFilterMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);

}
}

 

 

Leave a Reply

Your email address will not be published. Required fields are marked *