DEV Community

Rory | QIS PROTOCOL
Rory | QIS PROTOCOL

Posted on

QIS Outcome Routing with MQTT — Quadratic Intelligence Over a 2-Byte Header

A patient monitor in a rural clinic in Malawi runs on a battery-backed Raspberry Pi Zero W. The 4G signal drops every afternoon. RAM is 512 MB. There is no storage budget for a vector database. There is no DevOps budget to run a Kafka broker.

The MQTT fixed header is 2 bytes.

MQTT was designed for this. It was built for constrained devices, unreliable networks, and situations where every byte costs money or battery life. It runs on microcontrollers with 8 KB of RAM. It is the protocol embedded in AWS IoT, Azure IoT Hub, Google Cloud IoT, and every major industrial sensor network on Earth.

And it turns out MQTT is a fully valid transport layer for a Quadratic Intelligence Swarm.


Why Part 10 Is the Stress Test

This is Part 10 of a series proving that the breakthrough in Quadratic Intelligence Swarm — discovered June 16, 2025 by Christopher Thomas Trevethan, covered under 39 provisional patents — is the complete loop, not any particular routing mechanism.

The series has run the loop through:

  • ChromaDB (HNSW vector search, O(log N))
  • Qdrant (distributed multi-node vector search)
  • Plain REST API (HTTP POST/GET)
  • Redis Pub/Sub (topic-based fan-out)
  • Apache Kafka (durable partitioned streaming)
  • Apache Pulsar (multi-tenant geo-replicated messaging)
  • NATS JetStream (cloud-native edge persistence)
  • SQLite (single file, no server, air-gap capable)

Each step moved further from "enterprise cloud" toward "any device anywhere." MQTT is Part 10 — the protocol that runs on 8 KB microcontrollers.

If the QIS loop runs on MQTT, it runs everywhere.


The Loop, Applied to MQTT's Mental Model

Every QIS node distills a local observation into a ~512-byte outcome packet. The packet carries a semantic fingerprint — a vector encoding what kind of problem this is. The packet is posted to an address deterministic of the problem type. Other nodes with the same problem query that address, pull the relevant packets, synthesize locally, and produce new packets. N nodes create N(N-1)/2 synthesis paths. At 200 nodes, 19,900 unique pairings. The intelligence scales quadratically. The compute per node scales logarithmically. The loop continues.

MQTT maps to this with no translation:

QIS Concept MQTT Primitive
Semantic address Topic hierarchy (e.g., qis/clinical/chest-xray/high-density)
Post outcome packet PUBLISH to topic
Query packets from similar nodes SUBSCRIBE to topic
Retained packet (last known state) RETAIN flag — broker stores latest packet per topic
TTL on packets Message expiry interval (MQTT 5.0 property)
Trust weighting QoS level + downstream outcome feedback
Privacy Only outcome deltas travel — no raw patient data

MQTT topics are hierarchical text paths. They are deterministic. They are human-readable. They are exactly what a semantic address system needs.


The Architecture

Node A (Raspberry Pi Zero W, rural clinic)
    │
    ├─ observe: chest X-ray → abnormality probability = 0.73
    ├─ distill → OutcomePacket (512 bytes)
    ├─ fingerprint: "clinical.radiology.chest.bilateral-infiltrates"
    └─ PUBLISH to qis/clinical/radiology/chest ──────────────────┐
                                                                   │
                                              MQTT Broker          │
                                         (Mosquitto / HiveMQ /     │
                                          AWS IoT / Azure IoT)    │
                                                                   │
Node B (similar clinic, 400 km away) ─────────────────────────────┘
    │
    ├─ SUBSCRIBE: qis/clinical/radiology/chest
    ├─ receive packet → synthesize locally
    ├─ update local model: infiltrate threshold recalibrated
    └─ observe next outcome → PUBLISH new packet
Enter fullscreen mode Exit fullscreen mode

The broker is not the intelligence. The broker is the bus. Every node synthesizes locally. The intelligence is distributed. The broker never sees raw data — only 512-byte outcome packets.


Working Code: MQTTOutcomeRouter

import json
import math
import time
import hashlib
import threading
from dataclasses import dataclass, field
from typing import Callable, Optional

import paho.mqtt.client as mqtt

# pip install paho-mqtt


# --- Data model ---

