spark copy files to s3 using hadoop api

In this article I will illustrate how to copy raw files from S3 using spark. Spark out of the box does not have support for copying raw files so we will be using Hadoop FileSystem API.

If you are using Spark 2.0.0 or any older version make sure to set the mapreduce.fileoutputcommitter.algorithm.version to 2 as this will move the file directly from executors. The FileOutputCommitter has two methods commitTask and commitJob with Spark 2.0 and above uses Apache Hadoop 2, which uses the value of mapreduce.fileoutputcommitter.algorithm.version to control how commitTask and commitJob work. In Hadoop 2, the default value of mapreduce.fileoutputcommitter.algorithm.version is 1. For this version, commitTask moves data generated by a task from the task temporary directory to job temporary directory and when all tasks complete, commitJob moves data from job temporary directory to the final destination. Because the driver is doing the work of commitJob, for cloud storage, this operation can take a long time.

Below is the code for the same


public class RawDataCopyFromSpark {

public static void main(String[] args) throws AnalysisException, IOException {

String access_key_amazon = "accessKey.amazon";
String secret_key_amazon = "secretKey.amazon";

SparkSession session = SparkSession.builder().master("local")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.access.key", access_key_amazon)
.config("spark.hadoop.fs.s3a.secret.key", secret_key_amazon)
.config("fs.s3a.connection.ssl.enabled", "false").config("spark.network.timeout", "600s")
.config("spark.executor.heartbeatInterval", "500s").getOrCreate();

session.sparkContext().hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2");

Path srcPath = new Path("Source_File_Location");

FileSystem srcFs = FileSystem.get(srcPath.toUri(), session.sparkContext().hadoopConfiguration());

Path dstPath = new Path("Target_Location");

FileSystem dstFs = FileSystem.get(dstPath.toUri(), session.sparkContext().hadoopConfiguration());

FileUtil.copy(srcFs, srcPath, dstFs, dstPath, false, false, session.sparkContext().hadoopConfiguration());

}

}