running spark job using the mesosphere rest api

In this article I will illustrate how to run a spark job using the mesosphere rest api . The prerequisite to use this approach is to have running spark service in mesosphere.

For more information on installing spark service on mesosphere dcos use the below link

https://docs.d2iq.com/mesosphere/dcos/services/spark/2.6.0-2.3.2/install/

To submit spark jobs into the spark service installed on mesosphere we have to post a json in the below format


{
"action": "CreateSubmissionRequest",
"appArgs": [
"arg0",
"arg1"
],
"appResource": "https://artifactory-url/jar_file_where_spark_driver_class_is_present.jar",
"clientSparkVersion": "2.3.0",
"environmentVariables": {
"key": "key",
"value": "value"
},
"mainClass": "com.blog.time.pass.Driver",
"sparkProperties": {
"spark.app.name": "test_app_name",
"spark.driver.cores": "2",
"spark.driver.memory": "2G",
"spark.executor.memory": "5G",
"spark.executor.cores": "5",
"spark.cores.max": "15",
"spark.locality.wait": "2s",
"spark.jars": "https://artifactory-url/jar_file_where_spark_driver_class_is_present.jar",
"spark.master": "mesos-ssl://https://mesosphere-edge-url/service/{sandbox_name}/spark",
"spark.mesos.driver.labels": "DCOS_SPACE:/{sandbox_name}/spark",
"spark.mesos.executor.docker.forcePullImage": "true",
"spark.mesos.executor.docker.image": "mesosphere/spark:2.6.0-2.3.2-hadoop-2.7",
"spark.mesos.task.labels": "DCOS_SPACE:/{sandbox_name}/spark",
"spark.ssl.noCertVerification": "true",
"spark.submit.deployMode": "cluster"
}
}

Before we can run the spark job via rest service in mesosphere, we need to add the Authorization header with the token into the request.

To use the token based authentication we have to get the token from edge by posting the below json to the url https://mesosphere-edge-url/acs/api/v1/auth/login


{
"uid":"Your_username",
"password":"your_password"
}

We get the token as a response, which needs to be embedded into the post request to run the spark job in mesosphere.

Lets code the same in java using spring framework .

Note : I am not adding all the required dependent classes here as the focus is to show how to run spark in mesosophere using the rest api.

 

MesosSparkDelagator

This delegator should be called with a RequestObject which will have all the parameters thats required for your program to configure and run.

The Delagator supports two types of authentication basic and token based authentication . The basic authentication abstractes the process of getting the token based on the username and password in our implementation.

SubmitSparkJobToMesos sets limit on the amount of resource you can use for each job, and if the requested resource is more the threshold limit the maximum values will be used instead of the values passed by the user. The threshold values is taken from a property file MESOSPHERE_CONFIG.


