Skip to content

Spark SQL

Michael Nitschinger edited this page May 6, 2015 · 4 revisions

This section is outdated and will be updated. It works, but we now have support for Data Frames too in dp2!


We do not provide very tight integration with N1QL and Spark SQL right now, but you can use both together very easily right now. In the future we might be able to put the N1QL dialect right into Spark SQL for even tighter control on what gets loaded on the server side.

For now let's look at two easy integrations:

  • Create tables from raw json -> suitable when using views and loading full documents
  • Create tables from case classes -> easier with N1QL

Importing Context

If you want to use Spark SQL you need to import everything and spin up a SQLContext:

// Start your spark context
val sc = new SparkContext(sparkConf)

// SQL Context
val sqlContext = new SQLContext(sc)

Generating a SchemaRDD from JSON

If you are querying a view or just loading a bunch of JSON documents and you want to query them, the recommended way is to load them as RawJsonDocuments and then pass the raw JSON to the SQL context to generate a schema automatically.

Spark is great at inferring a schema from a diverse set of JSON documents, so we can just let it do its magic. The following example queries a view and loads the full content as raw json:

val rawBeerDocs = sc.couchbaseView("beer-sample", ViewQuery.from("all", "all"))
  .map(_.id)
  .couchbaseGet[RawJsonDocument]()
  .map(_.content())

Now we can create a schema RDD and register it as a logical table:

val parsedDocs = sqlContext.jsonRDD(rawBeerDocs)

parsedDocs.registerTempTable("beers")

You can print out the inferred schema with the printSchema() command:

root
 |-- abv: double (nullable = true)
 |-- brewery_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ibu: double (nullable = true)
 |-- name: string (nullable = true)
 |-- srm: double (nullable = true)
 |-- style: string (nullable = true)
 |-- type: string (nullable = true)
 |-- upc: integer (nullable = true)
 |-- updated: string (nullable = true)

You're all set! Now you can run arbitrary Spark SQL statements over your datasets:

val strongBeers = sqlContext
  .sql("SELECT name, abv FROM beers WHERE abv > 5.0 ORDER BY abv DESC LIMIT 10")
  .collect()
  .foreach(println)

Generating a SchemaRDD from Case Classes

If you are working with N1QL there is a high chance you know the type of format already that you want to expose. Once you've loaded your data you can map it onto a case class which can then be used as a schema.

The following example loads the name and abv from all beers and then exposes it as a SchemaRDD:

val beers = sc
  .couchbaseQuery(Query.simple("SELECT * FROM `beer-sample` WHERE type = \"beer\""))
  .map(row => row.value.getObject("beer-sample"))
  .map(value => Beer(value.getString("name"), value.getDouble("abv")))

Now we can create a SchemaRDD and register it as a logical table:

val beerRdd =  sqlContext.createSchemaRDD(beers)
beerRdd.registerTempTable("beers")

You can use the printSchema() method on the RDD to see if it is mapped properly:

root
 |-- name: string (nullable = true)
 |-- abv: double (nullable = false)

Finally, we can query our data with Spark SQL. Of course we could do the same using N1QL in this simple example, but imagine you are aggregating data form lots of sources and want to query them in a distributed fashion:

sqlContext.sql("SELECT name, abv FROM beers WHERE abv > 5.0 ORDER BY abv DESC LIMIT 10").collect().foreach(println)

Prints

[Norfolk Nog Old Dark Ale,99.99]
[Vetter 33,37.0]
[Tactical Nuclear Penguin,32.0]
[Samuel Adams Utopias MMIV,24.0]
[Raison D'Extra,20.0]
[Tokyo*,18.2]
[Fort,18.0]
[120 Minute IPA,18.0]
[World Wide Stout,18.0]
[Samuel Adams Triplebock 1994,17.0]

Don't forget to call collect() since the distributed chunks need to be collected, otherwise you'll get some funky ordering and results.

Clone this wiki locally