In this article I will illustrate how to merge two dataframes with different schema. Spark supports below api for the same feature but this comes with a constraint that we can perform union operation on dataframes with the same number of columns.
public Dataset<T> unionAll(Dataset<T> other) Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is equivalent to UNION ALL in SQL. public Dataset<T> union(Dataset<T> other) Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is equivalent to UNION ALL in SQL.
Let’s look at an example . Below are the input json files we want to merge
{"name":"keerti","gender":"Female","age":"20","area":"brigade",
"city":"bangalore","profession":"housekeeping"}
{"name":"rahul","gender":"Male","age":"20","area":"brigade",
"city":"bangalore","profession":"housekeeping"}
{"name":"keerti","gender":"Female","age":"20","profession":"housekeeping",
"area":"brigade","city":"bangalore","pincode":"560102"}
{"name":"rahul","gender":"Male","age":"20","profession":"housekeeping",
"area":"brigade","city":"bangalore","pincode":"560102"}
As we can see the two json file has different schema . Lets do an union on these two dataframes and see the result .
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class MergeDataFrameWithDifferentSchema {
public static void main(String[] args) throws AnalysisException {
ConfigFile aws_credential = new ConfigFile
(Constants.OBJECT_STORE_CREDENTIALS_CONFIG, FileType.property);
String access_key_amazon = CipherText.decrypt(aws_credential.getString("accessKey.amazon"));
String secret_key_amazon = CipherText.decrypt(aws_credential.getString("secretKey.amazon"));
SparkSession session = SparkSession.builder().master("local")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.access.key", access_key_amazon)
.config("spark.hadoop.fs.s3a.secret.key", secret_key_amazon)
.config("fs.s3a.connection.ssl.enabled", "false")
.config("spark.network.timeout", "600s").config("spark.executor.heartbeatInterval", "500s")
.getOrCreate();
Dataset<Row> dataset1 = session.read().json("s3a://${bucket-name}/${key-prefix}/one.json");
System.out.println(dataset1.schema());
Dataset<Row> dataset2 = session.read().json("s3a://${bucket-name}/${key-prefix}/two.json");
System.out.println(dataset2.schema());
dataset1.union(dataset2).show();
}
}
The above code throws an org.apache.spark.sql.AnalysisException as below, as the dataframes we are trying to merge has different schema.
Exception in thread "main" org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 6 columns and the second table has 7 columns.
We can fix this by creating a dataframe with a list of paths, instead of creating different dataframe and then doing an union on it. Below is the code for the same
package com.wdc.ddp.utility;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import com.wdc.ddp.utility.CipherText;
import com.wdc.ddp.utility.ConfigFile;
import com.wdc.ddp.utility.FileType;
import scala.collection.JavaConverters;
public class MergeDataFrameWithDifferentSchema {
public static void main(String[] args)
throws AnalysisException {
ConfigFile aws_credential =
new ConfigFile(Constants.OBJECT_STORE_CREDENTIALS_CONFIG,
FileType.property);
String access_key_amazon =
CipherText.decrypt(aws_credential.
getString("accessKey.amazon"));
String secret_key_amazon =
CipherText.decrypt(aws_credential.getString("secretKey.amazon"));
SparkSession session = SparkSession.builder().master("local")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.access.key", access_key_amazon)
.config("spark.hadoop.fs.s3a.secret.key", secret_key_amazon)
.config("fs.s3a.connection.ssl.enabled", "false")
.config("spark.network.timeout", "600s").
config("spark.executor.heartbeatInterval", "500s")
.getOrCreate();
List<String> convert = new ArrayList<String>();
convert.add("s3a://${bucket-name}/${key-prefix}/one.json");
convert.add("s3a://${bucket-name}/${key-prefix}/two.json");
scala.collection.Seq<String> paths =
JavaConverters.asScalaIteratorConverter
(convert.iterator()).asScala().toSeq();
Dataset<Row> dataset3 = session.read().json(paths);
dataset3.show();
}
}
Below is the output
+---+-------+---------+------+------+-------+------------+ |age| area| city|gender| name|pincode| profession| +---+-------+---------+------+------+-------+------------+ | 20|brigade|bangalore|Female|keerti| 560102|housekeeping| | 20|brigade|bangalore| Male| rahul| 560102|housekeeping| | 20|brigade|bangalore|Female|keerti| null|housekeeping| | 20|brigade|bangalore| Male| rahul| null|housekeeping| +---+-------+---------+------+------+-------+------------+
Can we do the same using Pyspark ?