oozie workflow example for pig action with end to end configuration

Oozie’s Pig action runs a Pig job in Hadoop.Pig action requires you to bundle the Pig script with all the necessary parameters. The action needs to know the JobTracker and the NameNode of the underlying Hadoop cluster where Oozie has to run the hive action. Here’s the full list of XML elements

1.job-tracker (required)
2.name-node (required)
3.prepare
4.job-xml
5.configuration
6.script (required)
7.param
8.argument
9.file
10.archive

The <prepare> section is optional and is typically used as a preprocessor to delete output directories or HCatalog table partitions or to create some directories required for the action. This delete helps make the action repeatable and enables retries after failure. Without this cleanup, retries of Hadoop jobs will fail because Hadoop checks for nonexistence of the output directories and tries to create them for the job. So deleting them before running the action is a common use case for this element. Using <prepare> to create directories is also supported, but not as common as the delete in usage.

Oozie also supports the <file> and <archive> elements for actions that need them. This is the native, Hadoop way of packaging libraries, archives, scripts, and other data files that jobs need, and Oozie provides the syntax to handle them. Users can specify symbolic links to files and archives using the # symbol in the workflow . The links themselves can’t have slashes (/) in them. Oozie creates these symlinks in the workflow root directory, and other files in the application can refer to and access them using relative paths. The file , archive elements make available, to map-reduce jobs, files and archives. If the specified path is relative, it is assumed the file or archiver are within the application directory, in the corresponding sub-path. If the path is absolute, the file or archive it is expected in the given absolute path. Files specified with the file element, will be symbolic links in the home directory of the task.

Script is the pig script to use and the argument tag is used to pass the parameters to pig script.The easiest way to use the UDF in Oozie is to copy the jar file to the lib/ subdirectory under the workflow root directory on HDFS. There are multiple ways to use UDFs and custom JARs in Pig through Oozie. The UDF code can be distributed via the <archive> and <file> elements, as always, but copying it to the lib/ subdirectory is the easiest and most straightforward approach.

Lets say we want to load the port details data into the hive table and we will be using the HCatStorer to store the data into the hive partition.

Sample Input Data

Test_Device_Id_234,/shelf=0/slot=4/port=6,BBGtest110,07/06/2009 13:15,2017-03-02
Test_Device_Id_345,/shelf=0/slot=5/port=0,BBGTest_Device_Id050,07/06/2009 13:15,2017-03-02
Test_Device_Id_765,/shelf=0/slot=5/port=1,BBG199999999,12/26/2009 10:24,2017-03-02
Test_Device_Id_234,/shelf=0/slot=5/port=24,BBGTest_Device_Id0524,08/19/2009 06:44,2017-03-02

1. Pig Script

We are using a custom loader to load the data for more information on the pig custom loader check this link pig-example-implement-custom-load-function . Create a jar for the custom loader class using ant or maven and add inside a folder called as lib.

register pigdataloader.jar;
EXEC;
raw_data = load '$wf_input_path' USING com.bt.haas.pig.dataloader.PortsCustomLoader() AS (element_id:chararray, sub_element_id:chararray, service_id:chararray, update_date:chararray);
store raw_data into '$wf_port_table_name' USING org.apache.hive.hcatalog.pig.HCatStorer('datestamp=${wf_date}');

2. Co-ordinator xml file – coordinator.xml

An Oozie coordinator schedules workflow executions based on a start-time and a frequency parameter, and it starts the workflow when all the necessary input data becomes available. If the input data is not available, the workflow execution is delayed until the input data becomes available. A coordinator is defined by a start and end time, a frequency, input and output data, and a workflow. A coordinator runs periodically from the start time until the end time.

Beginning at the start time, the coordinator job checks if the required input data is available. When the input data becomes available, a workflow is started to process the input data, which on completion, produces the corresponding output data. This process is repeated at every tick of the frequency until the end time of the coordinator job. If the input data is not available for a workflow run, the execution of the workflow job will be delayed until the input data becomes available. Normally, both the input and output data used for a workflow execution are aligned with the coordinator time frequency.

[xml]

