oozie spark action workflow example

Lets create oozie workflow with spark action for creating a inverted index use case. Inverted index pattern is used to generate an index from a data set to allow for faster searches or data enrichment capabilities.It is often convenient to index large data sets on keywords, so that searches can trace terms back to records that contain specific values. While building an inverted index does require extra processing up front, taking the time to do so can greatly reduce the amount of time it takes to find something. Search engines build indexes to improve search performance. Imagine entering a keyword and letting the engine crawl the Internet and build a list of pages to return to you. Such a query would take an extremely long amount of time to complete. By building an inverted index, the search engine knows all the web pages related to a keyword ahead of time and these results are simply displayed to the user. These indexes are often ingested into a database for fast query responses.

Problem to Solve

Given a employees information documents create a inverted index for department name based on the first name.

Build an inverted index as Name ->Department Name.

we will be creating inverted index as below so that it will be faster to search employee details based on the department.

LETRICH POLICE
DELVALLE STREETS & SAN,POLICE
JOSEPH FIRE,HEALTH,AVIATION,GENERAL SERVICES,STREETS & SAN,OEMC,LAW
BAYLIAN POLICE
ZHEN PUBLIC LIBRARY
KUBIAK POLICE,FIRE,WATER MGMNT

Here is a sample input data attached

employee_info_1

Below is the sample input for reference


First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate

dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51

1. Inverted index code in spark

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class InvertedIndex {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("pattern");

JavaSparkContext jsc = new JavaSparkContext(conf);

JavaRDD<String> rdd = jsc.textFile(args[0]);

JavaPairRDD<String, String> pair = rdd.mapToPair(new PairFunction<String, String, String>() {

@Override
public Tuple2<String, String> call(String value) throws Exception {
String data = value.toString();
String[] field = data.split(",", -1);

return new Tuple2<String, String>(field[0], field[3]);
}
});

JavaPairRDD<String, String> output = pair.reduceByKey(new Function2<String, String, String>() {

@Override
public String call(String arg0, String arg1) throws Exception {

if (!arg0.contains(arg1)) {
arg0 = arg0 +","+ arg1;
}

return arg0;
}
});

output.saveAsTextFile(args[1]);

}

}

Create a jar of the above code and name the jar as spark_test.jar and add the jar inside the lib folder inside the workflow base location.

2.Co-ordinator xml file – coordinator.xml

An Oozie coordinator schedules workflow executions based on a start-time and a frequency parameter, and it starts the workflow when all the necessary input data becomes available. If the input data is not available, the workflow execution is delayed until the input data becomes available. A coordinator is defined by a start and end time, a frequency, input and output data, and a workflow. A coordinator runs periodically from the start time until the end time.


<coordinator-app
name="${coord_name}"
frequency="${coord:days(1)}"
start="${coord_start_time}"
end="${coord_end_time}"
timezone="BST"
xmlns="uri:oozie:coordinator:0.4"
xmlns:sla="uri:oozie:sla:0.2">
<controls>
<timeout>10080</timeout>
</controls>

<action>
<workflow>
<app-path>${wf_workflow_path}</app-path>
<configuration>

<property>
<name>wf_exec_datetime</name>
<value>${coord:nominalTime()}</value>
</property>
<property>
<name>wf_date</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0,'DAY'), "yyyy-MM-dd")}</value>
</property>
<property>
<name>YEAR</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0, 'DAY'), 'yyyy')}</value>
</property>
<property>
<name>MONTH</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0, 'DAY'), 'MM')}</value>
</property>
<property>
<name>DAY</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0, 'DAY'), 'dd')}</value>
</property>
<property>
<name>HOUR</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0, 'DAY'), 'HH')}</value>
</property>
</configuration>
</workflow>
<sla:info>
<sla:nominal-time>${coord:nominalTime()}</sla:nominal-time>
<sla:should-end>${360 * MINUTES}</sla:should-end>
<sla:alert-events>end_miss</sla:alert-events>
<sla:alert-contact>${wf_notification_email}</sla:alert-contact>
<sla:notification-msg>PLease check spark workflow test job for ${coord:nominalTime()} is running properly!</sla:notification-msg>
<sla:upstream-apps>${wf_hadoop_instance}</sla:upstream-apps>
</sla:info>
</action>
</coordinator-app>

3. Oozie workflow xml – workflow.xml

An Oozie workflow is a multistage Hadoop job. A workflow is a collection of action and control nodes arranged in a directed acyclic graph (DAG) that captures control dependency where each action typically is a Hadoop job like a MapReduce, Pig, Hive, Sqoop, or Hadoop DistCp job. There can also be actions that are not Hadoop jobs like a Java application, a shell script, or an email notification. The order of the nodes in the workflow determines the execution order of these actions. An action does not start until the previous action in the workflow ends. Control nodes in a workflow are used to manage the execution flow of actions. The start and end control nodes define the start and end of a workflow. The fork and join control nodes allow executing actions in parallel. The decision control node is like a switch/case statement that can select a particular execution path within the workflow using information from the job itself.


