mapreduce pattern for writing data into external source to a system outside of hadoop and hdfs like mysql database

External source output pattern writes data to a system outside of Hadoop and HDFS.With this pattern, we are able to output data from the MapReduce framework directly to an external source. This is extremely useful for direct loading into a system instead of staging the data to be delivered to the external source. The pattern skips storing data in a file system entirely and sends output key/value pairs directly where they belong.

MapReduce is rarely ever hosting an applications as-is, so using MapReduce to bulk load into an external source in parallel has its uses.In a MapReduce approach, the data is written out in parallel. As with using an external source for input, you need to be sure the destination system can handle the parallel ingest it is bound to endure with all the open connections.

Structure

1.The OutputFormat verifies the output specification of the job configuration prior to job submission. This is a great place to ensure that the external source is fully functional, as it won’t be good to process all the data only to find out the external source was unable when it was time to commit the data. This method also is responsible for creating and initializing a RecordWriter implementation.

2.The RecordWriter writes all key/value pairs to the external source. Much like a RecordReader, the implementation varies depending on the external data source being written to. During construction of the object, establish any needed connections using the external source’s API. These connections are then used to write out all the data from each map or reduce task.

OutputFormat

The MySqlOutputFormat is responsible for establishing and verifying the job configuration prior to being submitted to the JobTracker. Once the job has been submitted, it also creates the RecordWriter to serialize all the output key/value pairs.In the checkOutputSpecs method, we ensure that username and password of mysql database are set in driver configuration before we even both launching the job, as it will surely fail without them. This is where you’ll want to verify your configuration.

The getRecordWriter method is used on the back end to create an instance of a RecordWriter for the map or reduce task. Here, we get the configuration variables required by the MySqlRecordWriter and return a new instance of it.

The final method of this output format is getOutputCommitter. The output committer is used by the framework to manage any temporary output before committing in case the task fails and needs to be reexecuted. For this implementation, we don’t typically care whether the task fails and needs to be re-executed. As long as the job finishes we are okay. An output committer is required by the framework, but the NullOutputFormat contains an output committer implementation that doesn’t do anything.

Problem to solve

Write the top twenty highest rated movies data into mysql from map reduce framework .This solution writes data to a system outside of Hadoop and HDFS.

We are using the output from the solution  highest-rated-movies  as the input for this solution.

Sample input data

Sanjuro (1962)::4.608695652173913
Seven Samurai (The Magnificent Seven) (Shichinin no samurai) (1954)::4.560509554140127
Shawshank Redemption, The (1994)::4.554557700942973
Godfather, The (1972)::4.524966261808367
Close Shave, A (1995)::4.52054794520548
Usual Suspects, The (1995)::4.517106001121705
Schindlers List (1993)::4.510416666666667
Wrong Trousers, The (1993)::4.507936507936508
Sunset Blvd. (a.k.a. Sunset Boulevard) (1950)::4.491489361702127

Input data attached top_twenty_highest_rated_movies.txt

Additional Dependencies for this solution

As we are inserting the data into the mysql database we need to add mysql driver into our project.Below maven dependency can be used if we are building the project on maven.

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>
MySqlOutputFormat

We are getting the mysql driver,url,username and password from the driver configuration. In the getRecordWriter method we are instantiating an object of MySqlRecordWriter passing all the required parameters.In the checkOutputSpecs we are validating to make sure we have all the required parameters to make a connection with the mysql database.

import java.io.IOException;
import java.sql.SQLException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

public class MySqlOutputFormat extends OutputFormat<Text, Text> {

public static final String MYSQL_USERNAME = "mysql.user.name";
public static final String MYSQL_PASSWORD = "mysql.password";
public static final String MYSQL_DRIVER = "mysql.driver";
public static final String MYSQL_DB_URL = "mysql.url";

public static void setMySqlUsername(Job job, String username) {
job.getConfiguration().set(MYSQL_USERNAME, username);
}

public static void setMySqlPassword(Job job, String pass) {
job.getConfiguration().set(MYSQL_PASSWORD, pass);
}

public static void setMySqlDriver(Job job, String driver) {
job.getConfiguration().set(MYSQL_DRIVER, driver);
}

public static void setMySqlUrl(Job job, String url) {
job.getConfiguration().set(MYSQL_DB_URL, url);
}
@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {

try {
return new MySqlRecordWriter(job.getConfiguration().get(MYSQL_DRIVER),
job.getConfiguration().get(MYSQL_DB_URL), job.getConfiguration().get(MYSQL_USERNAME),
job.getConfiguration().get(MYSQL_PASSWORD));
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;

}

@Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {

String driver = context.getConfiguration().get(MYSQL_DRIVER);
String dburl = context.getConfiguration().get(MYSQL_DB_URL);
String username = context.getConfiguration().get(MYSQL_USERNAME);
String pass = context.getConfiguration().get(MYSQL_PASSWORD);

if (driver == null || dburl == null || username == null || pass == null) {
throw new IOException("Check configuration.");
}

}

@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
return (new NullOutputFormat<Text, Text>()).getOutputCommitter(context);
}

}

RecordWriter code

The MySqlRecordWriter handles connecting to mysql via the mysql driver and writing out the data. Finally, the Connection and Statement resources are closed in the close method.

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class MySqlRecordWriter extends RecordWriter<Text, Text> {

public Connection conn = null;
public Statement stmt = null;

public MySqlRecordWriter(String driver, String db_url, String username, String password)
throws SQLException, ClassNotFoundException {

Class.forName(driver);
conn = DriverManager.getConnection(db_url, username, password);
stmt = conn.createStatement();

}

@Override
public void write(Text key, Text value) throws IOException, InterruptedException {

String sql = "INSERT INTO movie_rating_test " + "VALUES ('" + key.toString() + "'," + value.toString() + ")";
try {
stmt.executeUpdate(sql);
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {

try {
stmt.close();
conn.close();

} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

Mapper Code

The Mapper instance is very straightforward and looks like any other mapper. The movie name and rating are retrieved from the record and then output. The output format does all the heavy lifting for us, allowing it to be reused multiple times to write whatever we want to a mysql database.


import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ExternalWriterMapper extends Mapper<Object, Text, Text, Text> {

private Text movieId = new Text();
private Text rating = new Text();

@Override
public void map(Object key, Text values, Context context) throws IOException, InterruptedException {
String data = values.toString();
String[] field = data.split("::", -1);
if (null != field && field.length == 2 && field[0].length() > 0 && field[1].length()>0) {
movieId.set(field[0]);
rating.set(field[1]);
context.write(movieId, rating);

}

}

}

Driver Code

In the driver code we are passing the parameters required for a mysql database connection and the input data location . And we are setting all the other parameter as usual and submitting the job.


import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class Driver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

args = new String[] { "com.mysql.jdbc.Driver", "Replace this string with mysql database url", "Replace this string with mysql username","Replace this string with mysql password", "Replace this string with input data path location" };

FileUtils.deleteDirectory(new File(args[2]));

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Driver.class);
job.setMapperClass(ExternalWriterMapper.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, args[4]);
job.setOutputFormatClass(MySqlOutputFormat.class);
MySqlOutputFormat.setMySqlDriver(job, args[0]);
MySqlOutputFormat.setMySqlUrl(job, args[1]);
MySqlOutputFormat.setMySqlUsername(job, args[2]);
MySqlOutputFormat.setMySqlPassword(job, args[3]);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
int code = job.waitForCompletion(true) ? 0 : 2;
System.exit(code);
}
}