scala code to zip files in Azure blob storage on databricks notebook

In this short article, we will write a program in scala to compress the files in azure blob storage on databricks notebook. In the below code the input_path refers to the location in the Azure blob storage under which all the blobs need to be compressed, output_path refers to the location where the compressed file needs to be written in the blob storage and the zipFileName is the zip file name to be used.

import org.apache.hadoop.fs.{FileSystem, Path}
import java.util.zip.{ZipEntry, ZipOutputStream}
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer

object CompressFilesInAzureBlobStorage {

  val spark = SparkSession.builder().
              master("local").getOrCreate()

  def zipFile(out: String,
              fileName: String,
              basePath: String,
              fsn: FileSystem,
              zip: ZipOutputStream) = {
    zip.putNextEntry(new ZipEntry(fileName))
    val in = fsn.open(new Path(basePath + fileName))
    val buf = new scala.Array[Byte](1024)
    var len = in.read(buf)
    while (len >= 0) {
      zip.write(buf, 0, len)
      len = in.read(buf)
    }
    in.close
    zip.closeEntry()
  }

  def zip(inputPath: String, outputPath: String, zipFileName: String) = {

    var fileList: ListBuffer[String] = new ListBuffer()
    val fileSystem = FileSystem
                    .get(spark.sparkContext.hadoopConfiguration)
    val fileStatusListIterator =
      fileSystem.listFiles(new Path(inputPath), true)
    val outputStream =
      fileSystem.create(new Path(s"$outputPath/$zipFileName"))
    val zip = new ZipOutputStream(outputStream)

    while (fileStatusListIterator.hasNext()) {
      val fileStatus = fileStatusListIterator.next();
      val relFilePath = fileStatus
        .getPath()
        .toString
        .replaceAll(s"dbfs:$inputPath", "")
      fileList += relFilePath
    }

    val fileSeq = fileList.toSeq
    fileSeq
      .map(name => {
        zipFile(outputPath, name, inputPath, fileSystem, zip)
      })
    zip.close()
  }

  val input_path = s"/mnt/blob/input_file_path"
  val output_path = s"/mnt/blob/output_file_path"
  val zipFileName = "zipFileName.zip"

  zip(input_path, output_path, zipFileName)

}

Leave a Reply

Your email address will not be published. Required fields are marked *