using regex in spark dataframe

In the below example we will explore how we can read an object from amazon s3 and apply a regex in spark dataframe . Let’s say we have column value which is a combination of city.temperature (“Bangalore.28”) and we want to get temperature data using a regex on spark dataframe.


import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.regexp_extract;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class RegExOnDataFrame {

private static String master = "local";

private static String access_key_amazon = "UNKNOWN";

private static String secret_key_amazon = "UNKNOWN";

static {

ConfigFile aws_credential = new ConfigFile(Constants.OBJECT_STORE_CREDENTIALS_CONFIG, FileType.property);

master = System.getenv("spark.master.conf.mesos");
if (master == null) {
master = "local[*]";
}

access_key_amazon = CipherText.decrypt(aws_credential.getString("accessKey.amazon"));
secret_key_amazon = CipherText.decrypt(aws_credential.getString("secretKey.amazon"));

}

public static void main(String[] args) throws AnalysisException {

SparkSession session = SparkSession.builder()
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem").master("local[*]")
.config("spark.hadoop.fs.s3a.access.key", access_key_amazon)
.config("spark.hadoop.fs.s3a.secret.key", secret_key_amazon).config("spark.speculation", "false")
.config("fs.s3a.connection.ssl.enabled", "false").config("spark.network.timeout", "600s")
.config("spark.sql.codegen.wholeStage", "false").config("spark.executor.heartbeatInterval", "500s")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("mapreduce.fileoutputcommitter.algorithm.version", "2")
.config("fs.s3a.connection.establish.timeout", "501000").config("fs.s3a.connection.timeout", "501000")
.getOrCreate();

Dataset<Row> dataset = session.read().format("com.databricks.spark.avro").load("s3a://bucket_name/object_key")
.alias("one");
dataset.show();
dataset = dataset.withColumn("temp", regexp_extract(col("citytemp"), "(\\d+)", 0));
dataset.show();

}

}

In the above example we are first loading the s3 credentials from a configuration file, and then creating a sparksession with all the required parameters . Using the spark session we are reading an avro object from object store and creating a dataframe from it . The dataframe has a column “citytemp” with values as “city.temperature” format to which we want to apply the regex to extract temperature information from it.