Stream IoT sensor data from Azure IoT Hub into Databricks Delta Lake

IoT devices produce a lot of data very fast. Capturing data from all those devices, which could be at millions, and managing them is the very first step in building a successful and effective IoT platform.

Like any other data solution, an IoT data platform could be built on-premise or on cloud. I’m a huge fan of cloud based solutions specially PaaS offerings. After doing a little bit of research I decided to go with Azure since it has the most comprehensive and easy to use set of service offerings when it comes to IoT and they are reasonably priced. In this post, I am going to show how to build the architecture displayed in the diagram below: connect your devices to Azure IoT Hub and then ingest records into Databricks Delta Lake as they stream in using Spark Streaming.

Solution Architecture

Setup Azure IoT Hub and Register a Device

The very first step is to set up Azure IoT Hub, register a device with it and test it by sending data across. This is very well explained by Microsoft here. Make sure you follow all the steps and you’re able to read the messages sent to IoT Hub at the end.

The only extra step we need to take is to add a new consumer group to the IoT Hub. Doing this means our Spark Streaming application will have its own offset, tracking where in the queue it has last read the records coming from devices. By assigning unique consumer groups to each application that subscribes to IoT Hub, we can send the record coming from IoT devices to multiple destinations, for example to store them in Blob storage, send them to Azure Stream Analytics and do real-time analytics, as well as a delta table in Databricks Delta Lake.

Navigate to IoT Hub page on the Azure portal and select your hub. Click on Built-in endpoints and add a new Consumer Group:

Add Consumer Group to IoT Hub

Databricks: Unified Analytics Platform & Delta Lake

Moving on to the next layer in our architecture, we’re going to set up Databricks. Databricks offers a platform that unifies data engineering, data science and business logic. It is basically PaaS offering for Spark on cloud, which speeds up data exploration and preparation.

Why Delta?

Delta Lake is a storage layer invented by Databricks to bring ACID transactions to big data workloads. This is a response to limitation within the existing big data storage mechanisms like Parquet: They are immutable. To update a record within a Parquet file, you need to re-write the whole file. With Delta, you can easily write update statements at records level. This is all we need to know about Delta file format for the purpose of what we want to build here, more about is here.

A very important result of this feature for IoT and streaming use cases is that we will be able to query the data as they arrive, instead of having to wait for a partition to be updated (re-written)

In this solution we will see how to set up Databricks, use Spark Streaming to subscribe to records coming in to Azure IoT Hub, and write them to a Delta table.

Setup Databricks

Navigate to Azure Portal and click on Create a Resource -> Analytics -> Azure Databricks. This is where you create a workspace, which is where you can access all your databricks assets. Fill up the new form that opens up and make sure you select Standard for pricing tier. Then hit Create:

Create Databricks Workspace

When the workspace is created, go to Azure Databricks Workspace resource page and click on Lunch Workspace. You will be navigated to your workspace. Create a new cluster with the same properties you see in the picture below. You can ask for bigger nodes or enable autoscaling, but it’s not needed for this tutorial:

Create Databricks Cluster

The next step is to create a notebook. Click on Home -> <Your Email Address> -> Create -> Notebook. Give it a name, select Scala as the default language of the notebook (you can change it later using %), and select the cluster where this notebook’s commands will run on.

Structured Streaming from IoT Hub

Create and install required Maven library

For our streaming solution to work, we need to install ” azure-eventhubs-spark_2.11:2.3.6″ Maven library. The steps to do that are very easy:

Open the notebook and click on Workspace -> [User_Name] -> Create -> Library:

Select Maven and copy and paste ” com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.6″ into Coordinates box, then click on Create:

After the library is created, you’ll be redirected to the page where you can install your library on the existing clusters. Select the cluster where you’ll be running your Spark streaming code and click Install:

Important: You should restart your cluster after the installation is complete for it to take effect.

Connect to IoT Hub and read the stream

import org.apache.spark.eventhubs._
import  org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import  org.apache.spark.sql.functions.{ explode, split }

// To connect to an Event Hub, EntityPath is required as part of the connection string.
// Here, we assume that the connection string from the Azure portal does not have the EntityPath part.
val connectionString = ConnectionStringBuilder("--IOT HUB CONNECTION STRING FROM AZURE PORTAL--")
  .setEventHubName("--IoT Hub Name--")
  .build
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)
  .setConsumerGroup("delta")
  
val eventhubs = spark.readStream
  .format("eventhubs")
  .options(eventHubsConf.toMap)
  .load()