@Component
public class MesosSparkDelagator extends InitSparkProperties {

private static final Logger LOGGER = Logger.getLogger(MesosSparkDelagator.class);

@Autowired
private ModelToJsonMapping modelToJsonMapping;

@Autowired
@Qualifier("sparkBasicAuthRestClient")
private RestClient<CreateSubmissionRequest> sparkSubmissionrestClient;

@Autowired
@Qualifier("sparkTokenAuthRestClient")
private RestClient<CreateSubmissionRequest> sparkTokenBasedSubmissionrestClient;

public RestClient<CreateSubmissionRequest> getSparkTokenBasedSubmissionrestClient() {
return sparkTokenBasedSubmissionrestClient;
}

public void setSparkTokenBasedSubmissionrestClient(
RestClient<CreateSubmissionRequest> sparkTokenBasedSubmissionrestClient) {
this.sparkTokenBasedSubmissionrestClient = sparkTokenBasedSubmissionrestClient;
}

public RestClient<CreateSubmissionRequest> getSparkSubmissionrestClient() {
return sparkSubmissionrestClient;
}

public void setSparkSubmissionrestClient(RestClient<CreateSubmissionRequest> sparkSubmissionrestClient) {
this.sparkSubmissionrestClient = sparkSubmissionrestClient;
}

public ModelToJsonMapping getModelToJsonMapping() {
return modelToJsonMapping;
}

public void setModelToJsonMapping(ModelToJsonMapping modelToJsonMapping) {
this.modelToJsonMapping = modelToJsonMapping;
}

public void submitSparkJobToMesos(RequestObject requestObject) {

LOGGER.info("submitting the spark job to mesos");
ConfigFile mesos_config = new ConfigFile(Constants.MESOSPHERE_CONFIG, FileType.property);
CreateSubmissionRequest createSubmissionRequest = createSubmitProperties(requestObject);
createSubmissionRequest.setAppArgs(Arrays.asList(requestObject.getAppArgs()));
createSubmissionRequest.setAppResource(requestObject.getAppResource());
createSubmissionRequest.setMainClass(requestObject.getDriverClass());
SparkProperties sparkProperties = createSubmissionRequest.getSparkProperties();
sparkProperties.setSpark_app_name(requestObject.getAppName());
sparkProperties.setSpark_jars(mesos_config.getString(requestObject.getAppResource()));

String max_driver_cores = mesos_config.getString("spark.driver.cores");
String max_driver_memory = mesos_config.getString("spark.driver.memory");
String max_executor_memory = mesos_config.getString("spark.executor.memory");
String max_executor_cores = mesos_config.getString("spark.executor.cores");
String max_cores_per_job = mesos_config.getString("spark.cores.max");

String request_driver_cores = Util.isNullOrEmpty(requestObject.getSparkProperties().getSpark_driver_cores())
? max_driver_cores
: requestObject.getSparkProperties().getSpark_driver_cores();
String request_driver_memory = Util.isNullOrEmpty(requestObject.getSparkProperties().getSpark_driver_memory())
? max_driver_memory
: requestObject.getSparkProperties().getSpark_driver_memory();
String request_executor_memory = Util
.isNullOrEmpty(requestObject.getSparkProperties().getSpark_executor_memory()) ? max_executor_memory
: requestObject.getSparkProperties().getSpark_executor_memory();
String request_executor_cores = Util.isNullOrEmpty(requestObject.getSparkProperties().getSpark_executor_cores())
? max_executor_cores
: requestObject.getSparkProperties().getSpark_executor_cores();

String request_max_cores_per_job = Util.isNullOrEmpty(requestObject.getSparkProperties().getSpark_cores_max())
? max_cores_per_job
: requestObject.getSparkProperties().getSpark_cores_max();

sparkProperties.setSpark_driver_cores(request_driver_cores);
sparkProperties.setSpark_driver_memory(request_driver_memory);
sparkProperties.setSpark_executor_memory(request_executor_memory);
sparkProperties.setSpark_executor_cores(request_executor_cores);
sparkProperties.setSpark_cores_max(request_max_cores_per_job);
sparkProperties.setSpark_app_name(requestObject.getAppName());
sparkProperties.setSpark_jars(requestObject.getAppResource());
sparkProperties.setSpark_locality_wait(mesos_config.getString("spark.locality.wait"));
createSubmissionRequest.setSparkProperties(sparkProperties);
String createSubmissionRequestJson = getModelToJsonMapping().getJson(createSubmissionRequest);
LOGGER.info("Posting json to mesos: " + createSubmissionRequestJson);
String endpoint = Util.getSparkServiceEndpointUrl(requestObject);

if (requestObject.getAuthType().equals(AuthType.BASIC)) {
getSparkSubmissionrestClient().postRequest(createSubmissionRequestJson, requestObject, endpoint);
} else {
getSparkTokenBasedSubmissionrestClient().postRequest(createSubmissionRequestJson, requestObject, endpoint);
}

}

}

InitSparkProperties intializes some of the spark properties needed to submit the job into mesosphere.