<workflow-app name="${wf_name}" xmlns="uri:oozie:workflow:0.5">

<global>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${wf_hadoop_instance_queue}</value>
</property>
</configuration>
</global>

<credentials>
<credential name="hcat" type="hcat">
<property>
<name>hcat.metastore.uri</name>
<value>thrift://thrift_server</value>
</property>
<property>
<name>hcat.metastore.principal</name>
<value>hcat metastore principal</value>
</property>
</credential>
</credentials>

<start to="sparkTest" />
<action name="sparkTest" cred="hcat">
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>hive-config.xml</job-xml>
<master>yarn</master>
<mode>cluster</mode>
<name>${wf_name}</name>
<class>com.design.pattern.spark.InvertedIndex</class>
<jar>${nameNode}/${sparkJar}</jar>
<spark-opts>--conf spark.yarn.queue=${wf_hadoop_instance_queue}</spark-opts>
<arg>${nameNode}/${InputPath}</arg>
<arg>${nameNode}/${OutputPath}</arg>
</spark>
<ok to="Email_success" />
<error to="Email_failure" />
</action>

<action name="Email_success">
<email xmlns="uri:oozie:email-action:0.1">
<to>${wf_notification_email}</to>
<subject>SUCCESS:spark
</subject>
<body>Hi,
This is
auto-generated email. Please do not reply to this email.

Thanks,
TimePassTechies.com
</body>
</email>
<ok to="End" />
<error to="Kill" />
</action>
<action name="Email_failure">
<email xmlns="uri:oozie:email-action:0.1">
<to>${wf_notification_email}</to>
<subject>FAILURE: Spark
</subject>
<body>Hi,
The has failed.
This is
auto-generated email. Please do not reply to this email.

Thanks,
TimePassTechies.com
</body>
</email>
<ok to="Kill" />
<error to="Kill" />
</action>

<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="End"/>
</workflow-app>

4. coordinator property file to pass configuration – coordinator.properties

The property file where the configuration parameters are passed from.


wf_hadoop_instance=Queue_Name_To_USe
wf_hadoop_instance_queue=Queue_Name_To_USe
nameNode=hdfs://nameservice
jobTracker=jobTracker_To_Use

oozie.coord.application.path=/user/${wf_hadoop_instance}/workflowPath
oozie.use.system.libpath=true
oozie.action.sharelib.for.pig=pig,hcatalog

coord_name=SPARK_WORKFLOW
coord_start_time=2017-12-06T02:00Z
coord_end_time=2027-04-24T10:00Z

wf_name=SPARK_WORKFLOW
wf_workflow_path=/user/${wf_hadoop_instance}/workflowPath
wf_notification_email=mail id seperated by comma for notification
jarfile=spark_test.jar
InputPath=user/${wf_hadoop_instance}/input_path
OutputPath=user/${wf_hadoop_instance}/output_path
sparkJar=/user/${wf_hadoop_instance}/workflowPath/lib/spark_test.jar

5. Sample hive-config.xml file

<?xml version="1.0" encoding="UTF-8"?>

<configuration>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://thrift_server_url:port</value>
</property>
<property>
<name>hive.metastore.client.socket.timeout</name>
<value>300</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.warehouse.subdir.inherit.perms</name>
<value>true</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>-1</value>
</property>
<property>
<name>hive.exec.reducers.bytes.per.reducer</name>
<value>1073741824</value>
</property>
<property>
<name>hive.exec.copyfile.maxsize</name>
<value>33554432</value>
</property>
<property>
<name>hive.exec.reducers.max</name>
<value>999</value>
</property>
<property>
<name>hive.metastore.execute.setugi</name>
<value>true</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<value>zookeeper server list</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>zookeeper client port</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hbase zookeeper server list</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>hbase zookeeper client port</value>
</property>
<property>
<name>hive.zookeeper.namespace</name>
<value>zookeeper namespace</value>
</property>
<property>
<name>hive.cluster.delegation.token.store.class</name>
<value>org.apache.hadoop.hive.thrift.MemoryTokenStore</value>
</property>
<property>
<name>hive.server2.enable.doAs</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.sasl.enabled</name>
<value>true</value>
</property>
<!--'hive.server2.authentication', originally set to 'kerberos' (non-final), is overridden below by a safety valve-->
<property>
<name>hive.metastore.kerberos.principal</name>
<value>kerberos principal</value>
</property>
<property>
<name>hive.server2.authentication.kerberos.principal</name>
<value>kerberos principal</value>
</property>
<property>
<name>hive.server2.authentication</name>
<value>kerberos</value>
</property>
</configuration>

6. running the coordinator job

oozie job -oozie http://oozie_host:port/oozie -dryrun -config coordinator.properties

oozie job -oozie http://oozie_host:port/oozie -config coordinator.properties -run

1 thought on “oozie spark action workflow example”

Comments are closed.