In this short article, we will write a program in scala to compress the files in azure blob storage on databricks notebook. In the below code the input_path refers to the location in the Azure blob storage under which all the blobs need to be compressed, output_path refers to the location where the compressed file needs to be written in the blob storage and the zipFileName is the zip file name to be used.
import org.apache.hadoop.fs.{FileSystem, Path} import java.util.zip.{ZipEntry, ZipOutputStream} import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object CompressFilesInAzureBlobStorage { val spark = SparkSession.builder(). master("local").getOrCreate() def zipFile(out: String, fileName: String, basePath: String, fsn: FileSystem, zip: ZipOutputStream) = { zip.putNextEntry(new ZipEntry(fileName)) val in = fsn.open(new Path(basePath + fileName)) val buf = new scala.Array[Byte](1024) var len = in.read(buf) while (len >= 0) { zip.write(buf, 0, len) len = in.read(buf) } in.close zip.closeEntry() } def zip(inputPath: String, outputPath: String, zipFileName: String) = { var fileList: ListBuffer[String] = new ListBuffer() val fileSystem = FileSystem .get(spark.sparkContext.hadoopConfiguration) val fileStatusListIterator = fileSystem.listFiles(new Path(inputPath), true) val outputStream = fileSystem.create(new Path(s"$outputPath/$zipFileName")) val zip = new ZipOutputStream(outputStream) while (fileStatusListIterator.hasNext()) { val fileStatus = fileStatusListIterator.next(); val relFilePath = fileStatus .getPath() .toString .replaceAll(s"dbfs:$inputPath", "") fileList += relFilePath } val fileSeq = fileList.toSeq fileSeq .map(name => { zipFile(outputPath, name, inputPath, fileSystem, zip) }) zip.close() } val input_path = s"/mnt/blob/input_file_path" val output_path = s"/mnt/blob/output_file_path" val zipFileName = "zipFileName.zip" zip(input_path, output_path, zipFileName) }