public class InitSparkProperties {

public CreateSubmissionRequest createSubmitProperties(RequestObject requestObject) {
ConfigFile spark_mesos_config = new ConfigFile(Constants.MESOSPHERE_CONFIG, FileType.property);
String serviceName = requestObject.getSparkServiceName();
String mesosphere_url=requestObject.getEdgeUrl();
CreateSubmissionRequest createSubmissionRequest = new CreateSubmissionRequest();
createSubmissionRequest.setAction("CreateSubmissionRequest");
createSubmissionRequest.setClientSparkVersion(
requestObject.getClientSparkVersion() == null ? spark_mesos_config.getString("spark.default.version")
: requestObject.getClientSparkVersion());
createSubmissionRequest.setEnvironmentVariables(new EnvironmentVariables("key", "value"));
SparkProperties sparkProperties = new SparkProperties();
sparkProperties.setSpark_app_name(requestObject.getAppName());
sparkProperties.setSpark_master(Constants.SPARK_MASTER_PREFIX+mesosphere_url.replaceAll(Constants.HTTPS_PROTOCOL,Constants.EMPTY_STRING)+Constants.SERVICE_URL_BUILDER_STRING+serviceName);
sparkProperties.setSpark_mesos_driver_labels(Constants.DCOS_SPACE + serviceName);
sparkProperties.setSpark_mesos_executor_docker_forcePullImage(spark_mesos_config.getString("spark.mesos.executor.docker.forcePullImage"));
sparkProperties.setSpark_mesos_executor_docker_image(spark_mesos_config.getString("spark.mesos.executor.docker.image"));
sparkProperties.setSpark_mesos_task_labels(Constants.DCOS_SPACE + serviceName);
sparkProperties.setSpark_ssl_noCertVerification("true");
sparkProperties.setSpark_submit_deployMode(spark_mesos_config.getString("spark.submit.deployMode"));
createSubmissionRequest.setSparkProperties(sparkProperties);
return createSubmissionRequest;

}

}

Below are some of the model class used in the above code


public class CreateSubmissionRequest implements Model,Serializable {

private static final long serialVersionUID = 43434;
public String action;
public List<String> appArgs;
public String appResource;
public String clientSparkVersion;
public EnvironmentVariables environmentVariables;
public String mainClass;
public SparkProperties sparkProperties;

public CreateSubmissionRequest() {
super();

}

public CreateSubmissionRequest(String action, List<String> appArgs, String appResource, String clientSparkVersion,
EnvironmentVariables environmentVariables, String mainClass, SparkProperties sparkProperties) {
super();
this.action = action;
this.appArgs = appArgs;
this.appResource = appResource;
this.clientSparkVersion = clientSparkVersion;
this.environmentVariables = environmentVariables;
this.mainClass = mainClass;
this.sparkProperties = sparkProperties;
}

public String getAction() {
return action;
}

public void setAction(String action) {
this.action = action;
}

public String getAppResource() {
return appResource;
}

public void setAppResource(String appResource) {
this.appResource = appResource;
}

public List<String> getAppArgs() {
return appArgs;
}

public void setAppArgs(List<String> appArgs) {
this.appArgs = appArgs;
}

public String getClientSparkVersion() {
return clientSparkVersion;
}

public void setClientSparkVersion(String clientSparkVersion) {
this.clientSparkVersion = clientSparkVersion;
}

public EnvironmentVariables getEnvironmentVariables() {
return environmentVariables;
}

public void setEnvironmentVariables(EnvironmentVariables environmentVariables) {
this.environmentVariables = environmentVariables;
}

public String getMainClass() {
return mainClass;
}

public void setMainClass(String mainClass) {
this.mainClass = mainClass;
}

public SparkProperties getSparkProperties() {
return sparkProperties;
}

public void setSparkProperties(SparkProperties sparkProperties) {
this.sparkProperties = sparkProperties;
}

}


