In this article, I will illustrate how to integrate databricks spark in azure with apache poi and crealytics/spark-excel. Apache poi is a Java Excel solution that is used to read and write MS Excel files using Java and also to read and write MS Word and MS PowerPoint files using Java. Crealytics or spark-excel is a library for querying Excel files with Apache Spark.
At the end of this article, we will be creating a report as below
To add the dependent libraries to sbt project we can use the below artifact
libraryDependencies += "com.crealytics" %% "spark-excel" % "0.13.5"
If you are using maven below is the dependency
<dependency> <groupId>com.crealytics</groupId> <artifactId>spark-excel_2.12</artifactId> <version>0.13.5</version> </dependency>
Let’s first create a basic excel report with crealytics/spark-excel. Below is the code
package com.timepasstechies.blog import org.apache.spark.sql.{DataFrame, SparkSession} import scala.collection.mutable.ListBuffer class SparkExcel { def getSampleDataFrame(sparkSession: SparkSession): DataFrame = { import sparkSession.implicits._ var sequenceOfOverview = ListBuffer[(String, String, Integer)]() sequenceOfOverview += Tuple3("1", "isValidZipCode", 1) sequenceOfOverview += Tuple3("2", "isValidEmail", 0) sequenceOfOverview += Tuple3("3", "isValidPhoneNumber", -1) sequenceOfOverview += Tuple3("4", "isValidDob", 1) val df1 = sequenceOfOverview.toDF("test_case_number", "description", "result") df1 } def writeExcel(dataFrame: DataFrame, path: String): Unit = { val sheetName = "testcase-report" dataFrame.write .format("com.crealytics.spark.excel") .option("dataAddress", s"'$sheetName'!A1") .option("useHeader", "true") .option("header", "true") .mode(org.apache.spark.sql.SaveMode.Append) .save(path) } }
Let’s call the above code to create the basic excel report. If you are running it from Databricks notebook, the SparkSession is created for you when you start a cluster with databricks runtime. The spark session in databricks is accessible through a variable called spark and it encapsulates a spark context.
import org.apache.spark.sql.SparkSession object Gateway extends App { val outputFileName = "\\dataset\\sampleOut\\report-1.xlsx" val sparkExcel = new SparkExcel() // Not required if running from databricks, we can use spark variable lazy val sparkSession: SparkSession = SparkSession .builder() .master("local[*]") .getOrCreate() val dataframe = sparkExcel.getSampleDataFrame(sparkSession) sparkExcel.writeExcel(dataframe, outputFileName) }
The above code will create a report as below
crealytics/spark-excel framework does not support formatting the excel file. Consider an example, if we want to add conditional formatting with IconSet of 3 TRAFFIC LIGHTS or any other icon into the excel we need to use apache poi for the same. As crealytics/spark-excel uses apache poi internally to generate excel files we don`t need to add any apache poi library separately.
As apache poi is a java library with no scala language support so we have to write code in java for the same. Below is the code which formates the above created excel file with conditional formatting of 3 TRAFFIC LIGHTS. If you are using databricks you cannot write java code in the databricks notebook, however, you can create a JAR and upload the Jar into databricks and access the classes you’ve created.
The below code reads an excel file from azure blob storage and formates the same by adding icon sets and writes the same back to azure blob storage.
package com.timepasstechies.blog; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.poi.ss.usermodel.*; import org.apache.poi.ss.util.CellRangeAddress; import org.apache.poi.xssf.usermodel.XSSFWorkbook; import org.apache.spark.sql.SparkSession; public class ApacheExcelPOI { public void updateResultColumn(SparkSession sparkSession, String path) { try { System.out.println("inside poi for report update"); FileSystem fs = FileSystem.get(sparkSession.sparkContext().hadoopConfiguration()); FSDataInputStream fsdi = fs.open(new Path(path)); java.io.InputStream inputStream = fsdi; Workbook workbook = new XSSFWorkbook(inputStream); Sheet sheet = workbook.getSheetAt(0); SheetConditionalFormatting sheetCF = sheet.getSheetConditionalFormatting(); ConditionalFormattingRule rule = sheetCF.createConditionalFormattingRule (IconMultiStateFormatting.IconSet.GYR_3_TRAFFIC_LIGHTS); rule.getMultiStateFormatting().setIconOnly(true); IconMultiStateFormatting iconMultiStateFormatting = rule.getMultiStateFormatting(); ConditionalFormattingThreshold[] thresholds = iconMultiStateFormatting.getThresholds(); if (thresholds.length == 3) { for (int i = 0; i < 3; i++) { ConditionalFormattingThreshold threshold = thresholds[i]; // to change thresholds below code can be modified if (i == 0) { threshold.setValue(0d); } else if (i == 1) { threshold.setRangeType(ConditionalFormattingThreshold.RangeType.NUMBER); threshold.setValue(0d); } else if (i == 2) { threshold.setRangeType(ConditionalFormattingThreshold.RangeType.NUMBER); threshold.setValue(1d); } } } ConditionalFormattingRule[] cfRules = {rule}; CellRangeAddress[] regions = {new CellRangeAddress(2, 100, 3, 3)}; sheetCF.addConditionalFormatting(regions, cfRules); FSDataOutputStream out = fs.create(new Path(path)); workbook.write(out); out.close(); } catch (Exception ex) { System.out.println("Error from poi update " + ex.getLocalizedMessage()); } } }
Lets integrate the above code with our spark excel code we wrote earlier.
package com.timepasstechies.blog import org.apache.spark.sql.SparkSession object Gateway extends App { val outputFileName = "\\dataset\\sampleOut\\report-1.xlsx" val sparkExcel = new SparkExcel() lazy val sparkSession: SparkSession = SparkSession .builder() .master("local[*]") .getOrCreate() val dataframe = sparkExcel.getSampleDataFrame(sparkSession) sparkExcel.writeExcel(dataframe, outputFileName) new ApacheExcelPOI().updateResultColumn(sparkSession, outputFileName) }
If we run the above code with spark core as a dependency in sbt or in databricks notebook we get the below Exception as Spark comes with a bundle that is using an outdated version of commons-compress and POI needs a newer version.
java.lang.IllegalArgumentException: InputStream of class class org.apache.commons.compress.archivers.zip.ZipArchiveInputStream is not implementing InputStreamStatistics. at org.apache.poi.openxml4j.util.ZipArchiveThresholdInputStream.<init> (ZipArchiveThresholdInputStream.java:63) at org.apache.poi.openxml4j.opc.internal.ZipHelper.openZipStream(ZipHelper.java:180) at org.apache.poi.openxml4j.opc.ZipPackage.<init>(ZipPackage.java:104) at org.apache.poi.openxml4j.opc.OPCPackage. open(OPCPackage.java:298) at org.apache.poi.xssf.usermodel.XSSFWorkbookFactory.createWorkbook(XSSFWorkbookFactory.java:129) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.poi.ss.usermodel.WorkbookFactory. createWorkbook(WorkbookFactory.java:314) at org.apache.poi.ss.usermodel.WorkbookFactory. createXSSFWorkbook(WorkbookFactory.java:296) at org.apache.poi.ss.usermodel.WorkbookFactory. create(WorkbookFactory.java:214) at org.apache.poi.ss.usermodel.WorkbookFactory.create(WorkbookFactory.java:180) at com.crealytics.spark.excel.ExcelRelation$$anonfun$openWorkbook$2$$anonfun$apply$4.apply(ExcelRelation.scala:66) at com.crealytics.spark.excel.ExcelRelation$$anonfun$openWorkbook$2$$anonfun$apply$4.apply(ExcelRelation.scala:66) at scala.Option.fold(Option.scala:158) at com.crealytics.spark.excel.ExcelRelation$$anonfun$openWorkbook$2. apply(ExcelRelation.scala:66) at com.crealytics.spark.excel.ExcelRelation$$anonfun$openWorkbook$2.apply(ExcelRelation.scala:66) at scala.Option.getOrElse(Option.scala:121) at com.crealytics.spark.excel.ExcelRelation.openWorkbook(ExcelRelation.scala:64) at com.crealytics.spark.excel.ExcelRelation.excerpt$lzycompute(ExcelRelation.scala:71) at com.crealytics.spark.excel.ExcelRelation.excerpt(ExcelRelation.scala:70) at com.crealytics.spark.excel.ExcelRelation$$anonfun$inferSchema$1.apply(ExcelRelation.scala:264) at com.crealytics.spark.excel.ExcelRelation$$anonfun$inferSchema$1.apply(ExcelRelation.scala:263) at scala.Option.getOrElse(Option.scala:121) at com.crealytics.spark.excel.ExcelRelation.inferSchema(ExcelRelation.scala:263) at com.crealytics.spark.excel.ExcelRelation.<init>(ExcelRelation.scala:91) at com.crealytics.spark.excel.DefaultSource.createRelation(DefaultSource.scala:39) at com.crealytics.spark.excel.DefaultSource.createRelation(DefaultSource.scala:14) at com.crealytics.spark.excel.DefaultSource.createRelation(DefaultSource.scala:8) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:156)
If we are building the spark core as part of some application we can solve this by using the latest version of commons compress in sbt and shading the version from a spark in sbt or maven as
shadedDeps ++= Seq( "org.apache.commons" ^ "commons-compress" ^ "1.18" ) shadeRenames ++= Seq( "org.apache.commons.compress.**" -> "shadeio.commons.compress.@1" )
If we are using spark-submit to launch spark jobs we can also use the –driver-class-path parameter to use the latest version of commons compress.
Solving in databricks notebook
In databricks, as the spark core is part of the databricks runtime, we can`t use the shading here. Here the latest commons compress jar needs to be added to the System Classpath and adding it into the application classpath with your uber jar will not be considered and the one from spark core will be used so this approach will not solve this problem.
The workaround for this in databricks is to use the init script which is basically a shell script that runs during the startup of each cluster node before the apache Spark driver or worker JVM starts. By using these scripts you can place the jars into the System classpath directory which will be picked up by Spark.
Below is the init script that can be used in the databricks to resolve the exception “InputStream of class class org.apache.commons.compress.archivers.zip.ZipArchiveInputStream is not implementing InputStreamStatistics” . This needs to be added into the cluster using the init scripts tab in the advanced options section in the databricks UI.
#!/bin/bash wget --quiet -O /mnt/custom/jars/commons-compress-1.20.jar https://repo1.maven.org/maven2/org/apache/commons/commons-compress/1.20/commons-compress-1.20.jar
If we run the code with the fix we discussed above it will create a report as below
That`s a quick overview of how we can use spark excel, apache poi with databricks on azure to create and format excel reports.
nice article was stuck with issue ZipArchiveInputStream is not implementing InputStreamStatistics issue from long time