spark s3 reading and writing data

In this article i will demonstrate how to read and write data from s3 using spark .Create a maven project in eclipse and add the below content into the pom.xml file which will import all the required libraries.


<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.amazonaws</groupId>
<artifactId>timepasstechies</artifactId>
<version>1.0.0</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.11.327</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.1.1</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.1</version>
</dependency>

</dependencies>
</project>

Below is the code to read and write data in spark from s3


import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;

public class ReadWriteSparkS3 {

public static List<String> getStaticPath() {

String base = "input/2018-12-12/";
List<String> listOfFolders = new ArrayList<String>();
listOfFolders.add(base + "00");
for (int i = 1; i < 4; i++) {
listOfFolders.add(base + String.format("%02d", i));

}

return listOfFolders;

}

public static void main(String[] args) {

List<String> listOfFolders =getStaticPath();

SparkConf conf = new SparkConf().setMaster("local").setAppName("test").set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2").set("spark.speculation", "false");;

@SuppressWarnings("resource")
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> javaRdd2 = jsc.parallelize(listOfFolders, listOfFolders.size())
.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {

private static final long serialVersionUID = 54545;

@Override
public Iterator<String> call(Iterator<String> t) throws Exception {
List<String> list = new ArrayList<String>();
BasicAWSCredentials awsCreds = new BasicAWSCredentials("accessKey", "secretKey");
final AmazonS3 s3 = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(awsCreds)).withRegion(Regions.AP_SOUTH_1).withForceGlobalBucketAccessEnabled(true).build();
ListObjectsRequest request = new ListObjectsRequest();
request.setBucketName("newbucketadarshnow");
List<String> objectList = new ArrayList<String>();

while (t.hasNext()) {

request.setPrefix(t.next());
ObjectListing objectLising = s3.listObjects(request);
List<S3ObjectSummary> lists = objectLising.getObjectSummaries();
for (S3ObjectSummary key : lists) {
objectList.add(key.getKey());
}

}

list.addAll(objectList);
return list.iterator();
}
}).mapPartitions(new FlatMapFunction<Iterator<String>, String>() {

private static final long serialVersionUID = 1232323;

@Override
public Iterator<String> call(Iterator<String> t) throws Exception {

List<String> list = new ArrayList<String>();

BasicAWSCredentials awsCreds = new BasicAWSCredentials("accessKey", "secretKey");
final AmazonS3 s33 = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(awsCreds)).withRegion(Regions.AP_SOUTH_1).withForceGlobalBucketAccessEnabled(true).build();

while (t.hasNext()) {
String fileName = t.next();
if (!fileName.endsWith("/")) {
StringWriter writer = new StringWriter();
IOUtils.copy(s33.getObject("newbucketadarshnow", fileName).getObjectContent(), writer);
list.addAll(Arrays.asList(writer.toString().split("\n")));
}

}

return list.iterator();

}
});

javaRdd2.saveAsTextFile("s3a://accessKey:secretKey@newbucketadarshnow/output");

}

}

we are setting the spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version to 2 so that commitTask will move data generated by a task directly to the final destination and commitJob is basically a no-op where as with version one commitTask moves data generated by a task from the task temporary directory to job temporary directory and when all tasks complete, commitJob moves data to from job temporary directory the final destination. Because the driver is doing the work of commitJob, for S3, this operation can take a long time which can be really inefficient.

We have disabled the spark speculative exection as well so that if one or more tasks are running slowly in a stage, they will not be re-launched. The write operation on S3 in spark can be very slow and hence we can see a lot of tasks getting relaunched as the output data size increases if the speculative execution is enabled.