In this short article, we will write a program in scala to compress the files in azure blob storage on databricks notebook. In the below code the input_path refers to the location in the Azure blob storage under which all the blobs need to be compressed, output_path refers to the location where the compressed file needs to be written in the blob storage and the zipFileName is the zip file name to be used.
import org.apache.hadoop.fs.{FileSystem, Path}
import java.util.zip.{ZipEntry, ZipOutputStream}
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
object CompressFilesInAzureBlobStorage {
val spark = SparkSession.builder().
master("local").getOrCreate()
def zipFile(out: String,
fileName: String,
basePath: String,
fsn: FileSystem,
zip: ZipOutputStream) = {
zip.putNextEntry(new ZipEntry(fileName))
val in = fsn.open(new Path(basePath + fileName))
val buf = new scala.Array[Byte](1024)
var len = in.read(buf)
while (len >= 0) {
zip.write(buf, 0, len)
len = in.read(buf)
}
in.close
zip.closeEntry()
}
def zip(inputPath: String, outputPath: String, zipFileName: String) = {
var fileList: ListBuffer[String] = new ListBuffer()
val fileSystem = FileSystem
.get(spark.sparkContext.hadoopConfiguration)
val fileStatusListIterator =
fileSystem.listFiles(new Path(inputPath), true)
val outputStream =
fileSystem.create(new Path(s"$outputPath/$zipFileName"))
val zip = new ZipOutputStream(outputStream)
while (fileStatusListIterator.hasNext()) {
val fileStatus = fileStatusListIterator.next();
val relFilePath = fileStatus
.getPath()
.toString
.replaceAll(s"dbfs:$inputPath", "")
fileList += relFilePath
}
val fileSeq = fileList.toSeq
fileSeq
.map(name => {
zipFile(outputPath, name, inputPath, fileSystem, zip)
})
zip.close()
}
val input_path = s"/mnt/blob/input_file_path"
val output_path = s"/mnt/blob/output_file_path"
val zipFileName = "zipFileName.zip"
zip(input_path, output_path, zipFileName)
}