In the below example we will explore how we can read an object from amazon s3 and apply a regex in spark dataframe . Let’s say we have column value which is a combination of city.temperature (“Bangalore.28”) and we want to get temperature data using a regex on spark dataframe.
import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.regexp_extract; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class RegExOnDataFrame { private static String master = "local"; private static String access_key_amazon = "UNKNOWN"; private static String secret_key_amazon = "UNKNOWN"; static { ConfigFile aws_credential = new ConfigFile(Constants.OBJECT_STORE_CREDENTIALS_CONFIG, FileType.property); master = System.getenv("spark.master.conf.mesos"); if (master == null) { master = "local[*]"; } access_key_amazon = CipherText.decrypt(aws_credential.getString("accessKey.amazon")); secret_key_amazon = CipherText.decrypt(aws_credential.getString("secretKey.amazon")); } public static void main(String[] args) throws AnalysisException { SparkSession session = SparkSession.builder() .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem").master("local[*]") .config("spark.hadoop.fs.s3a.access.key", access_key_amazon) .config("spark.hadoop.fs.s3a.secret.key", secret_key_amazon).config("spark.speculation", "false") .config("fs.s3a.connection.ssl.enabled", "false").config("spark.network.timeout", "600s") .config("spark.sql.codegen.wholeStage", "false").config("spark.executor.heartbeatInterval", "500s") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("mapreduce.fileoutputcommitter.algorithm.version", "2") .config("fs.s3a.connection.establish.timeout", "501000").config("fs.s3a.connection.timeout", "501000") .getOrCreate(); Dataset<Row> dataset = session.read().format("com.databricks.spark.avro").load("s3a://bucket_name/object_key") .alias("one"); dataset.show(); dataset = dataset.withColumn("temp", regexp_extract(col("citytemp"), "(\\d+)", 0)); dataset.show(); } }
In the above example we are first loading the s3 credentials from a configuration file, and then creating a sparksession with all the required parameters . Using the spark session we are reading an avro object from object store and creating a dataframe from it . The dataframe has a column “citytemp” with values as “city.temperature” format to which we want to apply the regex to extract temperature information from it.