spark dataset api with examples – tutorial 20

A Dataset is a strongly typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations. Each Dataset also has an untyped view called a DataFrame, which is a Dataset of Row.

Operations available on Datasets are divided into transformations and actions. Transformations are the ones that produce new Datasets, and actions are the ones that trigger computation and return results. Example transformations include map, filter, select, and aggregate (groupBy). Example actions count, show, or writing data out to file systems.

To efficiently support domain-specific objects, an Encoder is required. The encoder maps the domain specific type T to Spark’s internal type system. For example, given a class Person with two fields, name (string) and age (int), an encoder is used to tell Spark to generate code at runtime to serialize the Person object into a binary structure. This binary structure often has much lower memory footprint as well as are optimized for efficiency in data processing (e.g. in a columnar format). To understand the internal binary representation for data, use the schema function.

There are typically two ways to create a Dataset. The most common way is by pointing Spark to some files on storage systems, using the read function available on a SparkSession.


val people = spark.read.parquet("...").as[Person] // Scala
Dataset<Person> people = spark.read().parquet("...").as(Encoders.bean(Person.class)); // Java


val names = people.map(_.name) // in Scala; names is a Dataset[String]
Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING)); // in Java 8

Dataset operations can also be untyped, through various domain-specific-language (DSL) functions defined in: Dataset (this class), Column, and functions.

To select a column from the Dataset, use apply method in Scala and col in Java.

val ageCol = people(“age”) // in Scala
Column ageCol = people.col(“age”); // in Java

Note that the Column type can also be manipulated through its various functions.


// The following creates a new column that increases everybody's age by 10.
people("age") + 10 // in Scala
people.col("age").plus(10); // in Java

sortWithinPartitions

Returns a new Dataset with each partition sorted by the given expressions.


SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json")
.master("local").getOrCreate();

Encoder<Movie> encoder_movie = Encoders.bean(Movie.class);

Dataset<Row> dataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2").toDF("name",
"rating", "timestamp");

Dataset<Row> dataset2 = dataset.select(col("name"), col("rating").cast("double"), col("timestamp"));

Dataset<Movie> dataset3 = dataset2.as(encoder_movie);

dataset3.sortWithinPartitions("rating").show();

sort

Returns a new Dataset sorted by the specified column, all in ascending order.


SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json")
.master("local").getOrCreate();

Encoder<Movie> encoder_movie = Encoders.bean(Movie.class);

Dataset<Row> dataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2").toDF("name",
"rating", "timestamp");

Dataset<Row> dataset2 = dataset.select(col("name"), col("rating").cast("double"), col("timestamp"));

Dataset<Movie> dataset3 = dataset2.as(encoder_movie);

dataset3.sort("name").show();

dataset3.sort(col("rating").desc()).show();

dataset3.sort(col("rating").asc()).show();

orderBy

Returns a new Dataset sorted by the given expressions. This is an alias of the sort function.


dataset3.orderBy(col("rating").desc());

select

Selects a set of column based expressions.


SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json")
.master("local").getOrCreate();

Encoder<Movie> encoder_movie = Encoders.bean(Movie.class);

Dataset<Row> dataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2").toDF("name",
"rating", "timestamp");

Dataset<Row> dataset2 = dataset.select(col("name"), col("rating").cast("double"), col("timestamp"));

selectExpr

Selects a set of SQL expressions. This is a variant of select that accepts SQL expressions.


SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json")
.master("local").getOrCreate();

Dataset<Row> dataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2").toDF("name",
"rating", "timestamp");

dataset.selectExpr("name","rating as movie_rating").show();

groupBy

Groups the Dataset using the specified columns, so we can run aggregation on them. See RelationalGroupedDataset for all the available aggregate functions.


SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json")
.master("local").getOrCreate();

Encoder<Movie> encoder_movie = Encoders.bean(Movie.class);

Dataset<Row> dataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2").toDF("name",
"rating", "timestamp");

