delta lake databricks spark merging data

In this article, I will illustrate how to insert/merge data in delta lake databricks. Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads.

Let’s jump into the code

We can update or insert data that matches a predicate in the Delta table. For example, lets consider we are storing a employee data with the below structure

struct(col("id"),col("name"),col("gender"),
col("profession"),col("age"))

Below is the code which returns a dataFrame with the above structure

package com.timepasstechies.blog.delta

import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer

class Data {

def getSampleDataFrame(sparkSession: SparkSession): DataFrame = {

import sparkSession.implicits._
var sequenceOfOverview =
ListBuffer[(Integer, String, String, String, Integer)]()
sequenceOfOverview += Tuple5(1, "mark", "male", "IT", 34)
sequenceOfOverview += Tuple5(2, "steve", "male", "Automobile", 28)
sequenceOfOverview += Tuple5(3, "stella", "female", "marketing", 23)
sequenceOfOverview += Tuple5(4, "taylor", "male", "Professor", 43)
val df1 =
sequenceOfOverview.toDF("id", "name", "gender", "profession", "age")
df1
}

}

We can merge or insert data from a spark dataFrame into a delta lake table using the merge operation.

In the below code we are merging the employee delta lake table data with the dataFrame that we created above. We are merging records based on the id column, and if the id is not existing in the delta lake then the record would be inserted. The columnMapToUpdate specifies which columns to be updated and in the below example we are updating all the columns if the id matches with the existing data. Also, note that we could have used updateAll instead of updateExpr below as we are updating all the columns.

Automatic schema evolution has been enabled from databricks runtime 6.6 and above and the same can be enabled by setting the spark configuration

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled ","true")
package com.timepasstechies.blog.delta

import io.delta.tables.DeltaTable
import org.apache.spark.sql.{DataFrame, SparkSession}

class DeltaLake {

def mergeDeltaLakeData(deltaLakePath: String,
dataFrame: DataFrame,
sparkSession: SparkSession) = {

val present = FileSystem.isPathPresent(sparkSession, deltaLakePath)

if (!present) {
dataFrame.write
.partitionBy("profession")
.option("overwriteSchema", "true")
.format("delta")
.save(deltaLakePath)
} else {

val columnMapToUpdate = Map(
"id" -> "new.id",
"name" -> "new.name",
"gender" -> "new.gender",
"profession" -> "new.profession",
"age" -> "new.age"
)

// The dataframe should have same columns as in the target table, 
   otherwise the query throws an analysis error

DeltaTable
.forPath(sparkSession, deltaLakePath)
.as("old")
.merge(dataFrame.as("new"), "new.id = old.id")
.whenMatched
.updateExpr(columnMapToUpdate)
.whenNotMatched
.insertExpr(columnMapToUpdate + ("id" -> "new.id"))
.execute()
}

}

}

Below is the utility class to check the presence of a delta lake table

package com.timepasstechies.blog.delta

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession

object FileSystem {

def isPathPresent(sparkSession: SparkSession, path: String): Boolean = {

val fs = org.apache.hadoop.fs.FileSystem
.get(sparkSession.sparkContext.hadoopConfiguration)

fs.exists(new Path(path))
}

}

Gateway class to run the example

package com.timepasstechies.blog.delta

import org.apache.spark.sql.SparkSession

object Gateway extends App {

val deltaLakePath =
"/blob/test/deltalake/employee"
val data = new Data()
lazy val sparkSession: SparkSession = SparkSession
.builder()
.master("local[*]")
.getOrCreate()
val dataFrame = data.getSampleDataFrame(sparkSession)
new DeltaLake().mergeDeltaLakeData(deltaLakePath, dataFrame, sparkSession)

}

Delta lake also has auto-optimize option which can be enabled using spark configuration as below, if we enable this option it will compact small files during individual writes from spark to the Delta table.

spark.conf.set("spark.databricks.delta.optimizeWrite.enabled","true")

We can also enable auto compaction with delta lake generates smaller files around 128 MB compared to an OPTIMIZE which generates file around 1 GB. To enable the same we can use the below property

 spark.conf.set("spark.databricks.delta.autoCompact.enabled","true")

Delta lake also supports time travel and you can either use the timestamp or the version number. Below is an example of using the version number and we are rolling back to our first data load version

// This query will show the timestamp and also the version number with other metadata

%sql describe history delta.`/blob/test/deltalake/employee`

// Below is the query to rollback to older version of data

MERGE INTO delta.`/blob/test/deltalake/employee` target
USING delta.`/blob/test/deltalake/employee`
VERSION AS OF 1 source
ON source.id = target.id
WHEN MATCHED THEN UPDATE SET *

That`s a quick overview of how to insert/merge data in delta lake databricks with a spark as a source data.