oozie workflow example for hive action with end to end configuration

Hive actions run a Hive query on the cluster. The Hive query and the required configuration, libraries, and code for user-defined functions have to be packaged as part of the workflow bundle and deployed to HDFS. The action needs to know the JobTracker and the NameNode of the underlying Hadoop cluster where Oozie has to run the hive action . Below are the elements supported in hive workflow action

1. job-tracker (required)
2. name-node (required)
3. prepare
4. job-xml
5. configuration
6. script (required)
7. param
8. argument
9. file
10.archive

Hive requires certain key configuration properties, like the location of its metastore (hive.metastore.uris), which are typically part of the hive-site.xml. These properties have to be passed in as configuration to Oozie’s Hive action. The script element points to the actual Hive script to be run with the <param> elements used to pass the parameters to the script. Hive supports variable substitution .

The <prepare> section is optional and is typically used as a preprocessor to delete output directories or HCatalog table partitions or to create some directories required for the action. This delete helps make the action repeatable and enables retries after failure. Without this cleanup, retries of Hadoop jobs will fail because Hadoop checks for nonexistence of the output directories and tries to create them for the job. So deleting them before running the action is a common use case for this element. Using <prepare> to create directories is also supported, but not as common as the delete in usage.

The <job-xml> element or the <configuration> section can be used to capture all of the Hadoop job configuration properties.For hive action we will be using the <job-xml> tag to pass the hive-site.xml.This way, the hive-site.xml is just reused in its entirety and no additional configuration settings or special files are necessary.

Oozie also supports the <file> and <archive> elements for actions that need them. This is the native, Hadoop way of packaging libraries, archives, scripts, and other data files that jobs need, and Oozie provides the syntax to handle them. Users can specify symbolic links to files and archives using the # symbol in the workflow . The links themselves can’t have slashes (/) in them. Oozie creates these symlinks in the workflow root directory, and other files in the application can refer to and access them using relative paths. The file , archive elements make available, to map-reduce jobs, files and archives. If the specified path is relative, it is assumed the file or archiver are within the application directory, in the corresponding sub-path. If the path is absolute, the file or archive it is expected in the given absolute path. Files specified with the file element, will be symbolic links in the home directory of the task.

Lets say we want to process two data sets one for FTH(Fibre to the home) data and the FTC(Fibre to the cabinet data) so we will define a coordinator which will wait for these two datasets before its starts the workflow whose frequency is once in a day. Once the data has been processed by the map reduce we will be using a hive action to add the partition information into the hive meta store.

1. Mapper Code

We are skipping the mapper implementation as we are focussing on end to end configuration on oozie . Create a jar of the below mapper class using ant or maven and add inside a folder called as lib.


public class CalibrationMapper extends Mapper<Object, Text, NullWritable, Text> {

private static final RegExpPatternFactory regExpPatternFactory = RegExpPatternFactory.getRegExpPatternFactory();
private static String[] lookUpData;

@Override
protected void setup(Context context) throws IOException {

String lookUpParameter=context.getConfiguration().get("lookup_table_path");
String lookUpcontent = HdfsFileSystem.getFileContent(lookUpParameter);
lookUpData = lookUpcontent.split("\n");
Initializer.intializeLookUpData(lookUpData);
}

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

}

}

}

2. Co-ordinator xml file – coordinator.xml

An Oozie coordinator schedules workflow executions based on a start-time and a frequency parameter, and it starts the workflow when all the necessary input data becomes available. If the input data is not available, the workflow execution is delayed until the input data becomes available. A coordinator is defined by a start and end time, a frequency, input and output data, and a workflow. A coordinator runs periodically from the start time until the end time.

Beginning at the start time, the coordinator job checks if the required input data is available. When the input data becomes available, a workflow is started to process the input data, which on completion, produces the corresponding output data. This process is repeated at every tick of the frequency until the end time of the coordinator job. If the input data is not available for a workflow run, the execution of the workflow job will be delayed until the input data becomes available. Normally, both the input and output data used for a workflow execution are aligned with the coordinator time frequency.

