oozie workflow example for java action with end to end configuration

Java action is a great way to run custom Java code on the Hadoop cluster. The Java action will execute the public static void main(String[] args) method of the specified Java main class. It is technically considered a non-Hadoop action. This action runs as a single mapper job, which means it will run on an arbitrary Hadoop worker node.

While it is not recommended, Java action can be used to run Hadoop MapReduce jobs because MapReduce jobs are nothing but Java programs after all. The main class invoked can be a Hadoop MapReduce driver and can call Hadoop APIs to run a Map-Reduce job. In that mode, Hadoop spawns more mappers and reducers as required and runs them on the cluster. The reason this approach is not ideal is because Oozie does not know about or manage the MapReduce job spawned by the Java action, whereas it does manage the job run by the <map-reduce> action we saw in the previous section.

There are distinct advantages to being tightly integrated as a <mapreduce> action in Oozie instead of being just another Java program as Oozie knows that the <map-reduce> action runs a Hadoop job, it provides easy access to Hadoop counters for this job and It is a lot harder to save and access the counters of a Hadoop job if it is invoked as a <java> action and also the launcher map task that launches the <map-reduce> action completes immediately and Oozie directly manages the MapReduce job. This frees up a Hadoop slot for a MapReduce task that would have otherwise been occupied by the launcher task in the case of a <java> action.

The Java action is made up of the following elements:

1. job-tracker (required)
2. name-node (required)
3. prepare
4. configuration
5. main-class (required)
6. java-opts
7. arg
8. file
9. archive
10.capture-output

The action needs to know the JobTracker and the NameNode of the underlying Hadoop cluster where Oozie has to run the java action.The <prepare> section is optional and is typically used as a preprocessor to delete output directories or HCatalog table partitions or to create some directories required for the action. This delete helps make the action repeatable and enables retries after failure.

Oozie also supports the <file> and <archive> elements for actions that need them. This is the native, Hadoop way of packaging libraries, archives, scripts, and other data files that jobs need, and Oozie provides the syntax to handle them.

The key driver for this action is the Java main class to be run plus any arguments and/or JVM options it requires. This is captured in the <main-class>, <arg>, and <java-opts> elements, respectively. Each <arg> element corresponds to one argument and will be passed in the same order, as specified in the workflow XML to the main class by Oozie.The <capture-output> element, if present, can be used to pass the output back to the Oozie context.

Lets use the java action for house keeping purpose

1. Java class

A simple java class for house keeping files in hdfs and hive . Omitting the dependent code for simplicity purpose. Create a jar of the below mapper class using ant or maven and add inside a folder called as lib.


