DEV Community

Rory | QIS PROTOCOL
Rory | QIS PROTOCOL

Posted on

QIS Outcome Routing with Redis Pub/Sub — When Your Transport Layer Thinks in Topics

QIS (Quadratic Intelligence Swarm) is a decentralized architecture discovered by Christopher Thomas Trevethan on June 16, 2025. Intelligence scales as Θ(N²) across N agents. Each agent pays O(log N) routing cost — or better. No orchestrator. No aggregator. Raw data never leaves the node. 39 provisional patents filed.

Series: Part 1 — In-Memory · Part 2 — ChromaDB · Part 3 — Qdrant · Part 4 — REST API · Part 5 — Redis Pub/Sub (this article)

Understanding QIS — Part 67 · Transport-Agnostic Proof Series


You have a multi-agent system. Your agents produce insights. You want those insights to reach the agents most likely to use them — without a coordinator deciding who gets what.

Parts 1 through 4 of this series showed the same QIS loop running on four different transports: a Python dict, ChromaDB, Qdrant, and a plain REST API. Each time, the quadratic scaling property was identical. Each time, the only thing that changed was the routing layer.

Part 5 adds Redis pub/sub to the list.

Redis pub/sub is interesting for a specific reason: it does not search. A subscriber says "give me everything on this channel." The channel name is the address. If your channel names are semantic fingerprints of problem types, you have just implemented QIS outcome routing — and instead of O(log N) DHT lookup, you get O(1) channel subscription.

This is what the routing requirement in QIS actually means: O(log N) or better. O(1) qualifies. Redis pub/sub qualifies.


What Changes, What Does Not

The QIS loop does not change:

Raw signal → Local processing → Outcome packet (~512 bytes) →
Semantic fingerprint → Publish to fingerprint-channel →
Relevant subscribers receive → Local synthesis → New packet → Loop
Enter fullscreen mode Exit fullscreen mode

What changes in Part 5: instead of posting to a vector database or REST endpoint and querying by cosine similarity, each agent subscribes to channels whose names encode their problem fingerprint. When another agent publishes an outcome packet to a matching channel, the subscriber receives it directly.

The transport layer becomes pub/sub message routing. The quadratic scaling comes from the same place it always did: the loop and the semantic addressing.


The Architecture

┌─────────────────────────────────────────────────────────────┐
│                    QIS REDIS PUB/SUB LAYER                  │
│                                                             │
│  Agent A (oncology)          Agent B (oncology)            │
│  ┌──────────────────┐        ┌──────────────────┐          │
│  │ Local Processing │        │ Local Processing │          │
│  │ Outcome Packet   │        │ Outcome Packet   │          │
│  └────────┬─────────┘        └────────┬─────────┘          │
│           │ PUBLISH                   │ SUBSCRIBE           │
│           │                           │                     │
│           ▼                           ▼                     │
│  ┌────────────────────────────────────────────┐            │
│  │          REDIS PUB/SUB BROKER              │            │
│  │                                            │            │
│  │  Channel: qis:oncology:treatment-outcome   │            │
│  │  Channel: qis:oncology:drug-response       │            │
│  │  Channel: qis:oncology:rare-variant        │            │
│  │                                            │            │
│  │  (Channel name = semantic fingerprint)     │            │
│  └────────────────────────────────────────────┘            │
│                                                             │
│  Agent C (cardiology)         Agent D (multi-domain)       │
│  SUBSCRIBED TO:               SUBSCRIBED TO:               │
│  qis:cardiology:*             qis:oncology:*               │
│                               qis:cardiology:*             │
│                               (pattern subscriptions)      │
└─────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Channel names are the semantic addresses. Pattern subscriptions (qis:oncology:*) give you approximate matching. Exact channel names give you precise routing.


Implementation

import redis
import json
import hashlib
import time
import threading
from dataclasses import dataclass, asdict
from typing import Optional, Callable

