oozie workflow example for shell action with end to end configuration

Oozie provides a convenient way to run any shell command. This could be Unix commands, Perl,Python or even Java programs invoked through the Unix shell. The shell command runs on an arbitrary Hadoop cluster node and the commands being run have to be available locally on that node. There are some limitation like Interactive commands and sudo or run as another user is not allowed also because the shell command runs on any Hadoop node, you need to be aware of
the path of the binary on these nodes.

The elements that make up this action are as follows:

1. job-tracker (required)
2. name-node (required)
3. prepare
4. job-xml
5. configuration
6. exec (required)
7. argument
8. env-var
9. file
10. archive
11. capture-output

The <exec> element has the actual shell command with the arguments passed in through the <argument> elements. If the excutable is a script instead of a standard Unix command, the script needs to be copied to the workflow root directory on HDFS and defined via the <file> element as always. The <shell> action also includes an <env-var> element that contains the Unix environment variable, and it’s defined using the standard Unix syntax like PATH=$PATH:my_path.

1. Co-ordinator xml file – coordinator.xml

An Oozie coordinator schedules workflow executions based on a start-time and a frequency parameter, and it starts the workflow when all the necessary input data becomes available. If the input data is not available, the workflow execution is delayed until the input data becomes available. A coordinator is defined by a start and end time, a frequency, input and output data, and a workflow. A coordinator runs periodically from the start time until the end time.

Beginning at the start time, the coordinator job checks if the required input data is available. When the input data becomes available, a workflow is started to process the input data, which on completion, produces the corresponding output data. This process is repeated at every tick of the frequency until the end time of the coordinator job. If the input data is not available for a workflow run, the execution of the workflow job will be delayed until the input data becomes available. Normally, both the input and output data used for a workflow execution are aligned with the coordinator time frequency.

[xml]
<coordinator-app
name=”${coord_name}”
frequency=”${coord:months(1)}”
start=”${coord_start_time}”
end=”${coord_end_time}”
timezone=”IST”
xmlns=”uri:oozie:coordinator:0.4″
xmlns:sla=”uri:oozie:sla:0.2″>
<action>
<workflow>
<app-path>${wf_workflow_path}</app-path>
<configuration>
<property>
<name>wf_exec_datetime</name>
<value>${coord:nominalTime()}</value>
</property>
</configuration>
</workflow>
<sla:info>
<sla:nominal-time>${coord:nominalTime()}</sla:nominal-time>
<sla:should-end>${15 * MINUTES}</sla:should-end>
<sla:alert-events>end_miss</sla:alert-events>
<sla:alert-contact>${wf_notification_email}</sla:alert-contact>
<sla:notification-msg>Check if HOUSE_KEEPING job for ${coord:nominalTime()} is running properly!</sla:notification-msg>
<sla:upstream-apps>${wf_hadoop_instance}</sla:upstream-apps>
</sla:info>
</action>
</coordinator-app>
[/xml]

2. Oozie workflow xml – workflow.xml

An Oozie workflow is a multistage Hadoop job. A workflow is a collection of action and control nodes arranged in a directed acyclic graph (DAG) that captures control dependency where each action typically is a Hadoop job like a MapReduce, Pig, Hive, Sqoop, or Hadoop DistCp job. There can also be actions that are not Hadoop jobs like a Java application, a shell script, or an email notification. The order of the nodes in the workflow determines the execution order of these actions. An action does not start until the previous action in the workflow ends. Control nodes in a workflow are used to manage the execution flow of actions. The start and end control nodes define the start and end of a workflow. The fork and join control nodes allow executing actions in parallel. The decision control node is like a switch/case statement that can select a particular execution path within the workflow using information from the job itself.

[xml]

<workflow-app name=”${wf_name}” xmlns=”uri:oozie:workflow:0.5″>
<global>
<job-xml>hive-config.xml</job-xml>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${wf_hadoop_instance}</value>
</property>
</configuration>
</global>
<credentials>
<credential name=’hcat’ type=’hcat’>
<property>
<name>hcat.metastore.uri</name>
<value>thrift://host:port</value>
</property>
<property>
<name>hcat.metastore.principal</name>
<value>hive/_HOST@DOMAIN</value>
</property>
</credential>
</credentials>
<start to=”purge-data”/>

<action name=”purge-data”>
<shell xmlns=”uri:oozie:shell-action:0.1″>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<exec>bash</exec>
<argument>purge_file.sh</argument>
<argument>${nameNode}/user/${wf_hadoop_instance}/file/data/*</argument>
<argument>${wf_file_retention_period_in_days}</argument>
<file>purge_file.sh#purge_file.sh</file>
<capture-output/>
</shell>
<ok to=”success-notification-email”/>
<error to=”failed-notification-email”/>
</action>

<action name=”success-notification-email”>
<email xmlns=”uri:oozie:email-action:0.1″>
<to>${wf_notification_email}</to>
<subject>SUCCESS : [${wf_hadoop_instance}] ${wf:name()} is successful for Run Date Time :- ${wf_exec_datetime}</subject>
<body>Hi,

This is auto-generated email. Please do not reply to this email.
</body>
</email>
<ok to=”End”/>
<error to=”failed-notification-email”/>
</action>
<action name=”failed-notification-email”>
<email xmlns=”uri:oozie:email-action:0.1″>
<to>${wf_notification_email}</to>
<subject>FAIL: [${wf_hadoop_instance}] ${wf:name()} is failed for Run Date Time :- ${wf_exec_datetime}</subject>
<body>Hi,

Workflow : ${wf:name()}
Failed Node : ${wf:lastErrorNode()}
Error Message : ${wf:errorMessage(wf:lastErrorNode())}

This is auto-generated email. Please do not reply to this email.
</body>
</email>
<ok to=”Kill”/>
<error to=”Kill”/>
</action>
<kill name=”Kill”>
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name=”End”/>
</workflow-app>

[/xml]

3.Shell File

We are basically deleting the older files based on the configured retention policy using this shell script.

if [ -z "$1" ] || [ -z "$2" ];
then
echo 'Missing 2 argument input! Hdfs purging path and file retention are not specified! '
exit 1
fi

now=$(date +%s)
hadoop fs -ls -R $1 | while read f; do
dir_date=`echo $f | awk '{print $6}'`
file=`echo $f | awk '{print $8}'`
difference=$(( ( $now - $(date -d "$dir_date" +%s) ) / (24 * 60 * 60 ) ))
if [ $difference -gt $2 ]; then
hadoop fs -rm -r -skipTrash $file
fi
done

4. coordinator property file to pass configuration – coordinator.properties

Finally the property file where the configuration parameters are passed from.

wf_hadoop_instance=HAAS_QUEUE
nameNode=hdfs://nameservice
jobTracker=yarnRM

oozie.coord.application.path=/user/${wf_hadoop_instance}/workflow/house_keeping
oozie.use.system.libpath=true
oozie.action.sharelib.for.pig=pig,hcatalog

coord_name=HOUSE_KEEPING
coord_start_time=2016-02-15T00:00Z
coord_end_time=2027-02-04T00:00Z

wf_name=HOUSE_KEEPING
wf_workflow_path=/user/${wf_hadoop_instance}/workflow/house_keeping
[email protected]
wf_file_retention_period_in_days=20
wf_db_file_retention_period_in_days=30

5. running the coordinator job

oozie job -oozie http://oozie_host:port/oozie -dryrun -config coordinator.properties

oozie job -oozie http://oozie_host:port/oozie -config coordinator.properties -run