oozie workflow example for sqoop action with end to end configuration

Apache Sqoop is a Hadoop tool used for importing and exporting data between relational databases MySQL, Oracle, etc. and Hadoop clusters. Sqoop commands are structured around connecting to and importing or exporting data from various relational databases. It often uses JDBC to talk to these external database systems. Oozie’s sqoop action helps users run Sqoop jobs as part of the workflow. The following elements are part of the Sqoop action

1. job-tracker (required)
2. name-node (required)
3. prepare
4. job-xml
5. configuration
6. command (required if arg is not used)
7. arg (required if command is not used)
8. file
9. archive

The action needs to know the JobTracker and the NameNode of the underlying Hadoop cluster where Oozie has to run the sqoop action .The <prepare> section is optional and is typically used as a preprocessor to delete output directories or HCatalog table partitions or to create some directories required for the action. This delete helps make the action repeatable and enables retries after failure.

The <job-xml> element or the <configuration> section can be used to capture all of the Hadoop job configuration properties.For hive action we will be using the <job-xml> tag to pass the hive-site.xml.This way, the hive-site.xml is just reused in its entirety and no additional configuration settings or special files are necessary.

Oozie also supports the <file> and <archive> elements for actions that need them. This is the native, Hadoop way of packaging libraries, archives, scripts, and other data files that jobs need, and Oozie provides the syntax to handle them.

The arguments to Sqoop are sent either through the <command> element in one line or broken down into many <arg> elements.

<action name="mySqoopAction">
<sqoop>
<command>import --connect jdbc:hsqldb:file:db.hsqldb --table
test_table --target-dir hdfs://localhost:8020/user/joe/sqoop_tbl -m 1</command>
</sqoop>
</action>

Lets look at an example of exporting data from a hive table into the oracle table

1. Oozie workflow xml – workflow.xml

An Oozie workflow is a multistage Hadoop job. A workflow is a collection of action and control nodes arranged in a directed acyclic graph (DAG) that captures control dependency where each action typically is a Hadoop job like a MapReduce, Pig, Hive, Sqoop, or Hadoop DistCp job. There can also be actions that are not Hadoop jobs like a Java application, a shell script, or an email notification. The order of the nodes in the workflow determines the execution order of these actions. An action does not start until the previous action in the workflow ends. Control nodes in a workflow are used to manage the execution flow of actions. The start and end control nodes define the start and end of a workflow. The fork and join control nodes allow executing actions in parallel. The decision control node is like a switch/case statement that can select a particular execution path within the workflow using information from the job itself.

[xml]

<workflow-app name=”HDFS_TO_ORACLE” xmlns=”uri:oozie:workflow:0.4″>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
</global>

<start to=”SQOOP_EXPORT_TO_ORACLE” />

<action name=”SQOOP_EXPORT_TO_ORACLE”>
<sqoop xmlns=”uri:oozie:sqoop-action:0.2″>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>export</arg>
<arg>-Dmapred.child.java.opts=-Xmx4096m</arg>
<arg>–connect</arg>
<arg>${oracle_database}</arg>
<arg>–table</arg>
<arg>${oracle_table}</arg>
<arg>–columns</arg>
<arg>${oracle_columns}</arg>
<arg>–export-dir</arg>
<arg>${RESULTS_FOLDER}</arg>
<arg>–input-fields-terminated-by</arg>
<arg>,</arg>
<arg>–input-lines-terminated-by</arg>
<arg>’\n'</arg>
<arg>–username</arg>
<arg>${oracle_username}</arg>
<arg>–password</arg>
<arg>${oracle_password}</arg>
<arg>–m</arg>
<arg>${num_mappers}</arg>
</sqoop>
<ok to=”Email_success” />
<error to=”Email_failure” />
</action>
<action name=”Email_success”>
<email xmlns=”uri:oozie:email-action:0.1″>
<to>${success_emails}</to>
<subject>SUCCESS:update:${wf:id()}
</subject>
<body>Hi,