Dataset<Row> dataset2 = dataset.select(col("name"), col("rating").cast("double"), col("timestamp"));

dataset.selectExpr("name","rating as movie_rating").show();

Dataset<Movie> dataset3 = dataset2.as(encoder_movie);

dataset3.groupBy(col("name")).avg("rating").show();

rollup

Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them. See RelationalGroupedDataset for all the available aggregate functions.

Below is the input data


Spider Man,2,9783
Spider Man,3,9784



SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json")
.master("local").getOrCreate();

Encoder<Movie> encoder_movie = Encoders.bean(Movie.class);

Dataset<Row> smallDataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_input").toDF("name",
"rating", "timestamp");

Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp"));

Dataset<Movie> dataset3 = dataset2.as(encoder_movie);

dataset3.rollup(col("name"),col("timestamp")).avg("rating").show(1000);


Output


+----------+---------+-----------+
| name|timestamp|avg(rating)|
+----------+---------+-----------+
|Spider Man| null| 2.5|
|Spider Man| 9783| 2.0|
| null| null| 2.5|
|Spider Man| 9784| 3.0|
+----------+---------+-----------+

Cube

Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.


dataset3.cube(col("name"),col("timestamp")).avg("rating").show(1000);

Output


+----------+---------+-----------+
| name|timestamp|avg(rating)|
+----------+---------+-----------+
|Spider Man| null| 2.5|
|Spider Man| 9783| 2.0|
| null| null| 2.5|
| null| 9784| 3.0|
| null| 9783| 2.0|
|Spider Man| 9784| 3.0|
+----------+---------+-----------+

agg

Aggregates on the entire Dataset without groups.


dataset3.agg(max(col("rating")), avg(col("rating"))).show();

output


+-----------+-----------+
|max(rating)|avg(rating)|
+-----------+-----------+
| 3.0| 2.5|
+-----------+-----------+

drop

Returns a new Dataset with columns dropped. This is a no-op if schema doesn’t contain column name(s).


dataset2.drop(col("timestamp")).show();

dropDuplicates

Returns a new Dataset with duplicate rows removed, considering only the subset of columns.

input data


Spider Man,2,9783
Spider Man,3,9784
Spider Man,3,9784


SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json")
.master("local").getOrCreate();

Encoder<Movie> encoder_movie = Encoders.bean(Movie.class);

Dataset<Row> smallDataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_input").toDF("name",
"rating", "timestamp");

Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp"));

dataset2.show();

dataset2.dropDuplicates().show();

dataset2.dropDuplicates("name").show();

output


+----------+------+---------+
| name|rating|timestamp|
+----------+------+---------+
|Spider Man| 2.0| 9783|
|Spider Man| 3.0| 9784|
|Spider Man| 3.0| 9784|
+----------+------+---------+

+----------+------+---------+
| name|rating|timestamp|
+----------+------+---------+
|Spider Man| 3.0| 9784|
|Spider Man| 2.0| 9783|
+----------+------+---------+

+----------+------+---------+
| name|rating|timestamp|
+----------+------+---------+
|Spider Man| 2.0| 9783|
+----------+------+---------+

describe

Computes statistics for numeric and string columns, including count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical or string columns.


Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp"));

dataset2.describe("rating").show();

Output


+-------+------------------+
|summary| rating|
+-------+------------------+
| count| 3|
| mean|2.6666666666666665|
| stddev|0.5773502691896258|
| min| 2.0|
| max| 3.0|
+-------+------------------+

repartition

Returns a new Dataset partitioned by the given partitioning expressions into numPartitions. The resulting Dataset is hash partitioned.


Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp"));

dataset2.repartition(2, col("name"));

schema

Returns the schema of this Dataset.

printSchema

Prints the schema to the console in a nice tree format.

explain

Prints the plans (logical and physical) to the console for debugging purposes.

dtypes

Returns all column names and their data types as an array.


Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp"));

