using hive udf in spark sql

In this article i will demonstrate how to build a Hive UDF and execute it in Apache Spark. Hive user-defined functions operate row-wise and output one result for one row.

Lets say we have a input data as below


1920,shelf=0/slot=5/port=1,100
1920,shelf=1/slot=4/port=6,200
1920,shelf=2/slot=5/port=24,300
1920,shelf=3/slot=5/port=0,400

We need a hive custom function which will convert the shelf=0/slot=5/port=1 string to shelf=0. We need write a custom java class to define user defined function which extends org.apache.hadoop.hive.sq.exec.UDF and implement evaluate() method.


import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;

@UDFType(deterministic = true, stateful = false)
public class CustomUDF extends UDF {

public String evaluate(String str) {
if (str == null) {
return null;
}

String[] sub_id = str.split("/");

return sub_id[0];
}

}

To deploy the above code package your java class into a jar file and add it to the hive class path and add a temporary function and execute the udf as below


hive> ADD HiveUdf.jar;
hive> CREATE TEMPORARY FUNCTION stripPorts as 'blog.hadoop.hive.custom.CustomUDF';
hive> select stripPorts(shelf) from service_table;

So lets execute the same hive udf using spark sql and dataframe. We need to create a temporary view from the dataset and register the function using the session.sql method . Once we register the function we can use the same in the queries.


import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkUDFExecutor {

public static void main(String[] args) throws AnalysisException {

//Pass the input path as a parameter

SparkSession session = SparkSession.builder().enableHiveSupport().appName("test").master("local").getOrCreate();
Dataset<Row> df=session.read().option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").csv(args[0]).toDF("device","slot","usage");
df.createTempView("test_table");
session.sql("CREATE TEMPORARY FUNCTION STRIPDATA AS 'blog.hadoop.hive.custom.CustomUDF'");
session.sql("SELECT device, STRIPDATA(slot) AS slotMod FROM test_table").show();

}

}

In the above code we are using spark 2.0 features like SparkSession . If we have to execute the hive udf using older version of spark like spark 1.9,1.8,1.7 and 1.6 we can use the below code.

Here we are first loading the javardd of string and then we define the schema string where all the fields is of string type . We then create an list of StructField and add data into the list using DataTypes.CreateStructField method. And then we create the StringType object passing the list we created above to the createstructType method.

Once we have the structType object ready we then convert the javardd of string into the javardd of row using the RowFactory.create method inside the map method. Finally we pass the structType object we created earlier and javardd of row into the hiveContext.createDataFrame method to get the dataframe. Once we have the dataframe we create a temporary view from the dataframe and register the function using the dataFrame.registerTempTable method . Once we register the function we can use the same in the queries.


import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class SparkUDFExecutor {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setMaster("local").setAppName("test");
JavaSparkContext sc = new JavaSparkContext(conf);

List<org.apache.spark.sql.types.StructField> listOfStructField = new ArrayList<StructField>();
listOfStructField.add(
DataTypes.createStructField("device", DataTypes.StringType, true));
listOfStructField.add(
DataTypes.createStructField("shelf", DataTypes.StringType, true));
listOfStructField.add(DataTypes.createStructField("usage",
DataTypes.StringType, true));

StructType structType = DataTypes.createStructType(listOfStructField);

JavaRDD<Row> rddData = sc.textFile("Input Path")
.map(new Function<String, Row>() {

private static final long serialVersionUID = 1212;

@Override
public Row call(String v1) throws Exception {
// TODO Auto-generated method stub

String[] data = v1.split(",");

return RowFactory.create(data[0], data[1], data[2]);
}
});

SQLContext hiveContext = new HiveContext(sc);
DataFrame dataFrame = hiveContext.createDataFrame(rddData, structType);
dataFrame.registerTempTable("test_table");
hiveContext.sql(
"create temporary function STRIPDATA as 'blog.hive.udf.CustomUDF'");
hiveContext.sql("SELECT device, STRIPDATA(shelf) AS slotMod FROM test_table")
.show();

}

}

Lets execute the same udf for avro data . Lets first create the avro data using the below code, we are using the same input data that we have used in the above example.

We need additional dependency to process the avro data . We can add the below dependency if we are using maven to build the project


<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.10</artifactId>
<version>1.1.0-cdh5.9.1</version>
</dependency>


import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class CreateAvroData {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setMaster("local").setAppName("test");
JavaSparkContext sc = new JavaSparkContext(conf);

List<org.apache.spark.sql.types.StructField> listOfStructField = new ArrayList<StructField>();
listOfStructField.add(
DataTypes.createStructField("device", DataTypes.StringType, true));
listOfStructField.add(
DataTypes.createStructField("shelf", DataTypes.StringType, true));
listOfStructField.add(DataTypes.createStructField("usage",
DataTypes.StringType, true));

StructType structType = DataTypes.createStructType(listOfStructField);

JavaRDD<Row> rddData = sc.textFile("Input Path")
.map(new Function<String, Row>() {

private static final long serialVersionUID = 1212;

@Override
public Row call(String v1) throws Exception {
// TODO Auto-generated method stub

String[] data = v1.split(",");
return RowFactory.create(data[0], data[1], data[2]);
}
});

SQLContext hiveContext = new HiveContext(sc);
DataFrame dataFrame = hiveContext.createDataFrame(rddData, structType);
dataFrame.write().format("com.databricks.spark.avro").save("Output Location");
}
}

We will use the same hive udf that we created above and execute the same in spark sql to process the avro data.


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;

public class SparkUDFExecutorAvro {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setMaster("local").setAppName("test");
JavaSparkContext sc = new JavaSparkContext(conf);

sc.hadoopConfiguration()
.set("avro.mapred.ignore.inputs.without.extension", "false");

SQLContext hiveContext = new HiveContext(sc);

DataFrame dataFrame = hiveContext.read().format("com.databricks.spark.avro")
.load("Input Path");

dataFrame.show();

dataFrame.registerTempTable("test_table");
hiveContext.sql(
"create temporary function STRIPDATA as 'blog.hive.udf.CustomUDF'");
hiveContext.sql("SELECT device, STRIPDATA(shelf) AS slotMod FROM test_table")
.show();

}

}