Managing an IoT fleet with over 2,500 devices in Kenya isn't always straightforward, especially when you're dealing with intermittent connectivity and budget hardware. Recently, I had to set up a pipeline to catch anomalies in our data before they caused real headaches. Here's how I stitched together MQTT, Python, and some basic anomaly detection to get it done.
The context
Our devices are spread across rural areas with spotty internet, sending telemetry data every few minutes. This data includes temperature, humidity, and some custom sensor readings. Setting up a real-time monitoring system to alert us to anomalies, such as an unexpected spike in temperature, was challenging without exceeding our budget or falling apart due to connectivity issues.
Enter MQTT: The messenger
I chose MQTT for our messaging protocol because it’s lightweight, which is perfect for devices with limited resources. We’ve set up an MQTT broker on a local server that each device publishes to. The setup is straightforward and has worked reliably for us in the field. By using MQTT, we can achieve low-latency communication, which is essential for real-time anomaly detection.
Here's a quick look at the basic MQTT setup using the paho-mqtt library in Python:
import paho.mqtt.client as mqtt
# Define connection parameters
BROKER_ADDRESS = "localhost"
TOPIC = "sensor/data"
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
client.subscribe(TOPIC)
def on_message(client, userdata, msg):
print(f"Message received: {msg.topic} {msg.payload}")
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER_ADDRESS)
client.loop_start()
This is just a basic setup to show you how these events are handled. The real work happens in processing the incoming data.
Processing data with Python
Once the data hits our broker, we move it through a Python pipeline. The goal is to detect anomalies in sensor readings. For that, I've relied on scikit-learn and numpy. The great thing about scikit-learn is that it's fairly light and performs decently even on constrained hardware.
I used a basic z-score method for anomaly detection. It wasn't about using the fanciest model but rather ensuring it runs efficiently across multiple devices under our infrastructure constraints.
import numpy as np
from sklearn.preprocessing import StandardScaler
# Simulating incoming data
sensor_data = np.array([23.3, 23.5, 24.1, 50.0, 23.7]) # Note the anomaly?
# Standardizing data
scaler = StandardScaler()
sensor_data = sensor_data.reshape(-1, 1)
scaled_data = scaler.fit_transform(sensor_data)
# Checking for anomalies with z-score
z_scores = np.abs(scaled_data)
anomalies = np.where(z_scores > 2.0)[0] # Assuming anything >2 is an anomaly
print("Anomalies detected at indices:", anomalies)
In practical terms, this helped us flag temperature spikes over 10 degrees above the mean in near real-time. With IoT fleets, speed is critical, so minimizing the lag between data collection and anomaly detection was essential.
Connectivity challenges
One of the main challenges was dropped connections. MQTT with Quality of Service (QoS) levels helped, but not entirely. Initially, about 20% of our data was missing due to dropped connections. By incorporating retries and redundancy in the data publication, I managed to bring it down to about 5%.
We also set up a local buffer on each device. When there’s a connection issue, the device holds onto its data and publishes it once a stable connection's back. Here’s a quick structure of how the buffering looked:
def publish_sensor_data(client, data):
try:
client.publish(TOPIC, data, qos=1)
except Exception as e:
local_buffer.append(data)
print("Buffering data due to connection issue:", e)
def retry_buffered_data(client):
for data in local_buffer:
try:
client.publish(TOPIC, data, qos=1)
local_buffer.remove(data)
except Exception as e:
print("Retry failed, keeping data in buffer:", e)
This local buffer was a lifesaver many times, especially in rural areas where network stability is unpredictable.
Cost management
Budgeting is a constant concern in our setup. Every additional processing step could mean higher costs, either from energy consumption or added computational load. We've found that keeping our anomaly detection model simple helps us maintain a balance between performance and cost.
Switching from cloud-based heavy analyzers to these lightweight solutions reduced our AWS bills by about $200/month. That's a significant saving when you're looking at scale.
What’s next?
I’m considering expanding our pipeline to explore more advanced anomaly detection methods without drastically increasing resource usage. One promising direction is incorporating edge computing. Processing data closer to where it's collected could cut down on latency and further improve reliability.
I’m also looking at solutions like TinyML, which could integrate nicely with our existing infrastructure. They seem promising for running models directly on the devices, given how resource-hungry transmitting data can get.
For anyone in a similar position, especially dealing with infrastructure constraints, remember: The simplest solution that works is usually the best. Keep iterating, refine based on field feedback, and don't shy away from getting your hands dirty with what you’ve got.
Top comments (0)