Tuple2<String, String>[] tup=dataset2.dtypes();

for (Tuple2<String, String> tuple2 : tup) {

System.out.println(tuple2._1);
System.out.println(tuple2._2);

}

output


name
StringType
rating
DoubleType
timestamp
StringType

columns

Returns all column names as an array.

isLocal

Returns true if the collect and take methods can be run locally (without any Spark executors).

isStreaming

Returns true if this Dataset contains one or more sources that continuously return data as it arrives. A Dataset that reads data from a streaming source must be executed as a StreamingQuery using the start() method in DataStreamWriter.

checkpoint

Eagerly checkpoint a Dataset and return the new Dataset. Checkpointing can be used to truncate the logical plan of this Dataset, which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint directory set with SparkContext#setCheckpointDir.

withWatermark

Defines an event time watermark for this Dataset. A watermark tracks a point in time before which we assume no more late data is going to arrive.

na

Dropping rows containing any null values.

input data


Spider Man,2,9783
Spider Man,3,9784
Spider Man,3,9784
Spider Man,,
,,


Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp"));

dataset2.na().drop().show();

output


+----------+------+---------+
| name|rating|timestamp|
+----------+------+---------+
|Spider Man| 2.0| 9783|
|Spider Man| 3.0| 9784|
|Spider Man| 3.0| 9784|
+----------+------+---------+

join

joins two dataset/dataframe and supports inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, left_anti joins.

below is the left dataset


Spider Man,2,9783
Spider Man,3,9784
Spider Man,3,9784
Captain America,3,9784
Don,5,3434

Right dataset


1,Spider Man,Animation
2,Captain America,Adventure
3,Bat Man,Horror

Code


SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json")
.master("local").getOrCreate();

Encoder<Movie> encoder_movie = Encoders.bean(Movie.class);

Dataset<Row> ratingDatastet = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_input").toDF("name1",
"rating", "timestamp");

Dataset<Row> movieDataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_info").toDF("id",
"name2", "genre");


//join returns a dataset of row type i,e Dataset<Row>

ratingDatastet.join(movieDataset,col("name1").equalTo(col("name2")),"inner").show();

ratingDatastet.join(movieDataset,col("name1").equalTo(col("name2")),"left_outer").show();

ratingDatastet.join(movieDataset,col("name1").equalTo(col("name2")),"right_outer").show();

ratingDatastet.join(movieDataset,col("name1").equalTo(col("name2")),"full_outer").show();

//Equivalent to left_outer

ratingDatastet.join(movieDataset,col("name1").equalTo(col("name2")),"left").show();

//Equivalent to right_outer

ratingDatastet.join(movieDataset,col("name1").equalTo(col("name2")),"right").show();

//Returns matching data but only columns/data from the left dataset will be returned

ratingDatastet.join(movieDataset,col("name1").equalTo(col("name2")),"left_semi").show();

//Returns non matching data but only columns/data from the left dataset will be returned

ratingDatastet.join(movieDataset,col("name1").equalTo(col("name2")),"left_anti").show();

output


+---------------+------+---------+---+---------------+---------+
| name1|rating|timestamp| id| name2| genre|
+---------------+------+---------+---+---------------+---------+
| Spider Man| 2| 9783| 1| Spider Man|Animation|
| Spider Man| 3| 9784| 1| Spider Man|Animation|
| Spider Man| 3| 9784| 1| Spider Man|Animation|
|Captain America| 3| 9784| 2|Captain America|Adventure|
+---------------+------+---------+---+---------------+---------+

+---------------+------+---------+----+---------------+---------+
| name1|rating|timestamp| id| name2| genre|
+---------------+------+---------+----+---------------+---------+
| Spider Man| 2| 9783| 1| Spider Man|Animation|
| Spider Man| 3| 9784| 1| Spider Man|Animation|
| Spider Man| 3| 9784| 1| Spider Man|Animation|
|Captain America| 3| 9784| 2|Captain America|Adventure|
| Don| 5| 3434|null| null| null|
+---------------+------+---------+----+---------------+---------+

