spark example for jaccard similarity for lsh algorithm

The Jaccard similarity index or the jaccard similarity coefficient compares two datasets to see which data is shared and which are distinct. It is a measure of similarity for the two data sets with a range from 0 to 100 percent. The higher the percentage, the more similar the two datasets are.

We will implement the brute force version of LSH algorithm that is implement a pair-wise calculation of Jaccard similarity . This will not only give you more appreciation of the LSH algorithm.

The input for this code is list of files which contains some text . The output will be as below


[(FILE_1.txt-FILE_2.txt,0.88887), (FILE_1.txt-FILE_3.txt,0.989898988464), (FILE_2.txt-FILE_3.txt,0.88787428571428571), (FILE_1.txt-FILE_4.txt,0.8996552857142856), (FILE_1.txt-FILE_5.txt,0.889898745762711865), (FILE_2.txt-FILE_4.txt,0.89898966666666), (FILE_2.txt-FILE_5.txt,0.9898989047619047616), (FILE_3.txt-FILE_4.txt,0.7788545454545456), (FILE_3.txt-FILE_5.txt,0.9898989863013), (FILE_4.txt-FILE_5.txt,0.898989)]


import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;

public class SparkJacardsLSH {

private static final String FILE_URI = "{INPUT_PATH}/LSH_*.txt";

public static void main(String[] args) {

// initializing spark
SparkSession spark = SparkSession.builder()
.config("spark.master", "local[*]").getOrCreate();
@SuppressWarnings("resource")
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
sc.setLogLevel("WARN");

// create RDD by using text files
JavaPairRDD<String, String> documents = sc.wholeTextFiles(FILE_URI);

JavaRDD<Row> rddT = documents.map(tuple -> RowFactory.create(
tuple._1.substring(tuple._1().lastIndexOf("/") + 1), tuple._2));

StructField one = DataTypes.createStructField("fileName",
DataTypes.StringType, true);

StructField two = DataTypes.createStructField("content",
DataTypes.StringType, true);

StructType type = DataTypes.createStructType(
new ArrayList<StructField>(Arrays.asList(one, two)));

Dataset<Row> characteristicMatrix = spark.createDataFrame(rddT, type);

characteristicMatrix.show();


//cartesian product of the characteristic matrix

Dataset<Row> matrix = characteristicMatrix
.crossJoin(characteristicMatrix)
.toDF("one", "two", "three", "four");


/*
* filter out about half of it. More specifically, using cartesian will give
* you combinations such as (file1, file2) and (file2, file1). However as
* far as their Jaccard similarity is concerned, this two pairs are the
* same. Therefore you will need to add a filter to eliminate the repeated
* pairs such as in this example
*/

JavaRDD<Row> rdd = matrix.toJavaRDD().coalesce(1)
.mapPartitions(new FlatMapFunction<Iterator<Row>, Row>() {

private static final long serialVersionUID = 3232323;

public Iterator<Row> call(Iterator<Row> t)
throws Exception {
// TODO Auto-generated method stub

Map<String, String> map = new java.util.HashMap<>();
List<Row> list = new ArrayList<>();

while (t.hasNext()) {
Row row = t.next();

String one = row.getString(0);
String two = row.getString(2);
String finalStr = null;

if (!one.equals(two)) {
int i = one.compareTo(two);

if (i > 0) {
finalStr = two + one;

} else {
finalStr = one + two;
}

if (map.get(finalStr) == null) {
list.add(row);
}

map.put(finalStr, "test");

}
}

return list.iterator();
}
});

JavaPairRDD<String, JacardTuple> pairRdd = rdd.repartition(10)
.mapToPair(new PairFunction<Row, String, JacardTuple>() {

/**
*
*/
private static final long serialVersionUID = 323232345;

public Tuple2<String, JacardTuple> call(Row t)
throws Exception {

String text = t.get(1).toString().replaceAll("\\p{P}","");
String text2 = t.get(3).toString().replaceAll("\\p{P}","");
String baseString = text2;

System.out.println(t.getString(0)+"-"+t.getString(2));

if (text2.split(" ").length > text.split(" ").length) {
baseString = text;
}

String[] split = null;

if (baseString.equals(text)) {
split = text2.split(" ");
} else {
split = text.split(" ");
}

double unique = 0;
double union = 0;

String[] baseStringArray=baseString.split(" ");

for (String string : split) {

if (Arrays.asList(baseStringArray).contains(string)) {
union++;
} else {
unique++;
}

}

for(String two: baseStringArray)
{

if (Arrays.asList(split).contains(two)) {

} else {
unique++;
}

}

return new Tuple2<String, JacardTuple>(
t.getString(0) + "-" + t.getString(2),
new JacardTuple(union + unique, union));
}
});

for (Tuple2<String, JacardTuple> tuple : pairRdd.collect()) {

System.out.println(tuple._1 + "," + tuple._2);

}

}
}

Below is a helper class used in the above code


import java.io.Serializable;

public class JacardTuple implements Serializable {

/**
*
*/
private static final long serialVersionUID = 44545;

private double count;

private double union;

public JacardTuple(double count, double union) {
super();
this.count = count;
this.union = union;

}


public JacardTuple() {
super();
// TODO Auto-generated constructor stub
}

public double getCount() {
return count;
}

public void setCount(double count) {
this.count = count;
}

public double getUnion() {
return union;
}

public void setUnion(double union) {
this.union = union;
}

public static long getSerialversionuid() {
return serialVersionUID;
}

@Override
public String toString() {
return String
.valueOf(union/count);
}

}