public class SparkProperties implements Model {

public String spark_app_name;
public String spark_driver_cores;
public String spark_driver_memory;
public String spark_executor_memory;
public String spark_executor_cores;
public String spark_cores_max;
public String spark_locality_wait;
public String spark_jars;
public String spark_master;
public String spark_mesos_driver_labels;
public String spark_mesos_executor_docker_forcePullImage;
public String spark_mesos_executor_docker_image;
public String spark_mesos_task_labels;
public String spark_ssl_noCertVerification;
public String spark_submit_deployMode;

@JsonProperty("spark.executor.cores")
public String getSpark_executor_cores() {
return spark_executor_cores;
}

public void setSpark_executor_cores(String spark_executor_cores) {
this.spark_executor_cores = spark_executor_cores;
}

@JsonProperty("spark.locality.wait")
public String getSpark_locality_wait() {
return spark_locality_wait;
}

public void setSpark_locality_wait(String spark_locality_wait) {
this.spark_locality_wait = spark_locality_wait;
}

@JsonProperty("spark.cores.max")
public String getSpark_cores_max() {
return spark_cores_max;
}

public void setSpark_cores_max(String spark_cores_max) {
this.spark_cores_max = spark_cores_max;
}

@JsonProperty("spark.app.name")
public String getSpark_app_name() {
return spark_app_name;
}

public void setSpark_app_name(String spark_app_name) {
this.spark_app_name = spark_app_name;
}

@JsonProperty("spark.driver.cores")
public String getSpark_driver_cores() {
return spark_driver_cores;
}

public void setSpark_driver_cores(String spark_driver_cores) {
this.spark_driver_cores = spark_driver_cores;
}

@JsonProperty("spark.driver.memory")
public String getSpark_driver_memory() {
return spark_driver_memory;
}

public void setSpark_driver_memory(String spark_driver_memory) {
this.spark_driver_memory = spark_driver_memory;
}

@JsonProperty("spark.executor.memory")
public String getSpark_executor_memory() {
return spark_executor_memory;
}

public void setSpark_executor_memory(String spark_executor_memory) {
this.spark_executor_memory = spark_executor_memory;
}

@JsonProperty("spark.jars")
public String getSpark_jars() {
return spark_jars;
}

public void setSpark_jars(String spark_jars) {
this.spark_jars = spark_jars;
}

@JsonProperty("spark.master")
public String getSpark_master() {
return spark_master;
}

public void setSpark_master(String spark_master) {
this.spark_master = spark_master;
}

@JsonProperty("spark.mesos.driver.labels")
public String getSpark_mesos_driver_labels() {
return spark_mesos_driver_labels;
}

public void setSpark_mesos_driver_labels(String spark_mesos_driver_labels) {
this.spark_mesos_driver_labels = spark_mesos_driver_labels;
}

@JsonProperty("spark.mesos.executor.docker.forcePullImage")
public String getSpark_mesos_executor_docker_forcePullImage() {
return spark_mesos_executor_docker_forcePullImage;
}

public void setSpark_mesos_executor_docker_forcePullImage(String spark_mesos_executor_docker_forcePullImage) {
this.spark_mesos_executor_docker_forcePullImage = spark_mesos_executor_docker_forcePullImage;
}

@JsonProperty("spark.mesos.executor.docker.image")
public String getSpark_mesos_executor_docker_image() {
return spark_mesos_executor_docker_image;
}

public void setSpark_mesos_executor_docker_image(String spark_mesos_executor_docker_image) {
this.spark_mesos_executor_docker_image = spark_mesos_executor_docker_image;
}

@JsonProperty("spark.mesos.task.labels")
public String getSpark_mesos_task_labels() {
return spark_mesos_task_labels;
}

public void setSpark_mesos_task_labels(String spark_mesos_task_labels) {
this.spark_mesos_task_labels = spark_mesos_task_labels;
}

@JsonProperty("spark.ssl.noCertVerification")
public String getSpark_ssl_noCertVerification() {
return spark_ssl_noCertVerification;
}

public void setSpark_ssl_noCertVerification(String spark_ssl_noCertVerification) {
this.spark_ssl_noCertVerification = spark_ssl_noCertVerification;
}

@JsonProperty("spark.submit.deployMode")
public String getSpark_submit_deployMode() {
return spark_submit_deployMode;
}

public void setSpark_submit_deployMode(String spark_submit_deployMode) {
this.spark_submit_deployMode = spark_submit_deployMode;
}

}


