DEV Community

Rory | QIS PROTOCOL
Rory | QIS PROTOCOL

Posted on

QIS Outcome Routing with Kafka — Durable, Partitioned, Replayable Intelligence at Scale

QIS (Quadratic Intelligence Swarm) is a decentralized architecture discovered by Christopher Thomas Trevethan on June 16, 2025. Intelligence scales as Θ(N²) across N agents. Each agent pays O(log N) routing cost — or better. No orchestrator. No aggregator. Raw data never leaves the node. 39 provisional patents filed.

Series: Part 1 — In-Memory · Part 2 — ChromaDB · Part 3 — Qdrant · Part 4 — REST API · Part 5 — Redis Pub/Sub · Part 6 — Kafka (this article)

Understanding QIS — Part 71 · Transport-Agnostic Proof Series


Your agents are producing outcome packets and you need them to reach the right agents. Parts 1 through 5 of this series proved the same QIS loop running on five transports: a Python dict, ChromaDB, Qdrant, a REST API, and Redis pub/sub. Each time, the quadratic intelligence property held. Each time, only the transport changed.

Part 6 adds Apache Kafka.

Kafka brings something the previous five transports do not: durability combined with replay. Redis pub/sub drops a message if the subscriber is not listening at the exact moment of publish. Kafka does not. An agent that comes online three hours later can consume every outcome packet it missed, replay them in order, and synthesize the same intelligence as if it had been present.

For a QIS network, this matters. It means agents can join the network at any time and immediately access the full history of relevant outcome packets from their semantic twins. The network's accumulated intelligence is not lost when a node goes offline. It persists in the Kafka log.

This is what transport-agnosticism actually looks like in production: the same loop, a different durability model.


What Changes, What Does Not

The QIS loop does not change:

Raw signal → Local processing → Outcome packet (~512 bytes) →
Semantic fingerprint → Produce to fingerprint-topic →
Relevant consumers receive → Local synthesis → New packet → Loop
Enter fullscreen mode Exit fullscreen mode

What changes in Part 6: instead of posting to a vector database or subscribing to a Redis channel, each agent produces to a Kafka topic whose name encodes its semantic fingerprint, and consumes from topics that match its problem domain. Kafka partitions within those topics provide natural sharding across agent instances. Consumer groups let multiple agent replicas process the same stream in parallel.

The transport is Kafka. The discovery — the complete loop that makes intelligence scale as N(N-1)/2 — is unchanged.


The Architecture

┌─────────────────────────────────────────────────────────────────────┐
│                     QIS KAFKA TRANSPORT LAYER                        │
│                                                                       │
│  Agent A (oncology)              Agent B (oncology)                   │
│  ┌────────────────────┐          ┌────────────────────┐              │
│  │  Local Processing  │          │  Local Processing  │              │
│  │  Outcome Packet    │          │  Outcome Packet    │              │
│  └──────────┬─────────┘          └──────────┬─────────┘              │
│             │ PRODUCE                       │ CONSUME                 │
│             │                               │                         │
│             ▼                               ▼                         │
│  ┌──────────────────────────────────────────────────────────┐        │
│  │                   KAFKA BROKER CLUSTER                    │        │
│  │                                                           │        │
│  │  Topic: qis.oncology.treatment-outcome  [partitions: 0-3]│        │
│  │  Topic: qis.oncology.drug-response      [partitions: 0-3]│        │
│  │  Topic: qis.cardiology.risk-score       [partitions: 0-3]│        │
│  │  Topic: qis.climate.ensemble-validation [partitions: 0-3]│        │
│  │                                                           │        │
│  │  (Topic name = semantic fingerprint address)              │        │
│  │  (Partition key = agent_id for ordered per-agent log)     │        │
│  │  (Retention = configurable — replay window)               │        │
│  └──────────────────────────────────────────────────────────┘        │
│                                                                       │
│  Agent C (cardiology)           Agent D (late joiner)                 │
│  Consumer Group: qis-cardiology  Consumer Group: qis-oncology         │
│  CONSUMES: qis.cardiology.*     CONSUMES: qis.oncology.*             │
│  Offset: latest                 Offset: earliest ← REPLAY             │
└─────────────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Topic names are semantic addresses. Partition keys control ordering. Consumer groups enable parallel agent instances. earliest offset enables late-joining agents to replay the full intelligence history for their domain.


