spark using custom outputcommitter like s3 committer from netflix

In this article i will demonstrate how to write our own custom output format and custom committer in spark. I will be using the s3committer from netflix which uses the multipart upload api of amazon s3 to write data into s3.

Spark executors write the result of the job to a temporary directory and once all the executors are done with the processing a rename is done. This approach is fine in normal file system where renames are a metadata operation but in object store like S3, this is not efficient as renames are basically copy and delete operation. The spark file committer first moves the completed task output to a job specific folder and then the files from job specific folder to the final destination, so this approach results in copying the files twice if we are using s3 as rename is not a metadata operation but a copy and delete operation. So by using the s3committer from netflix, we get to use the s3 multipart upload committer which writes all of the data to local disks and nothing is written to s3 until the commit phase starts.

Below is the github link for s3committer

https://github.com/rdblue/s3committer

Lets code the CustomOutputFormat for spark which extends from TextOutputFormat and override the getOutputCommitter method . The getOutputCommitter method returns the object of class S3DirectoryOutputCommitter which is an open source committer from netflix which is optimised for writting data to s3.


import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.netflix.bdp.s3.S3DirectoryOutputCommitter;

public class CustomOutputFormat extends TextOutputFormat<Text, Text> {

private FileOutputCommitter committer = null;

@Override
public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
if (committer == null) {
Path output = getOutputPath(context);
committer = new S3DirectoryOutputCommitter(output, context);
}
return committer;
}

}

Lets code the spark code which uses the above custom output format along with the custom s3committer . We are using the saveAsNewAPIHadoopFile method to write the data into s3 which outputs the RDD to any hadoop supported file system, using a new Hadoop API OutputFormat (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.


import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.NullWriter;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import scala.Tuple2;

public class UsingCustomS3Committer {

public static List<String> getStaticPath() {

String base = "input/2018-12-12/";
List<String> listOfFolders = new ArrayList<String>();
listOfFolders.add(base + "00");
for (int i = 1; i < 4; i++) {
listOfFolders.add(base + String.format("%02d", i));

}

return listOfFolders;

}

public static void main(String[] args) {

List<String> listOfFolders = getStaticPath();

SparkConf conf = new SparkConf().setMaster("local").setAppName("s3committer")
.set("spark.hadoop.fs.s3a.access.key", "Access_Key")
.set("spark.hadoop.fs.s3a.secret.key", "Secret_Key");

@SuppressWarnings("resource")
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> javaRdd2 = jsc.parallelize(listOfFolders, listOfFolders.size())
.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {

private static final long serialVersionUID = 54545;

@Override
public Iterator<String> call(Iterator<String> t) throws Exception {
List<String> list = new ArrayList<String>();
BasicAWSCredentials awsCreds = new BasicAWSCredentials("Access_Key",
"Secret_Key");
final AmazonS3 s3 = AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.withRegion(Regions.AP_SOUTH_1).withForceGlobalBucketAccessEnabled(true).build();
ListObjectsRequest request = new ListObjectsRequest();
request.setBucketName("newbucketadarshnow");
List<String> objectList = new ArrayList<String>();

while (t.hasNext()) {

request.setPrefix(t.next());
ObjectListing objectLising = s3.listObjects(request);
List<S3ObjectSummary> lists = objectLising.getObjectSummaries();
for (S3ObjectSummary key : lists) {
objectList.add(key.getKey());
}

}

list.addAll(objectList);
return list.iterator();
}
}).mapPartitions(new FlatMapFunction<Iterator<String>, String>() {

private static final long serialVersionUID = 1232323;

@Override
public Iterator<String> call(Iterator<String> t) throws Exception {

List<String> list = new ArrayList<String>();

BasicAWSCredentials awsCreds = new BasicAWSCredentials("Access_Key",
"secret_key");
final AmazonS3 s33 = AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.withRegion(Regions.AP_SOUTH_1).withForceGlobalBucketAccessEnabled(true).build();

while (t.hasNext()) {
String fileName = t.next();
if (!fileName.endsWith("/")) {
StringWriter writer = new StringWriter();
IOUtils.copy(s33.getObject("newbucketadarshnow", fileName).getObjectContent(), writer);
list.addAll(Arrays.asList(writer.toString().split("\n")));
}

}

return list.iterator();

}
});

JavaPairRDD<Text, Text> pair = javaRdd2.mapToPair(new PairFunction<String, Text, Text>() {

@Override
public Tuple2<Text, Text> call(String t) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<Text, Text>(new Text(), new Text(t));
}
});

pair.saveAsNewAPIHadoopFile("s3a://newbucketadarshnow/targetOutput", Text.class, Text.class,
CustomOutputFormat.class, jsc.hadoopConfiguration());

}

}