public class EnvironmentVariables {

public String key;
public String value;

public EnvironmentVariables(String key, String value) {
super();
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}

}

Let’s build a simple rest client to post the request to spark service in mesosphere


public abstract class RestClient<T extends Model> {

private static final Logger LOGGER = Logger.getLogger(RestClient.class);

public RestClient() {
super();
}

public abstract HttpURLConnection enrichPostRequest(HttpURLConnection postConnection,RequestObject requestObject);

public String getRequest(RequestObject requestObject) {

StringBuilder response = null;
try {
URL obj = new URL(Util.getSparkServiceEndpointUrl(requestObject));
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
con.setRequestMethod("GET");
int responseCode = con.getResponseCode();
LOGGER.info("GET Response Code :: " + responseCode);

if (responseCode == HttpURLConnection.HTTP_OK) { // success
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
String inputLine;
response = new StringBuilder();

while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
}
in.close();
LOGGER.info(response.toString());
} else {
LOGGER.error("GET request not worked");
response = new StringBuilder(String.valueOf(responseCode));
}
} catch (IOException e) {
LOGGER.error(e.getLocalizedMessage());
}
return response.toString();

}

public String postRequest(String json, RequestObject requestObject,String endpoint) {

StringBuffer response = null;

try {
URL obj = new URL(endpoint);
HttpURLConnection postConnection = (HttpURLConnection) obj.openConnection();
postConnection.setRequestMethod("POST");
postConnection.setRequestProperty("Content-Type", "application/json");
postConnection.setDoOutput(true);
this.enrichPostRequest(postConnection,requestObject);
OutputStream os = postConnection.getOutputStream();
os.write(json.getBytes());
os.flush();
os.close();
int responseCode = postConnection.getResponseCode();
LOGGER.info("POST Response Code : " + responseCode);
if (responseCode == HttpURLConnection.HTTP_OK) { // success
BufferedReader in = new BufferedReader(new InputStreamReader(postConnection.getInputStream()));
String inputLine;
response = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
}
in.close();
// LOGGER.info(response.toString());
} else {
LOGGER.info("Post to endpoint is not succesfull :" + Util.getSparkServiceEndpointUrl(requestObject));
}
} catch (IOException e) {
LOGGER.error(e.getLocalizedMessage());
}
return response.toString();
}

}


@Component
public class SparkTokenAuthRestClient extends RestClient<CreateSubmissionRequest> {

private static final Logger LOGGER = Logger.getLogger(SparkTokenAuthRestClient.class);

@Override
public HttpURLConnection enrichPostRequest(HttpURLConnection postConnection,RequestObject requestObject) {

LOGGER.info("Enriching post request");
postConnection.setRequestProperty("Authorization","token="+requestObject.getToken());
return postConnection;

}

}


@Component
public class SparkBasicAuthRestClient extends RestClient<CreateSubmissionRequest>{

private static final Logger LOGGER = Logger.getLogger(SparkBasicAuthRestClient.class);

@Autowired
public MesosAuthBuilder mesosAuthBuilder;

@Override
public HttpURLConnection enrichPostRequest(HttpURLConnection postConnection,RequestObject requestObject) {
LOGGER.info("Adding authorization header");
String token=mesosAuthBuilder.getToken(requestObject);
System.out.println(token);
postConnection.setRequestProperty("Authorization","token="+token);
return postConnection;
}

}