Lets say we want to process two data sets one for FTH(Fibre to the home) data and the FTC(Fibre to the cabinet data) so we will define a coordinator which will wait for these two datasets before its starts the workflow whose frequency is once in a day.


<coordinator-app
name="${coord_name}"
frequency="${coord:days(1)}"
start="${coord_start_time}"
end="${coord_end_time}"
timezone="BST"
xmlns="uri:oozie:coordinator:0.4"
xmlns:sla="uri:oozie:sla:0.2">
<controls>
<timeout>10080</timeout>
</controls>
<datasets>
<dataset name="dindc1" frequency="${coord:days(1)}" initial-instance="${coord_start_time}" timezone="BST">
<uri-template>${nameNode}/user/${wf_hadoop_instance}/ftc-data/datestamp=${YEAR}-${MONTH}-${DAY}</uri-template>
<done-flag>done.ctl</done-flag>
</dataset>
<dataset name="dindc2" frequency="${coord:days(1)}" initial-instance="${coord_start_time}" timezone="BST">
<uri-template>${nameNode}/user/${wf_hadoop_instance}/fth-data/datestamp=${YEAR}-${MONTH}-${DAY}</uri-template>
<done-flag>done.ctl</done-flag>
</dataset>
</datasets>
<input-events>
<data-in name="eindc1" dataset="dindc1">
<instance>${coord:current(-1)}</instance>
</data-in>
<data-in name="eindc2" dataset="dindc2">
<instance>${coord:current(-1)}</instance>
</data-in>
</input-events>
<action>
<workflow>
<app-path>${wf_workflow_path}</app-path>
<configuration>
<property>
<name>wf_input_path</name>
<value>${coord:dataIn('eindc1')}</value>
</property>
<property>
<name>wf_input_path_2</name>
<value>${coord:dataIn('eindc2')}</value>
</property>
<property>
<name>wf_output_path</name>
<value>${nameNode}/${wf_output_mapreduce}</value>
</property>
<property>
<name>wf_exec_datetime</name>
<value>${coord:nominalTime()}</value>
</property>
<property>
<name>wf_date</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1,'DAY'), "yyyy-MM-dd")}</value>
</property>
<property>
<name>YEAR</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, 'DAY'), 'yyyy')}</value>
</property>
<property>
<name>MONTH</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, 'DAY'), 'MM')}</value>
</property>
<property>
<name>DAY</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, 'DAY'), 'dd')}</value>
</property>
</configuration>
</workflow>
<sla:info>
<sla:nominal-time>${coord:nominalTime()}</sla:nominal-time>
<sla:should-end>${120 * MINUTES}</sla:should-end>
<sla:alert-events>end_miss</sla:alert-events>
<sla:alert-contact>${wf_notification_email}</sla:alert-contact>
<sla:notification-msg>PLease check if Daily_Ftc_Fth_Data_Processor job for ${coord:nominalTime()} is running properly!</sla:notification-msg>
<sla:upstream-apps>${wf_hadoop_instance}</sla:upstream-apps>
</sla:info>
</action>
</coordinator-app>

 

3. Oozie workflow xml – workflow.xml

An Oozie workflow is a multistage Hadoop job. A workflow is a collection of action and control nodes arranged in a directed acyclic graph (DAG) that captures control dependency where each action typically is a Hadoop job like a MapReduce, Pig, Hive, Sqoop, or Hadoop DistCp job. There can also be actions that are not Hadoop jobs like a Java application, a shell script, or an email notification. The order of the nodes in the workflow determines the execution order of these actions. An action does not start until the previous action in the workflow ends. Control nodes in a workflow are used to manage the execution flow of actions. The start and end control nodes define the start and end of a workflow. The fork and join control nodes allow executing actions in parallel. The decision control node is like a switch/case statement that can select a particular execution path within the workflow using information from the job itself.

