Problem to solve – Load the collector data from mysql database and process it using mapreduce framework. We have a collector database which has 5 collector table from c1_metric to c5_metric .We want to process these data with 5 mapper instances and process it for further analyses.
Prerequisite
1. A running instance of mysql server should be available . We are using table name c1_metric,c2_metric,c3_metric,c4_metric,c5_metric from which input will be taken.
2.As we are fetching the data from the mysql database we need to add mysql driver into our project.Below maven dependency can be used if we are building the project on maven.
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.6</version> </dependency>
The external source input pattern doesn’t load data from HDFS, but instead from some system outside of Hadoop, such as an SQL database or a web service.This pattern can be used to load data in parallel from a source that is not part of your MapReduce framework.
There are a few reasons why you might want to analyze the data directly from the source instead of staging it first in hdfs. It may be faster to load the data from outside of Hadoop without having to stage it into files first. For example, dumping a database to the file system is
likely to be an expensive operation, and taking it from the database directly ensures that the MapReduce job has the most up-to-date data available.
In a MapReduce approach, the data is loaded in parallel rather than in a serial fashion. The caveat to this is that the source needs to have well-defined boundaries on which data is read in parallel in order to scale. For example, in the case of a sharded databases, each map task can be assigned a shard to load from the a table, thus allowing for very quick parallel loads of data without requiring a database scan.
Structure
1. The InputFormat creates all the InputSplit objects, which may be based on a custom object. An input split is a chunk of logical input, and that largely depends on the format in which it will be reading data. In this pattern, the input is not from a file-based input but an external source. The input could be from a series of SQL tables or a number of distributed services spread through the cluster. As long as the input can be read in parallel, this is a good fit for MapReduce.
2. The InputSplit contains all the knowledge of where the sources are and how much of each source is going to be read. The framework uses the location information to help determine where to assign the map task. A custom InputSplit must also implement the Writable interface, because the framework uses the methods of this interface to transmit the input split information to a TaskTracker.
3.The RecordReader uses the job configuration provided and InputSplit information to read key/value pairs. The implementation of this class depends on the data source being read.
InputSplit code
MySqlInputSplit extends the inputsplit base class and implements the writable interface. We store the username,password,driver,url and table name in the inputsplit which decides what table to process for a particular split. The input split implements the Writable interface, so that it is serializable by the framework, and includes a default constructor in order for the framework to create a new instance via reflection. We return the location via the getLocations method, in the hopes that the JobTracker will assign each map task to a TaskTracker that is hosting the data.
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; public class MySqlInputSplit extends InputSplit implements Writable { public String tableName; public String username; public String password; public String driver; public String url; public MySqlInputSplit() { } public String getTableName() { return tableName; } public String getUsername() { return username; } public String getPassword() { return password; } public String getDriver() { return driver; } public String getUrl() { return url; } public MySqlInputSplit(String username, String password, String driver, String url, String tableName) { super(); this.username = username; this.password = password; this.driver = driver; this.url = url; this.tableName = tableName; } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(username); out.writeUTF(password); out.writeUTF(driver); out.writeUTF(url); out.writeUTF(tableName); } @Override public void readFields(DataInput in) throws IOException { this.username = in.readUTF(); this.password = in.readUTF(); this.driver = in.readUTF(); this.url = in.readUTF(); this.tableName = in.readUTF(); } @Override public long getLength() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public String[] getLocations() throws IOException, InterruptedException { // TODO Auto-generated method stub return new String[] { tableName }; } }
InputFormat code
MySqlInputFormat contains configuration variables to know which database to connect. In the getSplits method, the configuration is verified and a number of MySqlInputSplits is created based on the number of collector tables. This will create one map task for each configured collector table. The createRecordReader method is called by the framework to get a new instance of a record reader. The record reader’s initialize method is called by the framework, so we can just create a new instance and return it.
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; public class MySqlInputFormat extends InputFormat<Text, Text> { public static final String MYSQL_USERNAME = "mysql.user.name"; public static final String MYSQL_PASSWORD = "mysql.password"; public static final String MYSQL_DRIVER = "mysql.driver"; public static final String MYSQL_DB_URL = "mysql.url"; public static final String TABLE_NAME = "mysql.tablename"; public static void setMySqlUsername(Job job, String username) { job.getConfiguration().set(MYSQL_USERNAME, username); } public static void setMySqlPassword(Job job, String pass) { job.getConfiguration().set(MYSQL_PASSWORD, pass); } public static void setMySqlDriver(Job job, String driver) { job.getConfiguration().set(MYSQL_DRIVER, driver); } public static void setMySqlUrl(Job job, String url) { job.getConfiguration().set(MYSQL_DB_URL, url); } public static void setMySqlTableName(Job job, String tableName) { job.getConfiguration().set(TABLE_NAME, tableName); } @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub String driver = context.getConfiguration().get(MYSQL_DRIVER); String dburl = context.getConfiguration().get(MYSQL_DB_URL); String username = context.getConfiguration().get(MYSQL_USERNAME); String pass = context.getConfiguration().get(MYSQL_PASSWORD); String tableName = context.getConfiguration().get(TABLE_NAME); if (driver == null || dburl == null || username == null || pass == null || tableName == null) { throw new IOException("Check configuration."); } // create an inputsplit for each collector table List<InputSplit> splits = new ArrayList<InputSplit>(); for (String table : tableName.split(",")) { splits.add(new MySqlInputSplit(username,pass,driver,dburl, table)); } return splits; } @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub return new MySqlRecordReader(); } }
RecordReader code
MySqlRecordReader is where most of the work is done. The initialize method is called by the framework and provided with an input split we
created in the input format. Here, we get the mysql parameters and also the table name which this split should process.In nextKeyValue, we iterate through the resultset one at a time and set the record reader’s writable objects for the key and value. A return value of true informs the framework that there is a key/value pair to process. Once we have exhausted all the key/value pairs, false is returned so the map task can complete.
The other methods of the record reader are used by the framework to get the current key and value for the mapper to process. It is worthwhile to reuse this object whenever possible. The getProgress method is useful for reporting gradual status to the Job Tracker and should also be reused if possible. Finally, the close method is for finalizing the process.
package com.hadoop.design.external.input; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; public class MySqlRecordReader extends RecordReader<Text, Text> { public Connection conn = null; public Statement stmt = null; public Text key = new Text(); public Text value = new Text(); public ResultSet rs = null; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { MySqlInputSplit mySqlInputSplit = (MySqlInputSplit) split; try { Class.forName(mySqlInputSplit.getDriver()); } catch (ClassNotFoundException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } try { conn = DriverManager.getConnection(mySqlInputSplit.getUrl(), mySqlInputSplit.getUsername(), mySqlInputSplit.getPassword()); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { stmt = conn.createStatement(); String sql = "select * from " + mySqlInputSplit.getTableName(); rs = stmt.executeQuery(sql); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public boolean nextKeyValue() throws IOException, InterruptedException { try { while (rs.next()) { String metric_id = rs.getString("metric_id"); String metric_value = rs.getString("value"); key.set(metric_id); value.set(metric_value); return true; } } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } return false; } @Override public Text getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return value; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 50; } @Override public void close() throws IOException { try { rs.close(); stmt.close(); conn.close(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
Driver code
In the driver code we are passing the parameters required for a mysql database connection and the output path location. Since we are just using the identity mapper, we don’t need to set any special classes. The number of reduce tasks is set to zero to specify that this is a map-only job.c1_metric,c2_metric,c3_metric,c4_metric,c5_metric table name configured are the table name in mysql which will be processed by 5 mappers each for one table.
import java.io.File; import java.io.IOException; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class Driver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[] { "com.mysql.jdbc.Driver", "Replace this string with mysql database url", "Replace this string with mysql username","Replace this string with mysql password","c1_metric,c2_metric,c3_metric,c4_metric,c5_metric", "Replace this string with output path location" }; FileUtils.deleteDirectory(new File(args[2])); Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Driver.class); job.setNumReduceTasks(0); job.setInputFormatClass(MySqlInputFormat.class); MySqlInputFormat.setMySqlDriver(job, args[0]); MySqlInputFormat.setMySqlUrl(job, args[1]); MySqlInputFormat.setMySqlUsername(job, args[2]); MySqlInputFormat.setMySqlPassword(job, args[3]); MySqlInputFormat.setMySqlTableName(job,args[4]); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(args[5])); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 3); } }
Very Well written. Thanks for posting. Good job .
Why getProgress method is returning 50? . Can you please explain why?
hi Victoria get progress method should actually return a number between 0.0 and 1.0 that is the fraction of the data read which indicates the the current progress of the record reader through its data. I have returned a random number here to keep the code simpler but in a real scenario you should ideally come with a logic which can give a fair progress of the job.