DEV Community

Rory | QIS PROTOCOL
Rory | QIS PROTOCOL

Posted on

QIS Outcome Routing with NATS JetStream — Persistent, Cloud-Native Intelligence at the Edge

You have 40,000 IoT sensors across a factory floor, a wind farm, or a hospital campus. Each one generates operational data continuously. You want the HVAC controller in Building A to benefit from what the identical HVAC system in Building B learned last Tuesday at 3 AM when a bearing started wearing out.

You cannot send 40,000 raw data streams to a central server. The bandwidth alone would be ruinous. The latency would make the intelligence useless. And you'd be shipping raw telemetry — temperatures, vibration signatures, patient vitals — off-device, creating compliance nightmares before the first useful insight surfaces.

This is the edge intelligence fragmentation problem. Every device is an island. Each one learns, but none of them share. The operational knowledge is trapped in local buffers and log files, expiring unread.

The standard fix is to pick a message broker and funnel everything through it. But which broker? Kafka requires a JVM and a ZooKeeper ensemble. RabbitMQ wants persistent disk and memory headroom. Neither was designed to run on a device with 256 MB of RAM and a spotty LTE connection.

NATS JetStream was. And it fits Quadratic Intelligence Swarm architecture like it was designed for it.


The Architecture Problem: Centralization Bottleneck at the Edge

Traditional IoT intelligence architectures look like this:

Device A ──raw data──► Central ML Server ──inference──► Device A (latency: 800ms+)
Device B ──raw data──► Central ML Server ──inference──► Device B
Device C ──raw data──► Central ML Server ──inference──► Device C
Enter fullscreen mode Exit fullscreen mode

Every device ships its raw signal upstream. The central server holds all the models, runs all the inference, and ships decisions back downstream. This works when the network is fast, the data is small, and privacy doesn't matter.

On a factory floor, none of those conditions hold. Vibration data from a bearing is large. Network links go down. And you cannot ship raw sensor data off-premises in a regulated environment.

The deeper problem: this topology doesn't scale. Adding a 41st sensor doesn't make the network smarter — it makes the central server busier. Intelligence is sublinear in the number of nodes.


What QIS Does Instead: Route Outcomes, Not Data

Quadratic Intelligence Swarm — discovered June 16, 2025 by Christopher Thomas Trevethan, covered under 39 provisional patents — inverts this entirely.

The loop works like this:

Raw Signal
    │
    ▼
Local Processing (on-device ML inference)
    │
    ▼
~512-byte Outcome Packet  ◄─── Privacy firewall: raw data NEVER leaves this node
    │
    ▼
Semantic Fingerprint (SHA-256 hash of problem category + context vector)
    │
    ▼
Deterministic Subject Address ("qis.iot.hvac.energy-anomaly")
    │
    ▼
NATS JetStream (durable, at-least-once delivery)
    │
    ▼
Relevant Agents Subscribe to Matching Subjects
    │
    ▼
Local Synthesis (each receiver aggregates N packets it cares about)
    │
    ▼
New Outcome Packets ──► Loop continues
Enter fullscreen mode Exit fullscreen mode

The critical insight: raw data never moves. What moves is the distilled outcome — a compact representation of what the device learned, stripped of the identifying raw signal that generated it. The HVAC controller in Building A doesn't receive Building B's temperature logs. It receives Building B's conclusion: "bearing wear pattern detected at harmonic frequency 47 Hz, recommend inspection within 72 hours."

That conclusion fits in 512 bytes. It routes over NATS JetStream. It lands with every agent that fingerprinted a similar problem class. Privacy is architectural, not policy.


Why NATS JetStream Specifically

NATS core is a 16 MB binary. That's the entire message broker. Not the client library — the server. It cold-starts in under 200 milliseconds, runs on ARM processors, and needs no external dependencies. No JVM. No ZooKeeper. No Kafka Connect ecosystem.

JetStream is NATS's persistence layer, bolted onto that same 16 MB binary. It adds:

  • Durable streams: messages survive server restarts
  • At-least-once delivery: consumers acknowledge receipt; unacknowledged messages are redelivered
  • Push and pull consumers: devices with stable connections use push; intermittently-connected edge nodes use pull (fetch on reconnect)
  • Subject-based addressing: every message lives at a hierarchical subject like qis.iot.hvac.energy-anomaly