public class CleanUp {

public static void main(String[] args) throws Exception {

/*
* <arg>Namenode loc</arg> <arg>File location</arg> <arg>retention
* period</arg> <arg>hiveorfile</arg>
*/

HdfsFile hdfsFile = new HdfsFile();
final String date = Util.getDate();

List<String> files = new ArrayList<String>();
List<String> scractFiles = new ArrayList<String>();

if (HouseKeepingType.file.toString().equals(args[3])) {
HaasHdfsFileFilter haasHdfsFileFilter = new HaasHdfsFileFilter(date, Integer.parseInt(args[2]));
files = hdfsFile.filterDirectories(haasHdfsFileFilter, args[1], args[0]);

} else if (HouseKeepingType.hive.toString().equals(args[3])) {
System.out.println("hive type selected");
HiveHdfsFilter hiveHdfsFilter = new HiveHdfsFilter(date, Integer.parseInt(args[2]));
files = hdfsFile.filterDirectories(hiveHdfsFilter, args[1], args[0]);
System.out.println("deleting scratch hive directories");
HiveScratchFilter hiveScratchFilter=new HiveScratchFilter();
scractFiles=hdfsFile.filterScratchDir(hiveScratchFilter,args[1], args[0]);
hdfsFile.deleteScratchFiles(scractFiles, args[0]);

} else {
throw new Exception("Invalid house keeping type specified");
}

if (null != files && files.size() > 2) {

throw new Exception("More directory than the max allowed. Please check the configuration");

} else {

hdfsFile.deleteFiles(files, args[0],2);

}

}

 

2.Co-ordinator xml file – coordinator.xml

An Oozie coordinator schedules workflow executions based on a start-time and a frequency parameter, and it starts the workflow when all the necessary input data becomes available. If the input data is not available, the workflow execution is delayed until the input data becomes available. A coordinator is defined by a start and end time, a frequency, input and output data, and a workflow. A coordinator runs periodically from the start time until the end time.

Beginning at the start time, the coordinator job checks if the required input data is available. When the input data becomes available, a workflow is started to process the input data, which on completion, produces the corresponding output data. This process is repeated at every tick of the frequency until the end time of the coordinator job. If the input data is not available for a workflow run, the execution of the workflow job will be delayed until the input data becomes available. Normally, both the input and output data used for a workflow execution are aligned with the coordinator time frequency.

[xml]

<coordinator-app
name=”${coord_name}”
frequency=”${coord:days(1)}”
start=”${coord_start_time}”
end=”${coord_end_time}”
timezone=”IST”
xmlns=”uri:oozie:coordinator:0.4″
xmlns:sla=”uri:oozie:sla:0.2″>
<action>
<workflow>
<app-path>${wf_workflow_path}</app-path>
<configuration>
<property>
<name>wf_exec_datetime</name>
<value>${coord:nominalTime()}</value>
</property>
</configuration>
</workflow>
<sla:info>
<sla:nominal-time>${coord:nominalTime()}</sla:nominal-time>
<sla:should-end>${60 * MINUTES}</sla:should-end>
<sla:alert-events>end_miss</sla:alert-events>
<sla:alert-contact>${wf_notification_email}</sla:alert-contact>
<sla:notification-msg>please check if HOUSE_KEEPING job for ${coord:nominalTime()} is running properly!</sla:notification-msg>
<sla:upstream-apps>${wf_hadoop_instance}</sla:upstream-apps>
</sla:info>
</action>
</coordinator-app>

 

[/xml]

3. Oozie workflow xml – workflow.xml

An Oozie workflow is a multistage Hadoop job. A workflow is a collection of action and control nodes arranged in a directed acyclic graph (DAG) that captures control dependency where each action typically is a Hadoop job like a MapReduce, Pig, Hive, Sqoop, or Hadoop DistCp job. There can also be actions that are not Hadoop jobs like a Java application, a shell script, or an email notification. The order of the nodes in the workflow determines the execution order of these actions. An action does not start until the previous action in the workflow ends. Control nodes in a workflow are used to manage the execution flow of actions. The start and end control nodes define the start and end of a workflow. The fork and join control nodes allow executing actions in parallel. The decision control node is like a switch/case statement that can select a particular execution path within the workflow using information from the job itself.

The below workflow has two java action one to delete the files from hdfs and one to delete the files from the hive tables. We also have email action to send notification to the support team to know whether the job is successful or not on day to day basis.

[xml]

<workflow-app name=”${wf_name}” xmlns=”uri:oozie:workflow:0.5″>
<global>
<job-xml>hive-config.xml</job-xml>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${wf_hadoop_instance}</value>
</property>
</configuration>
</global>

<start to=”purge-files”/>

<action name=”purge-files”>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.haas.CleanUp</main-class>
<arg>${nameNode}</arg>
<arg>${nameNode}/user/${wf_hadoop_instance}/data/file</arg>
<arg>${wf_file_retention_period_in_days}</arg>
<arg>file</arg>
<file>lib/cleanup.jar#cleanup.jar</file>
</java>
<ok to=”purge-hive-files”/>
<error to=”failed-notification-email”/>
</action>

<action name=”purge-hive-files”>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.haas.CleanUp</main-class>
<arg>${nameNode}</arg>
<arg>${nameNode}/user/${wf_hadoop_instance}/hive/testTable</arg>
<arg>${wf_db_file_retention_period_in_days}</arg>
<arg>hive</arg>
<file>lib/cleanup.jar#cleanup.jar</file>
</java>
<ok to=”success-notification-email”/>
<error to=”failed-notification-email”/>
</action>

<action name=”success-notification-email”>
<email xmlns=”uri:oozie:email-action:0.1″>
<to>${wf_notification_email}</to>
<subject>SUCCESS : [${wf_hadoop_instance}] ${wf:name()} is successful for Run Date Time :- ${wf_exec_datetime}</subject>
<body>Hi,

This is auto-generated email. Please do not reply to this email.
</body>
</email>
<ok to=”End”/>
<error to=”failed-notification-email”/>
</action>

<action name=”failed-notification-email”>
<email xmlns=”uri:oozie:email-action:0.1″>
<to>${wf_notification_email}</to>
<subject>FAIL: [${wf_hadoop_instance}] ${wf:name()} is failed for Run Date Time :- ${wf_exec_datetime}</subject>
<body>Hi,

Workflow : ${wf:name()}
Failed Node : ${wf:lastErrorNode()}
Error Message : ${wf:errorMessage(wf:lastErrorNode())}
The workflow ${wf:name()} is succefully completed its run. Run Date Time :- ${wf_exec_datetime}
This is auto-generated email. Please do not reply to this email.
</body>
</email>
<ok to=”Kill”/>
<error to=”Kill”/>
</action>
<kill name=”Kill”>
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name=”End”/>
</workflow-app>

[/xml]

4. coordinator property file to pass configuration – coordinator.properties

Finally the property file where the configuration parameters are passed from.


wf_hadoop_instance=HAAS_QUEUE
nameNode=hdfs://nameservice
jobTracker=yarnRM

oozie.coord.application.path=/user/${wf_hadoop_instance}/workflows/house_keeping
oozie.use.system.libpath=true
oozie.action.sharelib.for.pig=pig,hcatalog

coord_name=DAILY_HOUSE_KEEPING
coord_start_time=2016-25-30T00:00Z
coord_end_time=2027-02-04T00:00Z

wf_name=DAILY_HOUSE_KEEPING
wf_workflow_path=/user/${wf_hadoop_instance}/workflows/house_keeping
[email protected]
wf_file_retention_period_in_days=3
wf_db_file_retention_period_in_days=365

5. running the coordinator job

oozie job -oozie http://oozie_host:port/oozie -dryrun -config coordinator.properties

oozie job -oozie http://oozie_host:port/oozie -config coordinator.properties -run