+---------------+------+---------+---+---------------+---------+
| name1|rating|timestamp| id| name2| genre|
+---------------+------+---------+---+---------------+---------+
| Spider Man| 3| 9784| 1| Spider Man|Animation|
| Spider Man| 3| 9784| 1| Spider Man|Animation|
| Spider Man| 2| 9783| 1| Spider Man|Animation|
|Captain America| 3| 9784| 2|Captain America|Adventure|
| null| null| null| 3| Bat Man| Horror|
+---------------+------+---------+---+---------------+---------+

+---------------+------+---------+----+---------------+---------+
| name1|rating|timestamp| id| name2| genre|
+---------------+------+---------+----+---------------+---------+
| Don| 5| 3434|null| null| null|
| null| null| null| 3| Bat Man| Horror|
|Captain America| 3| 9784| 2|Captain America|Adventure|
| Spider Man| 2| 9783| 1| Spider Man|Animation|
| Spider Man| 3| 9784| 1| Spider Man|Animation|
| Spider Man| 3| 9784| 1| Spider Man|Animation|
+---------------+------+---------+----+---------------+---------+

+---------------+------+---------+----+---------------+---------+
| name1|rating|timestamp| id| name2| genre|
+---------------+------+---------+----+---------------+---------+
| Spider Man| 2| 9783| 1| Spider Man|Animation|
| Spider Man| 3| 9784| 1| Spider Man|Animation|
| Spider Man| 3| 9784| 1| Spider Man|Animation|
|Captain America| 3| 9784| 2|Captain America|Adventure|
| Don| 5| 3434|null| null| null|
+---------------+------+---------+----+---------------+---------+

+---------------+------+---------+---+---------------+---------+
| name1|rating|timestamp| id| name2| genre|
+---------------+------+---------+---+---------------+---------+
| Spider Man| 3| 9784| 1| Spider Man|Animation|
| Spider Man| 3| 9784| 1| Spider Man|Animation|
| Spider Man| 2| 9783| 1| Spider Man|Animation|
|Captain America| 3| 9784| 2|Captain America|Adventure|
| null| null| null| 3| Bat Man| Horror|
+---------------+------+---------+---+---------------+---------+

+---------------+------+---------+
| name1|rating|timestamp|
+---------------+------+---------+
| Spider Man| 2| 9783|
| Spider Man| 3| 9784|
| Spider Man| 3| 9784|
|Captain America| 3| 9784|
+---------------+------+---------+

+-----+------+---------+
|name1|rating|timestamp|
+-----+------+---------+
| Don| 5| 3434|
+-----+------+---------+

 

crossJoin

Explicit cartesian join with another DataFrame.

joinWith

Joins this Dataset returning a Tuple2 for each pair where condition evaluates to true.
This is similar to the relation join function with one important difference in the result schema. Since joinWith preserves objects present on either side of the join, the result schema is similarly nested into a tuple under the column names _1 and _2.

This type of join can be useful both for preserving type-safety with the original object types as well as working with relational data where either side of the join has column names in common.


SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json")
.master("local").getOrCreate();

Encoder<Movie> encoder_movie = Encoders.bean(Movie.class);

Dataset<Row> ratingDatastet = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_input").toDF("name1",
"rating", "timestamp");

Dataset<Row> movieDataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_info").toDF("id",
"name2", "genre");


Dataset<Tuple2<Row, Row>> ds=ratingDatastet.joinWith(movieDataset,col("name1").equalTo(col("name2")),"inner");

ds.show();

output


