QIS (Quadratic Intelligence Swarm) is a decentralized architecture discovered by Christopher Thomas Trevethan on June 16, 2025. Intelligence scales as Θ(N²) across N agents. Each agent pays O(log N) routing cost — or better. No orchestrator. No aggregator. Raw data never leaves the node. 39 provisional patents filed.
Series: Part 1 — In-Memory · Part 2 — ChromaDB · Part 3 — Qdrant · Part 4 — REST API · Part 5 — Redis Pub/Sub (this article)
Understanding QIS — Part 67 · Transport-Agnostic Proof Series
You have a multi-agent system. Your agents produce insights. You want those insights to reach the agents most likely to use them — without a coordinator deciding who gets what.
Parts 1 through 4 of this series showed the same QIS loop running on four different transports: a Python dict, ChromaDB, Qdrant, and a plain REST API. Each time, the quadratic scaling property was identical. Each time, the only thing that changed was the routing layer.
Part 5 adds Redis pub/sub to the list.
Redis pub/sub is interesting for a specific reason: it does not search. A subscriber says "give me everything on this channel." The channel name is the address. If your channel names are semantic fingerprints of problem types, you have just implemented QIS outcome routing — and instead of O(log N) DHT lookup, you get O(1) channel subscription.
This is what the routing requirement in QIS actually means: O(log N) or better. O(1) qualifies. Redis pub/sub qualifies.
What Changes, What Does Not
The QIS loop does not change:
Raw signal → Local processing → Outcome packet (~512 bytes) →
Semantic fingerprint → Publish to fingerprint-channel →
Relevant subscribers receive → Local synthesis → New packet → Loop
What changes in Part 5: instead of posting to a vector database or REST endpoint and querying by cosine similarity, each agent subscribes to channels whose names encode their problem fingerprint. When another agent publishes an outcome packet to a matching channel, the subscriber receives it directly.
The transport layer becomes pub/sub message routing. The quadratic scaling comes from the same place it always did: the loop and the semantic addressing.
The Architecture
┌─────────────────────────────────────────────────────────────┐
│ QIS REDIS PUB/SUB LAYER │
│ │
│ Agent A (oncology) Agent B (oncology) │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Local Processing │ │ Local Processing │ │
│ │ Outcome Packet │ │ Outcome Packet │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
│ │ PUBLISH │ SUBSCRIBE │
│ │ │ │
│ ▼ ▼ │
│ ┌────────────────────────────────────────────┐ │
│ │ REDIS PUB/SUB BROKER │ │
│ │ │ │
│ │ Channel: qis:oncology:treatment-outcome │ │
│ │ Channel: qis:oncology:drug-response │ │
│ │ Channel: qis:oncology:rare-variant │ │
│ │ │ │
│ │ (Channel name = semantic fingerprint) │ │
│ └────────────────────────────────────────────┘ │
│ │
│ Agent C (cardiology) Agent D (multi-domain) │
│ SUBSCRIBED TO: SUBSCRIBED TO: │
│ qis:cardiology:* qis:oncology:* │
│ qis:cardiology:* │
│ (pattern subscriptions) │
└─────────────────────────────────────────────────────────────┘
Channel names are the semantic addresses. Pattern subscriptions (qis:oncology:*) give you approximate matching. Exact channel names give you precise routing.
Implementation
import redis
import json
import hashlib
import time
import threading
from dataclasses import dataclass, asdict
from typing import Optional, Callable
@dataclass
class OutcomePacket:
"""~512-byte distilled insight. Raw data never travels."""
domain: str
subdomain: str
problem_type: str
outcome_delta: float # What changed (not what the data was)
confidence: float # 0.0 - 1.0
validation_method: str
n_observations: int
timestamp: float
agent_id: str # Anonymized
packet_version: str = "1.0"
def to_channel(self) -> str:
"""Deterministic channel name from problem fingerprint.
This is the semantic address. Any agent with the same
domain/subdomain/problem_type subscribes to the same channel.
No coordinator needed to route — the channel name IS the routing.
"""
fingerprint = f"qis:{self.domain}:{self.subdomain}:{self.problem_type}"
return fingerprint
def to_bytes(self) -> bytes:
"""Serialized packet. Stays well under 512 bytes for typical fields."""
return json.dumps(asdict(self)).encode("utf-8")
@classmethod
def from_bytes(cls, data: bytes) -> "OutcomePacket":
return cls(**json.loads(data.decode("utf-8")))
class QISPubSubNode:
"""
A QIS agent node using Redis pub/sub as the transport layer.
Routing complexity: O(1) — Redis channel subscription is direct.
This is better than O(log N) DHT lookup.
The quadratic scaling N(N-1)/2 comes from the loop architecture
and semantic addressing — not from this transport layer.
"""
def __init__(
self,
agent_id: str,
domain: str,
subdomain: str,
problem_type: str,
redis_host: str = "localhost",
redis_port: int = 6379,
on_packet_received: Optional[Callable] = None
):
self.agent_id = agent_id
self.domain = domain
self.subdomain = subdomain
self.problem_type = problem_type
self.on_packet_received = on_packet_received
# Redis connections — separate for pub and sub (Redis requirement)
self.publish_client = redis.Redis(host=redis_host, port=redis_port, db=0)
self.subscribe_client = redis.Redis(host=redis_host, port=redis_port, db=0)
# Build channel fingerprint for this node's problem type
self.my_channel = f"qis:{domain}:{subdomain}:{problem_type}"
# Pubsub object for subscriptions
self.pubsub = self.subscribe_client.pubsub()
self._listener_thread: Optional[threading.Thread] = None
print(f"[{self.agent_id}] Node initialized. Channel: {self.my_channel}")
def subscribe(self, *additional_channels: str):
"""
Subscribe to own channel + any additional channels.
Pattern subscriptions (e.g. 'qis:oncology:*') enable
approximate semantic matching — subscribe to a domain,
receive from all sub-problems within that domain.
"""
channels = [self.my_channel] + list(additional_channels)
self.pubsub.subscribe(**{ch: self._handle_message for ch in channels})
print(f"[{self.agent_id}] Subscribed to: {channels}")
def subscribe_pattern(self, *patterns: str):
"""Pattern subscription — e.g. 'qis:oncology:*' matches all oncology channels."""
self.pubsub.psubscribe(**{p: self._handle_message for p in patterns})
print(f"[{self.agent_id}] Pattern subscribed to: {patterns}")
def start_listening(self):
"""Non-blocking listener thread."""
self._listener_thread = threading.Thread(
target=self._listen_loop,
daemon=True,
name=f"qis-listener-{self.agent_id}"
)
self._listener_thread.start()
print(f"[{self.agent_id}] Listening for outcome packets...")
def _listen_loop(self):
"""Listen for incoming outcome packets. Runs in background thread."""
for message in self.pubsub.listen():
if message["type"] in ("message", "pmessage"):
self._handle_message(message)
def _handle_message(self, message: dict):
"""Process received outcome packet."""
try:
data = message.get("data")
if not data or not isinstance(data, bytes):
return
packet = OutcomePacket.from_bytes(data)
# Skip own packets (don't synthesize with yourself)
if packet.agent_id == self.agent_id:
return
print(f"[{self.agent_id}] Received from {packet.agent_id}: "
f"delta={packet.outcome_delta:.3f}, "
f"confidence={packet.confidence:.2f}, "
f"n={packet.n_observations}")
# Local synthesis — this is where intelligence compounds
if self.on_packet_received:
self.on_packet_received(packet)
except Exception as e:
print(f"[{self.agent_id}] Packet decode error: {e}")
def publish_outcome(self, packet: OutcomePacket) -> int:
"""
Publish outcome packet to the semantic channel.
Returns: number of subscribers who received the packet.
This is O(1) — Redis routes directly to channel subscribers.
No coordinator. No lookup. No DHT traversal needed.
"""
channel = packet.to_channel()
payload = packet.to_bytes()
# Enforce packet size discipline
if len(payload) > 512:
print(f"[{self.agent_id}] WARNING: Packet {len(payload)} bytes > 512 target")
receivers = self.publish_client.publish(channel, payload)
print(f"[{self.agent_id}] Published to '{channel}' → {receivers} receivers "
f"({len(payload)} bytes)")
return receivers
def close(self):
self.pubsub.unsubscribe()
self.pubsub.punsubscribe()
self.pubsub.close()
Running a QIS Swarm on Redis
Three agents, same domain, synthesizing treatment outcomes:
import time
import threading
from collections import defaultdict
# Local synthesis state — per-agent, never shared raw
class AgentIntelligence:
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.received_packets: list[OutcomePacket] = []
self.synthesized_delta: float = 0.0
def synthesize(self, packet: OutcomePacket):
"""Integrate incoming outcome packet into local intelligence."""
self.received_packets.append(packet)
# Weighted running average — local synthesis, local compute
n = len(self.received_packets)
weight = packet.confidence * packet.n_observations
self.synthesized_delta = (
(self.synthesized_delta * (n - 1) + packet.outcome_delta * weight) /
(n + weight - 1)
)
print(f" [{self.agent_id}] Synthesized delta → {self.synthesized_delta:.4f} "
f"(from {n} packets)")
def demo_qis_redis_swarm():
# Three hospital agents observing the same treatment domain
intelligence = {
"hospital_a": AgentIntelligence("hospital_a"),
"hospital_b": AgentIntelligence("hospital_b"),
"hospital_c": AgentIntelligence("hospital_c"),
}
def make_handler(agent_id: str):
def handler(packet: OutcomePacket):
intelligence[agent_id].synthesize(packet)
return handler
# Create QIS nodes
node_a = QISPubSubNode(
agent_id="hospital_a",
domain="oncology", subdomain="breast-cancer", problem_type="treatment-outcome",
on_packet_received=make_handler("hospital_a")
)
node_b = QISPubSubNode(
agent_id="hospital_b",
domain="oncology", subdomain="breast-cancer", problem_type="treatment-outcome",
on_packet_received=make_handler("hospital_b")
)
node_c = QISPubSubNode(
agent_id="hospital_c",
domain="oncology", subdomain="breast-cancer", problem_type="treatment-outcome",
on_packet_received=make_handler("hospital_c")
)
# Subscribe to shared channel — O(1) routing
for node in [node_a, node_b, node_c]:
node.subscribe()
node.start_listening()
time.sleep(0.5) # Let subscriptions register
print("\n=== PUBLISHING OUTCOME PACKETS ===\n")
# Hospital A publishes: treatment X showed +12% 5-year survival delta
node_a.publish_outcome(OutcomePacket(
domain="oncology", subdomain="breast-cancer", problem_type="treatment-outcome",
outcome_delta=0.12, confidence=0.91, validation_method="RCT",
n_observations=847, timestamp=time.time(), agent_id="hospital_a"
))
time.sleep(0.2)
# Hospital B publishes: consistent finding, slightly different cohort
node_b.publish_outcome(OutcomePacket(
domain="oncology", subdomain="breast-cancer", problem_type="treatment-outcome",
outcome_delta=0.09, confidence=0.87, validation_method="observational",
n_observations=412, timestamp=time.time(), agent_id="hospital_b"
))
time.sleep(0.2)
# Hospital C publishes: larger cohort, confirms direction
node_c.publish_outcome(OutcomePacket(
domain="oncology", subdomain="breast-cancer", problem_type="treatment-outcome",
outcome_delta=0.11, confidence=0.94, validation_method="RCT",
n_observations=1203, timestamp=time.time(), agent_id="hospital_c"
))
time.sleep(0.5)
print("\n=== SYNTHESIS RESULTS ===\n")
for agent_id, intel in intelligence.items():
print(f"[{agent_id}]")
print(f" Packets received: {len(intel.received_packets)}")
print(f" Synthesized treatment delta: {intel.synthesized_delta:.4f}")
print(f" Raw data shared: 0 bytes")
print()
# N=3 agents → N(N-1)/2 = 3 synthesis pairs
# Each agent holds local synthesis without seeing anyone else's raw data
print("Synthesis pairs: N(N-1)/2 = 3(2)/2 = 3")
print("Each agent's compute: O(1) channel subscription")
print("Raw data transmitted: 0 bytes")
for node in [node_a, node_b, node_c]:
node.close()
if __name__ == "__main__":
demo_qis_redis_swarm()
Expected output:
[hospital_a] Node initialized. Channel: qis:oncology:breast-cancer:treatment-outcome
[hospital_b] Node initialized. Channel: qis:oncology:breast-cancer:treatment-outcome
[hospital_c] Node initialized. Channel: qis:oncology:breast-cancer:treatment-outcome
=== PUBLISHING OUTCOME PACKETS ===
[hospital_a] Published to 'qis:oncology:breast-cancer:treatment-outcome' → 2 receivers (247 bytes)
[hospital_b] Received from hospital_a: delta=0.120, confidence=0.91, n=847
[hospital_b] Synthesized delta → 0.1092 (from 1 packets)
[hospital_c] Received from hospital_a: delta=0.120, confidence=0.91, n=847
[hospital_c] Synthesized delta → 0.1092 (from 1 packets)
[hospital_b] Published to 'qis:oncology:breast-cancer:treatment-outcome' → 2 receivers (249 bytes)
[hospital_a] Received from hospital_b: delta=0.090, confidence=0.87, n=412
[hospital_a] Synthesized delta → 0.1056 (from 1 packets)
...
=== SYNTHESIS RESULTS ===
[hospital_a] Packets received: 2, Synthesized delta: 0.1034
[hospital_b] Packets received: 2, Synthesized delta: 0.1089
[hospital_c] Packets received: 2, Synthesized delta: 0.1081
Synthesis pairs: N(N-1)/2 = 3(2)/2 = 3
Each agent's compute: O(1) channel subscription
Raw data transmitted: 0 bytes
The Routing Complexity Comparison
| Transport | Routing complexity | Lookup type | Approximate matching |
|---|---|---|---|
| In-memory dict | O(1) exact | Direct key lookup | No |
| ChromaDB (HNSW) | O(log N) approximate | ANN search | Yes |
| Qdrant (HNSW) | O(log N) distributed | Distributed ANN | Yes |
| REST API (cosine) | O(N) brute force | Exhaustive search | Yes |
| Redis pub/sub | O(1) exact + pattern | Direct channel sub | Yes (pattern) |
| DHT (Kademlia) | O(log N) | Iterative lookup | No (exact keys) |
Redis pub/sub achieves O(1) routing. That is better than the O(log N) DHT baseline. The tradeoff: pub/sub is real-time push (no persistence by default), while DHT enables agents to query historical packets. Redis Streams solves this — persistent, consumer-group delivery, O(1) append and O(log N) range queries.
The QIS requirement is O(log N) or better. Redis satisfies it. The quadratic scaling is unchanged.
Pattern Subscriptions for Approximate Matching
Redis pub/sub supports pattern subscriptions (PSUBSCRIBE). This maps directly to approximate semantic matching:
# Subscribe to all oncology sub-problems
node.subscribe_pattern("qis:oncology:*")
# Subscribe to all treatment outcomes across domains
node.subscribe_pattern("qis:*:*:treatment-outcome")
# Subscribe to everything (coordinator-free broadcast)
node.subscribe_pattern("qis:*")
This gives you the semantic similarity behavior of vector search — without embedding computation at routing time. The specificity of the channel name IS the fingerprint granularity.
Tradeoffs vs vector search:
- Faster: no embedding model at publish time, O(1) subscription vs O(log N) ANN
- Less flexible: channel name must match exactly (or via wildcard pattern) — no "70% similar" results
- Simpler: no vector database to operate and scale
For domains with well-defined taxonomies (medical ICD codes, ATT&CK technique IDs, financial asset classes, regulatory jurisdiction codes), Redis pub/sub channels map directly to existing classification systems. No embedding needed.
Redis Streams: When You Need Persistence
Pub/sub is ephemeral — if an agent is offline when a packet is published, it misses it. For domains where outcome packets must be queryable after the fact, use Redis Streams:
# Publish to stream (persistent)
r.xadd(f"qis:{domain}:{subdomain}:{problem_type}", packet_dict)
# Consumer group reads — each agent gets its own cursor
r.xreadgroup(
groupname=f"qis-consumers-{domain}",
consumername=agent_id,
streams={channel: ">"}, # ">" means "new messages only"
count=10,
block=1000
)
Streams add O(log N) range query on top of O(1) append. The QIS routing complexity stays O(log N) or better. This is the persistent-transport variant — same loop, same quadratic scaling, different delivery guarantee.
The Series Summary
Five transports. One loop. One scaling law.
| Part | Transport | Routing complexity | Best for |
|---|---|---|---|
| 1 | In-memory Python dict | O(1) | Single-process testing |
| 2 | ChromaDB (HNSW) | O(log N) approximate | Semantic fuzzy matching, small-medium scale |
| 3 | Qdrant (distributed HNSW) | O(log N) distributed | Production semantic search at scale |
| 4 | REST API (cosine brute force) | O(N) | Zero-dependency, any HTTP environment |
| 5 | Redis pub/sub + Streams | O(1) / O(log N) | Real-time push, real-time applications |
Each transport implements the same architecture:
- Agent observes → distills to outcome packet (~512 bytes)
- Packet fingerprinted → semantic address derived
- Packet routed to address → relevant agents receive
- Local synthesis → new insight generated
- New packet → loop continues
The quadratic intelligence property — N(N-1)/2 synthesis pairs — comes from this loop and the semantic addressing. Not from any transport layer. The transport determines delivery semantics (push vs pull, exact vs approximate, persistent vs ephemeral). The scaling law is the same regardless.
This is the architectural claim in the 39 provisional patents filed by Christopher Thomas Trevethan: the discovery is the complete loop, transport-agnostic. If you implement this loop with a database, a vector store, a message queue, a REST endpoint, or a Redis pub/sub channel — you are running QIS. The routing mechanism does not change what was discovered.
What Is Next
The transport-agnostic proof series is complete at five transports. The full QIS architectural specification covers the governance model (Three Elections as natural selection forces), Byzantine fault tolerance, the economic model, and domain-specific applications.
For engineers hitting coordinator bottlenecks in production multi-agent systems: the central orchestrator bottleneck article covers the structural problem. This series covers five ways to implement the architectural solution.
The implementation you choose depends on your delivery requirements. The quadratic scaling you get is the same.
QIS (Quadratic Intelligence Swarm) was discovered by Christopher Thomas Trevethan. 39 provisional patents filed. Protocol specification at qisprotocol.com.
Series: Part 1 · Part 2 · Part 3 · Part 4 · Part 5 (this article)
Top comments (0)