In this article we will implement fuzzy string matching in a spark data frame, using the Levenshtein distance algorithm. With Fuzzy matching, we will be able to find non-exact matches in data. Spark has built-in support for fuzzy matching strings if we have to do a simple one 2 one matching between two columns using Soundex and Levenshtein fuzzy matching algorithm. The Soundex is a phonetic algorithm that is based on how the word is pronounced whereas the Levenshtein algorithm is based on the minimum number of single-character edits (insertions, deletions, or substitutions) required to change one word into the other.
Lets code the Soundex in spark
package com.timepasstechies.blog.fuzzy import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ object FuzzyBuiltIn extends App { lazy val sparkSession: SparkSession = SparkSession .builder() .master("local[*]") .getOrCreate() val dataGenerator = new DataFrameGenerator val df1 = dataGenerator.getSample(sparkSession) val actualSoundexDF = df1 .withColumn("word1_soundex_homophones", soundex(col("str1"))) .withColumn("word2_soundex_homophones", soundex(col("str2"))) actualSoundexDF.show() }
Lets code the DataFrameGenerator used in the above example
package com.timepasstechies.blog.fuzzy import org.apache.spark.sql.{DataFrame, SparkSession} import scala.collection.mutable.ListBuffer class DataFrameGenerator { def getSampleEmployeeDataFrame(sparkSession: SparkSession): DataFrame = { import sparkSession.implicits._ var sequenceOfOverview = ListBuffer[(String, String, String, String, String)]() sequenceOfOverview += Tuple5("adarsh", "2323", "20200901", "EMC", "India") sequenceOfOverview += Tuple5( "Abigail", "455", "20200901", "peerislands.io", "cayman islands" ) sequenceOfOverview += Tuple5( "Aaliyah", "232", "20200901", "peerislands", "cayman islands" ) sequenceOfOverview += Tuple5( "Ariel", "3434", "20200901", "western digital", "India" ) sequenceOfOverview += Tuple5("Alina", "3434", "20200902", "EMC", "India") sequenceOfOverview += Tuple5( "Addyson", "34354", "20200902", "peerislands.io", "cayman islands" ) sequenceOfOverview += Tuple5( "Alayah", "5656", "20200902", "peerislands", "cayman islands" ) sequenceOfOverview += Tuple5( "Aubrie", "56566", "20200902", "peerislands", "cayman islands" ) sequenceOfOverview += Tuple5("Bobby", "232", "20200903", "EMC", "India") sequenceOfOverview += Tuple5( "Bethany", "7878", "20200903", "peerislands.io", "cayman islands" ) sequenceOfOverview += Tuple5( "Beatrice", "2323", "20200903", "peerislands.io", "cayman islands" ) sequenceOfOverview += Tuple5( "Bobby", "767", "20200903", "western Digital", "India" ) sequenceOfOverview += Tuple5("Cabernet", "836", "20200904", "EMC", "India") sequenceOfOverview += Tuple5( "Cable", "746", "20200904", "peerislands.io", "cayman islands" ) sequenceOfOverview += Tuple5( "Cachi", "546", "20200904", "western Digital", "India" ) sequenceOfOverview += Tuple5( "Cache", "864", "20200904", "peerislands", "cayman islands" ) val df1 = sequenceOfOverview.toDF("name", "id", "dob", "company", "location") df1 } def getSampleDataFrame(sparkSession: SparkSession): DataFrame = { import sparkSession.implicits._ var sequenceOfOverview = ListBuffer[(String, String, String, String, String)]() sequenceOfOverview += Tuple5( "adrsh", "2323", "20200901", "western Digital", "India" ) sequenceOfOverview += Tuple5( "Allison", "455", "20200901", "peerislands.io", "cayman islands" ) sequenceOfOverview += Tuple5( "Anna", "232", "20200901", "western Digital", "India" ) sequenceOfOverview += Tuple5( "Ariels", "3434", "20200901", "peerislands", "cayman islands" ) sequenceOfOverview += Tuple5("Alna", "3434", "20200902", "EMC", "India") sequenceOfOverview += Tuple5( "Addyson", "34354", "20200902", "western Digital", "India" ) sequenceOfOverview += Tuple5( "Alayah", "5656", "20200902", "peerislands.io", "cayman islands" ) sequenceOfOverview += Tuple5( "chethan", "56566", "20200902", "peerislands", "cayman islands" ) sequenceOfOverview += Tuple5("Bobbi", "232", "20200903", "EMC", "India") sequenceOfOverview += Tuple5( "Blake", "7878", "20200903", "peerislands.io", "cayman islands" ) sequenceOfOverview += Tuple5( "Beatricesa", "2323", "20200903", "western Digital", "India" ) sequenceOfOverview += Tuple5( "Bella", "767", "20200903", "peerislands", "cayman islands" ) sequenceOfOverview += Tuple5( "Cabernetes", "836", "20200904", "peerislands.io", "cayman islands" ) sequenceOfOverview += Tuple5( "Cabriole", "746", "20200904", "peerislands", "cayman islands" ) sequenceOfOverview += Tuple5("Cachi", "546", "20200904", "BOSCH", "India") sequenceOfOverview += Tuple5( "Caches", "864", "20200904", "British Telecom", "India" ) val df1 = sequenceOfOverview.toDF("name", "id", "dob", "company", "location") df1 } def getSample(sparkSession: SparkSession): DataFrame = { import sparkSession.implicits._ var sequenceOfOverview = ListBuffer[(String, String)]() sequenceOfOverview += Tuple2("hi there", "hi there") sequenceOfOverview += Tuple2("hi there", "hi there is") sequenceOfOverview += Tuple2("cool", "fool") sequenceOfOverview += Tuple2("sum", "test") val df1 = sequenceOfOverview.toDF("str1", "str2") df1 } }
Below is the output
Lets code the Levenshtein distance in spark
package com.timepasstechies.blog.fuzzy import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ object FuzzyBuiltIn extends App { lazy val sparkSession: SparkSession = SparkSession .builder() .master("local[*]") .getOrCreate() val dataGenerator = new DataFrameGenerator val df1 = dataGenerator.getSample(sparkSession) val actualLevenshteinDF = df1 .withColumn("word1_levenshtein", levenshtein(col("str1"), col("str2"))) actualLevenshteinDF.show() }
Below is the output
This works well when we have one to one comparison, but what if we have to compare a column with a list of values. In this scenario, we will not be able to use the built-in function provided by spark and have to have our own implementation. Let’s consider the name column from dataframe1 which has to be matched with all the values in the name column in dataframe2.
Lets first implement the levenshtein distance algorithm
package com.timepasstechies.blog.fuzzy class LevenshteinDistance { def minimum(i1: Int, i2: Int, i3: Int) = math.min(math.min(i1, i2), i3) def apply(s1: String, s2: String) = { if (s1 != null && s1 != "" && s2 != null && s2 != "") { val str1 = s1.toLowerCase val str2 = s2.toLowerCase val dist = Array.tabulate(str2.length + 1, str1.length + 1) { (j, i) => if (j == 0) i else if (i == 0) j else 0 } for (j <- 1 to str2.length; i <- 1 to str1.length) dist(j)(i) = if (str2(j - 1) == str1(i - 1)) dist(j - 1)(i - 1) else minimum( dist(j - 1)(i) + 1, dist(j)(i - 1) + 1, dist(j - 1)(i - 1) + 1 ) dist(str2.length)(str1.length) } else 10 } def getLevenshteinDistance = (names: String) => { val levenshteinDistance = new LevenshteinDistance val employees = names.split("\\|") val employeeInData = employees(0) val matching = employees .slice(1, employees.length) .find(name => { levenshteinDistance.apply(employeeInData, name) <= 2 }) matching.getOrElse("NO_MATCH") } }
Lets code the driver code to use the above algorithm
package com.timepasstechies.blog.fuzzy import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ object Gateway extends App { lazy val sparkSession: SparkSession = SparkSession .builder() .master("local[*]") .getOrCreate() val dataGenerator = new DataFrameGenerator val levenshteinDistance = new LevenshteinDistance val df1 = dataGenerator.getSampleEmployeeDataFrame(sparkSession) val df2 = dataGenerator.getSampleDataFrame(sparkSession) val employeeName = df2 .select("name") .collect() .map(row => { row.getString(0) }) .toList .mkString("|") val combined = df1.withColumn("Names", concat(col("name"), lit("|"), lit(employeeName))) val levenshteinDistanceUdf = sparkSession.udf.register( "levenshteinDistance", levenshteinDistance.getLevenshteinDistance ) val nameMatchedDf = combined.withColumn("matchingName", levenshteinDistanceUdf(col("Names"))) nameMatchedDf.write .format("com.crealytics.spark.excel") .option("dataAddress", s"'data'!A1") .option("useHeader", "true") .option("header", "true") .mode(org.apache.spark.sql.SaveMode.Append) .save("out_path") }
Below is the output
That`s a brief on how we can use the Soundex and Levenshtein fuzzy matching algorithms provided natively via the Spark API and also via registering a UDF with the custom implementation of the Levenshtein algorithm for the use case where we want to compare a string against a list of string.