submit spark job programmatically using SparkLauncher

In this article I will illustrate how to submit a spark job programmatically using SparkLauncher. Let us take a use case where we are subscribed to streams of records from a kafka topic, where each record is an object store file location. For each record we will be launching a spark job to process the data inside that folder.

To run the below example we need to have a prerequisite which is a running kafka installation . Below is the link for kafka quick start

https://kafka.apache.org/quickstart

Let us start with a DirectoryMonitor code which monitors a directory, once a new file is pushed to the directory it pushes the file into the KafkaProducer. I will be using org.apache.commons.io.monitor package of Apache Commons IO library which provides a component for monitoring file system events like file create, update and delete events.


import java.io.File;
import java.io.IOException;
import org.apache.commons.io.monitor.FileAlterationListener;
import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import com.blog.util.*;

public class DirectoryMonitor {

private String sourceDirectory = new ConfigFile(Constants.CONFIG_TRAVEL, FileType.property).getString("producer.source");

private String pollingInterval = new ConfigFile(Constants.CONFIG_TRAVEL, FileType.property)
.getString("polling.interval.directory");

public String getSourceDirectory() {
return sourceDirectory;
}

public void setSourceDirectory(String sourceDirectory) {
this.sourceDirectory = sourceDirectory;
}

public void startMonitor() throws Exception {

final long polling = Long.parseLong(pollingInterval);
File folder = new File(getSourceDirectory());
if (!folder.exists()) {
throw new RuntimeException("Directory not found: " + folder);
}
FileAlterationObserver observer = new FileAlterationObserver(folder);
FileAlterationMonitor monitor = new FileAlterationMonitor(polling * 1000);
FileAlterationListener listener = new FileAlterationListenerAdaptor() {

@Override
public void onFileCreate(File file) {
try {
System.out.println("File created first: " + file.getCanonicalPath());
KafkaProducer.pushMessage(KafkaProducerSingleton.getInstance(), file);
System.out.println("File created: " + file.getCanonicalPath());
} catch (IOException e) {
System.out.println("Exception in creating file: "+e.getMessage());
}
}

@Override
public void onFileDelete(File file) {

}
};

observer.addListener(listener);
monitor.addObserver(observer);
monitor.start();
}

}

In the above code we are using KafkaProducerSingleton which returns a KafkaProducer with configured propertes. Below is the KafkaProducerSingleton class


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;

public class KafkaProducerSingleton {

private KafkaProducerSingleton() {

}

public static final Producer<String, String> producer = new KafkaProducer<String, String>(KafkaProducerProperties.getInstance());

public static Producer<String, String> getInstance() {

return producer;

}

}

KafkaProducerProperties is the class where we are configuring below kafka properties in the code.

bootstrap.servers – A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers

acks – The number of acknowledgments the producer requires the leader to have received before considering a request complete.

retries – Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error

batch.size – The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.

linger.ms – The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together.

buffer.memory – The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.

key.serializer – Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface.

value.serializer – Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.


import java.util.Properties;

public class KafkaProducerProperties {

private KafkaProducerProperties() {

}

public static final Properties props = new Properties();

static {

props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "0");
props.put("retries", 0);
props.put("batch.size", 500000);
props.put("linger.ms", 500);
props.put("buffer.memory", 5000000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
}

public static Properties getInstance() {
return props;
}

}

Let us code the KafkaProducer which pushes the record into the kafka topic process_s3_data.


import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import au.com.bytecode.opencsv.CSVReader;

public class KafkaProducer {

public static void pushMessage(Producer<String, String> producer, File source) throws IOException {

@SuppressWarnings("resource")
CSVReader reader = new CSVReader(new FileReader(source));
String[] nextLine;
String emptyValue = "";
while ((nextLine = reader.readNext()) != null) {

System.out.println(nextLine[0]);
ProducerRecord<String, String> record = new ProducerRecord<String, String>("process_s3_data", nextLine[0],
emptyValue);
producer.send(record);

}

}

}

