using hive udtf in spark sql

In this article i will demonstrate how to build a Hive UDTF and execute it in Apache Spark.

In hive user defined tabular functions works on a row and returns multiple row as output. One example of build in udtf in Hive is EXPLODE function.

Lets say we have a input data as below


1920,shelf=0/slot=5/port=1,100
1920,shelf=1/slot=4/port=6,200
1920,shelf=2/slot=5/port=24,300
1920,shelf=3/slot=5/port=0,400

We need a hive custom tabular function which will return each combination of one row .For example the input data will be transformed as below


1920,shelf=0/slot=5/port=1
1920,100
1920,shelf=1/slot=4/port=6
1920,200
1920,shelf=2/slot=5/port=24
1920,300
1920,shelf=3/slot=5/port=0
1920,400

Lets write a custom java class to define user defined tabular function or udtf which extends org.apache.hadoop.hive.ql.udf.generic.GenericUDTF and implement initialize(), process() and close() method.

Below is the code


import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class CustomUDTF extends GenericUDTF {
private PrimitiveObjectInspector stringOI = null;
@Override
public StructObjectInspector initialize(ObjectInspector[] args)
throws UDFArgumentException {
if (args.length != 1) {
throw new UDFArgumentException(
"NameParserGenericUDTF() takes exactly one argument");
}
if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE
&& ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
throw new UDFArgumentException(
"NameParserGenericUDTF() takes a string as a parameter");
}
// input inspectors
stringOI = (PrimitiveObjectInspector) args[0];
// output inspectors -- an object with three fields!
List<String> fieldNames = new ArrayList<String>(2);
List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);
fieldNames.add("device");
fieldNames.add("slot");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(
fieldNames, fieldOIs);
}

public ArrayList<Object[]> processInputRecord(String id) {
ArrayList<Object[]> result = new ArrayList<Object[]>();
// ignoring null or empty input
if (id == null || id.isEmpty()) {
return result;
}
String[] tokens = id.split(",");
if (tokens.length == 2) {
result.add(new Object[] { tokens[0], tokens[1] });
}
else if (tokens.length == 3) {
result.add(new Object[] { tokens[0], tokens[1] });
result.add(new Object[] { tokens[0], tokens[2] });
}
return result;
}
@Override
public void process(Object[] record) throws HiveException {
final String id = stringOI.getPrimitiveJavaObject(record[0]).toString();
ArrayList<Object[]> results = processInputRecord(id);
Iterator<Object[]> it = results.iterator();
while (it.hasNext()) {
Object[] r = it.next();
forward(r);
}
}
@Override
public void close() throws HiveException {
// do nothing
}
}

To deploy the above code package your java class into a jar file and add it to the hive class path and add a temporary function and execute the udtf as below


hive> ADD HiveUdtjar;
hive> CREATE TEMPORARY FUNCTION SPLIT_USAGE as 'blog.hadoop.hive.custom.CustomUDTF';
hive> select SPLIT_USAGE(value) from test_table;

So lets execute the same hive udaf using spark sql and dataframe. We need to create a temporary view from the dataset and register the function using the session.sql method . Once we register the function we can use the same in the queries.

Below is the code


import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkUDTFExecutor {

public static void main(String[] args) throws AnalysisException {

//Pass the input path as a parameter

SparkSession session = SparkSession.builder().enableHiveSupport().appName("test").master("local").getOrCreate();
//session.sparkContext().setLogLevel("DEBUG");
Dataset<Row> df=session.read().option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").text("C:\\dataset\\input\\test");

df.printSchema();
df.show();

df.createTempView("test_table");
session.sql("CREATE TEMPORARY FUNCTION SPLIT_USAGE AS 'com.blog.hive.udf.CustomUDTF'");
session.sql("SELECT SPLIT_USAGE(value) FROM test_table").show();

}

}

In the above code we are using spark 2.0 features like SparkSession . If we have to execute the hive udtf using older version of spark like spark 1.9,1.8,1.7 and 1.6 we can use the below code.

Here we are reading file with hiveContext.read().text() method of dataframe which will read the file with one line per row with a column named value. Once we have the dataframe we create a temporary view from the dataframe and register the function using the dataFrame.registerTempTable method . Once we register the function we can use the same in the queries.


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;

public class Test {

public static void main(String[] args) throws AnalysisException {

SparkConf conf = new SparkConf().setMaster("local").setAppName("test");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hc = new HiveContext(sc);
DataFrame df2=hc.read().text("Input Path");
df2.registerTempTable("test_table");
hc.sql("CREATE TEMPORARY FUNCTION SPLIT_USAGE AS 'com.blog.hive.udf.CustomUDTF'");
hc.sql("SELECT SPLIT_USAGE(value) FROM test_table").show();


}
}

Lets execute the same udtf for avro data . Lets first create the avro data using the below code, we are using the same input data that we have used in the above example.

We need additional dependency to process the avro data . We can add the below dependency if we are using maven to build the project


<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.10</artifactId>
<version>1.1.0-cdh5.9.1</version>
</dependency>


