Finding difference between two dataframes at column level in spark

Here we want to find the difference between two dataframes at a column level . We can use the dataframe1.except(dataframe2) but the comparison happens at a row level and not at specific column level. So here we will use the substractByKey function available on javapairrdd by converting the dataframe into rdd key value pair. We also save the schema of the dataframe so that we can apply the same when converting the javapairrdd back to the dataframe.

Below is the code

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

public class DataframeSubstractByColumn {

public static void main(String[] args) {

SparkSession session = SparkSession.builder().appName("test").master("local").getOrCreate();

session.sparkContext().setLogLevel("ERROR");

Dataset<Row> first = session.read().csv("C:\\codebase\\scala-project\\inputdata\\diff\\movie_set_one")
.toDF("id", "name", "genre");

Dataset<Row> second = session.read().csv("C:\\codebase\\scala-project\\inputdata\\diff\\movie_set_two")
.toDF("id", "name", "genre");

// This will show all the rows which are present in the first dataset
// but not present in the second dataset. But the comparison is at row
// level and not at column level.

first.except(second).show();

StructType one = first.schema();

JavaPairRDD<String, Row> pair1 = first.toJavaRDD().mapToPair(new PairFunction<Row, String, Row>() {

public Tuple2<String, Row> call(Row row) {
return new Tuple2<String, Row>(row.getString(1), row);
}

});

JavaPairRDD<String, Row> pair2 = second.toJavaRDD().mapToPair(new PairFunction<Row, String, Row>() {

public Tuple2<String, Row> call(Row row) {
return new Tuple2<String, Row>(row.getString(1), row);
}

});

JavaPairRDD<String, Row> subs = pair1.subtractByKey(pair2);

JavaRDD<Row> rdd = subs.values();

Dataset<Row> diff = session.createDataFrame(rdd, one);

diff.show();

}

}