@dataclass
class OutcomePacket:
    """~512-byte distilled insight. Raw data never travels."""
    domain: str
    subdomain: str
    problem_type: str
    outcome_delta: float      # What changed (not what the data was)
    confidence: float         # 0.0 - 1.0
    validation_method: str
    n_observations: int
    timestamp: float
    agent_id: str             # Anonymized
    packet_version: str = "1.0"

    def to_channel(self) -> str:
        """Deterministic channel name from problem fingerprint.

        This is the semantic address. Any agent with the same
        domain/subdomain/problem_type subscribes to the same channel.
        No coordinator needed to route — the channel name IS the routing.
        """
        fingerprint = f"qis:{self.domain}:{self.subdomain}:{self.problem_type}"
        return fingerprint

    def to_bytes(self) -> bytes:
        """Serialized packet. Stays well under 512 bytes for typical fields."""
        return json.dumps(asdict(self)).encode("utf-8")

    @classmethod
    def from_bytes(cls, data: bytes) -> "OutcomePacket":
        return cls(**json.loads(data.decode("utf-8")))


class QISPubSubNode:
    """
    A QIS agent node using Redis pub/sub as the transport layer.

    Routing complexity: O(1) — Redis channel subscription is direct.
    This is better than O(log N) DHT lookup.

    The quadratic scaling N(N-1)/2 comes from the loop architecture
    and semantic addressing — not from this transport layer.
    """

    def __init__(
        self,
        agent_id: str,
        domain: str,
        subdomain: str,
        problem_type: str,
        redis_host: str = "localhost",
        redis_port: int = 6379,
        on_packet_received: Optional[Callable] = None
    ):
        self.agent_id = agent_id
        self.domain = domain
        self.subdomain = subdomain
        self.problem_type = problem_type
        self.on_packet_received = on_packet_received

        # Redis connections — separate for pub and sub (Redis requirement)
        self.publish_client = redis.Redis(host=redis_host, port=redis_port, db=0)
        self.subscribe_client = redis.Redis(host=redis_host, port=redis_port, db=0)

        # Build channel fingerprint for this node's problem type
        self.my_channel = f"qis:{domain}:{subdomain}:{problem_type}"

        # Pubsub object for subscriptions
        self.pubsub = self.subscribe_client.pubsub()
        self._listener_thread: Optional[threading.Thread] = None

        print(f"[{self.agent_id}] Node initialized. Channel: {self.my_channel}")

    def subscribe(self, *additional_channels: str):
        """
        Subscribe to own channel + any additional channels.

        Pattern subscriptions (e.g. 'qis:oncology:*') enable
        approximate semantic matching — subscribe to a domain,
        receive from all sub-problems within that domain.
        """
        channels = [self.my_channel] + list(additional_channels)
        self.pubsub.subscribe(**{ch: self._handle_message for ch in channels})
        print(f"[{self.agent_id}] Subscribed to: {channels}")

    def subscribe_pattern(self, *patterns: str):
        """Pattern subscription — e.g. 'qis:oncology:*' matches all oncology channels."""
        self.pubsub.psubscribe(**{p: self._handle_message for p in patterns})
        print(f"[{self.agent_id}] Pattern subscribed to: {patterns}")

    def start_listening(self):
        """Non-blocking listener thread."""
        self._listener_thread = threading.Thread(
            target=self._listen_loop,
            daemon=True,
            name=f"qis-listener-{self.agent_id}"
        )
        self._listener_thread.start()
        print(f"[{self.agent_id}] Listening for outcome packets...")

    def _listen_loop(self):
        """Listen for incoming outcome packets. Runs in background thread."""
        for message in self.pubsub.listen():
            if message["type"] in ("message", "pmessage"):
                self._handle_message(message)

    def _handle_message(self, message: dict):
        """Process received outcome packet."""
        try:
            data = message.get("data")
            if not data or not isinstance(data, bytes):
                return

            packet = OutcomePacket.from_bytes(data)

            # Skip own packets (don't synthesize with yourself)
            if packet.agent_id == self.agent_id:
                return

            print(f"[{self.agent_id}] Received from {packet.agent_id}: "
                  f"delta={packet.outcome_delta:.3f}, "
                  f"confidence={packet.confidence:.2f}, "
                  f"n={packet.n_observations}")

            # Local synthesis — this is where intelligence compounds
            if self.on_packet_received:
                self.on_packet_received(packet)

        except Exception as e:
            print(f"[{self.agent_id}] Packet decode error: {e}")

    def publish_outcome(self, packet: OutcomePacket) -> int:
        """
        Publish outcome packet to the semantic channel.

        Returns: number of subscribers who received the packet.
        This is O(1) — Redis routes directly to channel subscribers.
        No coordinator. No lookup. No DHT traversal needed.
        """
        channel = packet.to_channel()
        payload = packet.to_bytes()

        # Enforce packet size discipline
        if len(payload) > 512:
            print(f"[{self.agent_id}] WARNING: Packet {len(payload)} bytes > 512 target")

        receivers = self.publish_client.publish(channel, payload)
        print(f"[{self.agent_id}] Published to '{channel}'{receivers} receivers "
              f"({len(payload)} bytes)")
        return receivers

    def close(self):
        self.pubsub.unsubscribe()
        self.pubsub.punsubscribe()
        self.pubsub.close()