Let us code spark consumer client that consumes records from a Kafka cluster. In the SparkDstreamConsumer we are consuming the each record from process_s3_data topic which is a folder location in object store and creating a spark job to process the same.


import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import com.blog.driver.SparkSubmitLauncher;
import com.blog.util.ConfigFile;
import com.blog.util.Constants;
import com.blog.util.FileType;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class SparkDstreamConsumer {

public static void main(String[] args) throws InterruptedException {

ConfigFile conf = new ConfigFile(Constants.CONFIG_TRAVEL, FileType.property);
String broker = conf.getString("metadata.broker.list");
SparkConf sparkConf = new SparkConf().setAppName("travel").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaStreamingContext jstream = new JavaStreamingContext(jsc, new Duration(5000));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", broker);
Set<String> topic_name = Collections.singleton("process_s3_data");
JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(jstream, String.class,
byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topic_name);

directKafkaStream.foreachRDD(rdd -> {

for (Tuple2<String, byte[]> t : rdd.collect()) {

SparkSubmitLauncher sparkSubmitLauncher = new SparkSubmitLauncher();
System.out.println( t._1);
sparkSubmitLauncher.lauchSparkJob(new String[] { t._1 });

}

});

jstream.start();
jstream.awaitTermination();

}

}

Below is the SparkSubmitLauncher code which takes object store location as an input and launches a spark job.


import java.io.File;
import java.io.IOException;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;

