spark merge two dataframes with different columns or schema

In this article I will illustrate how to merge two dataframes with different schema. Spark supports below api for the same feature but this comes with a constraint that we can perform union operation on dataframes with the same number of columns.


public Dataset<T> unionAll(Dataset<T> other)

Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is equivalent to UNION ALL in SQL.

public Dataset<T> union(Dataset<T> other)

Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is equivalent to UNION ALL in SQL.

Let’s look at an example . Below are the input json files we want to merge


{"name":"keerti","gender":"Female","age":"20","area":"brigade","city":"bangalore","profession":"housekeeping"}
{"name":"rahul","gender":"Male","age":"20","area":"brigade","city":"bangalore","profession":"housekeeping"}


{"name":"keerti","gender":"Female","age":"20","profession":"housekeeping","area":"brigade","city":"bangalore","pincode":"560102"}
{"name":"rahul","gender":"Male","age":"20","profession":"housekeeping","area":"brigade","city":"bangalore","pincode":"560102"}

As we can see the two json file has different schema . Lets do an union on these two dataframes and see the result .


import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class MergeDataFrameWithDifferentSchema {

public static void main(String[] args) throws AnalysisException {

ConfigFile aws_credential = new ConfigFile(Constants.OBJECT_STORE_CREDENTIALS_CONFIG, FileType.property);
String access_key_amazon = CipherText.decrypt(aws_credential.getString("accessKey.amazon"));
String secret_key_amazon = CipherText.decrypt(aws_credential.getString("secretKey.amazon"));

SparkSession session = SparkSession.builder().master("local")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.access.key", access_key_amazon)
.config("spark.hadoop.fs.s3a.secret.key", secret_key_amazon)
.config("fs.s3a.connection.ssl.enabled", "false").config("spark.network.timeout", "600s").config("spark.executor.heartbeatInterval", "500s")
.getOrCreate();

Dataset<Row> dataset1 = session.read().json("s3a://${bucket-name}/${key-prefix}/one.json");

System.out.println(dataset1.schema());

Dataset<Row> dataset2 = session.read().json("s3a://${bucket-name}/${key-prefix}/two.json");

System.out.println(dataset2.schema());

dataset1.union(dataset2).show();

}
}

The above code throws an org.apache.spark.sql.AnalysisException as below, as the dataframes we are trying to merge has different schema.


Exception in thread "main" org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 6 columns and the second table has 7 columns.

We can fix this by creating a dataframe with a list of paths, instead of creating different dataframe and then doing an union on it. Below is the code for the same


package com.wdc.ddp.utility;

import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import com.wdc.ddp.utility.CipherText;
import com.wdc.ddp.utility.ConfigFile;
import com.wdc.ddp.utility.FileType;

import scala.collection.JavaConverters;

public class MergeDataFrameWithDifferentSchema {

public static void main(String[] args) throws AnalysisException {

ConfigFile aws_credential = new ConfigFile(Constants.OBJECT_STORE_CREDENTIALS_CONFIG, FileType.property);
String access_key_amazon = CipherText.decrypt(aws_credential.getString("accessKey.amazon"));
String secret_key_amazon = CipherText.decrypt(aws_credential.getString("secretKey.amazon"));

SparkSession session = SparkSession.builder().master("local")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.access.key", access_key_amazon)
.config("spark.hadoop.fs.s3a.secret.key", secret_key_amazon)
.config("fs.s3a.connection.ssl.enabled", "false").config("spark.network.timeout", "600s").config("spark.executor.heartbeatInterval", "500s")
.getOrCreate();

List<String> convert = new ArrayList<String>();

convert.add("s3a://${bucket-name}/${key-prefix}/one.json");
convert.add("s3a://${bucket-name}/${key-prefix}/two.json");

scala.collection.Seq<String> paths = JavaConverters.asScalaIteratorConverter(convert.iterator()).asScala().toSeq();

Dataset<Row> dataset3 = session.read().json(paths);

dataset3.show();

}

}

Below is the output


+---+-------+---------+------+------+-------+------------+
|age| area| city|gender| name|pincode| profession|
+---+-------+---------+------+------+-------+------------+
| 20|brigade|bangalore|Female|keerti| 560102|housekeeping|
| 20|brigade|bangalore| Male| rahul| 560102|housekeeping|
| 20|brigade|bangalore|Female|keerti| null|housekeeping|
| 20|brigade|bangalore| Male| rahul| null|housekeeping|
+---+-------+---------+------+------+-------+------------+

Leave a Reply

Your email address will not be published. Required fields are marked *