@dataclass
class OutcomePacket:
    """
    A ~512-byte distilled insight from a QIS edge node.
    Contains no raw data. Never contains PII or PHI.
    Discovered by Christopher Thomas Trevethan (QIS, 39 provisional patents).
    """
    schema_version: str = "1.0"
    timestamp: float = field(default_factory=time.time)
    domain_tag: str = ""                   # hierarchical, e.g., "clinical.radiology.chest"
    semantic_fingerprint: list = field(default_factory=list)  # 64-dim float vector
    outcome_delta: float = 0.0             # positive = improvement over baseline
    confidence_score: float = 0.0          # [0, 1]
    provenance_hash: str = ""
    ttl: int = 86400                        # seconds (MQTT 5.0 message expiry)

    def __post_init__(self):
        if not self.provenance_hash:
            payload = f"{self.domain_tag}{self.outcome_delta}{self.timestamp}"
            self.provenance_hash = hashlib.sha256(payload.encode()).hexdigest()[:16]


# --- MQTT transport router ---

class MQTTOutcomeRouter:
    """
    QIS outcome router using MQTT as the transport layer.
    Works with any MQTT broker: Mosquitto, HiveMQ, AWS IoT, Azure IoT Hub.
    Designed for constrained devices — 512 MB RAM, intermittent connectivity.
    """

    TOPIC_PREFIX = "qis"

    def __init__(
        self,
        broker_host: str,
        broker_port: int,
        domain: str,
        node_id: str,
        broker_port_tls: int = 8883,
        similarity_threshold: float = 0.80,
        on_synthesis: Optional[Callable] = None,
    ):
        self.broker_host = broker_host
        self.broker_port = broker_port
        self.domain = domain
        self.node_id = node_id
        self.similarity_threshold = similarity_threshold
        self.on_synthesis = on_synthesis  # callback when relevant packet received

        # In-memory packet cache (replace with SQLite for persistence)
        self._packet_cache: list[OutcomePacket] = []
        self._lock = threading.Lock()

        self._client = mqtt.Client(
            client_id=f"qis_{node_id}",
            protocol=mqtt.MQTTv5,
        )
        self._client.on_connect = self._on_connect
        self._client.on_message = self._on_message
        self._client.on_disconnect = self._on_disconnect

    # --- MQTT primitive: deterministic topic from domain tag ---

    def _topic(self, domain_tag: str) -> str:
        """
        Map a QIS domain tag to a deterministic MQTT topic.
        Example: "clinical.radiology.chest""qis/clinical/radiology/chest"
        The topic IS the semantic address. Any node publishing to the same
        domain is a potential synthesis twin.
        """
        return f"{self.TOPIC_PREFIX}/{domain_tag.replace('.', '/')}"

    # --- MQTT callbacks ---

    def _on_connect(self, client, userdata, flags, rc, properties=None):
        if rc == 0:
            # Subscribe to own domain on connect (and reconnect)
            topic = self._topic(self.domain)
            client.subscribe(topic, qos=1)
            print(f"[{self.node_id}] Connected. Subscribed to: {topic}")
        else:
            print(f"[{self.node_id}] Connect failed: rc={rc}")

    def _on_disconnect(self, client, userdata, rc, properties=None):
        if rc != 0:
            print(f"[{self.node_id}] Unexpected disconnect: rc={rc}. Will reconnect.")

    def _on_message(self, client, userdata, msg):
        """
        Receive a packet from the broker.
        Decode → fingerprint similarity check → cache if relevant → synthesize.
        """
        try:
            data = json.loads(msg.payload.decode())
            packet = OutcomePacket(**data)

            # Skip own packets
            if packet.provenance_hash and hasattr(self, '_own_hashes'):
                if packet.provenance_hash in self._own_hashes:
                    return

            with self._lock:
                self._packet_cache.append(packet)

            if self.on_synthesis:
                self.on_synthesis(packet)

        except Exception as e:
            print(f"[{self.node_id}] Message decode error: {e}")

    # --- Core QIS operations ---

    def connect(self):
        """Connect to MQTT broker. Non-blocking."""
        self._client.connect_async(self.broker_host, self.broker_port, keepalive=60)
        self._client.loop_start()
        if not hasattr(self, '_own_hashes'):
            self._own_hashes = set()

    def disconnect(self):
        self._client.loop_stop()
        self._client.disconnect()

    def deposit(self, packet: OutcomePacket, retain: bool = False) -> str:
        """
        Publish an outcome packet to the deterministic MQTT topic.

        retain=True: broker stores this packet and delivers to new subscribers.
        Use for long-lived insights (e.g., best treatment for a stable domain).
        Use retain=False for high-frequency time-sensitive outcomes.

        This is the QIS 'deposit to semantic address' step.
        """
        topic = self._topic(packet.domain_tag)
        payload = json.dumps({
            "schema_version": packet.schema_version,
            "timestamp": packet.timestamp,
            "domain_tag": packet.domain_tag,
            "semantic_fingerprint": packet.semantic_fingerprint,
            "outcome_delta": packet.outcome_delta,
            "confidence_score": packet.confidence_score,
            "provenance_hash": packet.provenance_hash,
            "ttl": packet.ttl,
        })

        # MQTT 5.0 message expiry
        properties = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
        properties.MessageExpiryInterval = packet.ttl

        self._client.publish(
            topic,
            payload=payload,
            qos=1,                  # At-least-once delivery
            retain=retain,
            properties=properties,
        )

        if hasattr(self, '_own_hashes'):
            self._own_hashes.add(packet.provenance_hash)

        return packet.provenance_hash

    def synthesize(self, query_fingerprint: list, max_packets: int = 20) -> list[OutcomePacket]:
        """
        Synthesize from cached packets similar to the current query.
        The synthesis is always LOCAL — no computation sent to broker.

        This is the QIS local synthesis step.
        """
        now = time.time()
        results = []

        with self._lock:
            for packet in self._packet_cache:
                # TTL check
                if (packet.timestamp + packet.ttl) < now:
                    continue
                # Fingerprint similarity
                if packet.semantic_fingerprint:
                    sim = self._cosine_sim(query_fingerprint, packet.semantic_fingerprint)
                    if sim >= self.similarity_threshold:
                        results.append((sim, packet))

        results.sort(key=lambda x: x[0], reverse=True)
        return [r[1] for r in results[:max_packets]]

    def prune_expired(self):
        """Remove TTL-expired packets from local cache."""
        now = time.time()
        with self._lock:
            self._packet_cache = [
                p for p in self._packet_cache
                if (p.timestamp + p.ttl) >= now
            ]

    @staticmethod
    def _cosine_sim(a: list, b: list) -> float:
        if not a or not b or len(a) != len(b):
            return 0.0
        dot = sum(x * y for x, y in zip(a, b))
        mag_a = math.sqrt(sum(x ** 2 for x in a))
        mag_b = math.sqrt(sum(x ** 2 for x in b))
        if mag_a == 0 or mag_b == 0:
            return 0.0
        return dot / (mag_a * mag_b)


