datasets and dataframes in spark with examples – tutorial 15

DataFrame is an immutable distributed collection of data.Unlike an RDD, data is organized into named columns, like a table in a relational database. Designed to make large data sets processing even easier, DataFrame allows developers to impose a structure onto a distributed collection of data, allowing higher-level abstraction; it provides a domain specific language API to manipulate your distributed data and makes Spark accessible to a wider audience, beyond specialized data engineers.

Dataset takes on two distinct APIs characteristics a strongly-typed API and an untyped API. Conceptually, consider DataFrame as an alias for a collection of generic objects Dataset[Row], where a Row is a generic untyped JVM object. Dataset, by contrast, is a collection of strongly-typed JVM objects, dictated by a case class you define in Scala or a class in Java.

As a Spark developer, you benefit with the DataFrame and Dataset unified APIs in Spark 2.0 in a number of ways.Consider static-typing and runtime safety as a spectrum, with SQL least restrictive to Dataset most restrictive. For instance, in your Spark SQL string queries, you won’t know a syntax error until runtime (which could be costly), whereas in DataFrames and Datasets you can catch errors at compile time (which saves developer-time and costs). That is, if you invoke a function in DataFrame that is not part of the API, the compiler will catch it. However, it won’t detect a non-existing column name until runtime.

At the far end of the spectrum is Dataset, most restrictive. Since Dataset APIs are all expressed as lambda functions and JVM typed objects, any mismatch of typed-parameters will be detected at compile time. Also, your analysis error can be detected at compile time too, when using Datasets, hence saving developer-time and costs.

DataFrames as a collection of Datasets[Row] render a structured custom view into your semi-structured data.Although structure may limit control in what your Spark program can do with data, it introduces rich semantics and an easy set of domain specific operations that can be expressed as high-level constructs. Most computations, however, can be accomplished with Dataset’s high-level APIs. For example, it’s much simpler to perform agg, select, sum, avg, map, filter, or groupBy operations by accessing a Dataset typed object than using RDD rows data fields.

In Spark 2.0, DataFrames are just Dataset of Rows in Scala and Java API. These operations are also referred as untyped transformations in contrast to typed transformations come with strongly typed Scala/Java Datasets.

DataFrame and Dataset APIs are built on top of the Spark SQL engine, it uses Catalyst to generate an optimized logical and physical query plan.Spark as a compiler understands your Dataset type JVM object, it maps your type-specific JVM object to Tungsten’s internal memory representation using Encoders. As a result, Tungsten Encoders can efficiently serialize/deserialize JVM objects as well as generate compact bytecode that can execute at superior speeds.

Like RDDs, DataFrames and Datasets represent distributed collections, with additional schema information not found in RDDs. This additional schema information is used to provide a more efficient storage layer (Tungsten), and in the optimizer (Catalyst) to perform additional optimizations. Beyond schema information, the operations performed on Datasets and DataFrames are such that the optimizer can inspect the logical meaning rather than arbitrary functions. DataFrames are Datasets of a special Row object, which doesn’t provide any compile-time type checking. The strongly typed Dataset API shines especially for use with more RDD-like functional operations. Compared to working with RDDs, DataFrames allow Spark’s optimizer to better understand our code and our data, which allows for a new class of optimizations.

Below is the sample data


Spider Man,4,978301398
Spider Man,4,978302091
Bat Man,5,978298709
Bat Man,4,978299000
Bat Man,4,978299620

Lets see some examples of dataframes

The entry point into all functionality in Spark is the SparkSession class

In Java


SparkSession session = SparkSession.builder().appName("Test").config("key", "value").master("local")
.getOrCreate();

In Scala


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val session = SparkSession.builder().appName("Test").master("local").getOrCreate()

With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table or from Spark data sources. Lets take an example for creating dataframe from a csv file.

In Java


SparkSession session = SparkSession.builder().appName("Test").config("key", "value").master("local")
.getOrCreate();

Dataset<Row> dataframe = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2");

System.out.println(dataframe.schema());

dataframe.show();

In Scala


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val session = SparkSession.builder().appName("Test").master("local").getOrCreate()

var dataframe = session.read.csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2")

dataframe.show

DataFrames are just Dataset of Rows in Scala and Java API. These operations are also referred as untyped transformations in contrast to typed transformations come with strongly typed Scala/Java Datasets.

Lets see an example for changing the column name and type and then grouping by movies name and then calculation the average rating of the movie. We also have used the sql function in SparkSession which enables applications to run SQL queries programmatically and returns the result as a Dataset<Row> for which we are registering the dataframe as a temporary view.

