spark dataframe and dataset loading and saving data, spark sql performance tuning – tutorial 19

The default data source used will be parquet unless otherwise configured by spark.sql.sources.default for all operations.

We can use the below method to save the data in the parquet format.


dataset.write().save("C:\\codebase\\scala-project\\inputdata\\output\\data");

We can also manually specify the data source that will be used along with any extra options that you would like to pass to the data source. Data sources are specified by their fully qualified name org.apache.spark.sql.parquet, but for built-in sources you can also use their short names like json, parquet, jdbc, orc, libsvm, csv and text.

DataFrames loaded from any data source type can be converted into other types using the below code


Dataset<Row> dataframe= session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2");

dataframe.write().format("json").save("C:\\codebase\\scala-project\\inputdata\\output\\data");

Run SQL on files directly

Instead of using read API to load a file into DataFrame and query it, you can also query that file directly with SQL.


SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "parquet").master("local")
.getOrCreate();

Dataset<Row> dataset=session.sql("select * from parquet. '\\codebase\\scala-project\\inputdata\\output\\data\\part-r-00000-204e2538-b186-4ee1-a831-d56dff8a920a.snappy.parquet'");

dataset.show();

Save Modes

Save operations can optionally take a SaveMode, that specifies how to handle existing data if present. It is important to realize that these save modes do not utilize any locking and are not atomic. Additionally, when performing an Overwrite, the data will be deleted before writing out the new data.


/*
* When saving a DataFrame to a data source, if data/table already
* exists, contents of the DataFrame are expected to be appended to
* existing data.
*/
dataframe.write().mode(SaveMode.Append).save("C:\\codebase\\scala-project\\inputdata\\output\\data");

/*
* When saving a DataFrame to a data source, if data already exists, an
* exception is expected to be thrown.
*/

dataframe.write().mode(SaveMode.ErrorIfExists).save("C:\\codebase\\scala-project\\inputdata\\output\\data");

/*
* Ignore mode means that when saving a DataFrame to a data source, if
* data already exists, the save operation is expected to not save the
* contents of the DataFrame and to not change the existing data. This
* is similar to a CREATE TABLE IF NOT EXISTS in SQL.
*/
dataframe.write().mode(SaveMode.Ignore).save("C:\\codebase\\scala-project\\inputdata\\output\\data");

/*
* Overwrite mode means that when saving a DataFrame to a data source,
* if data/table already exists, existing data is expected to be
* overwritten by the contents of the DataFrame.
*/
dataframe.write().mode(SaveMode.Overwrite).save("C:\\codebase\\scala-project\\inputdata\\output\\data");

Saving to Persistent Tables

DataFrames can also be saved as persistent tables into Hive metastore using the saveAsTable command. Notice that an existing Hive deployment is not necessary to use this feature. Spark will create a default local Hive metastore (using Derby) for you. Unlike the createOrReplaceTempView command, saveAsTable will materialize the contents of the DataFrame and create a pointer to the data in the Hive metastore. Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the table method on a SparkSession with the name of the table.

For file-based data source, e.g. text, parquet, json, etc. you can specify a custom table path via the path option, e.g. df.write.option(“path”, “/data/output”).saveAsTable(“t”). When the table is dropped, the custom table path will not be removed and the table data is still there. If no custom table path is specified, Spark will write data to a default table path under the warehouse directory. When the table is dropped, the default table path will be removed too.

Below is an example


import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

public class SaveLoad2 {

public static void main(String[] args) {

SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "csv")
.master("local").getOrCreate();

Dataset<Row> dataframe = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2");

dataframe.write().option("path", "C:\\codebase\\scala-project\\inputdata\\output\\data").mode(SaveMode.Append).saveAsTable("Test_Table");
}

}

Bucketing, Sorting and Partitioning

By default, a simple query in Hive scans the whole Hive table. This slows down the performance when querying a large-size table. The issue could be resolved by creating Hive partitions, which is very similar to what’s in the RDBMS. In Hive, each partition corresponds to a predefined partition column(s) and stores it as a subdirectory in the table’s directory in HDFS. When the table gets queried, only the required partitions of data in the table are queried, so the I/O and time of query is greatly reduced.

Besides partition, bucket is another technique to cluster datasets into more manageable parts to optimize query performance. Different from partition, the bucket corresponds to segments of files in HDFS. For example, lets say we have a baseline_table table which uses the datestamp as the toplevel partition. If there is a further request to use the neid as the second level of partition, it leads to many deep and small partitions and directories. So we can bucket the baseline_table using neid as the bucket column. The value of this column will be hashed by a user-defined number into buckets. The records with the same neid will always be stored in the same bucket that is segment of files. By using buckets, Hive can easily and efficiently do sampling and map side joins as the data belonging to the same key will be available in the same file.