Database update is success for
workflow ID : ${wf:id()}

This is
auto-generated email. Please do not
reply to this email.

Thanks,
TimePassTechies.com
</body>
</email>
<ok to=”end” />
<error to=”kill” />
</action>
<action name=”Email_failure”>
<email xmlns=”uri:oozie:email-action:0.1″>
<to>${failure_emails}</to>
<subject>FAILURE:Database update :
${wf:id()}
</subject>
<body>Hi,
Database update is failed for
workflow ID : ${wf:id()}

This is
auto-generated email. Please do not
reply to this email.

Thanks,
TimePassTechies.com
</body>
</email>
<ok to=”end” />
<error to=”kill” />
</action>

<kill name=”kill”>
<message>Action failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name=”end” />
</workflow-app>

[/xml]

2.Co-ordinator xml file – coordinator.xml

An Oozie coordinator schedules workflow executions based on a start-time and a frequency parameter, and it starts the workflow when all the necessary input data becomes available. If the input data is not available, the workflow execution is delayed until the input data becomes available. A coordinator is defined by a start and end time, a frequency, input and output data, and a workflow. A coordinator runs periodically from the start time until the end time.

Beginning at the start time, the coordinator job checks if the required input data is available. When the input data becomes available, a workflow is started to process the input data, which on completion, produces the corresponding output data. This process is repeated at every tick of the frequency until the end time of the coordinator job. If the input data is not available for a workflow run, the execution of the workflow job will be delayed until the input data becomes available. Normally, both the input and output data used for a workflow execution are aligned with the coordinator time frequency.

[xml]

<coordinator-app
name=”${coord_name}”
frequency=”${coord:days(1)}”
start=”${coord_start_time}”
end=”${coord_end_time}”
timezone=”IST”
xmlns=”uri:oozie:coordinator:0.4″
xmlns:sla=”uri:oozie:sla:0.2″>
<action>
<workflow>
<app-path>${wf_workflow_path}</app-path>
<configuration>
<property>
<name>wf_exec_datetime</name>
<value>${coord:nominalTime()}</value>
</property>
</configuration>
</workflow>
<sla:info>
<sla:nominal-time>${coord:nominalTime()}</sla:nominal-time>
<sla:should-end>${60 * MINUTES}</sla:should-end>
<sla:alert-events>end_miss</sla:alert-events>
<sla:alert-contact>${wf_notification_email}</sla:alert-contact>
<sla:notification-msg>please check if Sqoop job for ${coord:nominalTime()} is running properly!</sla:notification-msg>
<sla:upstream-apps>${wf_hadoop_instance}</sla:upstream-apps>
</sla:info>
</action>
</coordinator-app>

[/xml]

3. coordinator property file to pass configuration – coordinator.properties

Finally the property file where the configuration parameters are passed from.


wf_hadoop_instance=HAAS_QUEUE
wf_hadoop_instance_queue=HAAS_QUEUE
nameNode=hdfs://nameservice
jobTracker=yarnRM

oozie.use.system.libpath=true
oozie.coord.application.path=/user/${queueName}/workflows/sqoop_export_oracle
workflowRoot=${nameNode}/user/${queueName}/workflows/sqoop_export_oracle
[email protected]
[email protected]

start=2017-02-05T04:00Z
end=2026-01-06T23:30Z

num_mappers=10
RESULTS_FOLDER=${nameNode}/user/${queueName}/sqoop_results_oracle_db

oracle_columns=ID,DEVICE_ID,SUBELEMENT_ID,SERVICE_ID

oracle_database=jdbc:oracle:thin:@host:port/database
oracle_table=TEST.SERVICE_INFO
oracle_username=username
oracle_password=password

4. running the coordinator job

oozie job -oozie http://oozie_host:port/oozie -dryrun -config coordinator.properties

oozie job -oozie http://oozie_host:port/oozie -config coordinator.properties -run