Lets create oozie workflow with spark action for creating a inverted index use case. Inverted index pattern is used to generate an index from a data set to allow for faster searches or data enrichment capabilities.It is often convenient to index large data sets on keywords, so that searches can trace terms back to records that contain specific values. While building an inverted index does require extra processing up front, taking the time to do so can greatly reduce the amount of time it takes to find something. Search engines build indexes to improve search performance. Imagine entering a keyword and letting the engine crawl the Internet and build a list of pages to return to you. Such a query would take an extremely long amount of time to complete. By building an inverted index, the search engine knows all the web pages related to a keyword ahead of time and these results are simply displayed to the user. These indexes are often ingested into a database for fast query responses.
Problem to Solve
Given a employees information documents create a inverted index for department name based on the first name.
Build an inverted index as Name ->Department Name.
we will be creating inverted index as below so that it will be faster to search employee details based on the department.
LETRICH POLICE
DELVALLE STREETS & SAN,POLICE
JOSEPH FIRE,HEALTH,AVIATION,GENERAL SERVICES,STREETS & SAN,OEMC,LAW
BAYLIAN POLICE
ZHEN PUBLIC LIBRARY
KUBIAK POLICE,FIRE,WATER MGMNT
Here is a sample input data attached
Below is the sample input for reference
First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00, edwards,tim p,lieutenant,fire,f,salary,,114846.00, elkins,eric j,sergeant,police,f,salary,,104628.00, estrada,luis f,police officer,police,f,salary,,96060.00, ewing,marie a,clerk iii,police,f,salary,,53076.00, finn,sean p,firefighter,fire,f,salary,,87006.00, fitch,jordan m,law clerk,law,f,hourly,35,,14.51
1. Inverted index code in spark
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class InvertedIndex { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("pattern"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> rdd = jsc.textFile(args[0]); JavaPairRDD<String, String> pair = rdd.mapToPair(new PairFunction<String, String, String>() { @Override public Tuple2<String, String> call(String value) throws Exception { String data = value.toString(); String[] field = data.split(",", -1); return new Tuple2<String, String>(field[0], field[3]); } }); JavaPairRDD<String, String> output = pair.reduceByKey(new Function2<String, String, String>() { @Override public String call(String arg0, String arg1) throws Exception { if (!arg0.contains(arg1)) { arg0 = arg0 +","+ arg1; } return arg0; } }); output.saveAsTextFile(args[1]); } }
Create a jar of the above code and name the jar as spark_test.jar and add the jar inside the lib folder inside the workflow base location.
2.Co-ordinator xml file – coordinator.xml
An Oozie coordinator schedules workflow executions based on a start-time and a frequency parameter, and it starts the workflow when all the necessary input data becomes available. If the input data is not available, the workflow execution is delayed until the input data becomes available. A coordinator is defined by a start and end time, a frequency, input and output data, and a workflow. A coordinator runs periodically from the start time until the end time.
<coordinator-app name="${coord_name}" frequency="${coord:days(1)}" start="${coord_start_time}" end="${coord_end_time}" timezone="BST" xmlns="uri:oozie:coordinator:0.4" xmlns:sla="uri:oozie:sla:0.2"> <controls> <timeout>10080</timeout> </controls> <action> <workflow> <app-path>${wf_workflow_path}</app-path> <configuration> <property> <name>wf_exec_datetime</name> <value>${coord:nominalTime()}</value> </property> <property> <name>wf_date</name> <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0,'DAY'), "yyyy-MM-dd")}</value> </property> <property> <name>YEAR</name> <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0, 'DAY'), 'yyyy')}</value> </property> <property> <name>MONTH</name> <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0, 'DAY'), 'MM')}</value> </property> <property> <name>DAY</name> <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0, 'DAY'), 'dd')}</value> </property> <property> <name>HOUR</name> <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0, 'DAY'), 'HH')}</value> </property> </configuration> </workflow> <sla:info> <sla:nominal-time>${coord:nominalTime()}</sla:nominal-time> <sla:should-end>${360 * MINUTES}</sla:should-end> <sla:alert-events>end_miss</sla:alert-events> <sla:alert-contact>${wf_notification_email}</sla:alert-contact> <sla:notification-msg>PLease check spark workflow test job for ${coord:nominalTime()} is running properly!</sla:notification-msg> <sla:upstream-apps>${wf_hadoop_instance}</sla:upstream-apps> </sla:info> </action> </coordinator-app>
3. Oozie workflow xml – workflow.xml
An Oozie workflow is a multistage Hadoop job. A workflow is a collection of action and control nodes arranged in a directed acyclic graph (DAG) that captures control dependency where each action typically is a Hadoop job like a MapReduce, Pig, Hive, Sqoop, or Hadoop DistCp job. There can also be actions that are not Hadoop jobs like a Java application, a shell script, or an email notification. The order of the nodes in the workflow determines the execution order of these actions. An action does not start until the previous action in the workflow ends. Control nodes in a workflow are used to manage the execution flow of actions. The start and end control nodes define the start and end of a workflow. The fork and join control nodes allow executing actions in parallel. The decision control node is like a switch/case statement that can select a particular execution path within the workflow using information from the job itself.
<workflow-app name="${wf_name}" xmlns="uri:oozie:workflow:0.5"> <global> <configuration> <property> <name>mapreduce.job.queuename</name> <value>${wf_hadoop_instance_queue}</value> </property> </configuration> </global> <credentials> <credential name="hcat" type="hcat"> <property> <name>hcat.metastore.uri</name> <value>thrift://thrift_server</value> </property> <property> <name>hcat.metastore.principal</name> <value>hcat metastore principal</value> </property> </credential> </credentials> <start to="sparkTest" /> <action name="sparkTest" cred="hcat"> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <job-xml>hive-config.xml</job-xml> <master>yarn</master> <mode>cluster</mode> <name>${wf_name}</name> <class>com.design.pattern.spark.InvertedIndex</class> <jar>${nameNode}/${sparkJar}</jar> <spark-opts>--conf spark.yarn.queue=${wf_hadoop_instance_queue}</spark-opts> <arg>${nameNode}/${InputPath}</arg> <arg>${nameNode}/${OutputPath}</arg> </spark> <ok to="Email_success" /> <error to="Email_failure" /> </action> <action name="Email_success"> <email xmlns="uri:oozie:email-action:0.1"> <to>${wf_notification_email}</to> <subject>SUCCESS:spark </subject> <body>Hi, This is auto-generated email. Please do not reply to this email. Thanks, TimePassTechies.com </body> </email> <ok to="End" /> <error to="Kill" /> </action> <action name="Email_failure"> <email xmlns="uri:oozie:email-action:0.1"> <to>${wf_notification_email}</to> <subject>FAILURE: Spark </subject> <body>Hi, The has failed. This is auto-generated email. Please do not reply to this email. Thanks, TimePassTechies.com </body> </email> <ok to="Kill" /> <error to="Kill" /> </action> <kill name="Kill"> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="End"/> </workflow-app>
4. coordinator property file to pass configuration – coordinator.properties
The property file where the configuration parameters are passed from.
wf_hadoop_instance=Queue_Name_To_USe wf_hadoop_instance_queue=Queue_Name_To_USe nameNode=hdfs://nameservice jobTracker=jobTracker_To_Use oozie.coord.application.path=/user/${wf_hadoop_instance}/workflowPath oozie.use.system.libpath=true oozie.action.sharelib.for.pig=pig,hcatalog coord_name=SPARK_WORKFLOW coord_start_time=2017-12-06T02:00Z coord_end_time=2027-04-24T10:00Z wf_name=SPARK_WORKFLOW wf_workflow_path=/user/${wf_hadoop_instance}/workflowPath wf_notification_email=mail id seperated by comma for notification jarfile=spark_test.jar InputPath=user/${wf_hadoop_instance}/input_path OutputPath=user/${wf_hadoop_instance}/output_path sparkJar=/user/${wf_hadoop_instance}/workflowPath/lib/spark_test.jar
5. Sample hive-config.xml file
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property> <name>hive.metastore.local</name> <value>false</value> </property> <property> <name>hive.metastore.uris</name> <value>thrift://thrift_server_url:port</value> </property> <property> <name>hive.metastore.client.socket.timeout</name> <value>300</value> </property> <property> <name>hive.metastore.warehouse.dir</name> <value>/user/hive/warehouse</value> </property> <property> <name>hive.warehouse.subdir.inherit.perms</name> <value>true</value> </property> <property> <name>mapred.reduce.tasks</name> <value>-1</value> </property> <property> <name>hive.exec.reducers.bytes.per.reducer</name> <value>1073741824</value> </property> <property> <name>hive.exec.copyfile.maxsize</name> <value>33554432</value> </property> <property> <name>hive.exec.reducers.max</name> <value>999</value> </property> <property> <name>hive.metastore.execute.setugi</name> <value>true</value> </property> <property> <name>hive.support.concurrency</name> <value>true</value> </property> <property> <name>hive.zookeeper.quorum</name> <value>zookeeper server list</value> </property> <property> <name>hive.zookeeper.client.port</name> <value>zookeeper client port</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>hbase zookeeper server list</value> </property> <property> <name>hbase.zookeeper.property.clientPort</name> <value>hbase zookeeper client port</value> </property> <property> <name>hive.zookeeper.namespace</name> <value>zookeeper namespace</value> </property> <property> <name>hive.cluster.delegation.token.store.class</name> <value>org.apache.hadoop.hive.thrift.MemoryTokenStore</value> </property> <property> <name>hive.server2.enable.doAs</name> <value>false</value> </property> <property> <name>hive.metastore.sasl.enabled</name> <value>true</value> </property> <!--'hive.server2.authentication', originally set to 'kerberos' (non-final), is overridden below by a safety valve--> <property> <name>hive.metastore.kerberos.principal</name> <value>kerberos principal</value> </property> <property> <name>hive.server2.authentication.kerberos.principal</name> <value>kerberos principal</value> </property> <property> <name>hive.server2.authentication</name> <value>kerberos</value> </property> </configuration>
6. running the coordinator job
oozie job -oozie http://oozie_host:port/oozie -dryrun -config coordinator.properties oozie job -oozie http://oozie_host:port/oozie -config coordinator.properties -run
update me when new concepts are posted