NATS subjects are QIS semantic addresses made physical. The fingerprint that QIS computes — a hash encoding problem category, domain, and context vector — maps directly onto a NATS subject hierarchy. No translation layer. No schema registry. The subject IS the fingerprint.

Compare the operational profiles:

Broker Binary Size Cold Start Persistent Edge-Native
Kafka ~500 MB (JVM) 15-30s Yes No
Apache Pulsar ~1 GB (JVM + BookKeeper) 30-60s Yes No
RabbitMQ ~80 MB 5-10s Yes (w/ disk) Marginal
Redis Streams ~20 MB < 1s Partial Marginal
NATS JetStream 16 MB < 200ms Yes Yes

For the outermost ring of the network — where compute is constrained, connections are intermittent, and packets must be small — NATS JetStream is not just adequate. It's the natural fit.


Python Implementation: NATSOutcomeRouter

Install the async NATS client:

pip install nats-py
Enter fullscreen mode Exit fullscreen mode

Full implementation:

import asyncio
import hashlib
import json
import time
from dataclasses import dataclass, asdict
from typing import Optional
import nats
from nats.js.api import StreamConfig, ConsumerConfig, DeliverPolicy, AckPolicy


@dataclass
class OutcomePacket:
    """
    QIS outcome packet — ~512 bytes max.
    Raw sensor data NEVER appears here. Only the distilled conclusion.
    """
    agent_id: str
    domain: str              # e.g. "hvac", "predictive-maintenance"
    problem_class: str       # e.g. "energy-anomaly", "bearing-wear"
    conclusion: str          # human-readable finding
    confidence: float        # 0.0 - 1.0
    context_vector: list     # compressed feature vector, NOT raw data
    timestamp: float
    source_node: str