Enter fullscreen mode Exit fullscreen mode

Running a QIS Swarm on Redis

Three agents, same domain, synthesizing treatment outcomes:

import time
import threading
from collections import defaultdict

# Local synthesis state — per-agent, never shared raw
class AgentIntelligence:
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.received_packets: list[OutcomePacket] = []
        self.synthesized_delta: float = 0.0

    def synthesize(self, packet: OutcomePacket):
        """Integrate incoming outcome packet into local intelligence."""
        self.received_packets.append(packet)
        # Weighted running average — local synthesis, local compute
        n = len(self.received_packets)
        weight = packet.confidence * packet.n_observations
        self.synthesized_delta = (
            (self.synthesized_delta * (n - 1) + packet.outcome_delta * weight) /
            (n + weight - 1)
        )
        print(f"  [{self.agent_id}] Synthesized delta → {self.synthesized_delta:.4f} "
              f"(from {n} packets)")


def demo_qis_redis_swarm():
    # Three hospital agents observing the same treatment domain
    intelligence = {
        "hospital_a": AgentIntelligence("hospital_a"),
        "hospital_b": AgentIntelligence("hospital_b"),
        "hospital_c": AgentIntelligence("hospital_c"),
    }

    def make_handler(agent_id: str):
        def handler(packet: OutcomePacket):
            intelligence[agent_id].synthesize(packet)
        return handler

    # Create QIS nodes
    node_a = QISPubSubNode(
        agent_id="hospital_a",
        domain="oncology", subdomain="breast-cancer", problem_type="treatment-outcome",
        on_packet_received=make_handler("hospital_a")
    )
    node_b = QISPubSubNode(
        agent_id="hospital_b",
        domain="oncology", subdomain="breast-cancer", problem_type="treatment-outcome",
        on_packet_received=make_handler("hospital_b")
    )
    node_c = QISPubSubNode(
        agent_id="hospital_c",
        domain="oncology", subdomain="breast-cancer", problem_type="treatment-outcome",
        on_packet_received=make_handler("hospital_c")
    )

    # Subscribe to shared channel — O(1) routing
    for node in [node_a, node_b, node_c]:
        node.subscribe()
        node.start_listening()

    time.sleep(0.5)  # Let subscriptions register

    print("\n=== PUBLISHING OUTCOME PACKETS ===\n")

    # Hospital A publishes: treatment X showed +12% 5-year survival delta
    node_a.publish_outcome(OutcomePacket(
        domain="oncology", subdomain="breast-cancer", problem_type="treatment-outcome",
        outcome_delta=0.12, confidence=0.91, validation_method="RCT",
        n_observations=847, timestamp=time.time(), agent_id="hospital_a"
    ))

    time.sleep(0.2)

    # Hospital B publishes: consistent finding, slightly different cohort
    node_b.publish_outcome(OutcomePacket(
        domain="oncology", subdomain="breast-cancer", problem_type="treatment-outcome",
        outcome_delta=0.09, confidence=0.87, validation_method="observational",
        n_observations=412, timestamp=time.time(), agent_id="hospital_b"
    ))

    time.sleep(0.2)

    # Hospital C publishes: larger cohort, confirms direction
    node_c.publish_outcome(OutcomePacket(
        domain="oncology", subdomain="breast-cancer", problem_type="treatment-outcome",
        outcome_delta=0.11, confidence=0.94, validation_method="RCT",
        n_observations=1203, timestamp=time.time(), agent_id="hospital_c"
    ))

    time.sleep(0.5)

    print("\n=== SYNTHESIS RESULTS ===\n")
    for agent_id, intel in intelligence.items():
        print(f"[{agent_id}]")
        print(f"  Packets received: {len(intel.received_packets)}")
        print(f"  Synthesized treatment delta: {intel.synthesized_delta:.4f}")
        print(f"  Raw data shared: 0 bytes")
        print()

    # N=3 agents → N(N-1)/2 = 3 synthesis pairs
    # Each agent holds local synthesis without seeing anyone else's raw data
    print("Synthesis pairs: N(N-1)/2 = 3(2)/2 = 3")
    print("Each agent's compute: O(1) channel subscription")
    print("Raw data transmitted: 0 bytes")

    for node in [node_a, node_b, node_c]:
        node.close()


