The Jaccard similarity index or the jaccard similarity coefficient compares two datasets to see which data is shared and which are distinct. It is a measure of similarity for the two data sets with a range from 0 to 100 percent. The higher the percentage, the more similar the two datasets are.
We will implement the brute force version of LSH algorithm that is implement a pair-wise calculation of Jaccard similarity . This will not only give you more appreciation of the LSH algorithm.
The input for this code is list of files which contains some text . The output will be as below
[(FILE_1.txt-FILE_2.txt,0.88887), (FILE_1.txt-FILE_3.txt,0.989898988464), (FILE_2.txt-FILE_3.txt,0.88787428571428571), (FILE_1.txt-FILE_4.txt,0.8996552857142856), (FILE_1.txt-FILE_5.txt,0.889898745762711865), (FILE_2.txt-FILE_4.txt,0.89898966666666), (FILE_2.txt-FILE_5.txt,0.9898989047619047616), (FILE_3.txt-FILE_4.txt,0.7788545454545456), (FILE_3.txt-FILE_5.txt,0.9898989863013), (FILE_4.txt-FILE_5.txt,0.898989)]
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
public class SparkJacardsLSH {
private static final String FILE_URI = "{INPUT_PATH}/LSH_*.txt";
public static void main(String[] args) {
// initializing spark
SparkSession spark = SparkSession.builder()
.config("spark.master", "local[*]").getOrCreate();
@SuppressWarnings("resource")
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
sc.setLogLevel("WARN");
// create RDD by using text files
JavaPairRDD<String, String> documents = sc.wholeTextFiles(FILE_URI);
JavaRDD<Row> rddT = documents.map(tuple -> RowFactory.create(
tuple._1.substring(tuple._1().lastIndexOf("/") + 1), tuple._2));
StructField one = DataTypes.createStructField("fileName",
DataTypes.StringType, true);
StructField two = DataTypes.createStructField("content",
DataTypes.StringType, true);
StructType type = DataTypes.createStructType(
new ArrayList<StructField>(Arrays.asList(one, two)));
Dataset<Row> characteristicMatrix = spark.createDataFrame(rddT, type);
characteristicMatrix.show();
//cartesian product of the characteristic matrix
Dataset<Row> matrix = characteristicMatrix
.crossJoin(characteristicMatrix)
.toDF("one", "two", "three", "four");
/*
* filter out about half of it. More specifically, using cartesian will give
* you combinations such as (file1, file2) and (file2, file1). However as
* far as their Jaccard similarity is concerned, this two pairs are the
* same. Therefore you will need to add a filter to eliminate the repeated
* pairs such as in this example
*/
JavaRDD<Row> rdd = matrix.toJavaRDD().coalesce(1)
.mapPartitions(new FlatMapFunction<Iterator<Row>, Row>() {
private static final long serialVersionUID = 3232323;
public Iterator<Row> call(Iterator<Row> t)
throws Exception {
// TODO Auto-generated method stub
Map<String, String> map = new java.util.HashMap<>();
List<Row> list = new ArrayList<>();
while (t.hasNext()) {
Row row = t.next();
String one = row.getString(0);
String two = row.getString(2);
String finalStr = null;
if (!one.equals(two)) {
int i = one.compareTo(two);
if (i > 0) {
finalStr = two + one;
} else {
finalStr = one + two;
}
if (map.get(finalStr) == null) {
list.add(row);
}
map.put(finalStr, "test");
}
}
return list.iterator();
}
});
JavaPairRDD<String, JacardTuple> pairRdd = rdd.repartition(10)
.mapToPair(new PairFunction<Row, String, JacardTuple>() {
/**
*
*/
private static final long serialVersionUID = 323232345;
public Tuple2<String, JacardTuple> call(Row t)
throws Exception {
String text = t.get(1).toString().replaceAll("\\p{P}","");
String text2 = t.get(3).toString().replaceAll("\\p{P}","");
String baseString = text2;
System.out.println(t.getString(0)+"-"+t.getString(2));
if (text2.split(" ").length > text.split(" ").length) {
baseString = text;
}
String[] split = null;
if (baseString.equals(text)) {
split = text2.split(" ");
} else {
split = text.split(" ");
}
double unique = 0;
double union = 0;
String[] baseStringArray=baseString.split(" ");
for (String string : split) {
if (Arrays.asList(baseStringArray).contains(string)) {
union++;
} else {
unique++;
}
}
for(String two: baseStringArray)
{
if (Arrays.asList(split).contains(two)) {
} else {
unique++;
}
}
return new Tuple2<String, JacardTuple>(
t.getString(0) + "-" + t.getString(2),
new JacardTuple(union + unique, union));
}
});
for (Tuple2<String, JacardTuple> tuple : pairRdd.collect()) {
System.out.println(tuple._1 + "," + tuple._2);
}
}
}
Below is a helper class used in the above code
import java.io.Serializable;
public class JacardTuple implements Serializable {
/**
*
*/
private static final long serialVersionUID = 44545;
private double count;
private double union;
public JacardTuple(double count, double union) {
super();
this.count = count;
this.union = union;
}
public JacardTuple() {
super();
// TODO Auto-generated constructor stub
}
public double getCount() {
return count;
}
public void setCount(double count) {
this.count = count;
}
public double getUnion() {
return union;
}
public void setUnion(double union) {
this.union = union;
}
public static long getSerialversionuid() {
return serialVersionUID;
}
@Override
public String toString() {
return String
.valueOf(union/count);
}
}