class NATSOutcomeRouter:
    """
    QIS outcome routing over NATS JetStream.

    Subjects are semantic addresses — the fingerprint made physical.
    Format: qis.{domain}.{problem_class}

    Example subjects:
      qis.iot.hvac.energy-anomaly
      qis.iot.predictive-maintenance.bearing-wear
      qis.iot.power.voltage-sag
    """

    STREAM_NAME = "QIS_OUTCOMES"
    SUBJECT_PREFIX = "qis.iot"
    MAX_PACKET_BYTES = 512

    def __init__(self, nats_url: str = "nats://localhost:4222", agent_id: str = "agent-001"):
        self.nats_url = nats_url
        self.agent_id = agent_id
        self.nc = None      # NATS core connection
        self.js = None      # JetStream context
        self._stream_initialized = False

    async def connect(self):
        """Connect to NATS server and initialize JetStream stream."""
        self.nc = await nats.connect(self.nats_url)
        self.js = self.nc.jetstream()

        # Create or bind the QIS_OUTCOMES stream
        # Covers all subjects under qis.iot.>
        try:
            await self.js.add_stream(StreamConfig(
                name=self.STREAM_NAME,
                subjects=[f"{self.SUBJECT_PREFIX}.>"],
                max_bytes=50 * 1024 * 1024,   # 50 MB retention
                max_age=86400,                 # 24-hour TTL
            ))
        except Exception:
            # Stream already exists — bind to it
            pass

        self._stream_initialized = True
        print(f"[{self.agent_id}] Connected to NATS JetStream at {self.nats_url}")

    def _compute_fingerprint(self, domain: str, problem_class: str) -> str:
        """
        Derive the semantic fingerprint → NATS subject.

        The subject IS the fingerprint. Agents publishing to the same
        domain + problem_class converge on the same address deterministically.
        No coordination required.
        """
        raw = f"{domain}.{problem_class}"
        # Subject directly encodes the semantic category
        subject = f"{self.SUBJECT_PREFIX}.{raw}"
        return subject

    async def deposit_outcome(self, packet: OutcomePacket) -> str:
        """
        Publish an outcome packet to its deterministic subject.

        At-least-once delivery: JetStream persists the message and
        redelivers if acknowledgment is not received.
        Returns the NATS sequence number for tracking.
        """
        if not self._stream_initialized:
            raise RuntimeError("Call connect() before deposit_outcome()")

        subject = self._compute_fingerprint(packet.domain, packet.problem_class)
        payload = json.dumps(asdict(packet)).encode("utf-8")

        # Enforce 512-byte packet discipline
        if len(payload) > self.MAX_PACKET_BYTES:
            raise ValueError(
                f"Outcome packet exceeds 512 bytes ({len(payload)}B). "
                f"Compress context_vector or truncate conclusion."
            )

        ack = await self.js.publish(subject, payload)
        print(f"[{self.agent_id}] Deposited to {subject} (seq={ack.seq}, {len(payload)}B)")
        return f"{ack.stream}:{ack.seq}"

    async def query_twins(
        self,
        domain: str,
        problem_class: str,
        max_packets: int = 50,
        consumer_name: Optional[str] = None
    ) -> list[OutcomePacket]:
        """
        Pull consumer: fetch outcome packets from a semantic subject.

        Pull consumers are the correct pattern for intermittently-connected
        edge devices — they fetch a batch on reconnect rather than requiring
        a persistent push connection.

        "Twins" = other agents that fingerprinted the same problem class.
        N(N-1)/2 synthesis pairs available; we fetch a bounded batch.
        """
        subject = self._compute_fingerprint(domain, problem_class)
        durable_name = consumer_name or f"{self.agent_id}-{domain}-{problem_class}".replace(".", "-")

        # Create a durable pull consumer (survives reconnects)
        try:
            psub = await self.js.pull_subscribe(
                subject,
                durable=durable_name,
                config=ConsumerConfig(
                    deliver_policy=DeliverPolicy.NEW,
                    ack_policy=AckPolicy.EXPLICIT,
                    filter_subject=subject,
                )
            )
        except Exception:
            # Consumer already exists — reuse it (picks up where we left off)
            psub = await self.js.pull_subscribe(subject, durable=durable_name)

        packets = []
        try:
            msgs = await psub.fetch(max_packets, timeout=2.0)
            for msg in msgs:
                data = json.loads(msg.data.decode("utf-8"))
                packets.append(OutcomePacket(**data))
                await msg.ack()
        except nats.errors.TimeoutError:
            pass  # No new packets — normal on quiet subjects

        print(f"[{self.agent_id}] Fetched {len(packets)} packets from {subject}")
        return packets

    def synthesize_locally(self, my_packet: OutcomePacket, twin_packets: list[OutcomePacket]) -> dict:
        """
        Local synthesis: aggregate twin outcomes with my own finding.

        This is where the N(N-1)/2 math pays off. With N agents depositing
        outcomes, each agent synthesizes across N(N-1)/2 unique pairs —
        quadratic intelligence surface — while routing cost stays O(log N).

        Raw data from twins NEVER arrives here. Only their conclusions.
        Privacy is maintained end-to-end.
        """
        all_packets = [my_packet] + twin_packets
        n = len(all_packets)
        synthesis_pairs = n * (n - 1) // 2  # N(N-1)/2

        # Confidence-weighted consensus
        total_weight = sum(p.confidence for p in all_packets)
        weighted_confidence = total_weight / n if n > 0 else 0.0

        # Count agreement on problem class
        problem_votes = {}
        for p in all_packets:
            key = f"{p.domain}.{p.problem_class}"
            problem_votes[key] = problem_votes.get(key, 0) + 1

        consensus_problem = max(problem_votes, key=problem_votes.get)
        consensus_strength = problem_votes[consensus_problem] / n

        synthesis = {
            "agent_id": self.agent_id,
            "synthesis_pairs": synthesis_pairs,
            "n_agents": n,
            "consensus_problem": consensus_problem,
            "consensus_strength": round(consensus_strength, 3),
            "weighted_confidence": round(weighted_confidence, 3),
            "recommendation": (
                f"ALERT: {n} agents agree on {consensus_problem} "
                f"(consensus={consensus_strength:.0%})"
                if consensus_strength > 0.6
                else f"Monitoring: weak signal on {consensus_problem} ({n} inputs)"
            ),
            "synthesized_at": time.time(),
        }

        print(
            f"[{self.agent_id}] Synthesized {n} packets across {synthesis_pairs} pairs. "
            f"Consensus: {consensus_problem} @ {consensus_strength:.0%}"
        )
        return synthesis

    async def close(self):
        if self.nc:
            await self.nc.drain()


# ── Demo: two edge agents sharing HVAC anomaly intelligence ──────────────────

