spark example to replace a header delimiter

This article will illustrate an example of how we can replace a delimiter in a spark dataframe. Let’s start with loading a CSV file into dataframe.

object SparkDataframeRegex extends App {
 

  val spark = SparkSession.builder.master("local").getOrCreate()

  val df = spark.read
    .option("header", true)
    .option("delimiter", "|")
    .format("csv")
    .load("C:\\dataset\\movies_dataset\\input")

    df.show(false)

}

Now if we want to replace all the pipe delimited characters with a caret character using spark we would do something like below

object SparkDataframeRegex extends App {
 

  val spark = SparkSession.builder.master("local").getOrCreate()

  val df = spark.read
    .option("header", true)
    .option("delimiter", "|")
    .format("csv")
    .load("C:\\dataset\\movies_dataset\\input")

   val replacedDf = df.columns.foldLeft(df)(
    (agg, columnName) =>
      df.withColumn(columnName, regexp_replace(col(columnName), "\\|", "\\^"))
  )

  replacedDf.show()

}

The above example works if you want to replace a character in the column values, but it will not replace the header delimiter, as the delimiter information is lost once you load the CSV with header configuration set to true.

So to solve this use case we need a solution where we treat the dataframe rows like a line of String and do our replacement. Below is the solution (you can achieve these multiple ways this is one approach)

Approach – 1

object SparkDataframeRegex extends App {

  val spark = SparkSession.builder.master("local").getOrCreate()

/* Load the dataframe with a dummy delimiter which does not exist in the file as we get a dataframe with a single column */  

  val df = spark.read
    .option("header", true)
    .option("delimiter", "&")
    .format("csv")
    .load("C:\\dataset\\movies_dataset\\input")

/* Replace the header */

  val modifiedHeader =
    df.columns.mkString.replaceAll("\\|", "\\^")

/*Rename the column */

  val dfWithHeaderSubs =
    df.withColumnRenamed(df.columns.mkString("\\|"), modifiedHeader)

/*Rename the column values  */

  val subsDataFrame = dfWithHeaderSubs.select(
    regexp_replace(col(modifiedHeader), "\\|", "\\^").as(modifiedHeader)
  )

/* Split the modified header with the new delimiter and Iterate over the modified header and add a new column for each element and drop the modified header column */

  var index = -1
  val transformedDf = modifiedHeader
    .split("\\^")
    .foldLeft(subsDataFrame)((dfs, columnName) => {
      index = index + 1
      dfs.withColumn(
        columnName,
        split(col(modifiedHeader), "\\^").getItem(index)
      )
    })
    .drop(modifiedHeader)

  transformedDf.show(false)

}

Approach – 2

object SparkDataframeRegex extends App {

  val spark = SparkSession.builder.master("local").getOrCreate()


// Load the dataframe

  val df = spark.read
    .option("header", true)
    .option("delimiter", "|")
    .format("csv")
    .load("C:\\dataset\\movies_dataset\\input")

  import spark.implicits._

/* Get the header info */

  val headerInfo = df.schema.fieldNames

/* Convert the schema into a list and to a dataframe */

  val headerDf = List(df.schema.fieldNames.mkString("|")).toDS

/* union the headerDf with the actual dataframe with a new column called as value and set the column value by iterating over each column and setting the value if it's not null. Also, we are using backtick here to handle the dot in the column name. Finally drop all the existing columns other than the value column we created, which now has the data of all the columns separated by a pipe delimiter */

  val unionDf = headerDf.union(
    df.withColumn(
        "value",
        concat_ws(
          "|",
          df.schema.fieldNames
            .map(columnName => coalesce(col(s"`$columnName`"), lit(""))): _*
        )
      )
      .drop(headerInfo: _*)
      .as[String]
  )

/*Replace the Pipe delimiter with the caret delimiter for data and header */

  val replacedDF = unionDf.rdd
    .map(line => line.replaceAll("\\|", "\\^"))
    .toDS

/*Get the dataframe header */

  val headerRow = replacedDF.select(col("value")).first.getString(0)
  
  /*Split it using the new delimiter */

  val headerList = headerRow.split("\\^").toList

  val selectList = new ListBuffer[String]()


  for ((x, i) <- headerList.view.zipWithIndex) selectList += s"_tmp[$i] as `$x`"

  replacedDF
    .filter($"value" =!= headerRow)
    .withColumn("_tmp", split($"value", "\\^"))
    .selectExpr(selectList: _*)
    .show(false)

}

 

Leave a Reply

Your email address will not be published. Required fields are marked *