spark create avro data using dataframe

Avro is a language-neutral data serialization system and its schemas are usually written in JSON, and data is usually encoded using a binary format. Avro has rich schema resolution capabilities. The schema used to read data need not be identical to the schema that was used to write the data. This is the mechanism by which Avro supports schema evolution.

Below is an example of creating avro data using spark dataframe . We are reading a input data which is in the text format and converting it into avro format.


import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class CreateAvroData {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setMaster("local").setAppName("test");
JavaSparkContext sc = new JavaSparkContext(conf);

List<org.apache.spark.sql.types.StructField> listOfStructField = new ArrayList<StructField>();
listOfStructField.add(
DataTypes.createStructField("device", DataTypes.StringType, true));
listOfStructField.add(
DataTypes.createStructField("shelf", DataTypes.StringType, true));
listOfStructField.add(DataTypes.createStructField("usage",
DataTypes.StringType, true));

StructType structType = DataTypes.createStructType(listOfStructField);

JavaRDD<Row> rddData = sc.textFile("Input Data In Text Format")
.map(new Function<String, Row>() {

private static final long serialVersionUID = 1212;

@Override
public Row call(String v1) throws Exception {
// TODO Auto-generated method stub

String[] data = v1.split(",");
return RowFactory.create(data[0], data[1], data[2]);
}
});

SQLContext hiveContext = new HiveContext(sc);
DataFrame dataFrame = hiveContext.createDataFrame(rddData, structType);
dataFrame.write().format("com.databricks.spark.avro").save("Output Path");
}

}