How to Generate Realistic IoT Sensor Data for Testing Your MQTT Pipeline
This is part 2 of a series on building robmqtt. Part 1 covered why paho-mqtt silently drops messages and the library I built to fix it. This part is about testing — how to exercise an MQTT pipeline without deploying physical hardware.
In the last post I wrote about robmqtt, a resilient MQTT client for edge devices. Once I had it working, I hit the obvious next problem: how do I test it properly without setting up a rack of Raspberry Pis?
I needed data flowing through the pipeline. Lots of it. From many devices. Behaving differently. And ideally surviving a broker outage so I could watch the offline queue do its job.
So I wrote a device simulator. And in writing it, I learned that generating realistic fake sensor data is harder than it looks — and that most people get it wrong in the same way.
The trap: random data isn't realistic data
The first instinct when simulating a sensor is to reach for random.uniform():
cpu = random.uniform(0, 100) # don't do this
This produces data that looks nothing like a real sensor. Real sensor readings don't jump randomly across the whole range every second. A CPU sitting at 8% doesn't suddenly read 94% then drop to 3%. Temperature drifts slowly. Signal strength wobbles around a baseline. There's continuity from one reading to the next.
If you test your pipeline with pure random noise, your charts look like static, your anomaly detection has nothing meaningful to detect, and your dashboards are useless for spotting whether anything actually works.
I wanted data that looked like it came from a real device.
Realistic drift with a sine wave
The trick I landed on was a slow sine wave with per-device random phase, plus a little Gaussian noise on top:
def _cpu(self) -> float:
p = self.profile
drift = 5 * math.sin(time.time() / 300 + self._drift_phase)
noise = random.gauss(0, p["cpu_variance"] / 2)
value = p["cpu_base"] + drift + noise
return round(max(1.0, min(99.0, value)), 1)
Three things are happening here:
The sine wave (math.sin(time.time() / 300 ...)) creates a slow, smooth oscillation with a period of about five minutes. This is the gradual drift you see in real systems as load rises and falls through the day.
The phase offset (self._drift_phase, a random value set once when the device starts) means every device is at a different point in its cycle. Without it, all your simulated devices would drift up and down in perfect unison, which is a dead giveaway that the data is fake.
self._drift_phase = random.uniform(0, 2 * math.pi)
The Gaussian noise (random.gauss) adds small reading-to-reading variation on top of the drift. Real sensors are never perfectly smooth — there's always measurement jitter.
The result is data that drifts, wobbles, and stays within a believable range — and each device has its own personality. When you chart it, it looks like telemetry, not like a random number generator.
Device profiles: a camera is not a sensor
A real fleet isn't 15 copies of the same device. A camera runs hot and busy. A simple sensor sips power and idles. A gateway sits in between. If your simulator treats them all identically, your test data doesn't reflect anything real.
So each device type gets a profile:
DEVICE_PROFILES = {
"sensor": {
"cpu_base": 8, "cpu_variance": 6,
"temp_base": 42.0, "temp_variance": 5.0,
"telemetry_interval": 10,
"failure_rate": 0.03,
},
"camera": {
"cpu_base": 65, "cpu_variance": 20,
"temp_base": 61.0, "temp_variance": 10.0,
"telemetry_interval": 5,
"failure_rate": 0.02,
},
# gateway, controller ...
}
A camera baselines at 65% CPU and 61°C, publishing every 5 seconds. A sensor baselines at 8% CPU and 42°C, publishing every 10 seconds. When this data lands in a dashboard, the device types are visibly different — exactly like a real deployment, where you can often guess a device's role just from its resource profile.
The failure_rate field controls how often the device injects an anomaly — a sudden CPU and temperature spike — so there's something for downstream anomaly detection to actually find:
if self._is_anomaly():
payload["cpu_percent"] = round(random.uniform(88, 99), 1)
payload["temperature_c"] = round(random.uniform(78, 92), 2)
payload["anomaly"] = True
Using robmqtt in the simulator
This is where the simulator doubles as a usage example for the library. Each simulated device is a real robmqtt client:
from robmqtt import ProductionMQTTClient
self.client = ProductionMQTTClient(
client_id=f"fleet_{device_id}",
broker_host=broker_host,
broker_port=broker_port,
max_queue_size=500,
db_path=f"./data/{device_id}.db",
min_backoff=2,
max_backoff=30,
log_dir=f"./logs/{device_id}",
)
self.client.connect()
self.client.start()
Each device publishes three kinds of message, and the QoS and priority differ by importance:
# Telemetry — frequent, can tolerate eviction under pressure
self.client.publish(topic=f"fleet/{id}/telemetry",
payload=json.dumps(payload), qos=1, priority=5)
# Status — operational health, higher priority
self.client.publish(topic=f"fleet/{id}/status",
payload=json.dumps(payload), qos=1, priority=8)
# Boot/alert events — must not be lost, highest priority, QoS 2
self.client.publish(topic=f"fleet/{id}/events",
payload=json.dumps(payload), qos=2, priority=10)
This is the priority system from part 1 in action. If the broker goes down and the offline queue fills, routine telemetry (priority 5) gets evicted before status messages (priority 8), and event messages (priority 10, QoS 2) are protected.
The status messages even report the client's own internal state, pulled straight from robmqtt:
stats = self.client.get_statistics()
payload = {
"queue_depth": stats.get("offline_queue_size", 0),
"inflight_count": stats.get("inflight_count", 0),
"is_connected": stats.get("is_connected", False),
"reconnect_count": stats.get("reconnect_count", 0),
}
So the simulated fleet reports on its own connectivity health — which means you can build a dashboard that shows the offline queue filling and draining in real time.
Watching the offline queue work
This is the part I find satisfying. Start a device:
python device_simulator.py --device-id device_001 --device-type gateway
[device_001] Started — type=gateway location=warehouse_a
Now kill the broker. The device keeps publishing — but the messages are now being written to SQLite instead of sent:
sudo systemctl stop mosquitto
The device doesn't crash. It doesn't error. It just quietly queues. The queue_depth in the status payload climbs: 5, 12, 28, 45...
Bring the broker back:
sudo systemctl start mosquitto
The queue drains automatically. Every reading that piled up during the outage is delivered, in priority order. The queue_depth falls back to zero. Nothing was lost.
That's the whole point of the library, demonstrated in a way you can watch happen.
Why a simulator is worth building
Even if you have real hardware, a simulator earns its place:
It lets you test at scale you don't have hardware for. You can run 15 simulated devices on your laptop and see how your pipeline, database, and dashboards behave under fleet-level load.
It gives you reproducible failure scenarios. Killing a broker on demand is a lot easier than waiting for a real 4G connection to drop in the field.
It produces clean test data with known properties. You injected the anomalies, so you know exactly what your anomaly detection should catch.
And — as a bonus — it doubles as living documentation for how to use your client library. The simulator is the example.
What's next
In part 3 I'll cover running this at fleet scale — launching many devices at once and feeding their telemetry through an analytics pipeline into a live dashboard.
The full simulator code is on GitHub alongside robmqtt:
How do you test your IoT pipelines — real hardware, simulators, or something else? I'm curious what others do — let me know in the comments.
Top comments (1)
How do you test your IoT pipelines — real hardware, simulators, or something else? I'm curious what others do — let me know in the comments.