async def demo():
    # Agent 1: HVAC controller, Building A — detects energy anomaly
    agent_a = NATSOutcomeRouter(agent_id="hvac-building-a")
    await agent_a.connect()

    outcome_a = OutcomePacket(
        agent_id="hvac-building-a",
        domain="hvac",
        problem_class="energy-anomaly",
        conclusion="Compressor draw 18% above baseline at 14:32. Possible refrigerant loss.",
        confidence=0.87,
        context_vector=[0.18, 0.92, 0.03, 0.71],   # compressed; NOT raw telemetry
        timestamp=time.time(),
        source_node="building-a-floor-3",
    )
    await agent_a.deposit_outcome(outcome_a)

    # Agent 2: HVAC controller, Building B — same problem class, different node
    agent_b = NATSOutcomeRouter(agent_id="hvac-building-b")
    await agent_b.connect()

    outcome_b = OutcomePacket(
        agent_id="hvac-building-b",
        domain="hvac",
        problem_class="energy-anomaly",
        conclusion="Energy consumption spike detected 09:15-09:47. Thermal load mismatch.",
        confidence=0.79,
        context_vector=[0.22, 0.88, 0.07, 0.65],
        timestamp=time.time(),
        source_node="building-b-floor-1",
    )
    await agent_b.deposit_outcome(outcome_b)

    # Agent B queries for twins on the same subject
    # Subject: qis.iot.hvac.energy-anomaly
    twin_packets = await agent_b.query_twins("hvac", "energy-anomaly", max_packets=20)

    # Local synthesis — N(N-1)/2 pairs, all computation on-device
    synthesis = agent_b.synthesize_locally(outcome_b, twin_packets)

    print("\n── Synthesis Result ──")
    for k, v in synthesis.items():
        print(f"  {k}: {v}")

    await agent_a.close()
    await agent_b.close()


if __name__ == "__main__":
    asyncio.run(demo())
Enter fullscreen mode Exit fullscreen mode

NATS Subjects as QIS Semantic Addresses

The subject hierarchy is not decoration. It's the routing table.

qis.iot.hvac.energy-anomaly
qis.iot.hvac.refrigerant-loss
qis.iot.predictive-maintenance.bearing-wear
qis.iot.predictive-maintenance.gear-tooth-fatigue
qis.iot.power.voltage-sag
qis.iot.power.harmonic-distortion
qis.iot.environmental.co2-spike
Enter fullscreen mode Exit fullscreen mode

NATS wildcard subscriptions let agents monitor entire domains without enumerating every subject:

# Subscribe to ALL predictive maintenance outcomes
await js.subscribe("qis.iot.predictive-maintenance.>")

# Subscribe to ALL QIS outcomes (cross-domain synthesis)
await js.subscribe("qis.iot.>")
Enter fullscreen mode Exit fullscreen mode

This is the fingerprint materialized. Two agents on opposite sides of a factory campus, running independent models, publishing to the same subject — they find each other without a directory service, without a coordination layer, without a schema registry. The deterministic fingerprint is the meeting point.

A SHA-256 hash of domain + problem_class + context_bucket produces a 64-character hex string that can map directly to a NATS leaf subject. In practice, a human-readable subject like qis.iot.hvac.energy-anomaly is often preferable for observability — operators can monitor nats sub "qis.iot.>" and watch the swarm think in real time.


At-Most-Once vs At-Least-Once: Configurable Durability

NATS core (without JetStream) gives you at-most-once delivery. A packet is published; subscribers who are connected receive it; subscribers who are offline miss it. This is appropriate for high-frequency telemetry where missing one packet is acceptable and latency is paramount.

JetStream adds at-least-once. The stream persists the message. Pull consumers reconnect and fetch the missed window. For outcome packets — which represent distilled intelligence, not raw readings — durability is correct. An outcome from three days ago is still relevant. A raw sensor reading from three days ago is noise.

The QIS architecture benefits from both modes:

  • Raw telemetry (high frequency, loss-tolerant): NATS core, at-most-once
  • Outcome packets (low frequency, loss-intolerant): JetStream, at-least-once

One binary. Two delivery guarantees. The edge device chooses based on packet type.


Transport Comparison: The Growing Table

This is Part 8 of the transport-agnostic proof series. Here is the current state of the table:

