spark read many small files from S3 in java

In spark if we are using the textFile method to read the input data spark will make many recursive calls to S3 list() method  and this can become very expensive for directories with large number of files as s3 is an object store not a file system and listing things can be very slow. In this scenarios the list() call dominates the overall processing time which is not desirable.

A better approach will be to generate a list of static partitions before hand and distribute the file read in spark. Lets say we have a directory structure as input/2018-12-12/00,input/2018-12-12/01,input/2018-12-12/02 upto 23 which has a days worth of data. We can distribute the file reads in this case by having list of static partitions and distributing the file read using the parallelize method in spark.

In the below code we are first generating the list of static path and we are passing that to the parallelize method and using the mapPartitions we are parallelizing the listing of files into 24 parallel process.

We are using  mappartitions function which operate on per partition data. Working with data on a per partition basis allows us to avoid redoing set up work for each data item.

Spark executors writes the result of the job to a temporary directory and once all the executors are done with the processing a rename is done. This approach is fine in normal filesystem where renames are a metadata operation but in object store like S3, this is not efficient as renames are basically copy and delete operation. The spark file commiter first moves the completed task output to a job specific folder and then the files from job specific folder to the final destination, so this approach results in copying the files twice if we are using s3 as rename is not a metadata operation but a copy and delete operation. So by using the s3committer  from netflix  we get to use the s3 multipart upload committer which writes all of the data to local disks and nothing is written to s3 until the commit phase starts.


import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;

public class ReadS3Data {

public static List<String> getStaticPath() {

String base = "input/2018-12-12/";
List<String> listOfFolders = new ArrayList<String>();
listOfFolders.add(base + "00");
for (int i = 1; i < 24; i++) {
listOfFolders.add(base + String.format("%02d", i));

}

return listOfFolders;

}

public static void main(String[] args) {

List<String> listOfFolders = getStaticPath();

SparkConf conf = new SparkConf().setMaster("local").setAppName("read_s3_distribute")
.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
.set("spark.speculation", "false");

@SuppressWarnings("resource")
JavaSparkContext jsc = new JavaSparkContext(conf);

jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "accessKey");
jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "secretKey");
JavaRDD<String> javaRdd2 = jsc.parallelize(listOfFolders, listOfFolders.size())
.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {

private static final long serialVersionUID = 54545;

@Override
public Iterator<String> call(Iterator<String> t) throws Exception {
List<String> list = new ArrayList<String>();
final AmazonS3 s3 = AmazonS3ClientBuilder.standard().build();
ListObjectsRequest request = new ListObjectsRequest();
request.setBucketName("newbucketadarshnow");
List<String> objectList = new ArrayList<String>();

while (t.hasNext()) {

request.setPrefix(t.next());
ObjectListing objectLising = s3.listObjects(request);
List<S3ObjectSummary> lists = objectLising.getObjectSummaries();
for (S3ObjectSummary key : lists) {
objectList.add(key.getKey());
}

}

list.addAll(objectList);
return list.iterator();
}
}).mapPartitions(new FlatMapFunction<Iterator<String>, String>() {

private static final long serialVersionUID = 1232323;

@Override
public Iterator<String> call(Iterator<String> t) throws Exception {

List<String> list = new ArrayList<String>();
final AmazonS3 s33 = AmazonS3ClientBuilder.standard().build();

while (t.hasNext()) {
String fileName = t.next();
if (!fileName.endsWith("/")) {
StringWriter writer = new StringWriter();
IOUtils.copy(s33.getObject("newbucketadarshnow", fileName).getObjectContent(), writer);
list.addAll(Arrays.asList(writer.toString().split("\n")));
}

}

return list.iterator();

}
});

javaRdd2.saveAsTextFile("s3n://newbucketadarshnow/result22");

}

}

Other approch that can be used if we are using amazon emr is to use the s3distcp to copy the files from s3 into hdfs with groupBy option for merging a large number of small files and processing the data from hdfs.

Below is the github link for s3committer

https://github.com/rdblue/s3committer