In Java


public class HiveContextExample {

public static void main(String[] args) {

SparkSession session = SparkSession.builder().appName("Test").config("key", "value").master("local")
.getOrCreate();

Dataset<Row> dataframe = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2");

System.out.println(dataframe.schema());

dataframe = dataframe.withColumnRenamed("_c0", "movie_name");

dataframe = dataframe.withColumnRenamed("_c1", "rating");

dataframe = dataframe.withColumnRenamed("_c2", "timestamp");

dataframe = dataframe.withColumn("rating", col("rating").cast("double"));

dataframe.select("movie_name").show();

dataframe.select(col("movie_name"), col("rating"), col("rating").plus(1)).show();

dataframe.groupBy("movie_name").avg("rating").show();

dataframe.createOrReplaceTempView("movie_table");

session.sql("select * from movie_table").show();

}

}

In Scala


object Frame extends App {

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val session = SparkSession.builder().appName("Test").master("local").getOrCreate()

var dataframe = session.read.csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2")

dataframe = dataframe.withColumnRenamed("_c0", "movie_name")

dataframe = dataframe.withColumnRenamed("_c1", "rating")

dataframe = dataframe.withColumnRenamed("_c2", "timestamp")

dataframe.select("movie_name").show()

dataframe.select(col("movie_name"), col("rating")).show()

dataframe = dataframe.withColumn("rating", col("rating").cast("double"))

dataframe.select(col("movie_name"), col("rating"), col("rating").plus(1)).show()

dataframe.select(col("rating") > 4).show()

import session.implicits._

dataframe.filter($"rating" > 4).show()

dataframe.groupBy("movie_name").avg("rating").show()

dataframe.createOrReplaceTempView("movie_table")

session.sql("select * from movie_table").show()

}

Datasets

Datasets are similar to RDDs, however, instead of using Java serialization or Kryo they use a specialized Encoder to serialize the objects for processing or transmitting over the network. While both encoders and standard serialization are responsible for turning an object into bytes, encoders are code generated dynamically and use a format that allows Spark to perform many operations like filtering, sorting and hashing without deserializing the bytes back into an object.

Lets take few examples

In Java

In java we need to use encode the bean if we need to use it as schema for the dataset. Below we want to use the movie object as an encoder so we are calling Encoders.bean(Movie.class) to get the required encoder which we need to pass to the createdataset method. We also converting the dataframe into dataset using the as method passing the movie_encoder. Encoders for most common types are provided in class Encoders like Encoders.INT().


import java.util.Arrays;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class DataSet {

public static void main(String[] args) {

Movie movie1 = new Movie("Autograph", 4.5, "23232");
Movie movie2 = new Movie("Mugulu Nage", 4.8, "43232");

Encoder<Movie> movie_encoder = Encoders.bean(Movie.class);

SparkSession session = SparkSession.builder().appName("Test").config("key", "value").master("local")
.getOrCreate();

Dataset<Movie> movieDataSet = session.createDataset(Arrays.asList(movie1, movie2), movie_encoder);

movieDataSet.show();

Dataset<Row> dataframe = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2");

dataframe = dataframe.withColumnRenamed("_c0", "name");

dataframe = dataframe.withColumnRenamed("_c1", "rating");

dataframe = dataframe.withColumnRenamed("_c2", "timestamp");

Dataset<Movie> movie_set = dataframe.as(movie_encoder);

movie_set.show();

Dataset<Integer> intdata=session.createDataset(Arrays.asList(1,2,3), Encoders.INT());

intdata.show();

}

}

In Scala

The Encoders are created for case classes and we need to import the implicits as in the code as session.implicits._ and encoders for most common types are automatically provided by importing session.implicits._ . Here we need to use the method toDs to get the dataset from list. We can also convert the dataframe into dataset using the as[Movie] method on the dataframe.


import org.apache.spark.sql.Encoders

object Datasets extends App {

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val session = SparkSession.builder().appName("Test").master("local").getOrCreate()

import session.implicits._

val dataset_movie=Vector(new Movie("test", 4.5, "11212")).toDS();

dataset_movie.show();

var dataframe = session.read.csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2")

dataframe = dataframe.withColumnRenamed("_c0", "name")

dataframe = dataframe.withColumnRenamed("_c1", "rating")

dataframe = dataframe.withColumnRenamed("_c2", "timestamp")

dataframe= dataframe.withColumn("rating", col("rating").cast("double"))

var dataset=dataframe.as[Movie]

dataset.show()

var doubledataframe=dataframe.select(col("rating")).as[Double]

doubledataframe.show()


}