Follow me on Twitter, Project Source Code, Powerpoint Slides, PDF Slides
Solution Overview
This solution diagram overviews a typical IoT solution. Azure IoT Hub is responsible for internet scale, secure, bi-directional communication with devices and backend services.
Telemetry can be routed by Azure IoT Hub to various services and also to storage in Apache Avro or JSON format for purposes such as audit, integration or driving machine learning processes.
This posting takes a slice of this scenario and is about the straight through serverless processing of telemetry from Azure IoT Hub, via Kotlin Azure Functions and Azure SignalR for a near real-time dashboard.
Azure Services
The following Azure services are used in this solution and available in Free tiers: Azure IoT Hub, Azure Functions, Azure SignalR, Azure Storage, Azure Storage Static Websites
You can sign up for a Free Azure Account, if you are a student then be sure to sign up for Azure for Students, no credit card required.
Developing Kotlin Azure Functions
The Creating Kotlin based Azure Function with IntelliJ guide has all the information on insrallation requirements, JDKs, IntelliJ, Azure CLI etc and has a step by step guide to creating your first Kotlin Azure Functions with IntelliJ.
Resources for Java and Kotlin Azure Functions
- Create your first function with Java and Maven
- Create your first Azure function with Java and IntelliJ
- Announcing the general availability of Java support in Azure Functions
- Azure Functions triggers and bindings concepts
- Maven Plugin for Azure Functions
- Azure Functions Java developer guide
- Java API for Azure Functions
Solution Components (included in this GitHub repo)
Kotlin Azure Function. This Azure Function processes batches of telemetry, then calibrations and validates telemetry, and updates the Device State Azure Storage Table, and then passes the telemetry to the Azure SignalR service for near real-time web client update.
Web Dashboard. This Single Page Web App is hosted on Azure Storage as a Static Website. So it too is serverless.
Design Considerations
Optimistic Concurrency
First up, it is useful to understand Event Hub Trigger Scaling and how additional function instances can be started to process events.
I wanted to maintain a count in the Device State table of the number of times a device had sent telemetry. The solution implements Azure Storage/CosmosDB Optimistic Concurrency.
Optimistic Concurrency (OCC) assumes that multiple transactions can frequently complete without interfering with each other. While running, transactions use data resources without acquiring locks on those resources. Before committing, each transaction verifies that no other transaction has modified the data it has read. OCC is generally used in environments with low data contention.
If there are multiple functions instances updating and there is a clash, I have implemented Exponential Backoff and added a random factor to allow for retry.
Pseudo code: random(occBase, min(occCap, occBase * 2 ^ attempt))
private fun calcExponentialBackoff(attempt: Int) : Long{
val base = occBase * Math.pow(2.0, attempt.toDouble())
return ThreadLocalRandom.current().nextLong(occBase,min(occCap, base.toLong()))
}
From my limited testing Exponential Backoff was effective.
Telemetry Processing
As each message is processed, retrieves the existing entity, updates the 'count', and updates the entity with the new telemetry. If there is an OCC clash then the process backs off for the calculated time and retries the update.
message.forEach { environment ->
maxRetry = 0
while (maxRetry < 10) {
maxRetry++
try {
top = TableOperation.retrieve(_partitionKey, environment.deviceId, EnvironmentEntity::class.java)
val existingEntity = deviceStateTable.execute(top).getResultAsType<EnvironmentEntity>()
calibrate(environment)
if (!validateTelemetry(environment)) {
context.logger.info("Data failed validation.")
break
}
with(environment) {
partitionKey = _partitionKey
rowKey = environment.deviceId
timestamp = Date()
}
if (existingEntity?.etag != null) {
environment.etag = existingEntity.etag
environment.count = existingEntity.count
environment.count++
top = TableOperation.replace(environment)
deviceStateTable.execute(top)
} else {
environment.count = 1
top = TableOperation.insert(environment)
deviceStateTable.execute(top)
}
deviceState[environment.deviceId!!] = environment
break
} catch (e: java.lang.Exception) {
val interval = calcExponentialBackoff(maxRetry)
Thread.sleep(interval)
context.logger.info("Optimistic Consistency Backoff interval $interval")
}
}
if (maxRetry >= 10){
context.logger.info("Failed to commit")
}
}
Telemetry Calibration Optimization
You can either calibrate data on the device or in the cloud. I prefer to calibrate cloud-side. The calibration data could be loaded with Azure Function Data Binding but I prefer to lazy load the calibration data. There could be a lot of calibration data so it does not make sense to load it all at once when the function is triggered.
private fun calibrate(environment: EnvironmentEntity) {
val calibrationData:CalibrationEntity?
if (calibrationMap.containsKey(environment.deviceId)){
calibrationData = calibrationMap[environment.deviceId]
}
else {
top = TableOperation.retrieve(_partitionKey, environment.deviceId, CalibrationEntity::class.java)
calibrationData = calibrationTable.execute(top).getResultAsType<CalibrationEntity>()
calibrationMap[environment.deviceId] = calibrationData
}
with(environment) {
calibrationData?.let {
temperature = scale(temperature, it.TemperatureSlope, it.TemperatureYIntercept)
humidity = scale(humidity, it.HumiditySlope, it.HumidityYIntercept)
pressure = scale(pressure, it.PressureSlope, it.PressureYIntercept)
}
}
}
Telemetry Validation
IoT solutions should validate telemetry to ensure data is within sensible ranges to allow for faulty sensors.
private fun validateTelemetry(telemetry: EnvironmentEntity): Boolean {
telemetry.temperature?.let {
if (it < -10 || it > 70) {
return false
}
}
telemetry.humidity?.let {
if (it < 0 || it > 100) {
return false
}
}
telemetry.pressure?.let {
if (it < 0 || it > 1500) {
return false
}
}
return true
}
Azure SignalR Integration
There is no Service-Side Azure SignalR SDK. To send telemetry from the Event Hub Trigger Azure Function to the Dashboard Web Client you need to call an HTTP Azure Function that is bound to the SignalR service. This SignalR Azure Function then sends the telemetry via SignalR as if the data was coming from a client-side app.
The flow for Azure SignalR integration is as follows:
- The Web client makes a REST call to 'negotiate', amongst other things, the SignalR 'Hubname' is returned to the client.
- The Web client then makes a REST call to 'getdevicestate', this HTTP Trigger retrieves the state for all devices from the Device State Table. The data is returned to the client via SignalR via the same 'Hubname' that was returned from the call to 'negotiate'.
- When new telemetry arrives via IoT Hub, the 'TelemetryProcessing' trigger fires, the telemetry is updated in the Device State table and a REST call is made to the 'SendSignalRMessage' and telemetry is sent to all the SignalR clients listening on the 'Hubname' channel.
Set Up Overview
This lab uses free of charge services on Azure. The following need to be set up:
- Azure IoT Hub and Azure IoT Device
- Azure SignalR Service
- Deploy the Kotlin Azure Function
- Deploy the SignalR .NET Core Azure Function
Step 1: Follow the Raspberry Pi Simulator Guide to set up Azure IoT Hub
Setting up the Raspberry Pi Simulator
Step 2: Create an Azure Resource Group
az group create -l westus -n enviromon-kotlin
Step 3: Create a Azure Signal Service
- az signalr create creates the Azure SignalR Service
- az signalr key list returns the connection string you need for the SignalR .NET Core Azure Function.
az signalr create -n <Your SignalR Name> -g enviromon-kotlin --sku Free_DS2 --unit-count 1
az signalr key list -n <Your SignalR Name> -g enviromon-kotlin
Step 4: Create a Storage Account
az storage account create -n enviromonstorage -g enviromon-kotlin -l westus --sku Standard_LRS --kind StorageV2
Step 5: Clone the project
git clone https://github.com/gloveboxes/Going-Serverless-with-Kotlin-Azure-Functions-SignalR.git
Step 6: Deploy the SignalR .NET Core Azure Function
cd Going-Serverless-with-Kotlin-Azure-Functions-SignalR
cd iot-serverless-kotlin-azure-functions-signalr
cp local.settings.sample.json local.settings.json
Step 7: Open the Kotlin Azure Functions Project with IntelliJ
From Terminal on Linux and macOS, or Powershell on Windows.
cd Going-Serverless-with-Kotlin-Azure-Functions-SignalR
cd iot-serverless-kotlin-azure-functions-signalr
cp local.settings.sample.json local.settings.json
Start IntelliJ and open the project in the 'iot-serverless-kotlin-azure-functions-signalr' folder.
Step 8: Update the local.settings.json
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "",
"FUNCTIONS_WORKER_RUNTIME": "java",
"StorageConnectionString": "",
"PartitionKey": "Sydney",
"IotHubConnectionString": "",
"AzureSignalRConnectionString": "",
"AzureSignalRUrl": ""
},
"Host": {
"LocalHttpPort": 7071,
"CORS": "http://127.0.0.1:5500,http://localhost:5500",
"CORSCredentials": true
}
}
Step 9: Deploy the Kotlin Azure Function
Open the 'Maven' tab, run 'clean', the 'package', then 'azure-functions:deploy'
Step 10: Enable Static Websites for Azure Storage
The Dashboard project contains the Static Website project.
Follow the guide for Static website hosting in Azure Storage.
The page used for this sample is enviromon.html. Be sure to modify the "apiBaseUrl" url in the web page javascript to point your instance of the SignalR Azure Function.
Copy the contents of the dashboard project to the static website.
Step 11: Enable CORS for the SignalR .NET Core Azure Function
az functionapp cors add -g enviromon-kotlin -n <Your SignalR Function Name> --allowed-origins <https://my-static-website-url>
Step 12: Start the Dashboard
From your web browser, navigate to https://your-start-web-site/enviromon.html
The telemetry from the Raspberry Pi Simulator will be displayed on the dashboard.
Top comments (0)