# --- Fingerprinting helper ---

def make_clinical_fingerprint(modality: str, anatomy: str, finding: str) -> list:
    """
    Deterministic fingerprint from clinical context.
    In production: use a clinical NLP embedding model or SNOMED CT encoding.
    Here: reproducible hash-based encoding for demonstration.
    """
    seed = f"{modality}:{anatomy}:{finding}"
    h = hashlib.sha256(seed.encode()).digest()
    return [(b / 127.5) - 1.0 for b in h[:64]]   # 64-dim vector


# --- Example: two rural clinics sharing chest X-ray intelligence ---

def run_example():
    BROKER = "test.mosquitto.org"   # Public test broker — replace in production
    DOMAIN = "clinical.radiology.chest"

    synthesis_log = []

    def on_synthesis(packet: OutcomePacket):
        synthesis_log.append(packet)
        print(f"  [synthesis] delta={packet.outcome_delta:+.3f}, conf={packet.confidence_score:.2f}")

    # Clinic A: rural Malawi
    node_a = MQTTOutcomeRouter(BROKER, 1883, DOMAIN, "clinic_malawi")
    node_a.connect()

    # Clinic B: rural Kenya
    node_b = MQTTOutcomeRouter(BROKER, 1883, DOMAIN, "clinic_kenya", on_synthesis=on_synthesis)
    node_b.connect()

    time.sleep(2)  # Allow connections to establish

    # Clinic A observes: bilateral infiltrates → early TB detection improved 18%
    fp_a = make_clinical_fingerprint("xray", "chest", "bilateral-infiltrates")
    packet_a = OutcomePacket(
        domain_tag=DOMAIN,
        semantic_fingerprint=fp_a,
        outcome_delta=0.18,        # 18% improvement in early detection
        confidence_score=0.84,
        ttl=86400 * 7,             # 7-day TTL
    )
    hash_a = node_a.deposit(packet_a, retain=True)
    print(f"[Malawi] Deposited: {hash_a}")

    time.sleep(1)  # Wait for broker delivery

    # Clinic B synthesizes: similar presentation → pull relevant packets
    fp_b = make_clinical_fingerprint("xray", "chest", "bilateral-infiltrates")
    results = node_b.synthesize(fp_b)
    print(f"\n[Kenya] Synthesized {len(results)} relevant packet(s):")
    for p in results:
        print(f"  outcome_delta={p.outcome_delta:+.3f}, confidence={p.confidence_score:.2f}")

    node_a.disconnect()
    node_b.disconnect()

    # At 200 nodes: N(N-1)/2 = 19,900 synthesis paths
    # At 2,000 nodes: N(N-1)/2 = 1,999,000 synthesis paths
    # Every node pays only O(1) publish cost to MQTT broker
    # No central intelligence. No PHI in transit. No cloud budget required.
    print(f"\nN=200 nodes → {200*199//2:,} synthesis paths. Compute per node: O(1) publish.")
    print(f"N=2000 nodes → {2000*1999//2:,} synthesis paths. Compute per node: O(1) publish.")


