spark couchbase read and write data

1. Overview

In this article I will explain how to integrate spark and couchbase with Spark Connector to read and write data into couchbase. The prerequisite for this is a running instance of couchbase along with a bucket to which we will be writing and reading data from . Below is the link to download the same

https://www.couchbase.com/downloads

2. Setup

Below are the dependencies required

SBT

libraryDependencies += 
"org.apache.spark" %% "spark-sql" % "2.4.5"

libraryDependencies +=
 "com.couchbase.client" %% "spark-connector" % "2.2.0"

MAVEN

<!-- 
https://mvnrepository.com/artifact/org.apache.spark/spark-sql 
-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.5</version>
</dependency>

<!-- 
https://mvnrepository.com/artifact/com.couchbase.client/spark-connector
 -->
<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>spark-connector_2.11</artifactId>
<version>2.2.0</version>
</dependency>

 

3. Writing data into couchbase from spark

Let`s create a dummy employee dataframe

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer

class Data {

def getSampleDataFrame(sparkSession: SparkSession): DataFrame = {
import sparkSession.implicits._
var sequenceOfOverview =
ListBuffer[(Integer, String, String, String, Integer)]()
sequenceOfOverview += Tuple5(1, "mark", "male", "IT", 34)
sequenceOfOverview += Tuple5(2, "steve", "male", "Automobile", 28)
sequenceOfOverview += Tuple5(3, "stella", "female", "marketing", 23)
sequenceOfOverview += Tuple5(4, "taylor", "male", "Professor", 43)

sequenceOfOverview.toDF("id", "name", "gender", "profession", "age")

}}

Let`s write the above dataframe into the couchbase. In the below example, the DataFrame is persisted into Couchbase and the document ID field is mapped to id field and if you execute the below code 4 documents will be inserted into the couchbase.

import com.couchbase.client.java.query.{N1qlParams, N1qlQuery}
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources._
import com.couchbase.spark.sql._
import com.couchbase.spark._
import org.apache.spark.storage.StorageLevel

object Gateway extends App {

lazy val sparkSession: SparkSession = SparkSession
.builder()
.config("spark.couchbase.nodes", "localhost")
.config("spark.couchbase.username", "username")
.config("com.couchbase.password", "password")
.config("com.couchbase.bucket.employee", "password")
.config("com.couchbase.bucket.finance", "password")
.config("com.couchbase.connectTimeout", "50000")
.master("local[*]")
.getOrCreate()

//Writing data into couch base

val dataFrameWrite = data.getSampleDataFrame(sparkSession)

dataFrameWrite
.withColumn("type", lit("bt::employee"))
.write
.mode("overwrite")
.couchbase(
Map("idField" -> "id", "bucket" -> "employee", "timeout" -> "50")
)

}

3. Reading data from couchbase using spark

Let`s read the documents that we created above into a dataframe. We need to create index on the employee data set to read the data and below is the code for the same

CREATE INDEX `employee_type` ON `employee`(`type`);

CREATE INDEX `employee_age` ON `employee`(`age`);

import com.couchbase.client.java.query.{N1qlParams, N1qlQuery}
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources._
import com.couchbase.spark.sql._
import com.couchbase.spark._
import org.apache.spark.storage.StorageLevel

object Gateway extends App {

lazy val sparkSession: SparkSession = SparkSession
.builder()
.config("spark.couchbase.nodes", "localhost")
.config("spark.couchbase.username", "User")
.config("com.couchbase.password", "Pass")
.config("com.couchbase.bucket.employee", "Pass")
// .config("com.couchbase.bucket.finance", "Pass")
.config("com.couchbase.connectTimeout", "50000")
.master("local[*]")
.getOrCreate()

//Using spark sql filter

val types = "bt::employee"

val df = sparkSession.read
.couchbase(
And(EqualTo("type", `types`), GreaterThan("age", 30)),
Map("bucket" -> "employee")
)
.persist(StorageLevel.MEMORY_AND_DISK)

df.show()

}

If we execute the above code the documents from the couchbase will be loaded into a dataframe, based on the spark filter we have passed into the couchbase method above (get all the documents where type = bt::employee and age is greater than 30) . Below is the output

+-------+---+------+------+----------+------------+
|META_ID|age|gender| name|profession| type|
+-------+---+------+------+----------+------------+
| 1| 34| male| mark| IT|bt::employee|
| 4| 43| male|taylor| Professor|bt::employee|
+-------+---+------+------+----------+------------+

We can also read the data passing the N1Q1 Query where condition string to filter the data as below

import com.couchbase.client.java.query.{N1qlParams, N1qlQuery}
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources._
import com.couchbase.spark.sql._
import com.couchbase.spark._
import org.apache.spark.storage.StorageLevel

object Gateway extends App {

lazy val sparkSession: SparkSession = SparkSession
.builder()
.config("spark.couchbase.nodes", "localhost")
.config("spark.couchbase.username", "User")
.config("com.couchbase.password", "Pass")
.config("com.couchbase.bucket.employee", "Pass")
// .config("com.couchbase.bucket.finance", "Pass")
.config("com.couchbase.connectTimeout", "50000")
.master("local[*]")
.getOrCreate()

// Using sql filter

val query = "type = 'bt::employee' and age>30"

val df = sparkSession.read
.format("com.couchbase.spark.sql.DefaultSource")
.option("schemaFilter", `query`)
.option("bucket", "employee")
.load()

df.show()

}

And finally we can also pass in a complete N1Q1 Query and load the data into an rdd which can be converted into a dataframe as well . Lets execute the below N1Q1 Query

SELECT sum(age) as total FROM employee WHERE 
type = 'bt::employee' group by gender

import java.util.concurrent.TimeUnit
import com.couchbase.client.java.query.{N1qlParams, N1qlQuery}
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources._
import com.couchbase.spark.sql._
import com.couchbase.spark._
import org.apache.spark.storage.StorageLevel

object Gateway extends App {

lazy val sparkSession: SparkSession = SparkSession
.builder()
.config("spark.couchbase.nodes", "localhost")
.config("spark.couchbase.username", "User")
.config("com.couchbase.password", "Pass")
.config("com.couchbase.bucket.employee", "Pass")
// .config("com.couchbase.bucket.finance", "Pass")
.config("com.couchbase.connectTimeout", "50000")
.master("local[*]")
.getOrCreate()

// Using N1Q1 Query

val idRdd = sparkSession.sparkContext.couchbaseQuery(
N1qlQuery
.simple(
"SELECT sum(age) as total FROM employee WHERE 
type = 'bt::employee' group by gender",
N1qlParams.build().serverSideTimeout(10, TimeUnit.SECONDS)
),
"employee"
)

idRdd.collect().foreach(println(_))

}

This was a quick but comprehensive intro on how we can integrate spark and couchbase.