spark dataframe fuzzy string matching

In this article we will implement fuzzy string matching in a spark data frame, using the Levenshtein distance algorithm. With Fuzzy matching, we will be able to find non-exact matches in data. Spark has built-in support for fuzzy matching strings if we have to do a simple one 2 one matching between two columns using Soundex and Levenshtein fuzzy matching algorithm. The Soundex is a phonetic algorithm that is based on how the word is pronounced whereas the Levenshtein algorithm is based on the minimum number of single-character edits (insertions, deletions, or substitutions) required to change one word into the other.

Lets code the Soundex in spark

package com.timepasstechies.blog.fuzzy

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object FuzzyBuiltIn extends App {

lazy val sparkSession: SparkSession = SparkSession
.builder()
.master("local[*]")
.getOrCreate()
val dataGenerator = new DataFrameGenerator
val df1 = dataGenerator.getSample(sparkSession)

val actualSoundexDF = df1
.withColumn("word1_soundex_homophones", soundex(col("str1")))
.withColumn("word2_soundex_homophones", soundex(col("str2")))

actualSoundexDF.show()
}

Lets code the DataFrameGenerator used in the above example

package com.timepasstechies.blog.fuzzy
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer
class DataFrameGenerator {

def getSampleEmployeeDataFrame(sparkSession: SparkSession): DataFrame = {
import sparkSession.implicits._
var sequenceOfOverview =
ListBuffer[(String, String, String, String, String)]()
sequenceOfOverview += Tuple5("adarsh", "2323", "20200901", "EMC", "India")
sequenceOfOverview += Tuple5(
"Abigail",
"455",
"20200901",
"peerislands.io",
"cayman islands"
)
sequenceOfOverview += Tuple5(
"Aaliyah",
"232",
"20200901",
"peerislands",
"cayman islands"
)
sequenceOfOverview += Tuple5(
"Ariel",
"3434",
"20200901",
"western digital",
"India"
)
sequenceOfOverview += Tuple5("Alina", "3434", "20200902", "EMC", "India")
sequenceOfOverview += Tuple5(
"Addyson",
"34354",
"20200902",
"peerislands.io",
"cayman islands"
)
sequenceOfOverview += Tuple5(
"Alayah",
"5656",
"20200902",
"peerislands",
"cayman islands"
)
sequenceOfOverview += Tuple5(
"Aubrie",
"56566",
"20200902",
"peerislands",
"cayman islands"
)
sequenceOfOverview += Tuple5("Bobby", "232", "20200903", "EMC", "India")
sequenceOfOverview += Tuple5(
"Bethany",
"7878",
"20200903",
"peerislands.io",
"cayman islands"
)
sequenceOfOverview += Tuple5(
"Beatrice",
"2323",
"20200903",
"peerislands.io",
"cayman islands"
)
sequenceOfOverview += Tuple5(
"Bobby",
"767",
"20200903",
"western Digital",
"India"
)
sequenceOfOverview += Tuple5("Cabernet", "836", "20200904", "EMC", "India")
sequenceOfOverview += Tuple5(
"Cable",
"746",
"20200904",
"peerislands.io",
"cayman islands"
)
sequenceOfOverview += Tuple5(
"Cachi",
"546",
"20200904",
"western Digital",
"India"
)
sequenceOfOverview += Tuple5(
"Cache",
"864",
"20200904",
"peerislands",
"cayman islands"
)
val df1 =
sequenceOfOverview.toDF("name", "id", "dob", "company", "location")
df1
}
def getSampleDataFrame(sparkSession: SparkSession): DataFrame = {
import sparkSession.implicits._
var sequenceOfOverview =
ListBuffer[(String, String, String, String, String)]()
sequenceOfOverview += Tuple5(
"adrsh",
"2323",
"20200901",
"western Digital",
"India"
)
sequenceOfOverview += Tuple5(
"Allison",
"455",
"20200901",
"peerislands.io",
"cayman islands"
)
sequenceOfOverview += Tuple5(
"Anna",
"232",
"20200901",
"western Digital",
"India"
)
sequenceOfOverview += Tuple5(
"Ariels",
"3434",
"20200901",
"peerislands",
"cayman islands"
)
sequenceOfOverview += Tuple5("Alna", "3434", "20200902", "EMC", "India")
sequenceOfOverview += Tuple5(
"Addyson",
"34354",
"20200902",
"western Digital",
"India"
)
sequenceOfOverview += Tuple5(
"Alayah",
"5656",
"20200902",
"peerislands.io",
"cayman islands"
)
sequenceOfOverview += Tuple5(
"chethan",
"56566",
"20200902",
"peerislands",
"cayman islands"
)
sequenceOfOverview += Tuple5("Bobbi", "232", "20200903", "EMC", "India")
sequenceOfOverview += Tuple5(
"Blake",
"7878",
"20200903",
"peerislands.io",
"cayman islands"
)
sequenceOfOverview += Tuple5(
"Beatricesa",
"2323",
"20200903",
"western Digital",
"India"
)
sequenceOfOverview += Tuple5(
"Bella",
"767",
"20200903",
"peerislands",
"cayman islands"
)
sequenceOfOverview += Tuple5(
"Cabernetes",
"836",
"20200904",
"peerislands.io",
"cayman islands"
)
sequenceOfOverview += Tuple5(
"Cabriole",
"746",
"20200904",
"peerislands",
"cayman islands"
)
sequenceOfOverview += Tuple5("Cachi", "546", "20200904", "BOSCH", "India")
sequenceOfOverview += Tuple5(
"Caches",
"864",
"20200904",
"British Telecom",
"India"
)
val df1 =
sequenceOfOverview.toDF("name", "id", "dob", "company", "location")
df1
}
def getSample(sparkSession: SparkSession): DataFrame = {
import sparkSession.implicits._
var sequenceOfOverview = ListBuffer[(String, String)]()
sequenceOfOverview += Tuple2("hi there", "hi there")
sequenceOfOverview += Tuple2("hi there", "hi there is")
sequenceOfOverview += Tuple2("cool", "fool")
sequenceOfOverview += Tuple2("sum", "test")
val df1 =
sequenceOfOverview.toDF("str1", "str2")
df1
}
}