<coordinator-app
name=”${coord_name}”
frequency=”${coord:days(1)}”
start=”${coord_start_time}”
end=”${coord_end_time}”
timezone=”IST”
xmlns=”uri:oozie:coordinator:0.4″
xmlns:sla=”uri:oozie:sla:0.2″>
<controls>
<timeout>10080</timeout>
</controls>
<datasets>
<dataset name=”dindc1″ frequency=”${coord:days(1)}” initial-instance=”${coord_start_time}” timezone=”BST”>
<uri-template>${nameNode}/user/${wf_hadoop_instance}/data/file/${YEAR}-${MONTH}-${DAY}</uri-template>
<done-flag>done.ctl</done-flag>
</dataset>
</datasets>
<input-events>
<data-in name=”eindc1″ dataset=”dindc1″>
<instance>${coord:current(0)}</instance>
</data-in>
</input-events>
<action>
<workflow>
<app-path>${wf_workflow_path}</app-path>
<configuration>
<property>
<name>wf_input_path</name>
<value>${coord:dataIn(‘eindc1′)}</value>
</property>
<property>
<name>wf_exec_datetime</name>
<value>${coord:nominalTime()}</value>
</property>
<property>
<name>wf_date</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0,’DAY’), “yyyy-MM-dd”)}</value>
</property>
<property>
<name>YEAR</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0, ‘DAY’), ‘yyyy’)}</value>
</property>
<property>
<name>MONTH</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0, ‘DAY’), ‘MM’)}</value>
</property>
<property>
<name>DAY</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0, ‘DAY’), ‘dd’)}</value>
</property>
<property>
<name>HOUR</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0, ‘DAY’), ‘HH’)}</value>
</property>
</configuration>
</workflow>
<sla:info>
<sla:nominal-time>${coord:nominalTime()}</sla:nominal-time>
<sla:should-end>${360 * MINUTES}</sla:should-end>
<sla:alert-events>end_miss</sla:alert-events>
<sla:alert-contact>${wf_notification_email}</sla:alert-contact>
<sla:notification-msg>PLease check if DAILY_PORT_LOAD job for ${coord:nominalTime()} is running properly!</sla:notification-msg>
<sla:upstream-apps>${wf_hadoop_instance}</sla:upstream-apps>
</sla:info>
</action>
</coordinator-app>

[/xml]

3. Oozie workflow xml – workflow.xml

An Oozie workflow is a multistage Hadoop job. A workflow is a collection of action and control nodes arranged in a directed acyclic graph (DAG) that captures control dependency where each action typically is a Hadoop job like a MapReduce, Pig, Hive, Sqoop, or Hadoop DistCp job. There can also be actions that are not Hadoop jobs like a Java application, a shell script, or an email notification. The order of the nodes in the workflow determines the execution order of these actions. An action does not start until the previous action in the workflow ends. Control nodes in a workflow are used to manage the execution flow of actions. The start and end control nodes define the start and end of a workflow. The fork and join control nodes allow executing actions in parallel. The decision control node is like a switch/case statement that can select a particular execution path within the workflow using information from the job itself.

We are using a pig action here to load the ports data and we also have email action to send notification to the support team to know whether the job is successful or not on day to day basis.

[xml]

<workflow-app name=”${wf_name}” xmlns=”uri:oozie:workflow:0.5″>
<global>
<job-xml>hive-config.xml</job-xml>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${wf_hadoop_instance}</value>
</property>
</configuration>
</global>
<credentials>
<credential name=’hcat’ type=’hcat’>
<property>
<name>hcat.metastore.uri</name>
<value>thrift://host:port</value>
</property>
<property>
<name>hcat.metastore.principal</name>
<value>hive/_HOST@DOMAIN</value>
</property>
</credential>
</credentials>
<start to=”port-load”/>
<action name=”port-load” cred=”hcat”>
<pig>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<script>port_load.pig</script>
<argument>-param</argument>
<argument>wf_port_table_name=${wf_port_table_name}</argument>
<argument>-param</argument>
<argument>wf_input_path=”${wf_input_path}”</argument>
<argument>-param</argument>
<argument>wf_date=${wf_date}</argument>
<file>lib/pigdataloader.jar#pigdataloader.jar</file>
</pig>
<ok to=”createSuccessMarkerFile”/>
<error to=”failed-notification-email”/>
</action>

<action name=”createSuccessMarkerFile”>
<fs>
<mkdir path=’${wf_input_path}/complete’ />
</fs>
<ok to=”success-notification-email” />
<error to=”failed-notification-email” />
</action>