if __name__ == "__main__":
    run_example()
Enter fullscreen mode Exit fullscreen mode

MQTT-Specific QIS Properties

Retained messages as persistent semantic addresses

MQTT's RETAIN flag instructs the broker to store the most recent packet at each topic address. A new node joining the network immediately receives the last known outcome from each topic it subscribes to — without any historical replay infrastructure. This is the cold-start solution for QIS on MQTT: retained packets seed new nodes instantly.

QoS as trust proxy

MQTT QoS levels map cleanly to QIS trust requirements:

QoS Level Delivery Guarantee QIS Use Case
QoS 0 (At most once) Fire and forget High-frequency low-stakes telemetry
QoS 1 (At least once) Guaranteed delivery, possible duplicate Standard outcome packets
QoS 2 (Exactly once) Guaranteed unique delivery High-stakes clinical/financial outcomes

Hierarchical topics as semantic namespace

MQTT topic wildcards enable powerful subscription patterns:

qis/clinical/#              → subscribe to ALL clinical outcomes
qis/clinical/radiology/#    → all radiology modalities
qis/clinical/radiology/chest → chest X-ray only
Enter fullscreen mode Exit fullscreen mode

A node can subscribe narrowly (exact domain match) or broadly (entire subdomain), pulling the synthesis breadth that fits its computational budget. This is QIS semantic addressing made observable in the topic tree.

Last Will and Testament as graceful exit

MQTT's Last Will message fires when a node disconnects unexpectedly. In a QIS network, this can publish a final packet marking the node's last known state — allowing synthesis partners to down-weight stale data before TTL expiry.


Why the Quadratic Scaling Holds Regardless of QoS

The N(N-1)/2 scaling claim comes from the loop and the semantic addressing, not from the transport's efficiency.

At 200 clinics:

  • Without QIS: each clinic's radiology department operates in isolation. 200 independent practice curves. 200 separate calibration loops. Rare presentation seen once per decade per site.
  • With QIS over MQTT: 19,900 synthesis paths active. The 0.18% improvement in early TB detection from clinic A reaches clinic B within broker delivery latency. Clinic B's confirmation deposits back. The network's collective accuracy improves with every observation.

The MQTT broker carries 512-byte packets. The intelligence is quadratic. The transport cost is O(1) per publish — the same complexity as any message queue.

Christopher Thomas Trevethan's 39 provisional patents cover the architecture: the complete loop. Not the broker. Not the QoS level. Not the topic schema. The discovery is that closing this loop — posting pre-distilled insights to deterministic addresses and synthesizing locally — produces quadratic intelligence at logarithmic cost. MQTT is proof that the loop works at 2-byte-header constraints.


Deployment Notes for Constrained Environments

Self-hosted broker (air-gap or low-bandwidth)
Mosquitto runs on a Raspberry Pi 3 with 512 MB RAM and serves thousands of clients. Install:

sudo apt install mosquitto mosquitto-clients
Enter fullscreen mode Exit fullscreen mode

Enable MQTT 5.0 and retained messages in /etc/mosquitto/mosquitto.conf:

listener 1883
allow_anonymous true
retain_available true
message_size_limit 1024   # 1 KB max per packet — enforces QIS 512-byte discipline
Enter fullscreen mode Exit fullscreen mode

TLS for clinical/financial deployments

client.tls_set(ca_certs="ca.crt")
client.connect(broker_host, 8883)
Enter fullscreen mode Exit fullscreen mode

Battery-constrained nodes (ESP32 / microcontroller)
Use MQTT QoS 0 for high-frequency deposits. Enable clean_session=True to avoid broker buffering overhead. Publish one outcome packet per observation cycle — not per sensor reading. The distillation step is what keeps packets sparse.


The Series: Transport-Agnostic Proof

Part Transport Minimum Infrastructure Key QIS Property Demonstrated
1 ChromaDB Python process O(log N) HNSW semantic routing
2 Qdrant Docker container Distributed multi-node scaling
3 Plain REST API HTTP server No vector DB required
4 Redis Pub/Sub Redis instance Topic-based semantic fan-out
5 Kafka Kafka broker Durable replay, late-joiner synthesis
6 Apache Pulsar Pulsar cluster Geo-replication, multi-tenancy
8 NATS JetStream NATS server 16 MB binary, edge-native
9 SQLite Single file Air-gap, zero installation
10 MQTT 2-byte header Microcontroller, 8 KB RAM

The loop is the discovery. The transport is the variable.


Previous in series: Part 9 — SQLite: No Server, Any Device, Air-Gap Capable

For the full architectural specification: QIS: An Open Protocol | Academic Preprint

Top comments (0)