DEV Community

Cover image for Why Your MQTT Client Is Silently Losing Messages (And How I Fixed It) - robmqtt
Supun Sriyananda
Supun Sriyananda

Posted on • Edited on

Why Your MQTT Client Is Silently Losing Messages (And How I Fixed It) - robmqtt

Why Your MQTT Client Is Silently Losing Messages (And How I Fixed It)

I learned this the hard way.

I was building a sensor system for a field deployment — Raspberry Pi units publishing temperature and humidity data over 4G cellular to an MQTT broker. The dashboard looked fine. The graphs looked fine. Then one day I compared the raw sensor logs against what actually made it to the broker.

Thousands of readings. Gone. No errors. No warnings. Just gone.

The culprit? paho-mqtt's default behaviour when the broker is unreachable: it silently drops your message and moves on.

After losing enough data I wrote a library to fix it. It's now on PyPI as robmqtt.

pip install robmqtt
Enter fullscreen mode Exit fullscreen mode

But before I show you how it works, let me show you exactly what the problem is — because it's subtler than most people realise.


The Problem with Standard MQTT Clients

When you call client.publish() in paho-mqtt and the broker is unreachable, one of two things happens:

  1. The message is silently discarded (QoS 0)
  2. The message is queued in memory for QoS 1/2 — but that queue is lost on process restart, and there's a second gap that even QoS 1 doesn't close

That second gap is the sneaky one. Here's what happens with QoS 1:

qos1-silent-failure.png

The message was "sent" from your perspective. It was never confirmed from the broker's perspective. And paho has no mechanism to track this gap across reconnections.

On a stable data centre network, this almost never matters. On a Raspberry Pi running on 4G cellular in a field cabinet, it happens constantly.


What a Resilient Edge Client Actually Needs

After losing enough data, I sat down and wrote out what a proper edge MQTT client needs to do:

1. Persist offline messages to disk
If the broker is unreachable when publish() is called, the message should be written to disk and replayed later. Not held in memory — memory is lost on restart.

2. Track in-flight messages separately
Messages that have been handed to the broker but not yet ACK'd need to be tracked. On reconnect, they must be re-sent before any queued messages start draining.

3. Priority-based eviction
When the queue fills up, not all messages are equal. A critical alarm should survive. A routine telemetry reading from 6 hours ago should not block it.

4. Exponential backoff on reconnect
A fleet of 50 devices coming back online after a broker restart should not all hammer the broker at the same second.

5. Thread-safe storage
The MQTT network thread and your application thread are both touching message state. This needs to be safe without forcing the caller to think about locking.

None of this is exotic. All of it is missing from the standard paho-mqtt client when used out of the box.


How robmqtt Solves It

Here's the architecture:

robmqtt-architecture.png

The Offline Queue

When the client detects it's disconnected, publish() routes to an OfflineQueue backed by SQLite:

# Simplified from offline_queue.py
class OfflineQueue:
    def enqueue(self, topic, payload, qos, priority):
        with self._lock:
            self._db.execute("""
                INSERT INTO queue (topic, payload, qos, priority, timestamp)
                VALUES (?, ?, ?, ?, ?)
            """, (topic, payload, qos, priority, time.time()))

    def dequeue_batch(self, batch_size=10):
        with self._lock:
            # Highest priority first, then oldest first within same priority
            return self._db.execute("""
                SELECT id, topic, payload, qos FROM queue
                ORDER BY priority DESC, timestamp ASC
                LIMIT ?
            """, (batch_size,)).fetchall()
Enter fullscreen mode Exit fullscreen mode

SQLite gives you durability without a separate process. It survives power cycles. The threading lock means your application thread and the drain thread never step on each other.

The Inflight Tracker

This closes the gap QoS 1 leaves open:

# Simplified from inflight_tracker.py
class InflightTracker:
    def track(self, mid, topic, payload, qos):
        """Call this when you hand a message to paho."""
        with self._lock:
            self._db.execute("""
                INSERT OR REPLACE INTO inflight (mid, topic, payload, qos)
                VALUES (?, ?, ?, ?)
            """, (mid, topic, payload, qos))

    def acknowledge(self, mid):
        """Call this in on_publish callback."""
        with self._lock:
            self._db.execute("DELETE FROM inflight WHERE mid = ?", (mid,))

    def get_all_pending(self):
        """Call this on reconnect — re-send everything unacknowledged."""
        with self._lock:
            return self._db.execute(
                "SELECT topic, payload, qos FROM inflight"
            ).fetchall()
Enter fullscreen mode Exit fullscreen mode

On reconnect, the client replays all inflight messages first, then starts draining the offline queue. Delivery order is preserved.

Priority Eviction

