Azure Databricks and CosmosDB: Tips on playing nice together

I’ve been working on a project and been using CosmosDB as a data source and extracting the data from it into Azure Databricks. Once there I apply some data and analytics magic to it. So this post is about some of the assumptions that I had about it, and some of the issues that I came across.

For setting up Databricks to get data from CosmosDB, the place to go is the Azure CosmosDB Spark connector site. I’m not going to go through installing it, as the read me and guidance on the GitHub does a good job and it is straight forward to do. There is also a tutorial here, that deals with ‘On time flight performance’, it covers how to connect, and then process data but doesn’t cover any of the issues you may have, as its using a nice, conformed data structure. In the real world, we have all sorts of weird design options, taken over the life time of a product and/or service.

I’m using the Databricks Python API to connect to CosmosDB, so the start of setting up a connection is set out here:

1234567891011cosmosConfig = {"Endpoint" : "Your endpoint here","Masterkey" : "Your key here","Database" : "Your database here","preferredRegions" : "Your region(s)","Collection" : "Your collection here","ConnectionMode": "DirectHttps","SamplingRatio" : "1.0","schema_samplesize" : "2000","query_pagesize" : "2147483647"}

Just add your keys etc, everything is nice and straight forward so far. I’ve left out the custom query option for the time being, but I’ll be coming back to it in a bit. The next bit sets up the read format by using the configuration used above, the creates a temp view called in this case “c”.

12345# Sets up the CosmosDB connection.# Creates a temp table call c, which is used in the query# to define the collectioncosmosdbConnection = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**cosmosConfig).load()cosmosdbConnection.createOrReplaceTempView("c")

SQL Queries

So now we have created a temp view in Databricks called “c” that sits over CosmosDB, we can create a data frame based on a spark SQL context query. So lets send a Cosmos SQL API style query from Databricks, for example:

12family = spark.sql(SELECT {"Name":c.id, "City":c.address.city} AS FamilyFROM c WHERE c.address.city = c.address.state)

It should return a error. Why? This was my first mistake, you can’t use the Cosmos SQL API syntax. It will not return a document, but columns and rows like a standard SQL table.

12family = spark.sql(SELECT c.id AS Name, c.city AS City FROM cWHERE c.city = c.state)

The above will work, but it will return a table like object, not a document like the CosmosDB SQL query.

Tip 1: You can’t use your Cosmos DB SQL queries, they will have to be redone in a more standard SQL syntax, but that is good, we can finally use a “Group by” to query data on CosmosDB!

CosmosDB Document Considerations

One of the issues I came across, was that we have a number of documents in our Cosmos DB, that are split by types, using an “entity” field, so we have for example, Customer and Project documents.

Now if you have the same field name in the documents, but different data types it can get mixed up? How, using this example for “customer”

1234{   #some other customer JSON here"range":"PWAQ-100","entity":"customer"}

So in this case “range” is a string.

And this for “project”:

1234{ #some other project JSON here"range":[ { "name": "Small Project", "min": 0, "max": 99 }, { "name": "Big Project", "min": 100, "max": 199 }], "entity":"project" }

So in this case “range” is an array.

Now when I do a query like the following

1test_df = spark.sql(SELECT c.range AS Range FROM c WHERE c.entity = "project")

It returns that data as a string like this:

[{name=Small Project, min=0, max=99},{name=Big Project, min=100, max=199}]

This removes the ” and changes the “:” to “=” rather than a column that contains the array which I can then explode the JSON into other objects. So I get a string back…Weird! Well what happened?  Well it’s all down in part to the configuration we set up earlier:

1"schema_samplesize" : "2000"

The first 2000 documents it read to determine the “c” table schema were of the “customer” type and set it as a string. In fact if you do the following query:

1test_df = spark.sql(SELECT * FROM c)

It will show all the columns for the Customer and Project as one big table, with all repeated columns names and datatypes that have been adjusted to fit the data frames expected schema.

There are three ways to solve this.

  1. Don’t mix datatypes for fields that have same name in your document schema
  2. Don’t store a mixture of document schemas in your CosmosDB database
  3. Use the custom query option

Using Custom Query

If like me you can’t do 1 or 2, you have to use the custom query option. Lets go back to our set up of the connection:

123456789101112cosmosConfig = {"Endpoint" : "https://cloudsuite-test.documents.azure.com:443/","Masterkey" : "wxElqnT67i3Ey7Mmh7rH2Opxxwjbn22g6RL8LFjboDK6Cqg6M8COfVcQXaJO3pxLq0vEDayidnisn5QmbN5dPQ==","Database" : "cbi_survey","preferredRegions" : "UK South","Collection" : "cbi","ConnectionMode": "DirectHttps","SamplingRatio" : "1.0","schema_samplesize" : "20000","query_pagesize" : "2147483647","query_custom" : "SELECT c.* FROM c WHERE c.entity = 'project'"}

So we have now added the following line to the config:

“query_custom” : “SELECT c.* FROM c WHERE c.entity = ‘project’”

So we now know that we will only get documents back of the  schema “project” and should map the datatypes correctly.

12345# Sets up the CosmosDB connection.# Creates a temp table call c, which is used in the query# to define the collectioncosmosdbConnection = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**cosmosConfig).load()cosmosdbConnection.createOrReplaceTempView("project")

I’ve also now create a distinct temp view called ‘project’ that I can query. I will do the same for ‘customer’ and any other document schemas stored in Cosmos DB.

Tip 2: Make sure to declare your temp views based on the documents you need

With CosmosDB and Databricks when you create the connection and query, it is schema on read. So it you have added items to your JSON structure, make sure that you set your schema sample size to allow it to read enough items, otherwise a column(s) that is in later version of your document schema, may not read correctly and any SQL queries will fail when you reference that column.

Tip 3: “schema_samplesize” should be adjusted were needed

Now I’ve wrapped up the connection and table details into a function that I can call with my entity value, and it will set that temp view up nice and cleanly.

Overall the Azure Spark Connector is works well and pulling data from Cosmos is fast. Databricks and JSON is a lot easier to handle than querying it in SQL Server, and we have been using it more for some projects for our ETL pipelines.

On the down side, the GitHub site needs a little bit of an update as it has a number of broken links, and a number of open issues that haven’t been looked at or assigned.