oozie workflow example to use multipleinputs and orcinputformat to process the data from different mappers and joining the dataset in the reducer

I will explain how to use multipleinputs to process linelength and speeddata from ems . The input format we will be using here is OrcInputFormat and TextInputFormat. We will be processing linelength data from one location and the speeddata from another location. The linelength data is in text format so we will be using TextInputFormat to process the same and the data from speeddata is in the OrcInputFormat .The data from two mappers will be passed into the reducer to do a join on the data.

We will be using DelegatingInputFormat and the DelegatingMapper. The mapreduce.input.multipleinputs.dir.formats will be used to configure the input formats and the path and mapreduce.input.multipleinputs.dir.mappers will be used to configure the mappers.

I have not included the mapper and reducer code implementation as there is no difference in that implementation.

Oozie workflow xml – workflow.xml
[xml]

<workflow-app name="${wf_name}" xmlns="uri:oozie:workflow:0.5">
<global>
<job-xml>hive-config.xml</job-xml>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${wf_hadoop_instance_queue}</value>
</property>
</configuration>
</global>
<credentials>
<credential name='hcat' type='hcat'>
<property>
<name>hcat.metastore.uri</name>
<value>thrift://host:port</value>
</property>
<property>
<name>hcat.metastore.principal</name>
<value>hive/_HOST@DOMAIN</value>
</property>
</credential>
</credentials>
<start to="process_data" />

<action name="process_data" retry-max="2" retry-interval="5" cred="hcat">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${wf_output_mapreduce}/datestamp=${wf_date}" />
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${wf_hadoop_instance_queue}</value>
</property>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>

<property>
<name>mapreduce.inputformat.class</name>
<value>org.apache.hadoop.mapreduce.lib.input.DelegatingInputFormat
</value>
</property>
<property>
<name>mapreduce.map.class</name>
<value>org.apache.hadoop.mapreduce.lib.input.DelegatingMapper
</value>
</property>

<property>
<name>mapreduce.input.multipleinputs.dir.formats</name>
<value>${wf_dsl_input_path}/${wf_date};
org.apache.orc.mapreduce.OrcInputFormat,
${wf_speed_input_path}
/datestamp=${wf_date};
org.apache.hadoop.mapreduce.lib.input.TextInputFormat</value>
</property>
<property>
<name>mapreduce.input.multipleinputs.dir.mappers</name>
<value>${wf_dsl_input_path}
/${wf_date};com.blog.DsllDataMapper,${wf_speed_input_path}
/datestamp=${wf_date};com.blog.SpeedDataMapper</value>
</property>

<property>
<name>mapreduce.reduce.class</name>
<value>com.blog.DslSpeedJoinReducer</value>
</property>
<property>
<name>mapred.output.key.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapred.output.value.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${wf_output_mapreduce}/datestamp=${wf_date}</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>20</value>
</property>
</configuration>
<file>lib/Validator.jar#Validator.jar</file>
<file>lib/orc-core-1.1.0.jar#orc-core-1.1.0.jar</file>
<file>lib/orc-mapreduce-1.1.0.jar#orc-mapreduce-1.1.0.jar</file>
<file>lib/hive-storage-api-2.2.1.jar#hive-storage-api-2.2.1.jar</file>
<file>lib/aircompressor-0.6.jar#aircompressor-0.6.jar</file>
</map-reduce>
<ok to="Email_success" />
<error to="Email_failure" />
</action>

<action name="Email_success">
<email xmlns="uri:oozie:email-action:0.1">
<to>${wf_notification_email}</to>
<subject>SUCCESS : [${wf_hadoop_instance}]
 ${wf:name()} is successful for Run 
Date Time :- ${wf_exec_datetime}:${wf:id()}
</subject>
<body>Hi,

This is auto-generated email. Please do not reply to this email.
</body>
</email>
<ok to="End" />
<error to="Kill" />
</action>

<action name="Email_failure">
<email xmlns="uri:oozie:email-action:0.1">
<to>${wf_notification_email}</to>
<subject>FAIL: [${wf_hadoop_instance}] 
${wf:name()} is failed for Run Date Time
 :- ${wf_exec_datetime} ${wf:id()}
</subject>
<body>Hi,

Workflow : ${wf:name()}
Failed Node : ${wf:lastErrorNode()}
Error Message : ${wf:errorMessage(wf:lastErrorNode())}
The workflow ${wf:name()} 
has failed to complete. Run Date Time 
:- ${wf_exec_datetime}
This is auto-generated email. 
Please do not reply to this email.
</body>
</email>
<ok to="Kill" />
<error to="Kill" />
</action>

<kill name="Kill">
<message>Action failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>

<end name="End"/>

</workflow-app>
[/xml]

 

 

1 thought on “oozie workflow example to use multipleinputs and orcinputformat to process the data from different mappers and joining the dataset in the reducer”

Comments are closed.