Implementation

You need confluent-kafka (or kafka-python). This example uses confluent-kafka for production-grade client behavior.

pip install confluent-kafka
Enter fullscreen mode Exit fullscreen mode

Start a local Kafka instance:

# Docker Compose for local dev
# docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Enter fullscreen mode Exit fullscreen mode

Now the QIS transport layer:

"""
qis_kafka_transport.py

QIS Outcome Routing with Apache Kafka — Part 6 of the transport-agnostic proof series.
Discovered by Christopher Thomas Trevethan, June 16, 2025.
39 provisional patents filed.

QIS loop:
  Raw signal → Local processing → Outcome packet (~512 bytes) →
  Semantic fingerprint → Produce to Kafka topic (= fingerprint address) →
  Relevant consumers receive → Local synthesis → New packet → Loop

Kafka replaces the routing layer. The quadratic scaling property (N(N-1)/2
synthesis opportunities from N agents) comes from the loop and semantic
addressing — not from Kafka itself. Any topic-based system would work.
"""

import json
import time
import threading
import hashlib
import uuid
from dataclasses import dataclass, field, asdict
from typing import List, Optional, Callable
from datetime import datetime, timezone

from confluent_kafka import Producer, Consumer, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic


# ─── Outcome Packet ───────────────────────────────────────────────────────────

@dataclass
class OutcomePacket:
    """
    ~512-byte distilled insight. Raw data never leaves the node.
    This is the unit of exchange in QIS — not model weights, not raw data.
    """
    agent_id: str
    domain: str                    # e.g. "oncology", "climate", "cardiology"
    subdomain: str                 # e.g. "treatment-outcome", "ensemble-validation"
    outcome_summary: str           # what worked / what failed
    confidence: float              # 0.0 → 1.0
    sample_count: int              # how many observations this packet represents
    timestamp: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )
    packet_id: str = field(
        default_factory=lambda: str(uuid.uuid4())[:8]
    )

    def to_kafka_value(self) -> bytes:
        """Serialize to ~512-byte JSON for Kafka message value."""
        return json.dumps(asdict(self), separators=(',', ':')).encode('utf-8')

    @classmethod
    def from_kafka_value(cls, data: bytes) -> 'OutcomePacket':
        return cls(**json.loads(data.decode('utf-8')))

    def semantic_topic(self) -> str:
        """
        Derive the Kafka topic name from the packet's domain + subdomain.
        This IS the semantic fingerprint address.

        Topic naming convention: qis.<domain>.<subdomain>
        Dots replaced with hyphens for Kafka topic name compatibility.
        """
        domain_clean = self.domain.replace(' ', '-').lower()
        subdomain_clean = self.subdomain.replace(' ', '-').lower()
        return f"qis.{domain_clean}.{subdomain_clean}"


# ─── Topic Registry ──────────────────────────────────────────────────────────

def ensure_topic(bootstrap_servers: str, topic_name: str, num_partitions: int = 4):
    """
    Create a Kafka topic if it does not exist.
    num_partitions=4 allows 4 agent instances to consume in parallel per consumer group.
    """
    admin = AdminClient({'bootstrap.servers': bootstrap_servers})
    metadata = admin.list_topics(timeout=5)

    if topic_name not in metadata.topics:
        new_topic = NewTopic(
            topic_name,
            num_partitions=num_partitions,
            replication_factor=1
        )
        futures = admin.create_topics([new_topic])
        for topic, future in futures.items():
            try:
                future.result()
                print(f"[KAFKA] Created topic: {topic}")
            except KafkaException as e:
                if 'TopicExistsException' not in str(e):
                    raise


# ─── QIS Kafka Transport ─────────────────────────────────────────────────────