import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class CreateAvroData {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setMaster("local").setAppName("test");
JavaSparkContext sc = new JavaSparkContext(conf);

List<org.apache.spark.sql.types.StructField> listOfStructField = new ArrayList<StructField>();
listOfStructField.add(
DataTypes.createStructField("device", DataTypes.StringType, true));
listOfStructField.add(
DataTypes.createStructField("shelf", DataTypes.StringType, true));
listOfStructField.add(DataTypes.createStructField("usage",
DataTypes.StringType, true));

StructType structType = DataTypes.createStructType(listOfStructField);

JavaRDD<Row> rddData = sc.textFile("Input Path")
.map(new Function<String, Row>() {

private static final long serialVersionUID = 1212;

@Override
public Row call(String v1) throws Exception {
// TODO Auto-generated method stub

String[] data = v1.split(",");
return RowFactory.create(data[0], data[1], data[2]);
}
});

SQLContext hiveContext = new HiveContext(sc);
DataFrame dataFrame = hiveContext.createDataFrame(rddData, structType);
dataFrame.write().format("com.databricks.spark.avro").save("Output Location");
}

}

We will use the same hive udtf that we created above and execute the same in spark sql to process the avro data. In the below code we are loading the query from a configuration file.


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;

public class SparkUDTFAvro{

public static void main(String[] args) {

args = new String[]{args[0],args[1]};

SparkConf conf = new SparkConf().setMaster("local").setAppName("test");

JavaSparkContext sc = new JavaSparkContext(conf);
sc.hadoopConfiguration()
.set("avro.mapred.ignore.inputs.without.extension", "false");
HiveContext sql = new HiveContext(sc);
DataFrame df = sql.read().format("com.databricks.spark.avro")
.load(args[0]);

df.registerTempTable("test_table");

sql.sql("CREATE TEMPORARY FUNCTION split_row AS 'com.blog.hive.udf.CustomUDTF'");
ConfigFile scriptFile = new ConfigFile(Constants.QUERY3_TXT,
FileType.script);
String query = scriptFile.getFileContent();
sql.sql(query).write().format("com.databricks.spark.avro").save(args[1]);;

}

}

Below are the helper classes used for loading the query from the configuration file


import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;

public class ConfigFile {

private String fileName;

private SequencedProperties properties;

private FileType fileType;

private String fileContent;

private static Logger logger;

public ConfigFile(String fileName, FileType fileType) {
this.fileName = fileName;
this.properties = new SequencedProperties();
this.fileType = fileType;
loadFile();

}

public Properties getProperties() {
return properties;
}

public void setProperties(SequencedProperties properties) {
this.properties = properties;
}

public FileType getFileType() {
return fileType;
}

public void setFileType(FileType fileType) {
this.fileType = fileType;
}

public String getFileContent() {
return fileContent;
}

public void setFileContent(String fileContent) {
this.fileContent = fileContent;
}

public String getFileName() {
return fileName;
}

public void setFileName(String fileName) {
this.fileName = fileName;
}

private void loadFile() {
InputStream in = getClass().getClassLoader().getResourceAsStream(getFileName());
try {
if (this.getFileType() == FileType.property) {
this.getProperties().load(in);
} else if (this.getFileType() == FileType.script) {
StringWriter writer = new StringWriter();
IOUtils.copy(in, writer);
fileContent = writer.toString();
}
} catch (IOException e) {
logger.error(e.getMessage().toString());
} finally {
try {
in.close();
} catch (IOException e) {
logger.error(e.getMessage().toString());
}
}
}

public Properties getProperty() {
return properties;

}

public String getString(String key) {
return properties.getProperty(key);
}

}


public class Constants {

public static final String QUERY1_TXT = "query1.txt";

public static final String QUERY2_TXT = "query2.txt";

public static final String QUERY3_TXT = "query3.txt";

/**
* Instantiates a new constants.
*/
public Constants() {
super();
}

}


public enum FileType {

property,script

}


import java.util.Collections;
import java.util.Enumeration;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
* The Class SequencedProperties is a custom property handler implementation.
*/
public class SequencedProperties extends Properties {

/** The Constant serialVersionUID. */
private static final long serialVersionUID = 1L;

/** The key set. */
private transient Set<Object> keySet = new LinkedHashSet<Object>(100);

/*
* (non-Javadoc)
*
* @see java.util.Hashtable#keys()
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public synchronized Enumeration keys() {
return Collections.enumeration(keySet);
}

/*
* (non-Javadoc)
*
* @see java.util.Hashtable#keySet()
*/
@Override
public Set<Object> keySet() {
return keySet;
}

/*
* (non-Javadoc)
*
* @see java.util.Hashtable#put(java.lang.Object, java.lang.Object)
*/
@Override
public synchronized Object put(Object key, Object value) {
if (!keySet.contains(key)) {
keySet.add(key);
}
return super.put(key, value);
}

/*
* (non-Javadoc)
*
* @see java.util.Hashtable#remove(java.lang.Object)
*/
@Override
public synchronized Object remove(Object key) {
keySet.remove(key);
return super.remove(key);
}

/*
* (non-Javadoc)
*
* @see java.util.Hashtable#putAll(java.util.Map)
*/
@SuppressWarnings("unchecked")
@Override
public synchronized void putAll(@SuppressWarnings("rawtypes") Map values) {
for (Object key : values.keySet()) {
if (!containsKey(key)) {
keySet.add(key);
}
}
super.putAll(values);
}
}