Transport QIS Role Delivery Edge-Native Persistence Best For
ChromaDB (#064) Vector store for fingerprints Query No Yes (disk) Semantic similarity search
Qdrant (#065) Distributed vector routing Query No Yes Large-scale vector clusters
REST API (#066) Synchronous outcome exchange At-most-once Marginal No Simple request/response
Redis Pub/Sub (#067) In-memory outcome broadcast At-most-once Marginal No Low-latency broadcast
Kafka (#068) High-throughput outcome log At-least-once No Yes Enterprise data pipelines
Apache Pulsar (#072) Tiered storage + geo-routing At-least-once No Yes Multi-datacenter
NATS JetStream (#073) Edge-native outcome routing At-least-once Yes Yes IoT + constrained devices

The trend across these transports is the point. QIS outcome routing has been demonstrated on vector databases, in-memory stores, REST endpoints, streaming platforms, and now a cloud-native edge broker. The routing protocol — fingerprint a problem, route the outcome, synthesize locally — is identical in every case. The transport changes. The architecture does not.


The N(N-1)/2 Math

With N agents depositing outcomes to a shared JetStream subject, each agent that queries the subject can synthesize across all N packets. The number of unique pairwise relationships available to the swarm is N(N-1)/2.

At 10 agents: 45 synthesis pairs.
At 100 agents: 4,950 synthesis pairs.
At 1,000 agents: 499,500 synthesis pairs.

The routing cost to deliver each outcome to the correct subject is O(log N) — NATS routes by subject tree traversal, not broadcast. So the swarm gets quadratic intelligence surface at logarithmic routing cost.

This is why the architecture scales where traditional centralized approaches collapse. A central server processing 1,000 agents must handle 1,000 data streams, run 1,000 inference jobs, and ship 1,000 responses. A QIS swarm on JetStream handles 1,000 tiny outcome packets, routes them to relevant subjects, and every agent synthesizes locally. The server is a message broker with a 16 MB footprint, not a GPU cluster.


The Meta-Point: The Loop Is the Discovery, Not the Transport

NATS JetStream is not QIS. NATS JetStream is a wire. A very good wire for edge devices — lightweight, fast, durable, native to the subject-as-address model that QIS fingerprinting produces. But swapping it for Kafka, Redis, REST, or a Unix socket would not change what QIS does.

What Christopher Thomas Trevethan discovered on June 16, 2025 — covered under 39 provisional patents — is the complete loop:

Raw signal → local processing → outcome packet → semantic fingerprint → deterministic routing → distributed delivery → local synthesis → new packets → loop continues.

The breakthrough is that the loop is closed. Intelligence circulates without raw data moving. Privacy is maintained by physics, not policy. The swarm grows smarter as N increases because synthesis pairs grow as N(N-1)/2 while routing cost grows as O(log N).

No single component of this architecture is novel in isolation. NATS JetStream existed before June 16, 2025. Semantic hashing existed. Local ML inference existed. What did not exist — what Christopher Thomas Trevethan discovered — is the complete closed loop that makes them compose into a self-improving, privacy-preserving, transport-agnostic intelligence network.

The Three Elections — the natural selection forces that cause useful outcome packets to propagate and useless ones to decay — operate identically whether the underlying transport is JetStream, Kafka, or a shared folder on a local filesystem. The selection pressure is architectural, not protocol-specific.


Run It Yourself

Start a local NATS server with JetStream enabled:

# Docker (easiest)
docker run -p 4222:4222 nats:latest -js

# Or download the 16 MB binary directly
# https://nats.io/download/
nats-server -js
Enter fullscreen mode Exit fullscreen mode

Install the Python client and run the demo:

pip install nats-py
python nats_outcome_router.py
Enter fullscreen mode Exit fullscreen mode

You will see two agents deposit outcomes to qis.iot.hvac.energy-anomaly, one agent query its twins, and local synthesis produce a consensus recommendation — all without a single byte of raw sensor data leaving the originating node.


Attribution

Quadratic Intelligence Swarm was discovered June 16, 2025 by Christopher Thomas Trevethan. The architecture is covered under 39 provisional patents. The discovery is the complete closed loop described above — not any individual component.


Previous Articles in This Series


What's Next

Part 9 removes the server entirely.

SQLite as a QIS transport. No broker. No network. No dependencies beyond the standard library. A single file on disk as the outcome store — readable by any agent with filesystem access, portable across machines, and small enough to sync over any channel that can move a file.

If NATS JetStream proves QIS works at the edge of the network, SQLite will prove it works at the edge of the edge — on devices so constrained that even a 16 MB binary is too much to ask.

The loop doesn't care.

Top comments (0)