<action name=”success-notification-email”>
<email xmlns=”uri:oozie:email-action:0.1″>
<to>${wf_notification_email}</to>
<subject>SUCCESS : [${wf_hadoop_instance}] ${wf:name()} is successful</subject>
<body>Hi,

The workflow ${wf:name()} is succefully completed its run. Run Date Time :- ${wf_exec_datetime}

This is auto-generated email. Please do not reply to this email.
</body>
</email>
<ok to=”End”/>
<error to=”failed-notification-email”/>
</action>
<action name=”failed-notification-email”>
<email xmlns=”uri:oozie:email-action:0.1″>
<to>${wf_notification_email}</to>
<subject>FAIL: [${wf_hadoop_instance}] ${wf:name()} is failed for Run Date Time :- ${wf_exec_datetime}</subject>
<body>Hi,

Workflow : ${wf:name()}
Failed Node : ${wf:lastErrorNode()}
Error Message : ${wf:errorMessage(wf:lastErrorNode())}

This is auto-generated email. Please do not reply to this email.
</body>
</email>
<ok to=”Kill”/>
<error to=”Kill”/>
</action>

<kill name=”Kill”>
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name=”End”/>
</workflow-app>

[/xml]

4. Hive config file for authentication and authorization required for hive action – hive-config.xml

[xml]

<?xml version=”1.0″ encoding=”UTF-8″?>

<!–Autogenerated by Cloudera Manager–>
<configuration>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://host:port</value>
</property>
<property>
<name>hive.metastore.client.socket.timeout</name>
<value>300</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.warehouse.subdir.inherit.perms</name>
<value>true</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>-1</value>
</property>
<property>
<name>hive.exec.reducers.bytes.per.reducer</name>
<value>1073741824</value>
</property>
<property>
<name>hive.exec.copyfile.maxsize</name>
<value>33554432</value>
</property>
<property>
<name>hive.exec.reducers.max</name>
<value>999</value>
</property>
<property>
<name>hive.metastore.execute.setugi</name>
<value>true</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<description>Zookeeper quorum used by Hive’s Table Lock Manager</description>
<value>substitute your actual ZooKeeper node names</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<description>The port at which the clients will connect.</description>
<value>2000</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>Zookeeper quorum used by hbase</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2002</value>
</property>
<property>
<name>hive.zookeeper.namespace</name>
<value>hive_zookeeper_namespace_hive2</value>
</property>
<property>
<name>hive.cluster.delegation.token.store.class</name>
<value>org.apache.hadoop.hive.thrift.MemoryTokenStore</value>
</property>
<property>
<name>hive.server2.enable.doAs</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.sasl.enabled</name>
<value>true</value>
</property>
<!–‘hive.server2.authentication’, originally set to ‘kerberos’ (non-final), is overridden below by a safety valve–>
<property>
<name>hive.metastore.kerberos.principal</name>
<value>hive/DOMAIN</value>
</property>
<property>
<name>hive.server2.authentication.kerberos.principal</name>
<value>hive/DOMAIN</value>
</property>
<property>
<name>hive.server2.authentication</name>
<value>kerberos</value>
</property>
</configuration>

[/xml]

5. coordinator property file to pass configuration – coordinator.properties

Finally the property file where the configuration parameters are passed from.


wf_hadoop_instance=HAAS_QUEUE
nameNode=hdfs://nameservice
jobTracker=yarnRM

oozie.coord.application.path=/user/${wf_hadoop_instance}/workflows/wf_port_load
oozie.use.system.libpath=true
oozie.action.sharelib.for.pig=pig,hcatalog

coord_name=DAILY_PORT_LOAD
coord_start_time=2016-08-09T06:00Z
coord_end_time=2027-02-04T00:00Z

wf_name=DAILY_YUKON_PORTSMAP_LOAD
wf_workflow_path=/user/${wf_hadoop_instance}/workflows/wf_port_load
[email protected]
wf_sys_table_name=${wf_hadoop_instance}.Port_Table

6. running the coordinator job

oozie job -oozie http://oozie_host:port/oozie -dryrun -config coordinator.properties

oozie job -oozie http://oozie_host:port/oozie -config coordinator.properties -run