spark converting rdd into datasets and dataframe – tutorial 16

There are two ways to convert the rdd into datasets and dataframe.

1. Inferring the Schema Using Reflection

Here spark uses the reflection to infer the schema of an RDD that contains specific types of objects. This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.

In Java

Spark SQL supports automatically converting an RDD of JavaBeans into a DataFrame. The BeanInfo, obtained using reflection, defines the schema of the table. Currently, Spark SQL does not support JavaBeans that contain Map field(s). Nested JavaBeans and List or Array fields are supported though. You can create a JavaBean by creating a class that implements Serializable and has getters and setters for all of its fields.


import static org.apache.spark.sql.functions.col;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class RddToDS {

public static void main(String[] args) {

SparkSession session = SparkSession.builder().appName("Test").config("key", "value").master("local")
.getOrCreate();

Encoder<Movie> movieEncoder = Encoders.bean(Movie.class);

JavaRDD<Movie> rdd = session.read().textFile("C:\\codebase\\scala-project\\inputdata\\movies_data_2").javaRDD()
.map(new Function<String, Movie>() {

@Override
public Movie call(String arg0) throws Exception {
String[] data = arg0.split(",");
return new Movie(data[0], Double.parseDouble(data[1]), data[2]);
}
});

Dataset<Row> data = session.createDataFrame(rdd, Movie.class);

data.select(col("name"), col("rating"));

data.createOrReplaceTempView("TestView");

Dataset<Row> dataframe = session.sql("select * from TestView where rating >4");

Dataset<Movie> movieDs = dataframe.map(new MapFunction<Row, Movie>() {

@Override
public Movie call(Row row) throws Exception {
// TODO Auto-generated method stub
return new Movie(row.getString(0), row.getDouble(1), row.getAs("timestamp"));
}
}, movieEncoder);

movieDs.show();

}

 

}

 

In Scala

The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Seqs or Arrays. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.


object RDDToDf extends App {

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val session = SparkSession.builder().appName("Test").master("local").getOrCreate()

import session.implicits._

val dataframe = session.sparkContext.textFile("C:\\codebase\\scala-project\\inputdata\\movies_data_2").map { x =>

val data = x.split(",")

Movie(data(0), data(1).toDouble, data(2))

}.toDF()

dataframe.createOrReplaceTempView("temp_table")

val highest_rated = session.sql("select * from temp_table where rating>4")

highest_rated.select(col("name")).show()

}

2. Programmatically Specifying the Schema

When JavaBean classes or case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a Dataset<Row> or DataFrame can be created programmatically with three steps.

1. Create an RDD of Rows from the original RDD
2. Create the schema represented by a StructType matching the structure of Rows in the RDD.
3. Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.

In Java

Here we are first loading the javardd of string and then we define the schema string where the first field is of string and the second field is of double. We then create an list of StructField and add data into the list with using DataTypes.CreateStructField method. And then we create the StringType object passing the list we created above to the createstructType method. Once we have the structType object ready we then convert the javardd of string into the javardd of row using the RowFactory.create method inside the map method.Finally we pass the structType object we created earlier and javardd of row into the session.createDataFrame method to get the dataframe or the dataset<row>.

 


import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.avatica.ColumnMetaData.StructType;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;

public class RddToDSSchema {

public static void main(String[] args) {

SparkSession session = SparkSession.builder().appName("Test").config("key", "value").master("local")
.getOrCreate();

JavaRDD<String> rdd = session.read().textFile("C:\\codebase\\scala-project\\inputdata\\movies_data_2")
.toJavaRDD();

Encoder<Movie> movie_encoder = Encoders.bean(Movie.class);

String schema = "name:string,rating:double,timestamp:String";

List<StructField> fields = new ArrayList<StructField>();

for (String type : schema.split(",")) {

String[] value = type.split(":");

StructField field = null;

if (value[1].equalsIgnoreCase("string")) {
field = DataTypes.createStructField(type.split(":")[0], DataTypes.StringType, true);
} else if (value[1].equalsIgnoreCase("double")) {
field = DataTypes.createStructField(type.split(":")[0], DataTypes.DoubleType, true);
}

fields.add(field);

}

org.apache.spark.sql.types.StructType type = DataTypes.createStructType(fields);

JavaRDD<Row> rowRdd = rdd.map(new Function<String, Row>() {

@Override
public Row call(String arg0) throws Exception {
String[] data = arg0.split(",");
return RowFactory.create(data[0], Double.parseDouble(data[1]), data[2]);
}
});

Dataset<Row> dataframe = session.createDataFrame(rowRdd, type);

dataframe.createOrReplaceTempView("test_table");

Dataset<Row> sqlRow = session.sql("select * from test_table");

/*We can also convert dataframe into a dataset if required as below*/

Dataset<Movie> dataset = sqlRow.map(new MapFunction<Row, Movie>() {

@Override
public Movie call(Row arg0) throws Exception {
// TODO Auto-generated method stub
return new Movie(arg0.getString(0), arg0.getDouble(1), arg0.getString(2));
}
}, movie_encoder);

dataset.show();

}

}

In Scala

Similar to java in scala also we are creating the rdd of string from the input data as a first step . Then we define a schema string which we are using to create the Array[StructField] using the StructField constructed in the scala map method. The map method in scala returns a Array of Objects and in this case as we are creating StructField objects we get an array of StructField. We then pass this array into the StringType constructor to get the StructType object. Then we convert the rdd of string into the rdd of row using the map method of spark . And finally we pass the rdd of row and the StringType objects into the session.createDataFrame method to get the dataframe.

 


import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row

object DatasetsSchemaExample extends App {

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val session = SparkSession.builder().appName("Test").master("local").getOrCreate()

import session.implicits._

var rdd = session.sparkContext.textFile("C:\\codebase\\scala-project\\inputdata\\movies_data_2")

val schema = "name:string,rating:double,timestamp:String";

val fields = schema.split(",").map { x =>

if (x.split(":")(1) == "double") {
new StructField(x.split(":")(0), DataTypes.DoubleType, nullable = true)
}

new StructField(x.split(":")(0), DataTypes.StringType, nullable = true)

}

val schemaType = StructType(fields);

val rowRdd = rdd.map { x =>

val data = x.split(",")

Row(data(0), data(1), data(2))

}

val df = session.createDataFrame(rowRdd, schemaType)

df.createOrReplaceTempView("test")

session.sql("select * from test").show()

}