oozie workflow example for hdfs file system action with end to end configuration

Users can run HDFS commands using Oozie’s FS action. Not all HDFS commands are supported, but the following common operations are allowed: delete, mkdir,move, chmod, <touchz>, chgrp. FS action commands are launched by Oozie on its server instead of the launcher. This is something to keep in mind, because a long running, resource-intensive FS action can affect the performance of the Oozie server and impact other Oozie applications. This is also the reason why not all HDFS commands like copy are not supported through this action. The elements that make up the FS action are as follows

1. name-node (required)
2. job-xml
3. configuration
4. delete
5. mkdir
6. move
7. chmod
8. chgrp

Depending on the operation, Oozie will check to make sure source directories exist and target directories don’t to reduce the chance of failure of the HDFS commands.

1. Co-ordinator xml file – coordinator.xml

An Oozie coordinator schedules workflow executions based on a start-time and a frequency parameter, and it starts the workflow when all the necessary input data becomes available. If the input data is not available, the workflow execution is delayed until the input data becomes available. A coordinator is defined by a start and end time, a frequency, input and output data, and a workflow. A coordinator runs periodically from the start time until the end time.

Beginning at the start time, the coordinator job checks if the required input data is available. When the input data becomes available, a workflow is started to process the input data, which on completion, produces the corresponding output data. This process is repeated at every tick of the frequency until the end time of the coordinator job. If the input data is not available for a workflow run, the execution of the workflow job will be delayed until the input data becomes available. Normally, both the input and output data used for a workflow execution are aligned with the coordinator time frequency.

[xml]

<coordinator-app
name=”${coord_name}”
frequency=”${coord:days(1)}”
start=”${coord_start_time}”
end=”${coord_end_time}”
timezone=”BST”
xmlns=”uri:oozie:coordinator:0.4″
xmlns:sla=”uri:oozie:sla:0.2″>
<action>
<workflow>
<app-path>${wf_workflow_path}</app-path>
<configuration>
<property>
<name>wf_exec_datetime</name>
<value>${coord:nominalTime()}</value>
</property>
<property>
<name>wf_date</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(),-1,’DAY’), “yyyy-MM-dd”)}</value>
</property>
</configuration>
</workflow>
<sla:info>
<sla:nominal-time>${coord:nominalTime()}</sla:nominal-time>
<sla:should-end>${60 * MINUTES}</sla:should-end>
<sla:alert-events>end_miss</sla:alert-events>
<sla:alert-contact>${wf_notification_email}</sla:alert-contact>
<sla:notification-msg>please check if FILE_INGESTION job for ${coord:nominalTime()} is running properly!</sla:notification-msg>
<sla:upstream-apps>${wf_hadoop_instance}</sla:upstream-apps>
</sla:info>
</action>
</coordinator-app>

[/xml]

2. Oozie workflow xml – workflow.xml

An Oozie workflow is a multistage Hadoop job. A workflow is a collection of action and control nodes arranged in a directed acyclic graph (DAG) that captures control dependency where each action typically is a Hadoop job like a MapReduce, Pig, Hive, Sqoop, or Hadoop DistCp job. There can also be actions that are not Hadoop jobs like a Java application, a shell script, or an email notification. The order of the nodes in the workflow determines the execution order of these actions. An action does not start until the previous action in the workflow ends. Control nodes in a workflow are used to manage the execution flow of actions. The start and end control nodes define the start and end of a workflow. The fork and join control nodes allow executing actions in parallel. The decision control node is like a switch/case statement that can select a particular execution path within the workflow using information from the job itself.

Here we are copying a all the files from a hive table to run a batch processing in the next job pipeline. But once all the files are copied into the target location we are using the fs action to create a success marker file.We are using a java action so have created a jar for the class using ant or maven and add inside a folder called as lib. We have wf_hourstamps and wf_minfiles configuration properties which we have used in the java action to copy data from specific folders and also to validate minimum number of files required and below which the dataset have to be considered not complete.