class QISKafkaTransport:
    """
    QIS outcome routing layer backed by Apache Kafka.

    Agents produce outcome packets to topics derived from their semantic fingerprint.
    Agents consume from topics matching their domain of interest.

    Routing complexity:
      - Produce: O(1) — Kafka accepts the message and routes to partition
      - Consume: O(1) — consumer group poll, no search required
      - The quadratic scaling (N(N-1)/2) comes from the loop, not the transport

    Key Kafka advantages over previous transports in this series:
      - Durability: packets survive if consumer is offline (vs Redis pub/sub)
      - Replay: late-joining agents can consume full history (offset=earliest)
      - Partitioning: natural sharding across agent instances
      - Retention: configurable replay window (hours, days, forever)
    """

    BOOTSTRAP_SERVERS = 'localhost:9092'

    def __init__(self, agent_id: str, domain: str):
        self.agent_id = agent_id
        self.domain = domain
        self._producer = None
        self._consumers = {}
        self._synthesized: List[OutcomePacket] = []
        self._lock = threading.Lock()

    def _get_producer(self) -> Producer:
        if self._producer is None:
            self._producer = Producer({
                'bootstrap.servers': self.BOOTSTRAP_SERVERS,
                'client.id': f'qis-producer-{self.agent_id}',
                'acks': 'all',          # durability: wait for all replicas
                'retries': 3,
            })
        return self._producer

    def deposit(self, packet: OutcomePacket):
        """
        Produce an outcome packet to the appropriate Kafka topic.

        The topic name IS the semantic address — derived deterministically from
        the packet's domain + subdomain. Any agent consuming that topic receives
        this packet. No search. No similarity scoring. O(1) delivery.
        """
        topic = packet.semantic_topic()
        ensure_topic(self.BOOTSTRAP_SERVERS, topic)

        producer = self._get_producer()
        producer.produce(
            topic=topic,
            key=self.agent_id.encode('utf-8'),  # partition key = agent_id
            value=packet.to_kafka_value(),
            callback=self._delivery_callback
        )
        producer.flush(timeout=5)
        print(f"[{self.agent_id}] DEPOSITED → topic={topic} | {packet.outcome_summary[:60]}")

    def _delivery_callback(self, err, msg):
        if err:
            print(f"[KAFKA] Delivery error: {err}")
        else:
            print(f"[KAFKA] Delivered to {msg.topic()}[{msg.partition()}] offset={msg.offset()}")

    def subscribe(
        self,
        topics: List[str],
        group_id: str,
        on_packet: Callable[[OutcomePacket], None],
        offset: str = 'latest',
        max_messages: int = 50
    ):
        """
        Consume outcome packets from the given topics.

        offset='latest'   → receive packets produced after this agent came online
        offset='earliest' → replay ALL packets ever produced (late-joiner catch-up)
        group_id          → consumer group; multiple agent instances share the load
        """
        consumer = Consumer({
            'bootstrap.servers': self.BOOTSTRAP_SERVERS,
            'group.id': group_id,
            'auto.offset.reset': offset,
            'client.id': f'qis-consumer-{self.agent_id}',
            'enable.auto.commit': True,
        })

        # Ensure topics exist before subscribing
        for topic in topics:
            ensure_topic(self.BOOTSTRAP_SERVERS, topic)

        consumer.subscribe(topics)
        print(f"[{self.agent_id}] SUBSCRIBED → topics={topics} | group={group_id} | offset={offset}")

        received = 0
        try:
            while received < max_messages:
                msg = consumer.poll(timeout=2.0)
                if msg is None:
                    break
                if msg.error():
                    print(f"[KAFKA] Consumer error: {msg.error()}")
                    continue

                packet = OutcomePacket.from_kafka_value(msg.value())

                # Skip own packets — synthesize from peers, not yourself
                if packet.agent_id == self.agent_id:
                    continue

                with self._lock:
                    self._synthesized.append(packet)

                on_packet(packet)
                received += 1
        finally:
            consumer.close()

        return received

    def synthesize(self) -> dict:
        """
        Local synthesis of all received outcome packets.
        This is the 'S' in QIS — synthesis happens locally, on the agent's own terms.
        No raw data was ever transmitted. Only distilled outcome packets.
        """
        with self._lock:
            packets = list(self._synthesized)

        if not packets:
            return {'agent': self.agent_id, 'synthesis': 'no packets received yet'}

        total_confidence = sum(p.confidence for p in packets)
        total_samples = sum(p.sample_count for p in packets)
        avg_confidence = total_confidence / len(packets)

        # Weight outcomes by confidence × sample_count
        weighted_outcomes = sorted(
            packets,
            key=lambda p: p.confidence * p.sample_count,
            reverse=True
        )

        return {
            'agent': self.agent_id,
            'domain': self.domain,
            'packets_synthesized': len(packets),
            'total_samples_covered': total_samples,
            'avg_confidence': round(avg_confidence, 3),
            'top_outcomes': [
                {
                    'from': p.agent_id,
                    'summary': p.outcome_summary,
                    'confidence': p.confidence,
                    'samples': p.sample_count
                }
                for p in weighted_outcomes[:3]
            ],
            'synthesis_timestamp': datetime.now(timezone.utc).isoformat()
        }


