1. Overview
In this article I will explain how to integrate spark and couchbase with Spark Connector to read and write data into couchbase. The prerequisite for this is a running instance of couchbase along with a bucket to which we will be writing and reading data from . Below is the link to download the same
https://www.couchbase.com/downloads
2. Setup
Below are the dependencies required
SBT
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5" libraryDependencies += "com.couchbase.client" %% "spark-connector" % "2.2.0"
MAVEN
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>2.4.5</version> </dependency> <!-- https://mvnrepository.com/artifact/com.couchbase.client/spark-connector --> <dependency> <groupId>com.couchbase.client</groupId> <artifactId>spark-connector_2.11</artifactId> <version>2.2.0</version> </dependency>
3. Writing data into couchbase from spark
Let`s create a dummy employee dataframe
import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, SparkSession} import scala.collection.mutable.ListBuffer class Data { def getSampleDataFrame(sparkSession: SparkSession): DataFrame = { import sparkSession.implicits._ var sequenceOfOverview = ListBuffer[(Integer, String, String, String, Integer)]() sequenceOfOverview += Tuple5(1, "mark", "male", "IT", 34) sequenceOfOverview += Tuple5(2, "steve", "male", "Automobile", 28) sequenceOfOverview += Tuple5(3, "stella", "female", "marketing", 23) sequenceOfOverview += Tuple5(4, "taylor", "male", "Professor", 43) sequenceOfOverview.toDF("id", "name", "gender", "profession", "age") }}
Let`s write the above dataframe into the couchbase. In the below example, the DataFrame is persisted into Couchbase and the document ID field is mapped to id field and if you execute the below code 4 documents will be inserted into the couchbase.
import com.couchbase.client.java.query.{N1qlParams, N1qlQuery} import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.sources._ import com.couchbase.spark.sql._ import com.couchbase.spark._ import org.apache.spark.storage.StorageLevel object Gateway extends App { lazy val sparkSession: SparkSession = SparkSession .builder() .config("spark.couchbase.nodes", "localhost") .config("spark.couchbase.username", "username") .config("com.couchbase.password", "password") .config("com.couchbase.bucket.employee", "password") .config("com.couchbase.bucket.finance", "password") .config("com.couchbase.connectTimeout", "50000") .master("local[*]") .getOrCreate() //Writing data into couch base val dataFrameWrite = data.getSampleDataFrame(sparkSession) dataFrameWrite .withColumn("type", lit("bt::employee")) .write .mode("overwrite") .couchbase( Map("idField" -> "id", "bucket" -> "employee", "timeout" -> "50") ) }
3. Reading data from couchbase using spark
Let`s read the documents that we created above into a dataframe. We need to create index on the employee data set to read the data and below is the code for the same
CREATE INDEX `employee_type` ON `employee`(`type`); CREATE INDEX `employee_age` ON `employee`(`age`);
import com.couchbase.client.java.query.{N1qlParams, N1qlQuery} import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.sources._ import com.couchbase.spark.sql._ import com.couchbase.spark._ import org.apache.spark.storage.StorageLevel object Gateway extends App { lazy val sparkSession: SparkSession = SparkSession .builder() .config("spark.couchbase.nodes", "localhost") .config("spark.couchbase.username", "User") .config("com.couchbase.password", "Pass") .config("com.couchbase.bucket.employee", "Pass") // .config("com.couchbase.bucket.finance", "Pass") .config("com.couchbase.connectTimeout", "50000") .master("local[*]") .getOrCreate() //Using spark sql filter val types = "bt::employee" val df = sparkSession.read .couchbase( And(EqualTo("type", `types`), GreaterThan("age", 30)), Map("bucket" -> "employee") ) .persist(StorageLevel.MEMORY_AND_DISK) df.show() }
If we execute the above code the documents from the couchbase will be loaded into a dataframe, based on the spark filter we have passed into the couchbase method above (get all the documents where type = bt::employee and age is greater than 30) . Below is the output
+-------+---+------+------+----------+------------+ |META_ID|age|gender| name|profession| type| +-------+---+------+------+----------+------------+ | 1| 34| male| mark| IT|bt::employee| | 4| 43| male|taylor| Professor|bt::employee| +-------+---+------+------+----------+------------+
We can also read the data passing the N1Q1 Query where condition string to filter the data as below
import com.couchbase.client.java.query.{N1qlParams, N1qlQuery} import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.sources._ import com.couchbase.spark.sql._ import com.couchbase.spark._ import org.apache.spark.storage.StorageLevel object Gateway extends App { lazy val sparkSession: SparkSession = SparkSession .builder() .config("spark.couchbase.nodes", "localhost") .config("spark.couchbase.username", "User") .config("com.couchbase.password", "Pass") .config("com.couchbase.bucket.employee", "Pass") // .config("com.couchbase.bucket.finance", "Pass") .config("com.couchbase.connectTimeout", "50000") .master("local[*]") .getOrCreate() // Using sql filter val query = "type = 'bt::employee' and age>30" val df = sparkSession.read .format("com.couchbase.spark.sql.DefaultSource") .option("schemaFilter", `query`) .option("bucket", "employee") .load() df.show() }
And finally we can also pass in a complete N1Q1 Query and load the data into an rdd which can be converted into a dataframe as well . Lets execute the below N1Q1 Query
SELECT sum(age) as total FROM employee WHERE type = 'bt::employee' group by gender
import java.util.concurrent.TimeUnit import com.couchbase.client.java.query.{N1qlParams, N1qlQuery} import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.sources._ import com.couchbase.spark.sql._ import com.couchbase.spark._ import org.apache.spark.storage.StorageLevel object Gateway extends App { lazy val sparkSession: SparkSession = SparkSession .builder() .config("spark.couchbase.nodes", "localhost") .config("spark.couchbase.username", "User") .config("com.couchbase.password", "Pass") .config("com.couchbase.bucket.employee", "Pass") // .config("com.couchbase.bucket.finance", "Pass") .config("com.couchbase.connectTimeout", "50000") .master("local[*]") .getOrCreate() // Using N1Q1 Query val idRdd = sparkSession.sparkContext.couchbaseQuery( N1qlQuery .simple( "SELECT sum(age) as total FROM employee WHERE type = 'bt::employee' group by gender", N1qlParams.build().serverSideTimeout(10, TimeUnit.SECONDS) ), "employee" ) idRdd.collect().foreach(println(_)) }
This was a quick but comprehensive intro on how we can integrate spark and couchbase.