Each message gets a priority from 1 (lowest) to 10 (highest):

# Routine telemetry — can be evicted when queue is full
client.publish(
    topic="sensors/temperature",
    payload='{"value": 23.5}',
    qos=1,
    priority=3
)

# Critical alert — survives eviction, displaces old telemetry
client.publish(
    topic="alerts/critical",
    payload='{"type": "over_temp", "value": 87.2}',
    qos=2,
    priority=9
)
Enter fullscreen mode Exit fullscreen mode

When the queue hits capacity, the lowest-priority messages are evicted first. Your critical alerts are never blocked by a backlog of stale routine data.


Using robmqtt

Install:

pip install robmqtt
Enter fullscreen mode Exit fullscreen mode

Basic usage — this is everything you need:

from robmqtt import ProductionMQTTClient
import json

client = ProductionMQTTClient(
    client_id="field_device_001",
    broker_host="mqtt.yourdomain.com",
    broker_port=1883,
    max_queue_size=5000,    # holds ~5000 messages during outages
    min_backoff=2,          # start retrying after 2s
    max_backoff=60,         # cap retry interval at 60s
    db_path="./device.db",  # SQLite lives here — survives reboots
)

client.connect()
client.start()

# From here just call publish() — routing is handled internally.
# Connected: sends directly and tracks inflight.
# Disconnected: writes to SQLite, drains automatically on reconnect.

while True:
    reading = read_sensor()
    client.publish(
        topic="sensors/temperature",
        payload=json.dumps(reading),
        qos=1,
        priority=5,
    )
    time.sleep(30)
Enter fullscreen mode Exit fullscreen mode

The application code doesn't need to know whether the broker is reachable. That's the point.

Check what's happening at runtime:

stats = client.get_statistics()
print(stats)
# {
#   'is_connected': True,
#   'offline_queue_size': 0,
#   'inflight_count': 2,
#   'reconnect_count': 4,
#   ...
# }
Enter fullscreen mode Exit fullscreen mode

TLS is supported if your broker requires it:

client = ProductionMQTTClient(
    client_id="secure_device_001",
    broker_host="mqtt.yourdomain.com",
    broker_port=8883,
    use_tls=True,
    ca_certs="/etc/ssl/certs/broker-ca.crt",
    username="device001",
    password="your_password",
)
Enter fullscreen mode Exit fullscreen mode

Seeing It in Action

The repo includes test_13.py, a simulation designed specifically to demo the offline behaviour:

# Terminal 1 — run the simulation (publishes every 5 seconds)
python test_13.py

# Terminal 2 — simulate a network outage
sudo systemctl stop mosquitto

# Watch messages queue up in Terminal 1
# Queue stats print every 10 readings

# Restore connectivity
sudo systemctl start mosquitto

# Watch the offline queue drain automatically — zero messages lost
Enter fullscreen mode Exit fullscreen mode

The queue drain happens in a background daemon thread. Your application code does nothing. It just works.


Real-World Context

I've deployed this pattern on:

  • Battery management systems — monitoring cell voltages and temperatures in production energy storage systems. A 10-minute broker outage during a network switch should not cause a gap in the battery health record.
  • Robotics telemetry — ROS2 robots publishing sensor and status data. Process restarts during OTA updates should not lose the last known state.
  • MQTT edge gateways — aggregating data from downstream sensors over serial or CAN and forwarding to a cloud broker over 4G. The gateway may reconnect dozens of times per day.

In all of these cases, the pattern is the same: treat disconnection as normal, not exceptional. Design the client to buffer, not to fail.


Who This Is For

robmqtt is specifically for edge device deployments where:

  • Network connectivity is unreliable (cellular, Wi-Fi roaming, VPNs)
  • Process restarts happen (watchdog resets, power cycles, OTA updates)
  • Message loss has real consequences (industrial monitoring, remote sensors, fleet telemetry)
  • You don't want to build and maintain this infrastructure yourself

If you're running MQTT on a stable cloud-to-cloud connection, paho-mqtt alone is probably fine. If you're deploying on Raspberry Pi, industrial gateways, field sensors, or anything running on 4G/LTE — this is for you.


What's Next

The Prometheus metrics endpoint is on the roadmap. The structured logging already writes .jsonl metrics files — exposing them via HTTP is a small step and would make robmqtt slot naturally into standard observability stacks.

If you try it and hit an issue, open a GitHub issue. If you want a feature, open a discussion.


Have you run into MQTT message loss on edge devices? How did you handle it — drop a comment below.

Top comments (1)

Collapse
 
ranaweerasupun profile image
Supun Sriyananda

Have you run into MQTT message loss on edge devices? How did you handle it — drop a comment below.