The Jaccard similarity index or the jaccard similarity coefficient compares two datasets to see which data is shared and which are distinct. It is a measure of similarity for the two data sets with a range from 0 to 100 percent. The higher the percentage, the more similar the two datasets are.
We will implement the brute force version of LSH algorithm that is implement a pair-wise calculation of Jaccard similarity . This will not only give you more appreciation of the LSH algorithm.
The input for this code is list of files which contains some text . The output will be as below
[(FILE_1.txt-FILE_2.txt,0.88887), (FILE_1.txt-FILE_3.txt,0.989898988464), (FILE_2.txt-FILE_3.txt,0.88787428571428571), (FILE_1.txt-FILE_4.txt,0.8996552857142856), (FILE_1.txt-FILE_5.txt,0.889898745762711865), (FILE_2.txt-FILE_4.txt,0.89898966666666), (FILE_2.txt-FILE_5.txt,0.9898989047619047616), (FILE_3.txt-FILE_4.txt,0.7788545454545456), (FILE_3.txt-FILE_5.txt,0.9898989863013), (FILE_4.txt-FILE_5.txt,0.898989)]
import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; public class SparkJacardsLSH { private static final String FILE_URI = "{INPUT_PATH}/LSH_*.txt"; public static void main(String[] args) { // initializing spark SparkSession spark = SparkSession.builder() .config("spark.master", "local[*]").getOrCreate(); @SuppressWarnings("resource") JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); sc.setLogLevel("WARN"); // create RDD by using text files JavaPairRDD<String, String> documents = sc.wholeTextFiles(FILE_URI); JavaRDD<Row> rddT = documents.map(tuple -> RowFactory.create( tuple._1.substring(tuple._1().lastIndexOf("/") + 1), tuple._2)); StructField one = DataTypes.createStructField("fileName", DataTypes.StringType, true); StructField two = DataTypes.createStructField("content", DataTypes.StringType, true); StructType type = DataTypes.createStructType( new ArrayList<StructField>(Arrays.asList(one, two))); Dataset<Row> characteristicMatrix = spark.createDataFrame(rddT, type); characteristicMatrix.show(); //cartesian product of the characteristic matrix Dataset<Row> matrix = characteristicMatrix .crossJoin(characteristicMatrix) .toDF("one", "two", "three", "four"); /* * filter out about half of it. More specifically, using cartesian will give * you combinations such as (file1, file2) and (file2, file1). However as * far as their Jaccard similarity is concerned, this two pairs are the * same. Therefore you will need to add a filter to eliminate the repeated * pairs such as in this example */ JavaRDD<Row> rdd = matrix.toJavaRDD().coalesce(1) .mapPartitions(new FlatMapFunction<Iterator<Row>, Row>() { private static final long serialVersionUID = 3232323; public Iterator<Row> call(Iterator<Row> t) throws Exception { // TODO Auto-generated method stub Map<String, String> map = new java.util.HashMap<>(); List<Row> list = new ArrayList<>(); while (t.hasNext()) { Row row = t.next(); String one = row.getString(0); String two = row.getString(2); String finalStr = null; if (!one.equals(two)) { int i = one.compareTo(two); if (i > 0) { finalStr = two + one; } else { finalStr = one + two; } if (map.get(finalStr) == null) { list.add(row); } map.put(finalStr, "test"); } } return list.iterator(); } }); JavaPairRDD<String, JacardTuple> pairRdd = rdd.repartition(10) .mapToPair(new PairFunction<Row, String, JacardTuple>() { /** * */ private static final long serialVersionUID = 323232345; public Tuple2<String, JacardTuple> call(Row t) throws Exception { String text = t.get(1).toString().replaceAll("\\p{P}",""); String text2 = t.get(3).toString().replaceAll("\\p{P}",""); String baseString = text2; System.out.println(t.getString(0)+"-"+t.getString(2)); if (text2.split(" ").length > text.split(" ").length) { baseString = text; } String[] split = null; if (baseString.equals(text)) { split = text2.split(" "); } else { split = text.split(" "); } double unique = 0; double union = 0; String[] baseStringArray=baseString.split(" "); for (String string : split) { if (Arrays.asList(baseStringArray).contains(string)) { union++; } else { unique++; } } for(String two: baseStringArray) { if (Arrays.asList(split).contains(two)) { } else { unique++; } } return new Tuple2<String, JacardTuple>( t.getString(0) + "-" + t.getString(2), new JacardTuple(union + unique, union)); } }); for (Tuple2<String, JacardTuple> tuple : pairRdd.collect()) { System.out.println(tuple._1 + "," + tuple._2); } } }
Below is a helper class used in the above code
import java.io.Serializable; public class JacardTuple implements Serializable { /** * */ private static final long serialVersionUID = 44545; private double count; private double union; public JacardTuple(double count, double union) { super(); this.count = count; this.union = union; } public JacardTuple() { super(); // TODO Auto-generated constructor stub } public double getCount() { return count; } public void setCount(double count) { this.count = count; } public double getUnion() { return union; } public void setUnion(double union) { this.union = union; } public static long getSerialversionuid() { return serialVersionUID; } @Override public String toString() { return String .valueOf(union/count); } }