In this article I will illustrate how to merge two dataframes with different schema. Spark supports below api for the same feature but this comes with a constraint that we can perform union operation on dataframes with the same number of columns.
public Dataset<T> unionAll(Dataset<T> other) Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is equivalent to UNION ALL in SQL. public Dataset<T> union(Dataset<T> other) Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is equivalent to UNION ALL in SQL.
Let’s look at an example . Below are the input json files we want to merge
{"name":"keerti","gender":"Female","age":"20","area":"brigade", "city":"bangalore","profession":"housekeeping"} {"name":"rahul","gender":"Male","age":"20","area":"brigade", "city":"bangalore","profession":"housekeeping"}
{"name":"keerti","gender":"Female","age":"20","profession":"housekeeping", "area":"brigade","city":"bangalore","pincode":"560102"} {"name":"rahul","gender":"Male","age":"20","profession":"housekeeping", "area":"brigade","city":"bangalore","pincode":"560102"}
As we can see the two json file has different schema . Lets do an union on these two dataframes and see the result .
import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class MergeDataFrameWithDifferentSchema { public static void main(String[] args) throws AnalysisException { ConfigFile aws_credential = new ConfigFile (Constants.OBJECT_STORE_CREDENTIALS_CONFIG, FileType.property); String access_key_amazon = CipherText.decrypt(aws_credential.getString("accessKey.amazon")); String secret_key_amazon = CipherText.decrypt(aws_credential.getString("secretKey.amazon")); SparkSession session = SparkSession.builder().master("local") .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .config("spark.hadoop.fs.s3a.access.key", access_key_amazon) .config("spark.hadoop.fs.s3a.secret.key", secret_key_amazon) .config("fs.s3a.connection.ssl.enabled", "false") .config("spark.network.timeout", "600s").config("spark.executor.heartbeatInterval", "500s") .getOrCreate(); Dataset<Row> dataset1 = session.read().json("s3a://${bucket-name}/${key-prefix}/one.json"); System.out.println(dataset1.schema()); Dataset<Row> dataset2 = session.read().json("s3a://${bucket-name}/${key-prefix}/two.json"); System.out.println(dataset2.schema()); dataset1.union(dataset2).show(); } }
The above code throws an org.apache.spark.sql.AnalysisException as below, as the dataframes we are trying to merge has different schema.
Exception in thread "main" org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 6 columns and the second table has 7 columns.
We can fix this by creating a dataframe with a list of paths, instead of creating different dataframe and then doing an union on it. Below is the code for the same
package com.wdc.ddp.utility; import java.util.ArrayList; import java.util.List; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import com.wdc.ddp.utility.CipherText; import com.wdc.ddp.utility.ConfigFile; import com.wdc.ddp.utility.FileType; import scala.collection.JavaConverters; public class MergeDataFrameWithDifferentSchema { public static void main(String[] args) throws AnalysisException { ConfigFile aws_credential = new ConfigFile(Constants.OBJECT_STORE_CREDENTIALS_CONFIG, FileType.property); String access_key_amazon = CipherText.decrypt(aws_credential. getString("accessKey.amazon")); String secret_key_amazon = CipherText.decrypt(aws_credential.getString("secretKey.amazon")); SparkSession session = SparkSession.builder().master("local") .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .config("spark.hadoop.fs.s3a.access.key", access_key_amazon) .config("spark.hadoop.fs.s3a.secret.key", secret_key_amazon) .config("fs.s3a.connection.ssl.enabled", "false") .config("spark.network.timeout", "600s"). config("spark.executor.heartbeatInterval", "500s") .getOrCreate(); List<String> convert = new ArrayList<String>(); convert.add("s3a://${bucket-name}/${key-prefix}/one.json"); convert.add("s3a://${bucket-name}/${key-prefix}/two.json"); scala.collection.Seq<String> paths = JavaConverters.asScalaIteratorConverter (convert.iterator()).asScala().toSeq(); Dataset<Row> dataset3 = session.read().json(paths); dataset3.show(); } }
Below is the output
+---+-------+---------+------+------+-------+------------+ |age| area| city|gender| name|pincode| profession| +---+-------+---------+------+------+-------+------------+ | 20|brigade|bangalore|Female|keerti| 560102|housekeeping| | 20|brigade|bangalore| Male| rahul| 560102|housekeeping| | 20|brigade|bangalore|Female|keerti| null|housekeeping| | 20|brigade|bangalore| Male| rahul| null|housekeeping| +---+-------+---------+------+------+-------+------------+
Can we do the same using Pyspark ?