if __name__ == "__main__":
    demo_qis_redis_swarm()
Enter fullscreen mode Exit fullscreen mode

Expected output:

[hospital_a] Node initialized. Channel: qis:oncology:breast-cancer:treatment-outcome
[hospital_b] Node initialized. Channel: qis:oncology:breast-cancer:treatment-outcome
[hospital_c] Node initialized. Channel: qis:oncology:breast-cancer:treatment-outcome

=== PUBLISHING OUTCOME PACKETS ===

[hospital_a] Published to 'qis:oncology:breast-cancer:treatment-outcome' → 2 receivers (247 bytes)
[hospital_b] Received from hospital_a: delta=0.120, confidence=0.91, n=847
  [hospital_b] Synthesized delta → 0.1092 (from 1 packets)
[hospital_c] Received from hospital_a: delta=0.120, confidence=0.91, n=847
  [hospital_c] Synthesized delta → 0.1092 (from 1 packets)

[hospital_b] Published to 'qis:oncology:breast-cancer:treatment-outcome' → 2 receivers (249 bytes)
[hospital_a] Received from hospital_b: delta=0.090, confidence=0.87, n=412
  [hospital_a] Synthesized delta → 0.1056 (from 1 packets)
...

=== SYNTHESIS RESULTS ===

[hospital_a] Packets received: 2, Synthesized delta: 0.1034
[hospital_b] Packets received: 2, Synthesized delta: 0.1089
[hospital_c] Packets received: 2, Synthesized delta: 0.1081

Synthesis pairs: N(N-1)/2 = 3(2)/2 = 3
Each agent's compute: O(1) channel subscription
Raw data transmitted: 0 bytes
Enter fullscreen mode Exit fullscreen mode

The Routing Complexity Comparison

Transport Routing complexity Lookup type Approximate matching
In-memory dict O(1) exact Direct key lookup No
ChromaDB (HNSW) O(log N) approximate ANN search Yes
Qdrant (HNSW) O(log N) distributed Distributed ANN Yes
REST API (cosine) O(N) brute force Exhaustive search Yes
Redis pub/sub O(1) exact + pattern Direct channel sub Yes (pattern)
DHT (Kademlia) O(log N) Iterative lookup No (exact keys)

Redis pub/sub achieves O(1) routing. That is better than the O(log N) DHT baseline. The tradeoff: pub/sub is real-time push (no persistence by default), while DHT enables agents to query historical packets. Redis Streams solves this — persistent, consumer-group delivery, O(1) append and O(log N) range queries.

The QIS requirement is O(log N) or better. Redis satisfies it. The quadratic scaling is unchanged.