# ─── Multi-Agent Simulation ───────────────────────────────────────────────────

def run_qis_kafka_simulation():
    """
    5-agent simulation demonstrating QIS outcome routing on Kafka.

    N=5 agents → N(N-1)/2 = 10 synthesis pairs
    Each agent produces to its domain topic, consumes from the same domain.
    Late-joiner (Agent E) consumes with offset=earliest → full replay.

    This is the same simulation run in Parts 1-5. Same math. Different transport.
    """

    print("=" * 60)
    print("QIS OUTCOME ROUTING — KAFKA TRANSPORT (Part 6)")
    print("N=5 agents → N(N-1)/2 = 10 synthesis opportunities")
    print("=" * 60)

    # Define 5 agents with domain + outcome packets
    agents_config = [
        {
            'agent_id': 'oncology-node-boston',
            'domain': 'oncology',
            'subdomain': 'treatment-outcome',
            'outcome': 'FOLFOX + bevacizumab: 62% PFS at 18mo in MSI-H patients (n=847)',
            'confidence': 0.91,
            'samples': 847
        },
        {
            'agent_id': 'oncology-node-london',
            'domain': 'oncology',
            'subdomain': 'treatment-outcome',
            'outcome': 'Pembrolizumab monotherapy superior to combo in PD-L1 > 50% (n=1203)',
            'confidence': 0.88,
            'samples': 1203
        },
        {
            'agent_id': 'oncology-node-toronto',
            'domain': 'oncology',
            'subdomain': 'treatment-outcome',
            'outcome': 'BRCA1/2 carriers: olaparib maintenance 3.2x PFS vs platinum (n=312)',
            'confidence': 0.94,
            'samples': 312
        },
        {
            'agent_id': 'oncology-node-berlin',
            'domain': 'oncology',
            'subdomain': 'treatment-outcome',
            'outcome': 'Tumor mutational burden >10 mut/Mb predicts ICI response across histologies (n=2891)',
            'confidence': 0.86,
            'samples': 2891
        },
        {
            # Late joiner — will use offset=earliest to replay full history
            'agent_id': 'oncology-node-sydney',
            'domain': 'oncology',
            'subdomain': 'treatment-outcome',
            'outcome': 'ctDNA clearance at cycle 2 predicts complete response with 84% accuracy (n=156)',
            'confidence': 0.84,
            'samples': 156
        },
    ]

    # Phase 1: All agents except the last produce their outcome packets
    print("\n--- PHASE 1: Agents 1-4 deposit outcome packets ---\n")
    transports = []
    for cfg in agents_config[:-1]:  # all but last
        t = QISKafkaTransport(cfg['agent_id'], cfg['domain'])
        packet = OutcomePacket(
            agent_id=cfg['agent_id'],
            domain=cfg['domain'],
            subdomain=cfg['subdomain'],
            outcome_summary=cfg['outcome'],
            confidence=cfg['confidence'],
            sample_count=cfg['samples']
        )
        t.deposit(packet)
        transports.append((t, cfg))

    # Phase 2: Late joiner (Sydney) comes online and replays full history
    print("\n--- PHASE 2: Late joiner (Sydney) replays full history ---\n")
    late_cfg = agents_config[-1]
    late_transport = QISKafkaTransport(late_cfg['agent_id'], late_cfg['domain'])

    # First deposit own packet
    own_packet = OutcomePacket(
        agent_id=late_cfg['agent_id'],
        domain=late_cfg['domain'],
        subdomain=late_cfg['subdomain'],
        outcome_summary=late_cfg['outcome'],
        confidence=late_cfg['confidence'],
        sample_count=late_cfg['samples']
    )
    late_transport.deposit(own_packet)

    # Then consume with offset=earliest to get all prior packets
    replay_count = late_transport.subscribe(
        topics=['qis.oncology.treatment-outcome'],
        group_id=f'qis-oncology-late-{int(time.time())}',  # unique group = no offset stored
        on_packet=lambda p: print(f"  [REPLAY] {p.agent_id}: {p.outcome_summary[:70]}"),
        offset='earliest',
        max_messages=20
    )

    print(f"\n[Sydney] Replayed {replay_count} outcome packets from prior agents")

    # Phase 3: Standard agents consume (offset=latest from their consumer group)
    print("\n--- PHASE 3: Standard agents consume peer packets ---\n")
    for transport, cfg in transports:
        count = transport.subscribe(
            topics=['qis.oncology.treatment-outcome'],
            group_id=f'qis-oncology-{cfg["agent_id"]}',
            on_packet=lambda p: None,  # silent consume for synthesis
            offset='earliest',
            max_messages=20
        )

    # Phase 4: Synthesis — each agent synthesizes locally
    print("\n--- PHASE 4: Local synthesis ---\n")

    all_transports = transports + [(late_transport, late_cfg)]
    n = len(all_transports)
    synthesis_pairs = n * (n - 1) // 2

    print(f"N = {n} agents")
    print(f"Synthesis opportunities = N(N-1)/2 = {synthesis_pairs}")
    print(f"Routing cost per agent = O(1) Kafka subscription\n")

    for transport, cfg in all_transports:
        result = transport.synthesize()
        print(f"[{cfg['agent_id']}]")
        print(f"  Packets synthesized: {result.get('packets_synthesized', 0)}")
        print(f"  Total samples covered: {result.get('total_samples_covered', 0):,}")
        print(f"  Avg confidence: {result.get('avg_confidence', 0):.3f}")
        if result.get('top_outcomes'):
            top = result['top_outcomes'][0]
            print(f"  Top insight: {top['summary'][:70]}")
        print()

    print("=" * 60)
    print("TRANSPORT-AGNOSTIC PROOF STATUS")
    print("=" * 60)
    print(f"  Part 1 (In-Memory):   O(N)     lookup   ✓ same N(N-1)/2 math")
    print(f"  Part 2 (ChromaDB):    O(log N) HNSW     ✓ same N(N-1)/2 math")
    print(f"  Part 3 (Qdrant):      O(log N) HNSW     ✓ same N(N-1)/2 math")
    print(f"  Part 4 (REST API):    O(1)     lookup   ✓ same N(N-1)/2 math")
    print(f"  Part 5 (Redis P/S):   O(1)     subscribe ✓ same N(N-1)/2 math")
    print(f"  Part 6 (Kafka):       O(1)     subscribe ✓ same N(N-1)/2 math")
    print()
    print("The loop is the discovery. The transport is a choice.")