[xml]

<workflow-app name=”${wf_name}” xmlns=”uri:oozie:workflow:0.5″>
<global>
<job-xml>hive-config.xml</job-xml>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${wf_hadoop_instance_queue}</value>
</property>
</configuration>
</global>
<credentials>
<credential name=’hcat’ type=’hcat’>
<property>
<name>hcat.metastore.uri</name>
<value>thrift://host:port</value>
</property>
<property>
<name>hcat.metastore.principal</name>
<value>hive/_HOST@DOMAIN</value>
</property>
</credential>
</credentials>
<start to=”files_ingestion”/>

<action name=”files_ingestion”>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.blog.haas.pig.dataloader.CopyFiles</main-class>
<arg>${nameNode}</arg>
<arg>datestamp=${wf_date}</arg>
<arg>${wf_hourstamps}</arg>
<arg>${nameNode}/user/${wf_hadoop_instance}/hive_warehouse/Test_Table</arg>
<arg>${nameNode}/user/${wf_hadoop_instance}/data/filecopy/${wf_date}</arg>
<arg>${wf_minfiles}</arg>
<file>lib/copyFiles.jar#copyFiles.jar</file>
</java>
<ok to=”fs-create-completion-file”/>
<error to=”Email_failure”/>
</action>

<action name=”fs-create-completion-file”>
<fs>
<touchz path=”${nameNode}/user/${wf_hadoop_instance}/data/filecopy/${wf_date}/done.ctl” />
</fs>
<ok to=”Email_success” />
<error to=”Email_failure” />
</action>

<action name=”Email_success”>
<email xmlns=”uri:oozie:email-action:0.1″>
<to>${wf_notification_email}</to>
<subject>SUCCESS:${wf:id()}
</subject>
<body>Hi,
This is
auto-generated email. Please do not reply to this email.

Thanks,
Timepass Techies.
</body>
</email>
<ok to=”End” />
<error to=”Kill” />
</action>
<action name=”Email_failure”>
<email xmlns=”uri:oozie:email-action:0.1″>
<to>${wf_notification_email}</to>
<subject>FAILURE: ${wf:id()}
</subject>
<body>Hi,

workflow failed for workflow ID : ${wf:id()}
This is
auto-generated email. Please do not reply to this email.

Thanks,
Timepass Techies.
</body>
</email>
<ok to=”Kill” />
<error to=”Kill” />
</action>

<kill name=”Kill”>
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name=”End”/>

</workflow-app>

[/xml]

3. coordinator property file to pass configuration – coordinator.properties

Finally the property file where the configuration parameters are passed from.

wf_hadoop_instance=HAAS_QUEUE
nameNode=hdfs://nameservice
jobTracker=yarnRM

oozie.coord.application.path=/user/${wf_hadoop_instance}/workflows/files_ingestion
oozie.use.system.libpath=true
oozie.action.sharelib.for.pig=pig,hcatalog

coord_name=FILE_INGESTION
coord_start_time=2017-24-30T09:00Z
coord_end_time=2017-24-04T23:00Z

wf_name=FILE_INGESTION
wf_workflow_path=oozie.coord.application.path=/user/${wf_hadoop_instance}/workflows/files_ingestion
[email protected]
wf_hourstamps=10
wf_minfiles=2

4. running the coordinator job

oozie job -oozie http://oozie_host:port/oozie -dryrun -config coordinator.properties

oozie job -oozie http://oozie_host:port/oozie -config coordinator.properties -run

1 thought on “oozie workflow example for hdfs file system action with end to end configuration”

  1. I want to copy file (/home/abc/test.txt) which is local(unix) on server A to Server B HDFS location(/user/abc/) through Oozie job using passwordless authentication (already passwordword less ssh key setup is done between Server A to Server B and Server B to Server A).Can anyone help.

Comments are closed.