The code snippet above first builds a connection string pointing to the IoT Hub we created before. The only extra steps you need to take is to get the connection string from Azure portal and replace it in ConnectionStringBuilder and then change the name in .setEventHubName to “<Event Hub-compatible name>” accordingly. Open Azure portal and go to your IoT Hub’s page. Click on Built-in endpoints and copy what you see below and paste in the code snippet in the notebook:

IoT Hub Endpoint Details

What we get after those commands are completed successfully is a DataFrame that has the following fields in it. The messages coming from our IoT device are in the “body” field:

To see how the incoming data from the IoT sensor looks like just run:

display(eventhubs)

Extract device data and create a Spark SQL Table

The next step would be to extract the device data coming in the body field of the DataFrame we built in previous step and build the DataFrame comprising of the fields we want to store in our Delta Lake to do analytics on later:

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._
val schema = (new StructType)
    .add("temperature", DoubleType)
    .add("humidity", DoubleType)
val df = eventhubs.select(($"enqueuedTime").as("Enqueued_Time"),($"systemProperties.iothub-connection-device-id")
                  .as("Device_ID"),(from_json($"body".cast("string"), schema)
                  .as("telemetry_json"))).select("Enqueued_Time","Device_ID", "telemetry_json.*")

The resulting DataFrame looks like:

Now we can create a table from our DataFrame and start writing SQL commands on it:

df.createOrReplaceTempView("device_telemetry_data")

Create the final DataFrame and write stream to Delta table

We’re almost there. We have the data we receive from our IoT device in a Spark SQL table, which enables us to transform it easily with SQL commands.

Tables in a Big Data ecosystem are supposed to be partitioned. I mean they better be, otherwise they’ll cause all sorts of problems. The reason I extracted Enqueued_Time from JSON was to be able to partition my table by date/hour. IoT devices produce a lot of data and partitioning them by hour not only makes each partition reasonably sized, but also enable a certain type of analytics to be performed on the data when companies need to predict the performance of their devices at different times of the day or night, for example.