We have two map reduce actions which process the data and outputs the data into the hive table locations and two hive jobs to add partitions into the hive meta store. We also have email action to send notification to the support team to know whether the job is successful or not on day to day basis.


<workflow-app name="${wf_name}" xmlns="uri:oozie:workflow:0.5">
<global>
<job-xml>hive-config.xml</job-xml>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${wf_hadoop_instance_queue}</value>
</property>
</configuration>
</global>
<credentials>
<credential name='hcat' type='hcat'>
<property>
<name>hcat.metastore.uri</name>
<value>thrift://host:port</value>
</property>
<property>
<name>hcat.metastore.principal</name>
<value>hive/_HOST@DOMAIN</value>
</property>
</credential>
</credentials>
<start to="calibrate_hlog_vdsl" />
<action name="calibrate_FTC" cred="hcat">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${wf_output_path}/datestamp=${wf_date}" />
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${wf_hadoop_instance_queue}</value>
</property>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.class</name>
<value>com.blog.haas.mapreduce.CalibrationMapper</value>
</property>
<property>
<name>mapred.output.key.class</name>
<value>org.apache.hadoop.io.NullWritable</value>
</property>
<property>
<name>mapred.output.value.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>${nameNode}/user/${wf_hadoop_instance}/ftc-data/datestamp=${wf_date}
</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${wf_output_path_ftc}/datestamp=${wf_date}</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>0</value>
</property>
</configuration>
<file>lib/calibrate.jar#calibrate.jar</file>
</map-reduce>
<ok to="calibrate_fth" />
<error to="Email_failure" />
</action>

<action name="calibrate_fth" cred="hcat">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${wf_output_path}/datestamp=${wf_date}" />
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${wf_hadoop_instance_queue}</value>
</property>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.class</name>
<value>com.blog.haas.mapreduce.CalibrationMapperFTH</value>
</property>
<property>
<name>mapred.output.key.class</name>
<value>org.apache.hadoop.io.NullWritable</value>
</property>
<property>
<name>mapred.output.value.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>${nameNode}/user/${wf_hadoop_instance}/fth-data/datestamp=${wf_date}
</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${wf_output_path_fth}/datestamp=${wf_date}</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>0</value>
</property>
</configuration>
<file>lib/calibrateFth.jar#calibrateFth.jar</file>
</map-reduce>
<ok to="add-partition-ftc" />
<error to="Email_failure" />
</action>

<action name="add-partition-ftc" cred="hcat">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>hive-config.xml</job-xml>
<script>add_partitions_ftc.hql</script>
<param>wf_hadoop_instance=${wf_hadoop_instance}</param>
<param>wf_date=${wf_date}</param>
<file>hive-config.xml#hive-config.xml</file>
</hive>
<ok to="add-partition-fth" />
<error to="Email_failure" />
</action>

<action name="add-partition-fth" cred="hcat">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>hive-config.xml</job-xml>
<script>add_partitions_fth.hql</script>
<param>wf_hadoop_instance=${wf_hadoop_instance}</param>
<param>wf_date=${wf_date}</param>
<file>hive-config.xml#hive-config.xml</file>
</hive>
<ok to="Email_success" />
<error to="Email_failure" />
</action>

<action name="Email_success">
<email xmlns="uri:oozie:email-action:0.1">
<to>${wf_notification_email}</to>
<subject>SUCCESS: Data processing FTC and FTH succesfull:${wf:id()}
</subject>
<body>Hi,
This is
auto-generated email. Please do not reply to this email.

Thanks,
TimePassTechie Team
</body>
</email>
<ok to="End" />
<error to="Kill" />
</action>
<action name="Email_failure">
<email xmlns="uri:oozie:email-action:0.1">
<to>${wf_notification_email}</to>
<subject>FAILURE:Data processing FTC and FTH failed : ${wf:id()}
</subject>
<body>Hi,

