In this article i will demonstrate how to add a column into a dataframe with a constant or static value using the lit function.
Consider we have a avro data on which we want to run the existing hql query . The avro data that we have on hdfs is of older schema but the hql query we want to run is of newer avro schema. So there are some columns which we have used in the hql query which is not part of the avro data that we have on hdfs as the data was created using the older avro schema. In this scenario its usefull to add these additional columns into the dataframe schema so that we can use the same hql query on the dataframe.
Once we have dataframe created we can use the withColumn method to add new coulumn into the dataframe . The withColumn method also takes a second parameter which we can use to pass the constant value for the newly added column.
Lets say we have a input data as below
1920,shelf=0/slot=5/port=1,100 1920,shelf=1/slot=4/port=6,200 1920,shelf=2/slot=5/port=24,300 1920,shelf=3/slot=5/port=0,400
And we want to add metric1, metric2, metric3, metric4 and metric5 with constant value of value1,value2,value3,value4 and value5 into the dataframe.
Below is the code
import static org.apache.spark.sql.functions.lit; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.hive.HiveContext; public class AddColumnDataFrame { public static void main(String[] args) { args = new String[]{"Input Data"}; SparkConf conf = new SparkConf().setMaster("local").setAppName("test"); JavaSparkContext sc = new JavaSparkContext(conf); sc.hadoopConfiguration() .set("avro.mapred.ignore.inputs.without.extension", "false"); HiveContext sql = new HiveContext(sc); DataFrame df = sql.read().format("com.databricks.spark.avro") .load(args[0]); DataFrame df2 = df.withColumn("metric1", lit("value1")) .withColumn("metric2", lit("value2")) .withColumn("metric3", lit("value3")) .withColumn("metric4", lit("value4")) .withColumn("metric5", lit("value5")); df2.show(); } }
Now lets execute a hql on the dataframe
import static org.apache.spark.sql.functions.lit; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.hive.HiveContext; public class AddColumnDataFrame { public static void main(String[] args) { args = new String[]{"input path"}; SparkConf conf = new SparkConf().setMaster("local").setAppName("test"); JavaSparkContext sc = new JavaSparkContext(conf); sc.hadoopConfiguration() .set("avro.mapred.ignore.inputs.without.extension", "false"); HiveContext sql = new HiveContext(sc); DataFrame df = sql.read().format("com.databricks.spark.avro") .load(args[0]); DataFrame df2 = df.withColumn("metric1", lit("value1")) .withColumn("metric2", lit("value2")) .withColumn("metric3", lit("value3")) .withColumn("metric4", lit("value4")) .withColumn("metric5", lit("value5")); df2.registerTempTable("test_table"); ConfigFile scriptFile = new ConfigFile(Constants.QUERY1_TXT,FileType.script); String query = scriptFile.getFileContent(); sql.sql(query).save(args[1], SaveMode.Overwrite); } }
Below are the helper classes used for loading the query from the configuration file
import java.io.IOException; import java.io.InputStream; import java.io.StringWriter; import java.util.Properties; import org.apache.commons.io.IOUtils; import org.apache.log4j.Logger; public class ConfigFile { private String fileName; private SequencedProperties properties; private FileType fileType; private String fileContent; private static Logger logger; public ConfigFile(String fileName, FileType fileType) { this.fileName = fileName; this.properties = new SequencedProperties(); this.fileType = fileType; loadFile(); } public Properties getProperties() { return properties; } public void setProperties(SequencedProperties properties) { this.properties = properties; } public FileType getFileType() { return fileType; } public void setFileType(FileType fileType) { this.fileType = fileType; } public String getFileContent() { return fileContent; } public void setFileContent(String fileContent) { this.fileContent = fileContent; } public String getFileName() { return fileName; } public void setFileName(String fileName) { this.fileName = fileName; } private void loadFile() { InputStream in = getClass().getClassLoader().getResourceAsStream(getFileName()); try { if (this.getFileType() == FileType.property) { this.getProperties().load(in); } else if (this.getFileType() == FileType.script) { StringWriter writer = new StringWriter(); IOUtils.copy(in, writer); fileContent = writer.toString(); } } catch (IOException e) { logger.error(e.getMessage().toString()); } finally { try { in.close(); } catch (IOException e) { logger.error(e.getMessage().toString()); } } } public Properties getProperty() { return properties; } public String getString(String key) { return properties.getProperty(key); } }
public class Constants { public static final String QUERY1_TXT = "query1.txt"; public static final String QUERY2_TXT = "query2.txt"; public static final String QUERY3_TXT = "query3.txt"; /** * Instantiates a new constants. */ public Constants() { super(); } }
public enum FileType { property,script }
import java.util.Collections; import java.util.Enumeration; import java.util.LinkedHashSet; import java.util.Map; import java.util.Properties; import java.util.Set; /** * The Class SequencedProperties is a custom property handler implementation. */ public class SequencedProperties extends Properties { /** The Constant serialVersionUID. */ private static final long serialVersionUID = 1L; /** The key set. */ private transient Set<Object> keySet = new LinkedHashSet<Object>(100); /* * (non-Javadoc) * * @see java.util.Hashtable#keys() */ @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public synchronized Enumeration keys() { return Collections.enumeration(keySet); } /* * (non-Javadoc) * * @see java.util.Hashtable#keySet() */ @Override public Set<Object> keySet() { return keySet; } /* * (non-Javadoc) * * @see java.util.Hashtable#put(java.lang.Object, java.lang.Object) */ @Override public synchronized Object put(Object key, Object value) { if (!keySet.contains(key)) { keySet.add(key); } return super.put(key, value); } /* * (non-Javadoc) * * @see java.util.Hashtable#remove(java.lang.Object) */ @Override public synchronized Object remove(Object key) { keySet.remove(key); return super.remove(key); } /* * (non-Javadoc) * * @see java.util.Hashtable#putAll(java.util.Map) */ @SuppressWarnings("unchecked") @Override public synchronized void putAll(@SuppressWarnings("rawtypes") Map values) { for (Object key : values.keySet()) { if (!containsKey(key)) { keySet.add(key); } } super.putAll(values); } }