Below is the examples


import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

public class SaveLoad2 {

public static void main(String[] args) {

SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json")
.master("local").getOrCreate();

Dataset<Row> dataframe = session.read().csv("C:\\codebase\\scala-project\\inputdata\\movies_data_2");

dataframe = dataframe.withColumnRenamed("_c0", "name");

dataframe = dataframe.withColumnRenamed("_c1", "rating");

dataframe = dataframe.withColumnRenamed("_c2", "timestamp");

dataframe.write().option("path",
"C:\\codebase\\scala-project\\inputdata\\output\\data").mode(SaveMode.Append).saveAsTable("Test_Table");

dataframe.write().bucketBy(7, "name").sortBy("rating").option("path", "C:\\codebase\\scala-project\\inputdata\\output\\data").mode(SaveMode.Append)
.saveAsTable("Test_Table");

dataframe.write().partitionBy("name").option("path", "C:\\codebase\\scala-project\\inputdata\\output\\data").mode(SaveMode.Append).saveAsTable("Test_Table");

}

}

 

Parquet Files

Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data. When writing Parquet files, all columns are automatically converted to be nullable for compatibility reasons.


Dataset<Row> movieDF = spark.read().json("examples/src/main/resources/movie.json");

// DataFrames can be saved as Parquet files, maintaining the schema information

movieDF.write().parquet("people.parquet");

// Read in the Parquet file created above.
// Parquet files are self-describing so the schema is preserved
// The result of loading a parquet file is also a DataFrame

Dataset<Row> parquetFileDF = session.read().parquet("people.parquet");

Hive Tables

Spark SQL also supports reading and writing data stored in Apache Hive. However, since Hive has a large number of dependencies, these dependencies are not included in the default Spark distribution. If Hive dependencies can be found on the classpath, Spark will load them automatically. Note that these Hive dependencies must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive.

Configuration of Hive is done by placing your hive-site.xml, core-site.xml (for security configuration), and hdfs-site.xml file in conf. When working with Hive, one must instantiate SparkSession with Hive support, including connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined functions.


String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();

spark.sql("SELECT * FROM baseline_table").show();

Specifying storage format for Hive tables

When you create a Hive table, you need to define how this table should read/write data from/to file system, i.e. the input format and output format. You also need to define how this table should deserialize the data to rows, or serialize rows to data, i.e. the serde.

fileFormat – A fileFormat is kind of a package of storage format specifications, including “serde”, “input format” and “output format”. Currently the supported fileFormats are: ‘sequencefile’, ‘rcfile’, ‘orc’, ‘parquet’, ‘textfile’ and ‘avro’.

inputFormat/outputFormat – These 2 options specify the name of a corresponding InputFormat and OutputFormat class as a string literal, e.g. org.apache.hadoop.hive.ql.io.orc.OrcInputFormat. These 2 options must be appeared in pair, and you can not specify them if you already specified the fileFormat option.

serde – This option specifies the name of a serde class. When the fileFormat option is specified, do not specify this option if the given fileFormat already include the information of serde. Currently sequencefile, textfile and rcfile don’t include the serde information and you can use this option with these 3 fileFormats.

fieldDelim/escapeDelim/collectionDelim/mapkeyDelim/lineDelim – These options can only be used with “textfile” fileFormat. They define how to read delimited files into rows.

Performance Tuning

Caching Data In Memory

Spark SQL can cache tables using an in-memory columnar format by calling spark.catalog.cacheTable(“tableName”) or dataFrame.cache(). Then Spark SQL will scan only required columns and will automatically tune compression to minimize memory usage and GC pressure. You can call spark.catalog.uncacheTable(“tableName”) to remove the table from memory.

Configuration of in-memory caching can be done using the setConf method on SparkSession or by running SET key=value commands using SQL.

spark.sql.inMemoryColumnarStorage.compressed – When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data.

spark.sql.inMemoryColumnarStorage.batchSize – Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data.

The following options can also be used to tune the performance of query execution.

spark.sql.files.maxPartitionBytes – The maximum number of bytes to pack into a single partition when reading files. Defaults to 128 mb.

spark.sql.files.openCostInBytes – The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition.

spark.sql.broadcastTimeout – Timeout in seconds for the broadcast wait time in broadcast joins

spark.sql.autoBroadcastJoinThreshold – Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join.

spark.sql.shuffle.partitions – Configures the number of partitions to use when shuffling data for joins or aggregations.