In this article i will demonstrate how to build a Hive UDTF and execute it in Apache Spark.
In hive user defined tabular functions works on a row and returns multiple row as output. One example of build in udtf in Hive is EXPLODE function.
Lets say we have a input data as below
1920,shelf=0/slot=5/port=1,100 1920,shelf=1/slot=4/port=6,200 1920,shelf=2/slot=5/port=24,300 1920,shelf=3/slot=5/port=0,400
We need a hive custom tabular function which will return each combination of one row .For example the input data will be transformed as below
1920,shelf=0/slot=5/port=1 1920,100 1920,shelf=1/slot=4/port=6 1920,200 1920,shelf=2/slot=5/port=24 1920,300 1920,shelf=3/slot=5/port=0 1920,400
Lets write a custom java class to define user defined tabular function or udtf which extends org.apache.hadoop.hive.ql.udf.generic.GenericUDTF and implement initialize(), process() and close() method.
Below is the code
import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; public class CustomUDTF extends GenericUDTF { private PrimitiveObjectInspector stringOI = null; @Override public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { if (args.length != 1) { throw new UDFArgumentException( "NameParserGenericUDTF() takes exactly one argument"); } if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) { throw new UDFArgumentException( "NameParserGenericUDTF() takes a string as a parameter"); } // input inspectors stringOI = (PrimitiveObjectInspector) args[0]; // output inspectors -- an object with three fields! List<String> fieldNames = new ArrayList<String>(2); List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2); fieldNames.add("device"); fieldNames.add("slot"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector( fieldNames, fieldOIs); } public ArrayList<Object[]> processInputRecord(String id) { ArrayList<Object[]> result = new ArrayList<Object[]>(); // ignoring null or empty input if (id == null || id.isEmpty()) { return result; } String[] tokens = id.split(","); if (tokens.length == 2) { result.add(new Object[] { tokens[0], tokens[1] }); } else if (tokens.length == 3) { result.add(new Object[] { tokens[0], tokens[1] }); result.add(new Object[] { tokens[0], tokens[2] }); } return result; } @Override public void process(Object[] record) throws HiveException { final String id = stringOI.getPrimitiveJavaObject(record[0]).toString(); ArrayList<Object[]> results = processInputRecord(id); Iterator<Object[]> it = results.iterator(); while (it.hasNext()) { Object[] r = it.next(); forward(r); } } @Override public void close() throws HiveException { // do nothing } }
To deploy the above code package your java class into a jar file and add it to the hive class path and add a temporary function and execute the udtf as below
hive> ADD HiveUdtjar; hive> CREATE TEMPORARY FUNCTION SPLIT_USAGE as 'blog.hadoop.hive.custom.CustomUDTF'; hive> select SPLIT_USAGE(value) from test_table;
So lets execute the same hive udaf using spark sql and dataframe. We need to create a temporary view from the dataset and register the function using the session.sql method . Once we register the function we can use the same in the queries.
Below is the code
import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class SparkUDTFExecutor { public static void main(String[] args) throws AnalysisException { //Pass the input path as a parameter SparkSession session = SparkSession.builder().enableHiveSupport().appName("test").master("local").getOrCreate(); //session.sparkContext().setLogLevel("DEBUG"); Dataset<Row> df=session.read().option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").text("C:\\dataset\\input\\test"); df.printSchema(); df.show(); df.createTempView("test_table"); session.sql("CREATE TEMPORARY FUNCTION SPLIT_USAGE AS 'com.blog.hive.udf.CustomUDTF'"); session.sql("SELECT SPLIT_USAGE(value) FROM test_table").show(); } }
In the above code we are using spark 2.0 features like SparkSession . If we have to execute the hive udtf using older version of spark like spark 1.9,1.8,1.7 and 1.6 we can use the below code.
Here we are reading file with hiveContext.read().text() method of dataframe which will read the file with one line per row with a column named value. Once we have the dataframe we create a temporary view from the dataframe and register the function using the dataFrame.registerTempTable method . Once we register the function we can use the same in the queries.
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.hive.HiveContext; public class Test { public static void main(String[] args) throws AnalysisException { SparkConf conf = new SparkConf().setMaster("local").setAppName("test"); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext hc = new HiveContext(sc); DataFrame df2=hc.read().text("Input Path"); df2.registerTempTable("test_table"); hc.sql("CREATE TEMPORARY FUNCTION SPLIT_USAGE AS 'com.blog.hive.udf.CustomUDTF'"); hc.sql("SELECT SPLIT_USAGE(value) FROM test_table").show(); } }
Lets execute the same udtf for avro data . Lets first create the avro data using the below code, we are using the same input data that we have used in the above example.
We need additional dependency to process the avro data . We can add the below dependency if we are using maven to build the project
<dependency> <groupId>com.databricks</groupId> <artifactId>spark-avro_2.10</artifactId> <version>1.1.0-cdh5.9.1</version> </dependency>
import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class CreateAvroData { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("test"); JavaSparkContext sc = new JavaSparkContext(conf); List<org.apache.spark.sql.types.StructField> listOfStructField = new ArrayList<StructField>(); listOfStructField.add( DataTypes.createStructField("device", DataTypes.StringType, true)); listOfStructField.add( DataTypes.createStructField("shelf", DataTypes.StringType, true)); listOfStructField.add(DataTypes.createStructField("usage", DataTypes.StringType, true)); StructType structType = DataTypes.createStructType(listOfStructField); JavaRDD<Row> rddData = sc.textFile("Input Path") .map(new Function<String, Row>() { private static final long serialVersionUID = 1212; @Override public Row call(String v1) throws Exception { // TODO Auto-generated method stub String[] data = v1.split(","); return RowFactory.create(data[0], data[1], data[2]); } }); SQLContext hiveContext = new HiveContext(sc); DataFrame dataFrame = hiveContext.createDataFrame(rddData, structType); dataFrame.write().format("com.databricks.spark.avro").save("Output Location"); } }
We will use the same hive udtf that we created above and execute the same in spark sql to process the avro data. In the below code we are loading the query from a configuration file.
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.hive.HiveContext; public class SparkUDTFAvro{ public static void main(String[] args) { args = new String[]{args[0],args[1]}; SparkConf conf = new SparkConf().setMaster("local").setAppName("test"); JavaSparkContext sc = new JavaSparkContext(conf); sc.hadoopConfiguration() .set("avro.mapred.ignore.inputs.without.extension", "false"); HiveContext sql = new HiveContext(sc); DataFrame df = sql.read().format("com.databricks.spark.avro") .load(args[0]); df.registerTempTable("test_table"); sql.sql("CREATE TEMPORARY FUNCTION split_row AS 'com.blog.hive.udf.CustomUDTF'"); ConfigFile scriptFile = new ConfigFile(Constants.QUERY3_TXT, FileType.script); String query = scriptFile.getFileContent(); sql.sql(query).write().format("com.databricks.spark.avro").save(args[1]);; } }
Below are the helper classes used for loading the query from the configuration file
import java.io.IOException; import java.io.InputStream; import java.io.StringWriter; import java.util.Properties; import org.apache.commons.io.IOUtils; import org.apache.log4j.Logger; public class ConfigFile { private String fileName; private SequencedProperties properties; private FileType fileType; private String fileContent; private static Logger logger; public ConfigFile(String fileName, FileType fileType) { this.fileName = fileName; this.properties = new SequencedProperties(); this.fileType = fileType; loadFile(); } public Properties getProperties() { return properties; } public void setProperties(SequencedProperties properties) { this.properties = properties; } public FileType getFileType() { return fileType; } public void setFileType(FileType fileType) { this.fileType = fileType; } public String getFileContent() { return fileContent; } public void setFileContent(String fileContent) { this.fileContent = fileContent; } public String getFileName() { return fileName; } public void setFileName(String fileName) { this.fileName = fileName; } private void loadFile() { InputStream in = getClass().getClassLoader().getResourceAsStream(getFileName()); try { if (this.getFileType() == FileType.property) { this.getProperties().load(in); } else if (this.getFileType() == FileType.script) { StringWriter writer = new StringWriter(); IOUtils.copy(in, writer); fileContent = writer.toString(); } } catch (IOException e) { logger.error(e.getMessage().toString()); } finally { try { in.close(); } catch (IOException e) { logger.error(e.getMessage().toString()); } } } public Properties getProperty() { return properties; } public String getString(String key) { return properties.getProperty(key); } }
public class Constants { public static final String QUERY1_TXT = "query1.txt"; public static final String QUERY2_TXT = "query2.txt"; public static final String QUERY3_TXT = "query3.txt"; /** * Instantiates a new constants. */ public Constants() { super(); } }
public enum FileType { property,script }
import java.util.Collections; import java.util.Enumeration; import java.util.LinkedHashSet; import java.util.Map; import java.util.Properties; import java.util.Set; /** * The Class SequencedProperties is a custom property handler implementation. */ public class SequencedProperties extends Properties { /** The Constant serialVersionUID. */ private static final long serialVersionUID = 1L; /** The key set. */ private transient Set<Object> keySet = new LinkedHashSet<Object>(100); /* * (non-Javadoc) * * @see java.util.Hashtable#keys() */ @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public synchronized Enumeration keys() { return Collections.enumeration(keySet); } /* * (non-Javadoc) * * @see java.util.Hashtable#keySet() */ @Override public Set<Object> keySet() { return keySet; } /* * (non-Javadoc) * * @see java.util.Hashtable#put(java.lang.Object, java.lang.Object) */ @Override public synchronized Object put(Object key, Object value) { if (!keySet.contains(key)) { keySet.add(key); } return super.put(key, value); } /* * (non-Javadoc) * * @see java.util.Hashtable#remove(java.lang.Object) */ @Override public synchronized Object remove(Object key) { keySet.remove(key); return super.remove(key); } /* * (non-Javadoc) * * @see java.util.Hashtable#putAll(java.util.Map) */ @SuppressWarnings("unchecked") @Override public synchronized void putAll(@SuppressWarnings("rawtypes") Map values) { for (Object key : values.keySet()) { if (!containsKey(key)) { keySet.add(key); } } super.putAll(values); } }