The following is an ordered sequence of XML elements for map-reduce job and you must specify them in order when writing the action definition in your workflows elements can be omitted, but if present, they should be in sequence.The action needs to know the JobTracker and the NameNode of the underlying Hadoop cluster where Oozie has to run the MapReduce job. Below are the tags supported
1. job-tracker (required)
2. name-node (required)
3. prepare
4. streaming or pipes
5. job-xml
6. configuration
7. file
8. archive
The <prepare> section is optional and is typically used as a preprocessor to delete output directories or HCatalog table partitions or to create some directories required for the action. This delete helps make the action repeatable and enables retries after failure. Without this cleanup, retries of Hadoop jobs will fail because Hadoop checks for nonexistence of the output directories and tries to create them for the job. So deleting them before running the action is a common use case for this element. Using <prepare> to create directories is also supported, but not as common as the delete in usage.
The <job-xml> element or the <configuration> section can be used to capture all of the Hadoop job configuration properties. The worker code for the MapReduce action is specified as part of this configuration using the mapred.mapper.class and the mapred.reducer.class properties. These properties specify the actual Java classes to be run as map and reduce as part of this action
Oozie also supports the <file> and <archive> elements for actions that need them. This is the native, Hadoop way of packaging libraries, archives, scripts, and other data files that jobs need, and Oozie provides the syntax to handle them. Users can specify symbolic links to files and archives using the # symbol in the workflow . The links themselves can’t have slashes (/) in them. Oozie creates these symlinks in the workflow root directory, and other files in the application can refer to and access them using relative paths. The file , archive elements make available, to map-reduce jobs, files and archives. If the specified path is relative, it is assumed the file or archiver are within the application directory, in the corresponding sub-path. If the path is absolute, the file or archive it is expected in the given absolute path. Files specified with the file element, will be symbolic links in the home directory of the task.
Streaming and pipes are special kinds of MapReduce jobs, and this action supports both. They are both mechanisms that Hadoop supports to help run non-Java code as MapReduce jobs. This is to help users who might have to port existing code written in other languages like Python or C++ to Hadoop’s MapReduce framework in Java. Also, some users might just prefer other programming languages.
Depending on whether you want to execute streaming or pipes, you can have either of those elements or neither. But you cannot specify both <streaming> and <pipes> as part of a single <map-reduce> action. Also, if they are present, they require some special subelements specific to those execution modes like mapper,reducer,record-reader,record-reader mapping,inputformat,partitioner,writer,program and env in the workflow xml.
Lets take an end to end example i will not go into the detail of the mapper and reducer code as the focus is on using oozie workflow engine
1. Mapper Code
We are skipping the mapper implementation as we are focussing on end to end configuration on oozie . Create a jar of the below mapper class using ant or maven and add inside a folder called as lib.
public class CalibrationMapper extends Mapper<Object, Text, NullWritable, Text> { private static final RegExpPatternFactory regExpPatternFactory = RegExpPatternFactory.getRegExpPatternFactory(); private static String[] lookUpData; @Override protected void setup(Context context) throws IOException { String lookUpParameter=context.getConfiguration().get("lookup_table_path"); String lookUpcontent = HdfsFileSystem.getFileContent(lookUpParameter); lookUpData = lookUpcontent.split("\n"); Initializer.intializeLookUpData(lookUpData); } @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { }}}
2. co-ordinator xml file – coordinator.xml
An Oozie coordinator schedules workflow executions based on a start-time and a frequency parameter, and it starts the workflow when all the necessary input data becomes available. If the input data is not available, the workflow execution is delayed until the input data becomes available. A coordinator is defined by a start and end time, a frequency, input and output data, and a workflow. A coordinator runs periodically from the start time until the end time.
Beginning at the start time, the coordinator job checks if the required input data is available. When the input data becomes available, a workflow is started to process the input data, which on completion, produces the corresponding output data. This process is repeated at every tick of the frequency until the end time of the coordinator job. If the input data is not available for a workflow run, the execution of the workflow job will be delayed until the input data becomes available. Normally, both the input and output data used for a workflow execution are aligned with the coordinator time frequency.
Lets say we want to process two data sets one for FTH(Fibre to the home) data and the FTC(Fibre to the cabinet data) so we will define a coordinator which will wait for these two datasets before its starts the workflow whose frequency is once in a day.
[xml]
<coordinator-app
name=”${coord_name}”
frequency=”${coord:days(1)}”
start=”${coord_start_time}”
end=”${coord_end_time}”
timezone=”BST”
xmlns=”uri:oozie:coordinator:0.4″
xmlns:sla=”uri:oozie:sla:0.2″>
<controls>
<timeout>10080</timeout>
</controls>
<datasets>
<dataset name=”dindc1″ frequency=”${coord:days(1)}” initial-instance=”${coord_start_time}” timezone=”BST”>
<uri-template>${nameNode}/user/${wf_hadoop_instance}/ftc-data/datestamp=${YEAR}-${MONTH}-${DAY}</uri-template>
<done-flag>done.ctl</done-flag>
</dataset>
<dataset name=”dindc2″ frequency=”${coord:days(1)}” initial-instance=”${coord_start_time}” timezone=”BST”>
<uri-template>${nameNode}/user/${wf_hadoop_instance}/fth-data/datestamp=${YEAR}-${MONTH}-${DAY}</uri-template>
<done-flag>done.ctl</done-flag>
</dataset>
</datasets>
<input-events>
<data-in name=”eindc1″ dataset=”dindc1″>
<instance>${coord:current(-1)}</instance>
</data-in>
<data-in name=”eindc2″ dataset=”dindc2″>
<instance>${coord:current(-1)}</instance>
</data-in>
</input-events>
<action>
<workflow>
<app-path>${wf_workflow_path}</app-path>
<configuration>
<property>
<name>wf_input_path</name>
<value>${coord:dataIn(‘eindc1’)}</value>
</property>
<property>
<name>wf_input_path_2</name>
<value>${coord:dataIn(‘eindc2′)}</value>
</property>
<property>
<name>wf_output_path</name>
<value>${nameNode}/${wf_output_mapreduce}</value>
</property>
<property>
<name>wf_exec_datetime</name>
<value>${coord:nominalTime()}</value>
</property>
<property>
<name>wf_date</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1,’DAY’), “yyyy-MM-dd”)}</value>
</property>
<property>
<name>YEAR</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, ‘DAY’), ‘yyyy’)}</value>
</property>
<property>
<name>MONTH</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, ‘DAY’), ‘MM’)}</value>
</property>
<property>
<name>DAY</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, ‘DAY’), ‘dd’)}</value>
</property>
</configuration>
</workflow>
<sla:info>
<sla:nominal-time>${coord:nominalTime()}</sla:nominal-time>
<sla:should-end>${120 * MINUTES}</sla:should-end>
<sla:alert-events>end_miss</sla:alert-events>
<sla:alert-contact>${wf_notification_email}</sla:alert-contact>
<sla:notification-msg>PLease check if Daily_Ftc_Fth_Data_Processor job for ${coord:nominalTime()} is running properly!</sla:notification-msg>
<sla:upstream-apps>${wf_hadoop_instance}</sla:upstream-apps>
</sla:info>
</action>
</coordinator-app>
[/xml]
3. Oozie workflow xml – workflow.xml
An Oozie workflow is a multistage Hadoop job. A workflow is a collection of action and control nodes arranged in a directed acyclic graph (DAG) that captures control dependency where each action typically is a Hadoop job like a MapReduce, Pig, Hive, Sqoop, or Hadoop DistCp job. There can also be actions that are not Hadoop jobs like a Java application, a shell script, or an email notification. The order of the nodes in the workflow determines the execution order of these actions. An action does not start until the previous action in the workflow ends. Control nodes in a workflow are used to manage the execution flow of actions. The start and end control nodes define the start and end of a workflow. The fork and join control nodes allow executing actions in parallel. The decision control node is like a switch/case statement that can select a particular execution path within the workflow using information from the job itself.
We have two map reduce actions which process the data and outputs the data into the hive table locations and two hive jobs to add partitions into the hive meta store. We also have email action to send notification to the support team to know whether the job is successful or not on day to day basis.
[xml]
<workflow-app name=”${wf_name}” xmlns=”uri:oozie:workflow:0.5″>
<global>
<job-xml>hive-config.xml</job-xml>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${wf_hadoop_instance_queue}</value>
</property>
</configuration>
</global>
<credentials>
<credential name=’hcat’ type=’hcat’>
<property>
<name>hcat.metastore.uri</name>
<value>thrift://host:port</value>
</property>
<property>
<name>hcat.metastore.principal</name>
<value>hive/_HOST@DOMAIN</value>
</property>
</credential>
</credentials>
<start to=”calibrate_hlog_vdsl” />
<action name=”calibrate_FTC” cred=”hcat”>
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path=”${wf_output_path}/datestamp=${wf_date}” />
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${wf_hadoop_instance_queue}</value>
</property>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.class</name>
<value>com.blog.haas.mapreduce.CalibrationMapper</value>
</property>
<property>
<name>mapred.output.key.class</name>
<value>org.apache.hadoop.io.NullWritable</value>
</property>
<property>
<name>mapred.output.value.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>${nameNode}/user/${wf_hadoop_instance}/ftc-data/datestamp=${wf_date}
</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${wf_output_path_ftc}/datestamp=${wf_date}</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>0</value>
</property>
</configuration>
<file>lib/calibrate.jar#calibrate.jar</file>
</map-reduce>
<ok to=”calibrate_fth” />
<error to=”Email_failure” />
</action>
<action name=”calibrate_fth” cred=”hcat”>
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path=”${wf_output_path}/datestamp=${wf_date}” />
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${wf_hadoop_instance_queue}</value>
</property>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.class</name>
<value>com.blog.haas.mapreduce.CalibrationMapperFTH</value>
</property>
<property>
<name>mapred.output.key.class</name>
<value>org.apache.hadoop.io.NullWritable</value>
</property>
<property>
<name>mapred.output.value.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>${nameNode}/user/${wf_hadoop_instance}/fth-data/datestamp=${wf_date}
</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${wf_output_path_fth}/datestamp=${wf_date}</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>0</value>
</property>
</configuration>
<file>lib/calibrateFth.jar#calibrateFth.jar</file>
</map-reduce>
<ok to=”add-partition-ftc” />
<error to=”Email_failure” />
</action>
<action name=”add-partition-ftc” cred=”hcat”>
<hive xmlns=”uri:oozie:hive-action:0.2″>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>hive-config.xml</job-xml>
<script>add_partitions_ftc.hql</script>
<param>wf_hadoop_instance=${wf_hadoop_instance}</param>
<param>wf_date=${wf_date}</param>
<file>hive-config.xml#hive-config.xml</file>
</hive>
<ok to=”add-partition-fth” />
<error to=”Email_failure” />
</action>
<action name=”add-partition-fth” cred=”hcat”>
<hive xmlns=”uri:oozie:hive-action:0.2″>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>hive-config.xml</job-xml>
<script>add_partitions_fth.hql</script>
<param>wf_hadoop_instance=${wf_hadoop_instance}</param>
<param>wf_date=${wf_date}</param>
<file>hive-config.xml#hive-config.xml</file>
</hive>
<ok to=”Email_success” />
<error to=”Email_failure” />
</action>
<action name=”Email_success”>
<email xmlns=”uri:oozie:email-action:0.1″>
<to>${wf_notification_email}</to>
<subject>SUCCESS: Data processing FTC and FTH succesfull:${wf:id()}
</subject>
<body>Hi,
This is
auto-generated email. Please do not reply to this email.
Thanks,
TimePassTechie Team
</body>
</email>
<ok to=”End” />
<error to=”Kill” />
</action>
<action name=”Email_failure”>
<email xmlns=”uri:oozie:email-action:0.1″>
<to>${wf_notification_email}</to>
<subject>FAILURE:Data processing FTC and FTH failed : ${wf:id()}
</subject>
<body>Hi,
Data processing FTC and FTH failed for workflow ID : ${wf:id()}
This is
auto-generated email. Please do not reply to this email.
Thanks,
TimePassTechie Team
</body>
</email>
<ok to=”Kill” />
<error to=”Kill” />
</action>
<kill name=”Kill”>
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name=”End”/>
</workflow-app>
[/xml]
4. Hql file used in hive action – add_partitions_ftc.hql
Hql file to add partition to the data generated from map reduce job so that the partition information gets updated in the hive meta store.
[sql]
USE ${wf_hadoop_instance};
alter table calibrate_ftc add IF NOT EXISTS partition (datestamp=’${wf_date}’);
[/sql]
5. Hql file used in hive action – add_partitions_fth.hql
Hql file to add partition to the data generated from map reduce job so that the partition information gets updated in the hive meta store.
[sql]
USE ${wf_hadoop_instance};
alter table calibrate_fth add IF NOT EXISTS partition (datestamp=’${wf_date}’);
[/sql]
6. Hive config file for authentication and authorization required for hive action – hive-config.xml
[xml]
<?xml version=”1.0″ encoding=”UTF-8″?>
<!–Autogenerated by Cloudera Manager–>
<configuration>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://host:port</value>
</property>
<property>
<name>hive.metastore.client.socket.timeout</name>
<value>300</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.warehouse.subdir.inherit.perms</name>
<value>true</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>-1</value>
</property>
<property>
<name>hive.exec.reducers.bytes.per.reducer</name>
<value>1073741824</value>
</property>
<property>
<name>hive.exec.copyfile.maxsize</name>
<value>33554432</value>
</property>
<property>
<name>hive.exec.reducers.max</name>
<value>999</value>
</property>
<property>
<name>hive.metastore.execute.setugi</name>
<value>true</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<description>Zookeeper quorum used by Hive’s Table Lock Manager</description>
<value>substitute your actual ZooKeeper node names</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<description>The port at which the clients will connect.</description>
<value>2000</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>Zookeeper quorum used by hbase</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2002</value>
</property>
<property>
<name>hive.zookeeper.namespace</name>
<value>hive_zookeeper_namespace_hive2</value>
</property>
<property>
<name>hive.cluster.delegation.token.store.class</name>
<value>org.apache.hadoop.hive.thrift.MemoryTokenStore</value>
</property>
<property>
<name>hive.server2.enable.doAs</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.sasl.enabled</name>
<value>true</value>
</property>
<!–‘hive.server2.authentication’, originally set to ‘kerberos’ (non-final), is overridden below by a safety valve–>
<property>
<name>hive.metastore.kerberos.principal</name>
<value>hive/DOMAIN</value>
</property>
<property>
<name>hive.server2.authentication.kerberos.principal</name>
<value>hive/DOMAIN</value>
</property>
<property>
<name>hive.server2.authentication</name>
<value>kerberos</value>
</property>
</configuration>
[/xml]
7. coordinator property file to pass configuration – coordinator.properties
Finally the property file where the configuration parameters are passed from.
wf_hadoop_instance=HAAS_QUEUE wf_hadoop_instance_queue=HAAS_QUEUE nameNode=hdfs://nameservice jobTracker=yarnRM oozie.coord.application.path=/user/${wf_hadoop_instance}/workflows/calibrate oozie.use.system.libpath=true oozie.action.sharelib.for.pig=pig,hcatalog coord_name=DAILY_CALIBRATE coord_start_time=2017-04-24T12:00Z coord_end_time=2020-04-25T23:00Z wf_name=DAILY_CALIBRATE wf_workflow_path=/user/${wf_hadoop_instance}/workflows/calibrate [email protected]
8. running the coordinator job
oozie job -oozie http://oozie_host:port/oozie -dryrun -config coordinator.properties oozie job -oozie http://oozie_host:port/oozie -config coordinator.properties -run
How to add partitioner class information to oozie workflow.
Adding the below property in oozie workflow did not invoke Partitioner at all
mapred.partitioner.class
PonRankPartitioner
But, when running the same code using hadoop jar command from CLI, partitioner is invoked .
Any idea on the reason as to why partitioner is not invoked ??