A Dataset is a strongly typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations. Each Dataset also has an untyped view called a DataFrame, which is a Dataset of Row.
Operations available on Datasets are divided into transformations and actions. Transformations are the ones that produce new Datasets, and actions are the ones that trigger computation and return results. Example transformations include map, filter, select, and aggregate (groupBy). Example actions count, show, or writing data out to file systems.
To efficiently support domain-specific objects, an Encoder is required. The encoder maps the domain specific type T to Spark’s internal type system. For example, given a class Person with two fields, name (string) and age (int), an encoder is used to tell Spark to generate code at runtime to serialize the Person object into a binary structure. This binary structure often has much lower memory footprint as well as are optimized for efficiency in data processing (e.g. in a columnar format). To understand the internal binary representation for data, use the schema function.
There are typically two ways to create a Dataset. The most common way is by pointing Spark to some files on storage systems, using the read function available on a SparkSession.
val people = spark.read.parquet("...").as[Person] // Scala Dataset<Person> people = spark.read().parquet("...").as(Encoders.bean(Person.class)); // Java
val names = people.map(_.name) // in Scala; names is a Dataset[String] Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING)); // in Java 8
Dataset operations can also be untyped, through various domain-specific-language (DSL) functions defined in: Dataset (this class), Column, and functions.
To select a column from the Dataset, use apply method in Scala and col in Java.
val ageCol = people(“age”) // in Scala
Column ageCol = people.col(“age”); // in Java
Note that the Column type can also be manipulated through its various functions.
// The following creates a new column that increases everybody's age by 10. people("age") + 10 // in Scala people.col("age").plus(10); // in Java
sortWithinPartitions
Returns a new Dataset with each partition sorted by the given expressions.
SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json") .master("local").getOrCreate(); Encoder<Movie> encoder_movie = Encoders.bean(Movie.class); Dataset<Row> dataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2").toDF("name", "rating", "timestamp"); Dataset<Row> dataset2 = dataset.select(col("name"), col("rating").cast("double"), col("timestamp")); Dataset<Movie> dataset3 = dataset2.as(encoder_movie); dataset3.sortWithinPartitions("rating").show();
sort
Returns a new Dataset sorted by the specified column, all in ascending order.
SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json") .master("local").getOrCreate(); Encoder<Movie> encoder_movie = Encoders.bean(Movie.class); Dataset<Row> dataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2").toDF("name", "rating", "timestamp"); Dataset<Row> dataset2 = dataset.select(col("name"), col("rating").cast("double"), col("timestamp")); Dataset<Movie> dataset3 = dataset2.as(encoder_movie); dataset3.sort("name").show(); dataset3.sort(col("rating").desc()).show(); dataset3.sort(col("rating").asc()).show();
orderBy
Returns a new Dataset sorted by the given expressions. This is an alias of the sort function.
dataset3.orderBy(col("rating").desc());
select
Selects a set of column based expressions.
SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json") .master("local").getOrCreate(); Encoder<Movie> encoder_movie = Encoders.bean(Movie.class); Dataset<Row> dataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2").toDF("name", "rating", "timestamp"); Dataset<Row> dataset2 = dataset.select(col("name"), col("rating").cast("double"), col("timestamp"));
selectExpr
Selects a set of SQL expressions. This is a variant of select that accepts SQL expressions.
SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json") .master("local").getOrCreate(); Dataset<Row> dataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2").toDF("name", "rating", "timestamp"); dataset.selectExpr("name","rating as movie_rating").show();
groupBy
Groups the Dataset using the specified columns, so we can run aggregation on them. See RelationalGroupedDataset for all the available aggregate functions.
SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json") .master("local").getOrCreate(); Encoder<Movie> encoder_movie = Encoders.bean(Movie.class); Dataset<Row> dataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2").toDF("name", "rating", "timestamp"); Dataset<Row> dataset2 = dataset.select(col("name"), col("rating").cast("double"), col("timestamp")); dataset.selectExpr("name","rating as movie_rating").show(); Dataset<Movie> dataset3 = dataset2.as(encoder_movie); dataset3.groupBy(col("name")).avg("rating").show();
rollup
Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them. See RelationalGroupedDataset for all the available aggregate functions.
Below is the input data
Spider Man,2,9783 Spider Man,3,9784
SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json") .master("local").getOrCreate(); Encoder<Movie> encoder_movie = Encoders.bean(Movie.class); Dataset<Row> smallDataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_input").toDF("name", "rating", "timestamp"); Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp")); Dataset<Movie> dataset3 = dataset2.as(encoder_movie); dataset3.rollup(col("name"),col("timestamp")).avg("rating").show(1000);
Output
+----------+---------+-----------+ | name|timestamp|avg(rating)| +----------+---------+-----------+ |Spider Man| null| 2.5| |Spider Man| 9783| 2.0| | null| null| 2.5| |Spider Man| 9784| 3.0| +----------+---------+-----------+
Cube
Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.
dataset3.cube(col("name"),col("timestamp")).avg("rating").show(1000);
Output
+----------+---------+-----------+ | name|timestamp|avg(rating)| +----------+---------+-----------+ |Spider Man| null| 2.5| |Spider Man| 9783| 2.0| | null| null| 2.5| | null| 9784| 3.0| | null| 9783| 2.0| |Spider Man| 9784| 3.0| +----------+---------+-----------+
agg
Aggregates on the entire Dataset without groups.
dataset3.agg(max(col("rating")), avg(col("rating"))).show();
output
+-----------+-----------+ |max(rating)|avg(rating)| +-----------+-----------+ | 3.0| 2.5| +-----------+-----------+
drop
Returns a new Dataset with columns dropped. This is a no-op if schema doesn’t contain column name(s).
dataset2.drop(col("timestamp")).show();
dropDuplicates
Returns a new Dataset with duplicate rows removed, considering only the subset of columns.
input data
Spider Man,2,9783 Spider Man,3,9784 Spider Man,3,9784
SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json") .master("local").getOrCreate(); Encoder<Movie> encoder_movie = Encoders.bean(Movie.class); Dataset<Row> smallDataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_input").toDF("name", "rating", "timestamp"); Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp")); dataset2.show(); dataset2.dropDuplicates().show(); dataset2.dropDuplicates("name").show();
output
+----------+------+---------+ | name|rating|timestamp| +----------+------+---------+ |Spider Man| 2.0| 9783| |Spider Man| 3.0| 9784| |Spider Man| 3.0| 9784| +----------+------+---------+ +----------+------+---------+ | name|rating|timestamp| +----------+------+---------+ |Spider Man| 3.0| 9784| |Spider Man| 2.0| 9783| +----------+------+---------+ +----------+------+---------+ | name|rating|timestamp| +----------+------+---------+ |Spider Man| 2.0| 9783| +----------+------+---------+
describe
Computes statistics for numeric and string columns, including count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical or string columns.
Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp")); dataset2.describe("rating").show();
Output
+-------+------------------+ |summary| rating| +-------+------------------+ | count| 3| | mean|2.6666666666666665| | stddev|0.5773502691896258| | min| 2.0| | max| 3.0| +-------+------------------+
repartition
Returns a new Dataset partitioned by the given partitioning expressions into numPartitions. The resulting Dataset is hash partitioned.
Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp")); dataset2.repartition(2, col("name"));
schema
Returns the schema of this Dataset.
printSchema
Prints the schema to the console in a nice tree format.
explain
Prints the plans (logical and physical) to the console for debugging purposes.
dtypes
Returns all column names and their data types as an array.
Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp")); Tuple2<String, String>[] tup=dataset2.dtypes(); for (Tuple2<String, String> tuple2 : tup) { System.out.println(tuple2._1); System.out.println(tuple2._2); }
output
name StringType rating DoubleType timestamp StringType
columns
Returns all column names as an array.
isLocal
Returns true if the collect and take methods can be run locally (without any Spark executors).
isStreaming
Returns true if this Dataset contains one or more sources that continuously return data as it arrives. A Dataset that reads data from a streaming source must be executed as a StreamingQuery using the start() method in DataStreamWriter.
checkpoint
Eagerly checkpoint a Dataset and return the new Dataset. Checkpointing can be used to truncate the logical plan of this Dataset, which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint directory set with SparkContext#setCheckpointDir.
withWatermark
Defines an event time watermark for this Dataset. A watermark tracks a point in time before which we assume no more late data is going to arrive.
na
Dropping rows containing any null values.
input data
Spider Man,2,9783 Spider Man,3,9784 Spider Man,3,9784 Spider Man,, ,,
Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp")); dataset2.na().drop().show();
output
+----------+------+---------+ | name|rating|timestamp| +----------+------+---------+ |Spider Man| 2.0| 9783| |Spider Man| 3.0| 9784| |Spider Man| 3.0| 9784| +----------+------+---------+
join
joins two dataset/dataframe and supports inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, left_anti joins.
below is the left dataset
Spider Man,2,9783 Spider Man,3,9784 Spider Man,3,9784 Captain America,3,9784 Don,5,3434
Right dataset
1,Spider Man,Animation 2,Captain America,Adventure 3,Bat Man,Horror
Code
SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json") .master("local").getOrCreate(); Encoder<Movie> encoder_movie = Encoders.bean(Movie.class); Dataset<Row> ratingDatastet = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_input").toDF("name1", "rating", "timestamp"); Dataset<Row> movieDataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_info").toDF("id", "name2", "genre"); //join returns a dataset of row type i,e Dataset<Row> ratingDatastet.join(movieDataset,col("name1").equalTo(col("name2")),"inner").show(); ratingDatastet.join(movieDataset,col("name1").equalTo(col("name2")),"left_outer").show(); ratingDatastet.join(movieDataset,col("name1").equalTo(col("name2")),"right_outer").show(); ratingDatastet.join(movieDataset,col("name1").equalTo(col("name2")),"full_outer").show(); //Equivalent to left_outer ratingDatastet.join(movieDataset,col("name1").equalTo(col("name2")),"left").show(); //Equivalent to right_outer ratingDatastet.join(movieDataset,col("name1").equalTo(col("name2")),"right").show(); //Returns matching data but only columns/data from the left dataset will be returned ratingDatastet.join(movieDataset,col("name1").equalTo(col("name2")),"left_semi").show(); //Returns non matching data but only columns/data from the left dataset will be returned ratingDatastet.join(movieDataset,col("name1").equalTo(col("name2")),"left_anti").show();
output
+---------------+------+---------+---+---------------+---------+ | name1|rating|timestamp| id| name2| genre| +---------------+------+---------+---+---------------+---------+ | Spider Man| 2| 9783| 1| Spider Man|Animation| | Spider Man| 3| 9784| 1| Spider Man|Animation| | Spider Man| 3| 9784| 1| Spider Man|Animation| |Captain America| 3| 9784| 2|Captain America|Adventure| +---------------+------+---------+---+---------------+---------+ +---------------+------+---------+----+---------------+---------+ | name1|rating|timestamp| id| name2| genre| +---------------+------+---------+----+---------------+---------+ | Spider Man| 2| 9783| 1| Spider Man|Animation| | Spider Man| 3| 9784| 1| Spider Man|Animation| | Spider Man| 3| 9784| 1| Spider Man|Animation| |Captain America| 3| 9784| 2|Captain America|Adventure| | Don| 5| 3434|null| null| null| +---------------+------+---------+----+---------------+---------+ +---------------+------+---------+---+---------------+---------+ | name1|rating|timestamp| id| name2| genre| +---------------+------+---------+---+---------------+---------+ | Spider Man| 3| 9784| 1| Spider Man|Animation| | Spider Man| 3| 9784| 1| Spider Man|Animation| | Spider Man| 2| 9783| 1| Spider Man|Animation| |Captain America| 3| 9784| 2|Captain America|Adventure| | null| null| null| 3| Bat Man| Horror| +---------------+------+---------+---+---------------+---------+ +---------------+------+---------+----+---------------+---------+ | name1|rating|timestamp| id| name2| genre| +---------------+------+---------+----+---------------+---------+ | Don| 5| 3434|null| null| null| | null| null| null| 3| Bat Man| Horror| |Captain America| 3| 9784| 2|Captain America|Adventure| | Spider Man| 2| 9783| 1| Spider Man|Animation| | Spider Man| 3| 9784| 1| Spider Man|Animation| | Spider Man| 3| 9784| 1| Spider Man|Animation| +---------------+------+---------+----+---------------+---------+ +---------------+------+---------+----+---------------+---------+ | name1|rating|timestamp| id| name2| genre| +---------------+------+---------+----+---------------+---------+ | Spider Man| 2| 9783| 1| Spider Man|Animation| | Spider Man| 3| 9784| 1| Spider Man|Animation| | Spider Man| 3| 9784| 1| Spider Man|Animation| |Captain America| 3| 9784| 2|Captain America|Adventure| | Don| 5| 3434|null| null| null| +---------------+------+---------+----+---------------+---------+ +---------------+------+---------+---+---------------+---------+ | name1|rating|timestamp| id| name2| genre| +---------------+------+---------+---+---------------+---------+ | Spider Man| 3| 9784| 1| Spider Man|Animation| | Spider Man| 3| 9784| 1| Spider Man|Animation| | Spider Man| 2| 9783| 1| Spider Man|Animation| |Captain America| 3| 9784| 2|Captain America|Adventure| | null| null| null| 3| Bat Man| Horror| +---------------+------+---------+---+---------------+---------+ +---------------+------+---------+ | name1|rating|timestamp| +---------------+------+---------+ | Spider Man| 2| 9783| | Spider Man| 3| 9784| | Spider Man| 3| 9784| |Captain America| 3| 9784| +---------------+------+---------+ +-----+------+---------+ |name1|rating|timestamp| +-----+------+---------+ | Don| 5| 3434| +-----+------+---------+
crossJoin
Explicit cartesian join with another DataFrame.
joinWith
Joins this Dataset returning a Tuple2 for each pair where condition evaluates to true.
This is similar to the relation join function with one important difference in the result schema. Since joinWith preserves objects present on either side of the join, the result schema is similarly nested into a tuple under the column names _1 and _2.
This type of join can be useful both for preserving type-safety with the original object types as well as working with relational data where either side of the join has column names in common.
SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json") .master("local").getOrCreate(); Encoder<Movie> encoder_movie = Encoders.bean(Movie.class); Dataset<Row> ratingDatastet = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_input").toDF("name1", "rating", "timestamp"); Dataset<Row> movieDataset = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_info").toDF("id", "name2", "genre"); Dataset<Tuple2<Row, Row>> ds=ratingDatastet.joinWith(movieDataset,col("name1").equalTo(col("name2")),"inner"); ds.show();
output
+--------------------+--------------------+ | _1| _2| +--------------------+--------------------+ | [Spider Man,2,9783]|[1,Spider Man,Animation| | [Spider Man,3,9784]|[1,Spider Man,Animation| | [Spider Man,3,9784]|[1,Spider Man,Animation| |[Captain America,3,9784]|[2,Captain America,Adventure| +--------------------+--------------------+
sortWithinPartitions
Returns a new Dataset with each partition sorted by the given expressions. The below code sorts the data based on the name column and a secondary sorting based on the rating column.
Dataset<Row> movieDataset2 = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2").toDF("name", "rating", "timestamp"); movieDataset2.sortWithinPartitions(col("name"), col("rating")).show(); // The below code sorts the name column in ascending order and the rating column in the descending order. movieDataset2.sortWithinPartitions(col("name").asc(), col("rating").desc()).show();
sort
Returns a new Dataset sorted by the specified column, all in ascending order.
Dataset<Row> movieDataset2 = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2").toDF("name", "rating", "timestamp"); movieDataset2.sort(col("name").asc(), col("rating").desc()).show();
orderBy
Returns a new Dataset sorted by the given expressions. This is an alias of the sort function.
Dataset<Row> movieDataset2 = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2").toDF("name", "rating", "timestamp"); movieDataset2.orderBy(col("name").asc(), col("rating").desc()).show();
apply
Selects column based on the column name and return it as a Column.
Dataset<Row> ratingDatastet = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_input").toDF("name1", "rating", "timestamp"); ratingDatastet.select(ratingDatastet.apply("rating"),ratingDatastet.apply("rating").plus(1)).show();
col
Selects column based on the column name and return it as a Column.
as
public Dataset<T> as(String alias)
Returns a new Dataset with an alias set.
select
public Dataset<Row> select(scala.collection.Seq<Column> cols)
Selects a set of column based expressions.
selectExpr
Selects a set of SQL expressions. This is a variant of select that accepts SQL expressions.
Dataset<Row> ratingDatastet = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_input").toDF("name", "rating", "timestamp"); ratingDatastet.selectExpr("name","rating as movie_rating").show();
filter
Filters rows using the given condition.
Dataset<Row> ratingDatastet = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_input").toDF("name", "rating", "timestamp"); ratingDatastet.filter(col("rating").gt(4)).show();
where
Filters rows using the given condition. This is an alias for filter.
Dataset<Row> ratingDatastet = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_input").toDF("name", "rating", "timestamp"); ratingDatastet.where(col("rating").gt(4)).show();
limit
public Dataset<T> limit(int n)
Returns a new Dataset by taking the first n rows. The difference between this function and head is that head is an action and returns an array while limit returns a new Dataset.
unionAll
public Dataset<T> unionAll(Dataset<T> other)
Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is equivalent to UNION ALL in SQL.
union
public Dataset<T> union(Dataset<T> other)
Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is equivalent to UNION ALL in SQL.
intersect
public Dataset<T> intersect(Dataset<T> other)
Returns a new Dataset containing rows only in both this Dataset and another Dataset. This is equivalent to INTERSECT in SQL.Equality checking is performed directly on the encoded representation of the data and thus is not affected by a custom equals function defined on T.
except
public Dataset<T> except(Dataset<T> other)
Returns a new Dataset containing rows in this Dataset but not in another Dataset. This is equivalent to EXCEPT in SQL.
sample
public Dataset<T> sample(boolean withReplacement,double fraction,long seed)
Returns a new Dataset by sampling a fraction of rows, using a user-supplied seed.
withReplacement – Sample with replacement or not.
fraction – Fraction of rows to generate.
seed – Seed for sampling.
withColumn
public Dataset<Row> withColumn(String colName,Column col)
Returns a new Dataset by adding a column or replacing the existing column that has the same name.
withColumnRenamed
public Dataset<Row> withColumnRenamed(String existingName, String newName)
Returns a new Dataset with a column renamed. This is a no-op if schema doesn’t contain existingName.
dataframe = dataframe.withColumnRenamed("_c0", "movie_name"); dataframe = dataframe.withColumnRenamed("_c1", "rating"); dataframe = dataframe.withColumnRenamed("_c2", "timestamp"); dataframe = dataframe.withColumn("rating", col("rating").cast("double"));
drop
public Dataset<Row> drop(String colName)
Returns a new Dataset with a column dropped. This is a no-op if schema doesn’t contain column name.
head
public Object head(int n)
Returns the first n rows.
first
public T first()
Returns the first row. Alias for head().
distinct
public Dataset<T> distinct()
Returns a new Dataset that contains only the unique rows from this Dataset. This is an alias for dropDuplicates.
persist
public Dataset<T> persist()
Persist this Dataset with the default storage level (MEMORY_AND_DISK).
cache
public Dataset<T> cache()
Persist this Dataset with the default storage level (MEMORY_AND_DISK).
rdd
public RDD<T> rdd()
Represents the content of the Dataset as an RDD of T.
toJavaRDD
public JavaRDD<T> toJavaRDD()
Returns the content of the Dataset as a JavaRDD of Ts.
createTempView
public void createTempView(String viewName) throws AnalysisException
Creates a local temporary view using the given name. The lifetime of this temporary view is tied to the SparkSession that was used to create this Dataset.
createGlobalTempView
public void createGlobalTempView(String viewName) throws AnalysisException
Creates a global temporary view using the given name. The lifetime of this temporary view is tied to this Spark application.
write
public DataFrameWriter<T> write()
Interface for saving the content of the non-streaming Dataset out into external storage.
writeStream
public DataStreamWriter<T> writeStream()
Interface for saving the content of the streaming Dataset out into external storage.
toJSON
public Dataset<String> toJSON()
Returns the content of the Dataset as a Dataset of JSON strings.
RelationalGroupedDataset API
A set of methods for aggregations on a DataFrame, created by Dataset.groupBy.
The main method is the agg function, which has multiple variants. This class also contains convenience some first order statistics such as mean, sum for convenience.
agg
public Dataset<Row> agg(Column expr,Column… exprs)
Compute aggregates by specifying a series of aggregate columns. Note that this function by default retains the grouping columns in its output. To not retain grouping columns, set spark.sql.retainGroupColumns to false.
Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp")); RelationalGroupedDataset groupedData=dataset2.groupBy(col("name")); groupedData.agg(avg("rating"), max("rating"),max("timestamp")).show();
mean
Compute the average value for each numeric columns for each group. This is an alias for avg. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the average values for them.
Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp")); RelationalGroupedDataset groupedData=dataset2.groupBy(col("name")); groupedData.mean("rating").show();
max
public Dataset<Row> max(String… colNames)
Compute the max value for each numeric columns for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the max values for them.
avg
public Dataset<Row> avg(String… colNames)
Compute the mean value for each numeric columns for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the mean values for them.
min
public Dataset<Row> min(String… colNames)
Compute the min value for each numeric column for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the min values for them.
sum
public Dataset<Row> sum(String… colNames)
Compute the sum for each numeric columns for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the sum for them.
pivot
public RelationalGroupedDataset pivot(String pivotColumn)
Pivots a column of the current DataFrame and perform the specified aggregation. There are two versions of pivot function: one that requires the caller to specify the list of distinct values to pivot on, and one that does not. The latter is more concise but less efficient, because Spark needs to first compute the list of distinct values internally.
Input Data
Spider Man,2,9783 Spider Man,3,9784 Spider Man,3,9784 Captain America,3,9784 Don,5,3434
Code
Dataset<Row> dataset2 = smallDataset.select(col("name"), col("rating").cast("double"), col("timestamp")); RelationalGroupedDataset groupedData=dataset2.groupBy(col("name")); groupedData.pivot("rating").avg("rating").show();
output
+---------------+----+----+----+ | name| 2.0| 3.0| 5.0| +---------------+----+----+----+ | Don|null|null| 5.0| |Captain America|null| 3.0|null| | Spider Man| 2.0| 3.0|null| +---------------+----+----+----+