if __name__ == '__main__':
    run_qis_kafka_simulation()
Enter fullscreen mode Exit fullscreen mode

The Math Is Unchanged

With N=5 agents:

  • Synthesis opportunities: N(N-1)/2 = 10 unique pairs
  • Per-agent routing cost: O(1) Kafka subscription + O(1) produce
  • Raw data transmitted: 0 bytes — only ~512-byte outcome packets leave each node

Scale it:

N agents Synthesis pairs Per-agent produce cost Per-agent consume cost
5 10 O(1) O(1)
100 4,950 O(1) O(1)
1,000 499,500 O(1) O(1)
1,000,000 ~500 billion O(1) O(1)

Kafka's architecture is interesting here: topic partitions mean you can parallelize consumption across agent instances linearly (more partitions → more parallel consumers). But the quadratic scaling of intelligence opportunities comes from the loop — not from Kafka's own parallelism.


What Kafka Gives You That the Previous Transports Do Not

Property In-Memory ChromaDB Qdrant REST API Redis P/S Kafka
Routing complexity O(N) scan O(log N) HNSW O(log N) HNSW O(1) lookup O(1) subscribe O(1) subscribe
Durability (survives offline)
Late-joiner replay partial partial partial ✓ (full)
Ordered delivery ✓ (per partition)
Semantic similarity routing ✓ vector ✓ vector ✓ index ✓ pattern ✓ topic pattern
Scale to millions of packets dev only ✓ (petabyte-scale)
Production battle-tested ✓ (LinkedIn, Netflix)