+--------------------+--------------------+
| _1| _2|
+--------------------+--------------------+
| [Spider Man,2,9783]|[1,Spider Man,Animation|
| [Spider Man,3,9784]|[1,Spider Man,Animation|
| [Spider Man,3,9784]|[1,Spider Man,Animation|
|[Captain America,3,9784]|[2,Captain America,Adventure|
+--------------------+--------------------+

sortWithinPartitions

Returns a new Dataset with each partition sorted by the given expressions. The below code sorts the data based on the name column and a secondary sorting based on the rating column.


Dataset<Row> movieDataset2 = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2").toDF("name",
"rating", "timestamp");

movieDataset2.sortWithinPartitions(col("name"), col("rating")).show();

// The below code sorts the name column in ascending order and the rating column in the descending order.

movieDataset2.sortWithinPartitions(col("name").asc(), col("rating").desc()).show();

sort

Returns a new Dataset sorted by the specified column, all in ascending order.


Dataset<Row> movieDataset2 = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2").toDF("name",
"rating", "timestamp");

movieDataset2.sort(col("name").asc(), col("rating").desc()).show();

orderBy

Returns a new Dataset sorted by the given expressions. This is an alias of the sort function.


Dataset<Row> movieDataset2 = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2").toDF("name",
"rating", "timestamp");
movieDataset2.orderBy(col("name").asc(), col("rating").desc()).show();

apply

Selects column based on the column name and return it as a Column.


Dataset<Row> ratingDatastet = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_input").toDF("name1",
"rating", "timestamp");


ratingDatastet.select(ratingDatastet.apply("rating"),ratingDatastet.apply("rating").plus(1)).show();

col

Selects column based on the column name and return it as a Column.

as

public Dataset<T> as(String alias)

Returns a new Dataset with an alias set.

select

public Dataset<Row> select(scala.collection.Seq<Column> cols)

Selects a set of column based expressions.

selectExpr

Selects a set of SQL expressions. This is a variant of select that accepts SQL expressions.


Dataset<Row> ratingDatastet = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_input").toDF("name",
"rating", "timestamp");

ratingDatastet.selectExpr("name","rating as movie_rating").show();

filter

Filters rows using the given condition.


Dataset<Row> ratingDatastet = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_input").toDF("name",
"rating", "timestamp");


ratingDatastet.filter(col("rating").gt(4)).show();

where

Filters rows using the given condition. This is an alias for filter.


Dataset<Row> ratingDatastet = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_input").toDF("name",
"rating", "timestamp");


ratingDatastet.where(col("rating").gt(4)).show();

limit

public Dataset<T> limit(int n)

Returns a new Dataset by taking the first n rows. The difference between this function and head is that head is an action and returns an array while limit returns a new Dataset.

unionAll

public Dataset<T> unionAll(Dataset<T> other)

Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is equivalent to UNION ALL in SQL.

union

public Dataset<T> union(Dataset<T> other)

Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is equivalent to UNION ALL in SQL.

intersect

public Dataset<T> intersect(Dataset<T> other)

Returns a new Dataset containing rows only in both this Dataset and another Dataset. This is equivalent to INTERSECT in SQL.Equality checking is performed directly on the encoded representation of the data and thus is not affected by a custom equals function defined on T.

except

public Dataset<T> except(Dataset<T> other)

Returns a new Dataset containing rows in this Dataset but not in another Dataset. This is equivalent to EXCEPT in SQL.

sample

public Dataset<T> sample(boolean withReplacement,double fraction,long seed)

Returns a new Dataset by sampling a fraction of rows, using a user-supplied seed.

withReplacement – Sample with replacement or not.
fraction – Fraction of rows to generate.
seed – Seed for sampling.

withColumn

public Dataset<Row> withColumn(String colName,Column col)

Returns a new Dataset by adding a column or replacing the existing column that has the same name.

withColumnRenamed

public Dataset<Row> withColumnRenamed(String existingName, String newName)

Returns a new Dataset with a column renamed. This is a no-op if schema doesn’t contain existingName.


dataframe = dataframe.withColumnRenamed("_c0", "movie_name");

dataframe = dataframe.withColumnRenamed("_c1", "rating");

