In this article I will illustrate how to convert a nested json to csv in apache spark. Spark does not support conversion of nested json to csv as its unable to figure out how to convert complex structure of json into a simple CSV format.
When Spark tries to convert a JSON structure to a CSV it can map only upto the first level of the JSON.
Lets take an example and convert the below json to csv
{"name":"adarsh", "gender":"male", "age":"20", "address": {"area":"richmond", "city":"bangalore" } }
Below is the spark code
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class NestedJsonToCsv { public static void main(String[] args) { SparkSession session = SparkSession.builder().master("local").appName("convertNestedJsonToCsv").getOrCreate(); Dataset<Row> dataset = session.read().json("Input_Location"); dataset.write().format("csv").save("Output_Location"); } }
Running the above code will result in the below error
Exception in thread "main" java.lang.UnsupportedOperationException: CSV data source does not support struct<area:string,city:string> data type. at org.apache.spark.sql.execution.datasources.csv.CSVUtils$.org$apache$spark$sql$execution$datasources$csv$CSVUtils$$verifyType$1(CSVUtils.scala:127) at org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema$1.apply(CSVUtils.scala:131) at org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema$1.apply(CSVUtils.scala:131) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:98) at org.apache.spark.sql.execution.datasources.csv.CSVUtils$.verifySchema(CSVUtils.scala:131) at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.prepareWrite(CSVFileFormat.scala:65) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:142) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:474) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217) at com.blog.driver.NestedJsonToCsv.main(NestedJsonToCsv.java:20)
We can flatten the json schema by converting the StructType to flattened type. The dataset which we loaded above has the below schema
StructType(StructField(address,StructType(StructField(area,StringType,true), StructField(city,StringType,true)),true), StructField(age,StringType,true), StructField(gender,StringType,true), StructField(name,StringType,true))
If we can flatten the above schema as below we will be able to convert the nested json to csv
StructType(StructField(age,StringType,true), StructField(gender,StringType,true), StructField(name,StringType,true), StructField(address.area,StringType,true), StructField(address.city,StringType,true))
Below is the code which converts the nested schema to a flat schema . The below code handles one level of nesting but the same can be extended to handle multiple level of nesting.
import static org.apache.spark.sql.functions.col; import java.util.ArrayList; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class CsvOutPutFormatPreprocessor<Row> { public Column[] flattenNestedStructure(Dataset<Row> dataset) { StructType type = dataset.schema(); System.out.println(dataset.schema()); List<String> list = new ArrayList<String>(); List<String> listOfSimpleType = new ArrayList<String>(); for (StructField structField : type.fields()) { if (structField.dataType().toString().startsWith("StructType")) { String prefix = structField.name(); Matcher match = Pattern.compile("(\\w+\\(([A-Z_a-z_0-9]+),\\w+,\\w+\\))+") .matcher(structField.dataType().toString()); while (match.find()) { list.add(prefix + "." + match.group(2)); } } else { listOfSimpleType.add(structField.name()); } } int i = 0; Column[] column = new Column[list.size() + listOfSimpleType.size()]; for (String columnName : listOfSimpleType) { column[i] = col(columnName); i++; } for (String column_name : list) { column[i] = col(column_name).alias(column_name); i++; } return column; } }
Lets integrate the above code into out spark driver
import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; public class NestedJsonToCsv { public static void main(String[] args) { SparkSession session = SparkSession.builder().master("local").appName("convertNestedJsonToCsv").getOrCreate(); Dataset<Row> dataset = session.read().json("C:\\dataset\\json\\nested-str.json"); CsvOutPutFormatPreprocessor<Row> csvOutPutFormatPreprocessor = new CsvOutPutFormatPreprocessor<Row>(); Column[] flattened_column = csvOutPutFormatPreprocessor.flattenNestedStructure(dataset); dataset.select(flattened_column).write().mode(SaveMode.Overwrite).option("header", "true").format("csv").save("C:\\dataset\\output\\sample"); } }
Let’s run the above code which will convert the nested json into a csv format .
Below is the output
age,gender,name,address.area,address.city 20,nale,adarsh,richmond,bangalore