Data processing FTC and FTH failed for workflow ID : ${wf:id()}
This is
auto-generated email. Please do not reply to this email.

Thanks,
TimePassTechie Team
</body>
</email>
<ok to="Kill" />
<error to="Kill" />
</action>

<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="End"/>
</workflow-app>

4. Hql file used in hive action – add_partitions_ftc.hql

Hql file to add partition to the data generated from map reduce job so that the partition information gets updated in the hive meta store.

USE ${wf_hadoop_instance};
alter table calibrate_ftc add IF NOT EXISTS partition (datestamp='${wf_date}');

5. Hql file used in hive action – add_partitions_fth.hql

Hql file to add partition to the data generated from map reduce job so that the partition information gets updated in the hive meta store.

USE ${wf_hadoop_instance};
alter table calibrate_fth add IF NOT EXISTS partition (datestamp='${wf_date}');

6. Hive config file for authentication and authorization required for hive action – hive-config.xml


<?xml version="1.0" encoding="UTF-8"?>

<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://host:port</value>
</property>
<property>
<name>hive.metastore.client.socket.timeout</name>
<value>300</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.warehouse.subdir.inherit.perms</name>
<value>true</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>-1</value>
</property>
<property>
<name>hive.exec.reducers.bytes.per.reducer</name>
<value>1073741824</value>
</property>
<property>
<name>hive.exec.copyfile.maxsize</name>
<value>33554432</value>
</property>
<property>
<name>hive.exec.reducers.max</name>
<value>999</value>
</property>
<property>
<name>hive.metastore.execute.setugi</name>
<value>true</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<description>Zookeeper quorum used by Hive's Table Lock Manager</description>
<value>substitute your actual ZooKeeper node names</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<description>The port at which the clients will connect.</description>
<value>2000</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>Zookeeper quorum used by hbase</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2002</value>
</property>
<property>
<name>hive.zookeeper.namespace</name>
<value>hive_zookeeper_namespace_hive2</value>
</property>
<property>
<name>hive.cluster.delegation.token.store.class</name>
<value>org.apache.hadoop.hive.thrift.MemoryTokenStore</value>
</property>
<property>
<name>hive.server2.enable.doAs</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.sasl.enabled</name>
<value>true</value>
</property>
<!--'hive.server2.authentication', originally set to 'kerberos' (non-final), is overridden below by a safety valve-->
<property>
<name>hive.metastore.kerberos.principal</name>
<value>hive/DOMAIN</value>
</property>
<property>
<name>hive.server2.authentication.kerberos.principal</name>
<value>hive/DOMAIN</value>
</property>
<property>
<name>hive.server2.authentication</name>
<value>kerberos</value>
</property>
</configuration>

7. coordinator property file to pass configuration – coordinator.properties

Finally the property file where the configuration parameters are passed from.


wf_hadoop_instance=HAAS_QUEUE
wf_hadoop_instance_queue=HAAS_QUEUE
nameNode=hdfs://nameservice
jobTracker=yarnRM

oozie.coord.application.path=/user/${wf_hadoop_instance}/workflows/calibrate
oozie.use.system.libpath=true
oozie.action.sharelib.for.pig=pig,hcatalog

coord_name=DAILY_CALIBRATE
coord_start_time=2017-04-24T12:00Z
coord_end_time=2020-04-25T23:00Z

wf_name=DAILY_CALIBRATE
wf_workflow_path=/user/${wf_hadoop_instance}/workflows/calibrate
wf_notification_email=testmail@google.com

8. running the coordinator job

oozie job -oozie http://oozie_host:port/oozie -dryrun -config coordinator.properties

oozie job -oozie http://oozie_host:port/oozie -config coordinator.properties -run

Leave a Reply

Your email address will not be published. Required fields are marked *