public class SparkSubmitLauncher {

public void lauchSparkJob(String args[]) {

System.out.println("Inside Spark lauch" + args[0]);

SparkLauncher sparkLauncher = new SparkLauncher()
.setSparkHome("spark_installation-location")
.setAppResource("jar_file_location_spark")
.setMainClass("com.blog.driver.ReadWriteS3Avro").setAppName("SparkLauncherTest" + " " + args[0])
.setMaster("local");

// Enables verbose reporting for SparkSubmit.

sparkLauncher.setVerbose(true);

// Distinguishes where the driver process runs. In "cluster" mode, the framework
// launches the driver inside of the cluster. In "client" mode, the submitter
// launches the driver outside of the cluster.

sparkLauncher.setDeployMode("client");

// Sets a Spark property. Expects key starting with spark. prefix.

sparkLauncher.setConf("spark.executor.memory", "1g");
sparkLauncher.setConf("spark.driver.memory", "1g");

// Sets the working directory of spark-submit

sparkLauncher.directory(new File("C:\\output\\temp"));

// Adds command line arguments for a Spark application

sparkLauncher.addAppArgs(args[0]);

// Redirects output to the specified file

sparkLauncher.redirectOutput(new File("C:\\output\\redirectOut"));

// Redirects error output to the specified file.

sparkLauncher.redirectError(new File("C:\\output\\redirectError"));

try {
// sparkLauncher.launch();

SparkAppHandle appHandle = sparkLauncher.startApplication();

// Represents the application's state. A state can be "final", in which case it
// will not change after it's reached, and means the application is not running
// anymore.

while (appHandle.getState().equals(SparkAppHandle.State.UNKNOWN)) {
try {
System.out.println(appHandle.getState().toString());
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

}

Below is the ReadWriteS3Avro class which reads the data from s3 from the input location which is passed from the sparkLauncher, and aggregates the data using spark dataframe.


import static org.apache.spark.sql.functions.avg;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.max;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class ReadWriteS3Avro {

public static void main(String[] args) {

SparkSession spark = SparkSession.builder().master("local")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("fs.s3a.endpoint", "endpoint_host")
.config("spark.hadoop.fs.s3a.access.key", "access_key")
.config("spark.hadoop.fs.s3a.secret.key", "secret_key")
.config("fs.s3a.connection.ssl.enabled", "false").getOrCreate();

Dataset<Row> dataset3 = spark.read().format("com.databricks.spark.avro").load(args[0]);

dataset3.agg(max(col("rating")), avg(col("rating"))).write()
.csv("output_location" + System.currentTimeMillis());

}

}

Below is the utility classes used in the code


/**
* The Class Constants has a list of constants.
*/
public class Constants {

public static final String CONFIG = "conf.properties";

public static final String CONFIG_TRAVEL = "conf_travel.properties";

public Constants() {
super();
}

}


public enum FileType {

property,script

}


import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;

public class ConfigFile {

private String fileName;

private SequencedProperties properties;

private FileType fileType;

private String fileContent;

private static Logger logger =Logger.getLogger(ConfigFile.class);;

public ConfigFile(String fileName, FileType fileType) {
this.fileName = fileName;
this.properties = new SequencedProperties();
this.fileType = fileType;
loadFile();

}

public Properties getProperties() {
return properties;
}

public void setProperties(SequencedProperties properties) {
this.properties = properties;
}

public FileType getFileType() {
return fileType;
}

public void setFileType(FileType fileType) {
this.fileType = fileType;
}

public String getFileContent() {
return fileContent;
}

public void setFileContent(String fileContent) {
this.fileContent = fileContent;
}

public String getFileName() {
return fileName;
}

public void setFileName(String fileName) {
this.fileName = fileName;
}

private void loadFile() {
InputStream in = getClass().getClassLoader().getResourceAsStream(getFileName());
try {
if (this.getFileType() == FileType.property) {
this.getProperties().load(in);
} else if (this.getFileType() == FileType.script) {
StringWriter writer = new StringWriter();
IOUtils.copy(in, writer);
fileContent = writer.toString();
}
} catch (IOException e) {
logger.error(e.getMessage().toString());
} finally {
try {
in.close();
} catch (IOException e) {
logger.error(e.getMessage().toString());
}
}
}

public Properties getProperty() {
return properties;

}

public String getString(String key) {
return properties.getProperty(key);
}

}


import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;

public class ConfigFile {

private String fileName;

private SequencedProperties properties;

private FileType fileType;

private String fileContent;

private static Logger logger =Logger.getLogger(ConfigFile.class);;

public ConfigFile(String fileName, FileType fileType) {
this.fileName = fileName;
this.properties = new SequencedProperties();
this.fileType = fileType;
loadFile();

}

public Properties getProperties() {
return properties;
}

public void setProperties(SequencedProperties properties) {
this.properties = properties;
}

public FileType getFileType() {
return fileType;
}

public void setFileType(FileType fileType) {
this.fileType = fileType;
}

public String getFileContent() {
return fileContent;
}

public void setFileContent(String fileContent) {
this.fileContent = fileContent;
}

public String getFileName() {
return fileName;
}

public void setFileName(String fileName) {
this.fileName = fileName;
}

private void loadFile() {
InputStream in = getClass().getClassLoader().getResourceAsStream(getFileName());
try {
if (this.getFileType() == FileType.property) {
this.getProperties().load(in);
} else if (this.getFileType() == FileType.script) {
StringWriter writer = new StringWriter();
IOUtils.copy(in, writer);
fileContent = writer.toString();
}
} catch (IOException e) {
logger.error(e.getMessage().toString());
} finally {
try {
in.close();
} catch (IOException e) {
logger.error(e.getMessage().toString());
}
}
}

public Properties getProperty() {
return properties;

}

public String getString(String key) {
return properties.getProperty(key);
}

}

To run the above code create a file with folder location in s3 as below and paste the file into a directory which your monitoring using the DirectoryMonitor class.


s3a://bucketname/input/loc1
s3a://bucketname/input/loc2
s3a://bucketname/input/loc3

Below are the properties from where we are getting the location of a directory which we are monitoring and also the polling frequency.


//Snippet from DirectoryMonitor class

private String sourceDirectory = new ConfigFile(Constants.CONFIG_TRAVEL, FileType.property).getString("producer.source");

private String pollingInterval = new ConfigFile(Constants.CONFIG_TRAVEL, FileType.property).getString("polling.interval.directory");

I have configured the producer.source and the polling.interval.directory properties in a property file as below


producer.source={}\\s3_location\\moniter
polling.interval.directory=2