dataframe = dataframe.withColumnRenamed("_c2", "timestamp");

dataframe = dataframe.withColumn("rating", col("rating").cast("double"));

drop

public Dataset<Row> drop(String colName)

Returns a new Dataset with a column dropped. This is a no-op if schema doesn’t contain column name.

head

public Object head(int n)

Returns the first n rows.

first

public T first()

Returns the first row. Alias for head().

distinct

public Dataset<T> distinct()

Returns a new Dataset that contains only the unique rows from this Dataset. This is an alias for dropDuplicates.

persist

public Dataset<T> persist()

Persist this Dataset with the default storage level (MEMORY_AND_DISK).

cache

public Dataset<T> cache()

Persist this Dataset with the default storage level (MEMORY_AND_DISK).

rdd

public RDD<T> rdd()

Represents the content of the Dataset as an RDD of T.

toJavaRDD

public JavaRDD<T> toJavaRDD()

Returns the content of the Dataset as a JavaRDD of Ts.

createTempView

public void createTempView(String viewName) throws AnalysisException

Creates a local temporary view using the given name. The lifetime of this temporary view is tied to the SparkSession that was used to create this Dataset.

createGlobalTempView

public void createGlobalTempView(String viewName) throws AnalysisException

Creates a global temporary view using the given name. The lifetime of this temporary view is tied to this Spark application.

write

public DataFrameWriter<T> write()

Interface for saving the content of the non-streaming Dataset out into external storage.

writeStream

public DataStreamWriter<T> writeStream()

Interface for saving the content of the streaming Dataset out into external storage.

toJSON

public Dataset<String> toJSON()

Returns the content of the Dataset as a Dataset of JSON strings.

RelationalGroupedDataset API

A set of methods for aggregations on a DataFrame, created by Dataset.groupBy.

The main method is the agg function, which has multiple variants. This class also contains convenience some first order statistics such as mean, sum for convenience.

agg

public Dataset<Row> agg(Column expr,Column… exprs)

Compute aggregates by specifying a series of aggregate columns. Note that this function by default retains the grouping columns in its output. To not retain grouping columns, set spark.sql.retainGroupColumns to false.


Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp"));

RelationalGroupedDataset groupedData=dataset2.groupBy(col("name"));

groupedData.agg(avg("rating"), max("rating"),max("timestamp")).show();

 

mean

Compute the average value for each numeric columns for each group. This is an alias for avg. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the average values for them.

 


Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp"));


RelationalGroupedDataset groupedData=dataset2.groupBy(col("name"));

groupedData.mean("rating").show();

max

public Dataset<Row> max(String… colNames)

Compute the max value for each numeric columns for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the max values for them.

avg

public Dataset<Row> avg(String… colNames)

Compute the mean value for each numeric columns for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the mean values for them.

min

public Dataset<Row> min(String… colNames)

Compute the min value for each numeric column for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the min values for them.

sum

public Dataset<Row> sum(String… colNames)

Compute the sum for each numeric columns for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the sum for them.

pivot

public RelationalGroupedDataset pivot(String pivotColumn)

Pivots a column of the current DataFrame and perform the specified aggregation. There are two versions of pivot function: one that requires the caller to specify the list of distinct values to pivot on, and one that does not. The latter is more concise but less efficient, because Spark needs to first compute the list of distinct values internally.

Input Data


Spider Man,2,9783
Spider Man,3,9784
Spider Man,3,9784
Captain America,3,9784
Don,5,3434

Code


Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp"));

RelationalGroupedDataset groupedData=dataset2.groupBy(col("name"));

groupedData.pivot("rating").avg("rating").show();

 

output


 

+---------------+----+----+----+
| name| 2.0| 3.0| 5.0|
+---------------+----+----+----+
| Don|null|null| 5.0|
|Captain America|null| 3.0|null|
| Spider Man| 2.0| 3.0|null|
+---------------+----+----+----+