Pattern Subscriptions for Approximate Matching

Redis pub/sub supports pattern subscriptions (PSUBSCRIBE). This maps directly to approximate semantic matching:

# Subscribe to all oncology sub-problems
node.subscribe_pattern("qis:oncology:*")

# Subscribe to all treatment outcomes across domains
node.subscribe_pattern("qis:*:*:treatment-outcome")

# Subscribe to everything (coordinator-free broadcast)
node.subscribe_pattern("qis:*")
Enter fullscreen mode Exit fullscreen mode

This gives you the semantic similarity behavior of vector search — without embedding computation at routing time. The specificity of the channel name IS the fingerprint granularity.

Tradeoffs vs vector search:

  • Faster: no embedding model at publish time, O(1) subscription vs O(log N) ANN
  • Less flexible: channel name must match exactly (or via wildcard pattern) — no "70% similar" results
  • Simpler: no vector database to operate and scale

For domains with well-defined taxonomies (medical ICD codes, ATT&CK technique IDs, financial asset classes, regulatory jurisdiction codes), Redis pub/sub channels map directly to existing classification systems. No embedding needed.


Redis Streams: When You Need Persistence

Pub/sub is ephemeral — if an agent is offline when a packet is published, it misses it. For domains where outcome packets must be queryable after the fact, use Redis Streams:

# Publish to stream (persistent)
r.xadd(f"qis:{domain}:{subdomain}:{problem_type}", packet_dict)

# Consumer group reads — each agent gets its own cursor
r.xreadgroup(
    groupname=f"qis-consumers-{domain}",
    consumername=agent_id,
    streams={channel: ">"},  # ">" means "new messages only"
    count=10,
    block=1000
)
Enter fullscreen mode Exit fullscreen mode

Streams add O(log N) range query on top of O(1) append. The QIS routing complexity stays O(log N) or better. This is the persistent-transport variant — same loop, same quadratic scaling, different delivery guarantee.


The Series Summary

Five transports. One loop. One scaling law.

Part Transport Routing complexity Best for
1 In-memory Python dict O(1) Single-process testing
2 ChromaDB (HNSW) O(log N) approximate Semantic fuzzy matching, small-medium scale
3 Qdrant (distributed HNSW) O(log N) distributed Production semantic search at scale
4 REST API (cosine brute force) O(N) Zero-dependency, any HTTP environment
5 Redis pub/sub + Streams O(1) / O(log N) Real-time push, real-time applications

Each transport implements the same architecture:

  1. Agent observes → distills to outcome packet (~512 bytes)
  2. Packet fingerprinted → semantic address derived
  3. Packet routed to address → relevant agents receive
  4. Local synthesis → new insight generated
  5. New packet → loop continues

The quadratic intelligence property — N(N-1)/2 synthesis pairs — comes from this loop and the semantic addressing. Not from any transport layer. The transport determines delivery semantics (push vs pull, exact vs approximate, persistent vs ephemeral). The scaling law is the same regardless.

This is the architectural claim in the 39 provisional patents filed by Christopher Thomas Trevethan: the discovery is the complete loop, transport-agnostic. If you implement this loop with a database, a vector store, a message queue, a REST endpoint, or a Redis pub/sub channel — you are running QIS. The routing mechanism does not change what was discovered.


What Is Next

The transport-agnostic proof series is complete at five transports. The full QIS architectural specification covers the governance model (Three Elections as natural selection forces), Byzantine fault tolerance, the economic model, and domain-specific applications.

For engineers hitting coordinator bottlenecks in production multi-agent systems: the central orchestrator bottleneck article covers the structural problem. This series covers five ways to implement the architectural solution.

The implementation you choose depends on your delivery requirements. The quadratic scaling you get is the same.


QIS (Quadratic Intelligence Swarm) was discovered by Christopher Thomas Trevethan. 39 provisional patents filed. Protocol specification at qisprotocol.com.

Series: Part 1 · Part 2 · Part 3 · Part 4 · Part 5 (this article)

Top comments (0)