val finalDF = spark.sql("Select Date(Enqueued_Time) Date_Enqueued
, Hour(Enqueued_Time) Hour_Enqueued, Enqueued_Time, Device_ID
, temperature AS Temperature, humidity as Humidity  
from device_telemetry_data")

The resulting DataFrame has the following schema:

The final step is to write the stream to a Delta table:

finalDF.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .format("delta")
  .partitionBy("Date_Enqueued", "Hour_Enqueued")
  .table("delta_telemetry_data")

Let’s check the options passed to writeStream:

  • outputMode: Specifies how the records of a streaming DataFrame are written to the streaming sink. There are 2 modes:
    • Append: Only the new records will be written to the sink
    • Complete: All records will be written to the sink every time there is an update
    • Update: Only the updated records will be outputed to sink
  • option: checkpointLocation
    • This is needed to ensure fault-tolerance. Basically we specify a location to save all application progress information. This is specially important in case of a Driver failure, read more here.
  • format: The output sink where the result will be written, obviously “delta”.
  • partitionBy: The column(s) by which we want our table to be partitioned by. We decided to partition our table hourly as explained above, so we pass in date and hour.
  • table: The name of the table.

If all the steps above have worked, you should be able to query your table and see the records inserted into the Delta table by running the following command:

%sql
SELECT * FROM delta_telemetry_data

And we’re done! Now we have a table in our Delta Lake that holds our IoT devices data. You can take it from here and do ML on the data collected or mix it with other tables you might have in your Delta Lake. And definitely feel free to ask your questions below in comments section.

Create ‘Connected Service’ cases in Dynamics 365 using an IoT device and Microsoft Azure

“Connected Service” is the name we give to the concept of a true end-to-end connection between an IoT-enabled device (such as a modern air conditioning unit or a lift) and the customer service department that would despatch an engineer to repair the device should it break down.

The idea is that, because the IoT device is both connected to the internet and has a bunch of sensors that monitor its temperature, vibration etc., it is able to send ‘health’ messages to its service department who can analyse this health data and decide whether to repair or swap out any given component.  This predictive maintenance model enables continuous, uninterrupted operations, without the down-time resulting from the more traditional and reactive break-fix service model.

This blog post explains how to set up a demo of this concept using a Raspberry Pi, Azure services and Dynamics 365.

Expected Outcome

Once successfully set up, you’ll have a Raspberry Pi with a SenseHat that will trigger a service case to be created in Dynamics 365 when the humidity sensor reports humidity greater than 45%.

Connecting the dots…

This is the chain of events that will be triggered by breathing on the Pi’s SenseHat:

  1. Every second, the Pi SenseHat captures temperature, humidity and pressure data and sends this to an Azure IoT Hub
  2. The Azure IoT Hub authenticates the Pi device and ingests the data, making the data available for the next stage…
  3. …which is an Azure Stream Analytics job.  This job is a simple SQL query that says “if humidity is greater than 45, pass the data to an Azure Service Bus”
  4. The Azure Service Bus queues the data in a service bus message, ready to be picked up by…
  5. …an Azure Logic App that integrates to Dynamics 365 and creates a new Dynamics 365 Device Alert record that contains all the data sent from the Pi
  6. Once the Device Alert record has been created, Dynamics 365’s workflow kicks in to create the service case

Prerequisites

  1. A Raspberry Pi 2 or 3, and a SenseHat
  2. An Azure subscription
  3. A Dynamics 365 instance that has two custom entities (Device and Device Alert) that are related to the default Case entity as follows:
  4. The Device Alert entity should be of the type: Activity
  5. The Device Alert form in Dynamics 365 should contain the following fields:
  6. The Device form in Dynamics 365 can be a lot simpler.  The only field that’s really necessary is the Device Name itself, the rest of the fields are optional:
  7. The Raspberry Pi should have Windows 10 IoT Core installed, which you can download and install through the Windows 10 IoT Core Dashboard
  8. The Raspberry Pi should be provisioned in Azure through the IoT dashboard
  1. The Raspberry Pi should be connected to your network
    1. You can connect from the default Windows 10 app once you’ve installed Windows 10 IoT Core
  2. And most importantly, you need to have followed Dimitris Gkanatsios’s fantastic blog article in order to create the Windows 10 app on the Raspberry Pi.  If you’ve successfully got to the “congratulations” and you’re sending data to Azure, you’re good to go!

NOTE: I made two minor additions to Dimitris’s code.

Firstly, in SenseHatData.cs I added two additional strings, DeviceName and DeviceGUID:

Secondly, in MainPage.xaml.cs I added data for the LocationDeviceName and DeviceGUID strings:

You can get the GUID once you’ve completed Prerequisite 6 and have created a Device record for your Raspberry Pi. 

The reason for adding the GUID in the code (which does seem slightly odd) is because it’s required by the Get Record action in the Azure Logic App to lookup the Device record in Dynamics 365.  The Action requires the GUID rather than the Device record name because the record name may not necessarily be unique.

Step 1 – Create the Azure Service Bus queue

First off, let’s create the Azure Service Bus queue.

  1. In Azure, click New, then search for Service Bus, then click Create
  2. Enter a Name, Pricing Tier, Resource Group and Location, then click Create
  3. Click + Queue, give the queue a name, keep all the default settings, click Create
  4. Open the queue, click Shared access policies and add the following policies for Send and Listen
  5. In the main blade of the service bus, click Connection Strings
  6. Click RootManageSharedAccessKey
  7. Copy the Connection String – Primary Key and paste it somewhere safe to use later

Step 2 – Create the Azure Stream Analytics job

Now we’re going to create a new Azure Stream Analytics job, (i.e. in addition to the one created in Dimitris’s blog).

  1. In Azure, click New, then search for Steam Analytics job, then click Create
  2. Enter a Job Name, Subscription, Resource Group and Location, then click Create

https://c0.pubmine.com/sf/0.0.3/html/safeframe.htmlREPORT THIS AD

  1. While the job is deploying, go to your IoT Hub, click Shared access policies and copy the primary key for the iothubowner role
  2. Once the job has deployed, click Inputs, click Add, complete the New Input blade as follows, click Create
  1. Now click Outputs, click Add, select Sink: Service Bus Queue, then complete the rest of the New Output blade as follows adding the details of the service bus queue you created in Step 1
  2. Click Query, add the following T-SQL, click Save
  3. Start the job

Step 3 – Create the Azure Logic App and add the service bus connection

A Logic App is essentially a workflow tool with pre-built connectors to popular services, such as Twitter.  It’s an incredibly simple way to create system-to-system integrations without using complex code.  In this step we’ll create an Azure Logic App and connect it to our service bus.

  1. In Azure, click New, then search for Logic App, then click Create
  2. Enter a Name, Pricing Tier, Resource Group and Location, then click Create
  3. Click Blank LogicApp
  4. In the search box, enter Service bus to filter the list
  5. Select Service Bus – When a message is received in a queue (auto-complete)
  6. In the Connection Name field, enter the name of the service bus itself (NOT the connection name!)
  1. In the Connection String field, paste the string you copied in Step 1
  2. Click Create
  3. Once the connection has been created, enter the queue name you created in Step 1
  4. Set the frequency to 1 second

Step 4 – Add a Compose action in the Logic App

The below Compose action is necessary to remedy a known feature/bug within Azure whereby the JSON data in the service bus message gets inadvertently wrapped with an XML header.  This prevents the subsequent actions in the logic app from extracting the JSON payload, so we employ this Compose action to strip the message of the unwanted XML wrapper.

  1. Click + New Step, click Add an action
  2. Select Compose
  3. In the Inputs field add the following code:
@json(substring(base64toString(triggerBody()['ContentData']), indexof(base64toString(triggerBody()['ContentData']), '{'), sub(lastindexof(base64toString(triggerBody()['ContentData']), '}'), indexof(base64toString(triggerBody()['ContentData']), '{'))))

Step 5 – Add a Get Record action to the Logic App

Now we use the Get Record action to perform a lookup on the Dynamics 365 Device entity. The logic app action will store the lookup values from the Device entity and enable us to pass them into the Device Alert entity.

  1. Click + New Step, click Add an action
  2. In the search box, enter Dynamics to filter the list
  3. Select Dynamics 365 – Get record
  4. Click Sign In and enter a Dynamics 365 administrator’s username and password to create the connection to Dynamics 365
  5. Select your Organization Name and Entity Name
  6. Click inside the Item Identifier field and from the Dynamic Content dialog select the Compose: Outputs block

https://c0.pubmine.com/sf/0.0.3/html/safeframe.htmlREPORT THIS AD

Our JSON has been stripped of its unwanted XML wrapper using the Compose in Step 4, and now we need to specify which JSON field/value pair to use in the Get record lookup.  We do this by using the code view to make one modification to the Outputs block.

  1. In the Logic Apps Designer menu bar, click </> Code View
  2. Scroll down to the Get_record: path line
  3. At the end of the path line, add “.deviceGUID” immediately after outputs(‘Compose’) so that the line ends:
  4. In the Logic Apps Designer menu bar, click Designer.  The Item Identifier field should now look like this, i.e. showing “deviceGUID”

Step 6 – Add a Create Record action to the Logic App

Now we’ve looked up our Device record, we can create a new Device Alert and populate it with our Device values

  1. Click + New Step, click Add an action
  2. In the search box, enter Dynamics to filter the list
  3. Select Dynamics 365 – Create a new record
  4. Enter the Organization nameEntity Name and Subject
  5. Click Show advanced options
  6. For each of the Location, Temperature, Pressure and Humidity fields in the new Device Alert record, add the Compose: Outputs block from the Dynamic Content dialog, e.g.:

https://c0.pubmine.com/sf/0.0.3/html/safeframe.htmlREPORT THIS AD

Once again, we need to modify the Output block code to tell the logic app which specific JSON field/value to add to the new record

  1. In the Logic Apps Designer menu bar, click </> Code View
  2. Scroll down to the section of code starting “Create_a _new_record”
  3. Under “body” you should see four lines of code for each of the Location, Temperature, Pressure and Humidity fields – similar to this:
"body": {
   "al_location": "@{outputs('Compose')}",
   "al_temperature": "@{outputs('Compose')}",
   "al_pressure": "@{outputs('Compose')}",
   "al_humidity": "@{outputs('Compose')}",
}
  1. At the end of the humidity line, add “.humidity” immediately after outputs(‘Compose’) 
  2. At the end of the temperature line, add “.temperature” immediately after outputs(‘Compose’) 
  3. At the end of the location line, add “.location” immediately after outputs(‘Compose’) 
  4. At the end of the pressure line, add “.pressure” immediately after outputs(‘Compose’) 
  5. Your code should now look similar to this:
  6. In the Logic Apps Designer menu bar, click Designer.
  7. The fields should now look similar this:
  8. In the Device field, add the Get Record: Device (Unique identifier for entity instances) block from the Dynamic Content dialog

At this point you should have four actions in your Logic App, like this:

  1. Click Save

Step 7 – Create the Dynamics 365 workflow

Thankfully, all the hard work has now been done!  All that remains is to use a Dynamics 365 workflow to create the Case once the Device Alert has been created by the Azure Logic App.https://c0.pubmine.com/sf/0.0.3/html/safeframe.htmlREPORT THIS AD

(I won’t go into the detail of creating the workflow, I’ll assume you’ve got this part covered).

Basically, create the workflow with two steps:

For step 1 “Create Case”, add these dynamic values to the Case form:

and for step 2 “Set Regarding in Device Alert”, add this dynamic value to the Device Alert form:

Conclusion

Assuming you’ve got this far (yes, I appreciate it was more of a transatlantic long haul than a weekend city break!) then all you have to do now is breathe on the SenseHat to get the humidity reading above 45% and you’ll get a nice list of new Device Alerts and associated Cases in Dynamics 365:

i.e. Connected Service.