s3select example with spark and java aws sdk

S3Select enables applications to retrieve only a subset of data from an object by using simple SQL expressions. By using S3 Select to retrieve only the data needed by your application, you can achieve drastic performance improvement.S3Select basically pushes all of the work of filtering data from a objectto to the storage compute which drastically reduces data transffred across network.

Lets create a maven project with below pom file to import all the required dependencies .


<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>S3Select</groupId>
<artifactId>S3Select</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>S3Select</name>
<description>S3Select with spark</description>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version>

</dependency>

<!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId>
<version>2.3.0</version> </dependency> -->

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.9.8</version><!--$NO-MVN-MAN-VER$ -->
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.7.7</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.7</version>
</dependency>

</dependencies>

<build>
<plugins>
<!-- Maven shade plug-in that creates uber JARs -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>

<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>

</plugin>
</plugins>
</build>

</project>

Below is the sample data we want to process


name,genre,director,producer
Spider Man,Action,Jon Watts,Sony Pictures
Iron Man,Science Fiction,Cameron,Apple
Super Man,Fantasy,Hudson,Pixar

Lets say we are interested in just the director of the movie, in this scenario the query filters out more than 75 percent of the original data and this is a usecase where S3 Select is usefull.

Below is an example of using s3select with java aws sdk.


import static com.amazonaws.util.IOUtils.copy;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.CSVInput;
import com.amazonaws.services.s3.model.CSVOutput;
import com.amazonaws.services.s3.model.CompressionType;
import com.amazonaws.services.s3.model.ExpressionType;
import com.amazonaws.services.s3.model.InputSerialization;
import com.amazonaws.services.s3.model.OutputSerialization;
import com.amazonaws.services.s3.model.SelectObjectContentEvent;
import com.amazonaws.services.s3.model.SelectObjectContentEventVisitor;
import com.amazonaws.services.s3.model.SelectObjectContentRequest;
import com.amazonaws.services.s3.model.SelectObjectContentResult;

public class S3Select {

private static final String BUCKET_NAME = "Bucket_Name";
private static final String CSV_OBJECT_KEY = "Object_Key";
private static final String S3_SELECT_RESULTS_PATH = "OUTPUT_PATH";
private static final String QUERY = "select s._3 from S3Object s";

public static void main(String[] args) throws Exception {

ConfigFile aws_credential = new ConfigFile(Constants.OBJECT_STORE_CREDENTIALS_CONFIG, FileType.property);
String access_key_active_scale = CipherText.decrypt(aws_credential.getString("accessKey.amazon"));
String secret_key_active_scale = CipherText.decrypt(aws_credential.getString("secretKey.amazon"));
AWSCredentials credentials = new BasicAWSCredentials(access_key_active_scale, secret_key_active_scale);
ClientConfiguration clientConfig = new ClientConfiguration();
clientConfig.setProtocol(Protocol.HTTP);
AmazonS3 s3Client = new AmazonS3Client(credentials, clientConfig);

SelectObjectContentRequest request = generateBaseCSVRequest(BUCKET_NAME, CSV_OBJECT_KEY, QUERY);
final AtomicBoolean isResultComplete = new AtomicBoolean(false);

try (OutputStream fileOutputStream = new FileOutputStream(new File(S3_SELECT_RESULTS_PATH));

SelectObjectContentResult result = s3Client.selectObjectContent(request)) {
InputStream resultInputStream = result.getPayload()
.getRecordsInputStream(new SelectObjectContentEventVisitor() {

@Override
public void visit(SelectObjectContentEvent.StatsEvent event) {
System.out.println("Received Stats, Bytes Scanned: " + event.getDetails().getBytesScanned()
+ " Bytes Processed: " + event.getDetails().getBytesProcessed());
}

@Override
public void visit(SelectObjectContentEvent.EndEvent event) {
isResultComplete.set(true);
System.out.println("Received End Event. Result is complete.");
}
});

copy(resultInputStream, fileOutputStream);
}

if (!isResultComplete.get()) {
throw new Exception("S3 Select request was incomplete as End Event was not received.");
}
}

private static SelectObjectContentRequest generateBaseCSVRequest(String bucket, String key, String query) {
SelectObjectContentRequest request = new SelectObjectContentRequest();
request.setBucketName(bucket);
request.setKey(key);
request.setExpression(query);
request.setExpressionType(ExpressionType.SQL);

InputSerialization inputSerialization = new InputSerialization();
inputSerialization.setCsv(new CSVInput());
inputSerialization.setCompressionType(CompressionType.NONE);
request.setInputSerialization(inputSerialization);

OutputSerialization outputSerialization = new OutputSerialization();
outputSerialization.setCsv(new CSVOutput());
request.setOutputSerialization(outputSerialization);

return request;
}
}

We can use the same feature with spark as well to optimize the data processing with Amazon EMR release version 5.17.0 and later .S3Select is supported with CSV and JSON files using s3selectCSV and s3selectJSON values to specify the data format.

Below is the code


import org.apache.spark.sql.SparkSession;

public class SparkS3Select {

public static void main(String[] args) {

SparkSession session = SparkSession.builder()
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.access.key", "access_key_amazon")
.config("spark.hadoop.fs.s3a.secret.key", "secret_key_amazon").config("spark.speculation", "false")
.config("fs.s3a.connection.ssl.enabled", "false").config("spark.network.timeout", "600s")
.config("spark.sql.codegen.wholeStage", "false").config("spark.executor.heartbeatInterval", "500s")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("mapreduce.fileoutputcommitter.algorithm.version", "2")
.config("fs.s3a.connection.establish.timeout", "501000").config("fs.s3a.connection.timeout", "501000")
.getOrCreate();

// For CSV File

session.read().format("s3selectCSV").load("s3a://bucket/key")
.select(org.apache.spark.sql.functions.col("director")).write().csv("s3a://bucket/key");

// For Json File

session.read().format("s3selectJSON").load("s3a://bucket/key")
.select(org.apache.spark.sql.functions.col("director")).write().csv("s3a://bucket/key");

}

}