Below is the output

Lets code the Levenshtein distance in spark

package com.timepasstechies.blog.fuzzy

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object FuzzyBuiltIn extends App {

lazy val sparkSession: SparkSession = SparkSession
.builder()
.master("local[*]")
.getOrCreate()
val dataGenerator = new DataFrameGenerator
val df1 = dataGenerator.getSample(sparkSession)

val actualLevenshteinDF = df1
.withColumn("word1_levenshtein", levenshtein(col("str1"), col("str2")))

actualLevenshteinDF.show()
}

Below is the output

This works well when we have one to one comparison, but what if we have to compare a column with a list of values.  In this scenario, we will not be able to use the built-in function provided by spark and have to have our own implementation. Let’s consider the name column from dataframe1 which has to be matched with all the values in the name column in dataframe2.

Lets first implement the levenshtein distance algorithm 

package com.timepasstechies.blog.fuzzy

class LevenshteinDistance {

def minimum(i1: Int, i2: Int, i3: Int) = math.min(math.min(i1, i2), i3)
def apply(s1: String, s2: String) = {
if (s1 != null && s1 != "" && s2 != null && s2 != "") {
val str1 = s1.toLowerCase
val str2 = s2.toLowerCase
val dist = Array.tabulate(str2.length + 1, str1.length + 1) { (j, i) =>
if (j == 0) i else if (i == 0) j else 0
}

for (j <- 1 to str2.length; i <- 1 to str1.length)
dist(j)(i) =
if (str2(j - 1) == str1(i - 1)) dist(j - 1)(i - 1)
else
minimum(
dist(j - 1)(i) + 1,
dist(j)(i - 1) + 1,
dist(j - 1)(i - 1) + 1
)

dist(str2.length)(str1.length)
} else 10
}

def getLevenshteinDistance = (names: String) => {

val levenshteinDistance = new LevenshteinDistance
val employees = names.split("\\|")
val employeeInData = employees(0)
val matching = employees
.slice(1, employees.length)
.find(name => {
levenshteinDistance.apply(employeeInData, name) <= 2

})
matching.getOrElse("NO_MATCH")
}

}

 

Lets code the driver code to use the above algorithm

package com.timepasstechies.blog.fuzzy
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object Gateway extends App {

lazy val sparkSession: SparkSession = SparkSession
.builder()
.master("local[*]")
.getOrCreate()
val dataGenerator = new DataFrameGenerator
val levenshteinDistance = new LevenshteinDistance
val df1 = dataGenerator.getSampleEmployeeDataFrame(sparkSession)
val df2 = dataGenerator.getSampleDataFrame(sparkSession)
val employeeName = df2
.select("name")
.collect()
.map(row => { row.getString(0) })
.toList
.mkString("|")

val combined =
df1.withColumn("Names", concat(col("name"), lit("|"), lit(employeeName)))

val levenshteinDistanceUdf =
sparkSession.udf.register(
"levenshteinDistance",
levenshteinDistance.getLevenshteinDistance
)

val nameMatchedDf =
combined.withColumn("matchingName", levenshteinDistanceUdf(col("Names")))

nameMatchedDf.write
.format("com.crealytics.spark.excel")
.option("dataAddress", s"'data'!A1")
.option("useHeader", "true")
.option("header", "true")
.mode(org.apache.spark.sql.SaveMode.Append)
.save("out_path")
}

Below is the output


Conclusion

That`s a brief on how we can use the Soundex and Levenshtein fuzzy matching algorithms provided natively via the Spark API and also via registering a UDF with the custom implementation of the Levenshtein algorithm for the use case where we want to compare a string against a list of string.

Leave a Reply

Your email address will not be published. Required fields are marked *