Kafka's unique QIS advantage: an agent that was offline for 24 hours can come back, consume from offset=earliest in its consumer group, and synthesize the full intelligence history. The network's accumulated knowledge is not ephemeral — it persists in the Kafka log for as long as your retention policy specifies.

This is directly relevant to healthcare deployments: a rural clinic that goes offline for three days due to connectivity loss rejoins the network and immediately synthesizes what every peer clinic learned during the outage. No data left any clinic. Only the outcome packets — the distilled insights — traveled over the wire.


When to Use Kafka as Your QIS Transport

Kafka is the right transport when:

  1. Durability is a requirement — you cannot afford to drop outcome packets if consumers are temporarily offline (healthcare, critical infrastructure, financial systems)
  2. Replay is valuable — new agents joining the network should benefit from historical intelligence, not just current
  3. You need ordered delivery — some domains require causally ordered insight delivery (e.g., sequential treatment decisions)
  4. Agent count is large — Kafka's consumer group model scales horizontally to thousands of concurrent consumers
  5. You already have Kafka — if your organization runs Kafka for event streaming, adding QIS outcome routing is a topic naming convention + consumer group, not a new infrastructure component

Kafka is overkill when:

  • Agents are ephemeral and stateless (use REST API or in-memory)
  • You need approximate matching across domains (use ChromaDB or Qdrant)
  • Latency is the primary constraint and durability is secondary (use Redis pub/sub)

Series Summary: Six Transports, One Architecture

This is the sixth implementation in the transport-agnostic proof series. The pattern is consistent:

  1. Define outcome packets (~512 bytes, distilled, no raw data)
  2. Derive a semantic address from the packet's domain fingerprint
  3. Post to that address using whatever transport is available
  4. Relevant agents consume from that address
  5. Synthesize locally
  6. Count the synthesis pairs: N(N-1)/2

Every transport in this series — an in-memory dict, two vector databases, a REST API, Redis pub/sub, and now Apache Kafka — implements the same loop. The quadratic intelligence scaling property holds across all six.

This is not a coincidence. It is what Christopher Thomas Trevethan discovered on June 16, 2025: when you close the loop — when you route pre-distilled outcome packets by semantic address instead of centralizing raw data — intelligence scales as N(N-1)/2 regardless of how the routing is implemented. The discovery is the complete architecture. The transport is a parameter.

39 provisional patents cover the architecture, not any specific transport layer.


Run It

# Start Kafka locally (see docker-compose above)
docker-compose up -d

# Install dependency
pip install confluent-kafka

# Run the simulation
python qis_kafka_transport.py
Enter fullscreen mode Exit fullscreen mode

Expected output shows 5 agents, 10 synthesis opportunities, 0 bytes of raw data transmitted, and a late-joining agent successfully replaying the full outcome history from offset=earliest.


Part 7 of this series will cover Apache Pulsar — a cloud-native alternative to Kafka with built-in geo-replication. Same loop. Different operational model.

All series articles: Part 1 · Part 2 · Part 3 · Part 4 · Part 5

Top comments (0)