DEV Community

Rory | QIS PROTOCOL
Rory | QIS PROTOCOL

Posted on

QIS Outcome Routing with Apache Arrow Flight — Quadratic Intelligence at Zero-Copy Speed

Every transport in this series so far has optimized for a different constraint: persistence, reliability, broker-free operation, minimal RAM, zero infrastructure.

Apache Arrow Flight optimizes for one constraint that none of the others touched: serialization overhead.

When an ML training node finishes an epoch and wants to share what it learned, the bottleneck is not network bandwidth. It is the time spent converting data from the format it lives in memory — columnar, typed, cache-aligned — into bytes that can travel across a socket, and then converting those bytes back on the other end. In high-throughput ML pipelines, this serialization/deserialization round-trip can consume 40–80% of data transfer time.

Apache Arrow Flight eliminates that round-trip entirely. Data moves from memory to memory, GPU to GPU, process to process, in the Apache Arrow columnar format — the same format it already occupies in RAM. Zero copy. No conversion. Sub-microsecond per-record overhead at scale.

If the Quadratic Intelligence Swarm loop can run on Arrow Flight, it can deliver outcome packets to ML and HPC pipelines at the speed of memory transfers. That is Part 12.


What the Transport Series Has Proven (So Far)

This is Part 12 of a series proving that the breakthrough in Quadratic Intelligence Swarm — discovered June 16, 2025 by Christopher Thomas Trevethan, covered under 39 provisional patents — is the complete loop, not any particular routing mechanism.

The series has run the same loop through eleven transport implementations:

Part Transport Routing Mechanism Infrastructure Required
1 ChromaDB HNSW vector search, O(log N) Vector DB process
2 Qdrant Distributed vector cluster Cluster of nodes
3 REST API HTTP POST/GET Web server
4 Redis Pub/Sub Topic fan-out Redis server
5 Apache Kafka Partitioned durable log Broker + KRaft/ZK
6 Apache Pulsar Multi-tenant geo-replicated Broker + BookKeeper
7 NATS JetStream Cloud-native edge persistence JetStream server
8 SQLite Single-file database A file
9 MQTT 2-byte header, 8KB RAM capable Mosquitto broker
10 ZeroMQ Binary sockets, no broker Nothing
11 Arrow Flight gRPC streams, columnar zero-copy gRPC server

Part 10 (ZeroMQ) proved the loop needs no infrastructure. Part 11 (Arrow Flight) proves the loop can operate at in-memory speeds inside the data pipelines where ML training actually happens.

The QIS loop works on all eleven. The loop is the discovery. The transport is a choice.


Why Apache Arrow Flight Is Different

Apache Arrow is a language-independent columnar memory format for flat and hierarchical data. If you have used pandas, DuckDB, Polars, Spark, or any modern data engine, your data already lives in Arrow format — or is trivially convertible to it — while it is being processed.

Arrow Flight is the RPC layer built on top of Arrow, using gRPC as the wire protocol. It was designed specifically to solve the cross-system data transfer problem in analytics and ML: data lives in Arrow format in process A, you want it in process B, and every existing solution (REST JSON, CSV export, Parquet file write/read) adds a conversion step that throws away the columnar structure and rebuilds it on the other side.

Flight skips the conversion entirely. The server exposes three primary RPCs:

RPC Direction QIS Mapping
DoPut Client → Server Deposit an outcome packet (or batch)
DoGet Server → Client Query: pull packets for my fingerprint
DoExchange Bidirectional stream Real-time synthesis: send a fingerprint, receive a continuous stream of matching packets

The FlightDescriptor — a path or opaque command attached to every stream — is the routing key. When a training node deposits an outcome packet, it attaches a FlightDescriptor that encodes its semantic fingerprint: domain, problem type, model architecture, training phase. Any other node that queries the same descriptor path gets the matching packets.

This is, structurally, a deterministic address. The FlightDescriptor IS the semantic address. The discovery — that pre-distilled insights routed to deterministic addresses enable N(N-1)/2 synthesis paths at O(log N) cost — applies here exactly as it does to a DHT, a vector database, or a Redis channel.

The quadratic scaling is in the loop. Arrow Flight is the fastest wire you can run it on.


The QIS Loop on Arrow Flight

The fundamental QIS loop:

Raw signal
    → local processing (never leaves the node)
    → distillation: ~512-byte OutcomePacket
    → semantic fingerprint (encodes domain + problem type)
    → post to FlightDescriptor (deterministic address)
    → other nodes with similar fingerprints pull via DoGet
    → local synthesis: integrate relevant packets
    → new outcome packets generated
    → loop continues
Enter fullscreen mode Exit fullscreen mode

On Arrow Flight, the outcome packet is represented as an Arrow RecordBatch — a collection of typed, nullable, columnar arrays. A single OutcomePacket maps to one row. A DoGet response is a stream of RecordBatches: all matching packets for your fingerprint, delivered at memory speed.


Implementation

Install dependencies:

pip install pyarrow grpcio protobuf
Enter fullscreen mode Exit fullscreen mode

Define the OutcomePacket schema:

# qis_arrow_flight/schema.py

import pyarrow as pa

OUTCOME_PACKET_SCHEMA = pa.schema([
    # Identity
    pa.field("node_id", pa.string()),          # Anonymized node identifier
    pa.field("timestamp_utc", pa.int64()),      # Unix epoch ms

    # Semantic fingerprint — the deterministic address
    pa.field("domain", pa.string()),            # e.g., "clinical.oncology"
    pa.field("problem_type", pa.string()),      # e.g., "treatment_response_prediction"
    pa.field("model_architecture", pa.string()),# e.g., "transformer-256d"
    pa.field("training_phase", pa.string()),    # e.g., "epoch_12_of_50"

    # Outcome signal — what was learned
    pa.field("metric_name", pa.string()),       # e.g., "val_auc"
    pa.field("metric_delta", pa.float32()),     # Change this epoch
    pa.field("metric_value", pa.float32()),     # Absolute value
    pa.field("confidence", pa.float32()),       # 0.0–1.0

    # What worked
    pa.field("intervention_type", pa.string()), # e.g., "learning_rate_schedule"
    pa.field("intervention_params", pa.string()),# JSON-encoded params
    pa.field("outcome_decile", pa.int8()),      # Ordinal: 1=bottom 10%, 10=top 10%

    # Privacy
    pa.field("sample_count", pa.int32()),       # Local samples (not shared)
    pa.field("raw_data_hash", pa.string()),     # Commitment — data never leaves node
])
Enter fullscreen mode Exit fullscreen mode

Flight server — the routing layer:

# qis_arrow_flight/server.py

import pyarrow as pa
import pyarrow.flight as flight
from collections import defaultdict
from typing import Dict, List
import threading
import time

from .schema import OUTCOME_PACKET_SCHEMA


class QISFlightServer(flight.FlightServerBase):
    """
    QIS outcome routing server using Apache Arrow Flight.

    FlightDescriptors are semantic addresses. Nodes deposit outcome packets
    via DoPut; nodes query for similar packets via DoGet.

    The routing mechanism is Flight's built-in descriptor matching.
    Quadratic scaling (N(N-1)/2 synthesis paths) emerges from the loop —
    not from any property of Arrow Flight itself.
    """

    def __init__(self, location: str, **kwargs):
        super().__init__(location, **kwargs)
        # descriptor_path -> list of RecordBatches
        self._store: Dict[str, List[pa.RecordBatch]] = defaultdict(list)
        self._lock = threading.Lock()
        self._deposit_count = 0

    def _descriptor_key(self, descriptor: flight.FlightDescriptor) -> str:
        """
        Convert a FlightDescriptor to a routing key.

        Path-type descriptors: path segments joined as 'domain/problem_type/...'
        Command-type descriptors: the command bytes decoded as UTF-8
        """
        if descriptor.descriptor_type == flight.DescriptorType.PATH:
            return "/".join(descriptor.path)
        else:
            return descriptor.command.decode("utf-8")

    def do_put(self, context, descriptor, reader, writer):
        """
        DoPut: A node deposits outcome packets.

        The FlightDescriptor encodes the semantic fingerprint (deterministic address).
        The packet stream is Arrow RecordBatches — zero-copy columnar data.
        """
        key = self._descriptor_key(descriptor)
        batches = []

        for chunk in reader:
            batch = chunk.data
            # Validate schema
            if not batch.schema.equals(OUTCOME_PACKET_SCHEMA):
                raise flight.FlightServerError(
                    f"Schema mismatch. Expected QIS OutcomePacket schema."
                )
            batches.append(batch)

        with self._lock:
            self._store[key].extend(batches)
            self._deposit_count += len(batches)

        print(f"[QISFlight] Deposited {len(batches)} batch(es) at '{key}'. "
              f"Total deposits: {self._deposit_count}")

    def do_get(self, context, ticket):
        """
        DoGet: A node queries for outcome packets at a deterministic address.

        The ticket encodes the descriptor key. Returns all matching packets
        as a stream of RecordBatches — the node synthesizes locally.
        """
        key = ticket.ticket.decode("utf-8")

        with self._lock:
            batches = self._store.get(key, [])

        if not batches:
            # Return empty stream with correct schema
            return flight.RecordBatchStream(
                pa.record_batch([], schema=OUTCOME_PACKET_SCHEMA)
            )

        # Concatenate all matching batches and stream back
        combined = pa.concat_tables(
            [pa.Table.from_batches([b]) for b in batches]
        ).to_batches()

        return flight.RecordBatchStream(
            pa.schema(OUTCOME_PACKET_SCHEMA),
            iter(combined)
        )

    def list_flights(self, context, criteria):
        """
        List all available semantic addresses (FlightDescriptors).
        Nodes can discover what problem domains have active packets.
        """
        with self._lock:
            for key, batches in self._store.items():
                total_rows = sum(b.num_rows for b in batches)
                descriptor = flight.FlightDescriptor.for_path(*key.split("/"))
                info = flight.FlightInfo(
                    schema=OUTCOME_PACKET_SCHEMA,
                    descriptor=descriptor,
                    endpoints=[flight.FlightEndpoint(
                        ticket=flight.Ticket(key.encode()),
                        locations=[]
                    )],
                    total_records=total_rows,
                    total_bytes=-1,
                )
                yield info

    def get_flight_info(self, context, descriptor):
        key = self._descriptor_key(descriptor)
        with self._lock:
            batches = self._store.get(key, [])
        total_rows = sum(b.num_rows for b in batches)
        return flight.FlightInfo(
            schema=OUTCOME_PACKET_SCHEMA,
            descriptor=descriptor,
            endpoints=[flight.FlightEndpoint(
                ticket=flight.Ticket(key.encode()),
                locations=[]
            )],
            total_records=total_rows,
            total_bytes=-1,
        )
Enter fullscreen mode Exit fullscreen mode

Node client — deposit and query:

# qis_arrow_flight/node.py

import pyarrow as pa
import pyarrow.flight as flight
import time
import hashlib
import json
from typing import Optional, List

from .schema import OUTCOME_PACKET_SCHEMA


class QISArrowNode:
    """
    A QIS node using Apache Arrow Flight as the routing transport.

    Each node:
    1. Processes data locally (raw data never leaves)
    2. Distills insights into OutcomePackets (Arrow RecordBatches)
    3. Deposits packets at the deterministic semantic address (FlightDescriptor path)
    4. Queries for packets from similar nodes (same domain + problem type)
    5. Synthesizes locally

    The N(N-1)/2 synthesis paths emerge from N nodes doing steps 3-5.
    Arrow Flight provides the zero-copy transport — the quadratic scaling
    is in the architecture.
    """

    def __init__(
        self,
        node_id: str,
        flight_server: str = "grpc://localhost:8815",
        domain: str = "ml.training",
        problem_type: str = "classification",
    ):
        self.node_id = node_id
        self.domain = domain
        self.problem_type = problem_type
        self.client = flight.FlightClient(flight_server)

        # Semantic fingerprint = deterministic address
        # Other nodes with same domain+problem_type will find these packets
        self.descriptor_path = [domain, problem_type]
        self.descriptor = flight.FlightDescriptor.for_path(*self.descriptor_path)
        self.ticket = flight.Ticket("/".join(self.descriptor_path).encode())

    def _make_data_commitment(self, local_data_ref: str) -> str:
        """
        Cryptographic commitment to local data. The data stays local.
        The hash proves we processed real data without revealing it.
        """
        return hashlib.sha256(local_data_ref.encode()).hexdigest()[:16]

    def deposit_outcome(
        self,
        metric_name: str,
        metric_value: float,
        metric_delta: float,
        confidence: float,
        intervention_type: str,
        intervention_params: dict,
        outcome_decile: int,
        sample_count: int,
        local_data_ref: str,
        model_architecture: str = "unspecified",
        training_phase: str = "unspecified",
    ):
        """
        Deposit an outcome packet to the Flight server.

        This is step 3 of the QIS loop: distill local learning into a
        ~512-byte insight and post it to the deterministic address.
        """
        packet = pa.record_batch(
            {
                "node_id": [self.node_id],
                "timestamp_utc": [int(time.time() * 1000)],
                "domain": [self.domain],
                "problem_type": [self.problem_type],
                "model_architecture": [model_architecture],
                "training_phase": [training_phase],
                "metric_name": [metric_name],
                "metric_delta": [float(metric_delta)],
                "metric_value": [float(metric_value)],
                "confidence": [float(confidence)],
                "intervention_type": [intervention_type],
                "intervention_params": [json.dumps(intervention_params)],
                "outcome_decile": [int(outcome_decile)],
                "sample_count": [int(sample_count)],
                "raw_data_hash": [self._make_data_commitment(local_data_ref)],
            },
            schema=OUTCOME_PACKET_SCHEMA,
        )

        writer, _ = self.client.do_put(
            self.descriptor,
            packet.schema
        )
        writer.write(packet)
        writer.close()

        print(f"[{self.node_id}] Deposited: {metric_name}={metric_value:.4f} "
              f"{metric_delta:+.4f}) at {'/'.join(self.descriptor_path)}")

    def query_similar_nodes(self) -> Optional[pa.Table]:
        """
        Query for outcome packets from nodes with the same semantic fingerprint.

        This is step 4 of the QIS loop: pull insights from your edge twins —
        every node processing the same domain + problem type — without
        any raw data leaving any node.
        """
        try:
            reader = self.client.do_get(self.ticket)
            table = reader.read_all()
            if table.num_rows == 0:
                return None
            return table
        except flight.FlightServerError:
            return None

    def synthesize(self, packets: pa.Table) -> dict:
        """
        Local synthesis: integrate relevant packets from edge twins.

        This is step 5 of the QIS loop. The synthesis happens locally —
        no raw data, no model weights, no centralization required.

        Returns a synthesis summary: what is working for nodes like you,
        mathematically, right now.
        """
        if packets is None or packets.num_rows == 0:
            return {"status": "no_packets", "recommendation": None}

        # Filter to packets that are genuinely informative
        # (confidence above threshold, positive delta)
        import pyarrow.compute as pc

        high_quality = packets.filter(
            pc.and_(
                pc.greater(packets["confidence"], 0.7),
                pc.greater(packets["metric_delta"], 0.0),
            )
        )

        if high_quality.num_rows == 0:
            return {"status": "no_high_quality_packets", "recommendation": None}

        # Identify the top-performing intervention
        deltas = high_quality["metric_delta"].to_pylist()
        interventions = high_quality["intervention_type"].to_pylist()
        params = high_quality["intervention_params"].to_pylist()
        deciles = high_quality["outcome_decile"].to_pylist()

        best_idx = deltas.index(max(deltas))
        mean_delta = sum(deltas) / len(deltas)
        top_decile_count = sum(1 for d in deciles if d >= 8)

        return {
            "status": "synthesized",
            "packets_analyzed": high_quality.num_rows,
            "mean_metric_delta": round(mean_delta, 5),
            "best_intervention": interventions[best_idx],
            "best_params": json.loads(params[best_idx]),
            "best_delta": round(deltas[best_idx], 5),
            "top_decile_nodes": top_decile_count,
            "recommendation": (
                f"Try '{interventions[best_idx]}' with params "
                f"{json.loads(params[best_idx])}"
                f"best observed delta +{deltas[best_idx]:.4f} across "
                f"{high_quality.num_rows} similar nodes."
            ),
        }

    def run_loop_step(
        self,
        metric_name: str,
        metric_value: float,
        metric_delta: float,
        confidence: float,
        intervention_type: str,
        intervention_params: dict,
        outcome_decile: int,
        sample_count: int,
        local_data_ref: str,
        **kwargs,
    ) -> dict:
        """
        One complete QIS loop step: deposit + query + synthesize.

        The full loop in three lines. The N(N-1)/2 synthesis paths emerge
        from N nodes running this method concurrently.
        """
        self.deposit_outcome(
            metric_name, metric_value, metric_delta, confidence,
            intervention_type, intervention_params, outcome_decile,
            sample_count, local_data_ref, **kwargs
        )
        packets = self.query_similar_nodes()
        return self.synthesize(packets)
Enter fullscreen mode Exit fullscreen mode

Running the server and two training nodes:

# run_demo.py

import threading
import time
import pyarrow.flight as flight
from qis_arrow_flight.server import QISFlightServer
from qis_arrow_flight.node import QISArrowNode

def run_server():
    server = QISFlightServer("grpc://0.0.0.0:8815")
    server.serve()

# Start server in background thread
t = threading.Thread(target=run_server, daemon=True)
t.start()
time.sleep(1.0)  # Give server a moment to bind

# Node A: training on GPU 0, oncology classification
node_a = QISArrowNode(
    node_id="gpu-node-a",
    domain="clinical.oncology",
    problem_type="treatment_response_classification",
    model_architecture="transformer-256d",
)

# Node B: training on GPU 1, same domain and problem
node_b = QISArrowNode(
    node_id="gpu-node-b",
    domain="clinical.oncology",
    problem_type="treatment_response_classification",
    model_architecture="transformer-256d",
)

# Node A completes epoch 5 — deposits what it learned
result_a = node_a.run_loop_step(
    metric_name="val_auc",
    metric_value=0.847,
    metric_delta=+0.023,
    confidence=0.91,
    intervention_type="learning_rate_schedule",
    intervention_params={"schedule": "cosine_annealing", "T_max": 10, "eta_min": 1e-6},
    outcome_decile=8,
    sample_count=1240,
    local_data_ref="/local/oncology_data/site_a/",
    training_phase="epoch_5_of_50",
)
print(f"\nNode A synthesis: {result_a}\n")

# Node B completes epoch 5 — sees Node A's packet immediately
result_b = node_b.run_loop_step(
    metric_name="val_auc",
    metric_value=0.831,
    metric_delta=+0.014,
    confidence=0.88,
    intervention_type="dropout_tuning",
    intervention_params={"dropout_rate": 0.3, "attention_dropout": 0.1},
    outcome_decile=6,
    sample_count=890,
    local_data_ref="/local/oncology_data/site_b/",
    training_phase="epoch_5_of_50",
)
print(f"Node B synthesis: {result_b}\n")

# Expected output:
# Node A synthesis: {status: synthesized, packets_analyzed: ...,
#                   recommendation: "Try 'cosine_annealing' schedule — ..."}
# Node B synthesis: {status: synthesized, packets_analyzed: 2,
#                   recommendation: "Try 'learning_rate_schedule' — best observed delta +0.023 ..."}
#
# Node B just learned what Node A's GPU discovered — zero raw data shared,
# zero model weight transfer, sub-millisecond delivery.
Enter fullscreen mode Exit fullscreen mode

The two GPUs exchanged insights in the same format their data already lives in. No serialization. No conversion. No raw data moved.


Why Arrow Flight Changes the HPC/ML Calculus

The existing approaches to cross-node ML learning have a shared assumption: you must move model weights or gradients to aggregate knowledge.

Federated Learning moves gradients: compressed, but still proportional to model size. A 7B-parameter model's gradient is ~28GB in fp32 before any compression. For 100 nodes, federated aggregation is 2.8TB of gradient traffic per round.

QIS outcome packets are ~512 bytes each. For 100 nodes, one exchange round is 51KB — four orders of magnitude less traffic — because you are sharing what worked, not how the model was updated.

On Arrow Flight, those 512-byte packets are delivered as in-memory columnar data. The transport overhead approaches zero. The bottleneck becomes the synthesis logic on the receiving node, not the network or the serialization layer.

For HPC pipelines running on InfiniBand or NVLink, Arrow Flight's zero-copy design means outcome packets can move GPU-to-GPU without touching the CPU or the host memory bus. A training node on one GPU deposits an insight; a training node on another GPU pulls it. The data stays in GPU-accessible memory formats throughout.

Approach Per-node transfer 100 nodes (1 round) Serialization
Federated Learning (7B model) ~28GB ~2.8TB Gradient→bytes→gradient
Model distillation ~100MB ~10GB Logits→bytes→logits
QIS outcome packets ~512 bytes ~51KB Arrow→Arrow (zero-copy)
QIS via Arrow Flight ~512 bytes ~51KB Zero-copy, in-memory

The quadratic scaling — 100 nodes producing 4,950 synthesis opportunities — stays constant. Arrow Flight just makes each synthesis opportunity essentially free from a transport perspective.


Where This Fits in the Architecture

Christopher Thomas Trevethan's discovery on June 16, 2025, covered under 39 provisional patents, is that pre-distilled insights routed to deterministic addresses enable real-time quadratic intelligence scaling. The seven-layer architecture is:

  1. Data Sources → Raw signals from sensors, databases, training corpora
  2. Edge Nodes → Local processing. Raw data never leaves.
  3. Semantic Fingerprintdomain/problem_type path encodes the problem space
  4. Routing Layer → Arrow Flight DoGet to the deterministic FlightDescriptor path
  5. Outcome Packets → Arrow RecordBatch rows, ~512 bytes per insight
  6. Local Synthesissynthesize() runs locally on the pulling node
  7. External Augmentation → Optional: LLM analysis of synthesis output

The FlightDescriptor path IS the semantic address. Layer 4 is Arrow Flight. The other six layers are unchanged from ChromaDB, Qdrant, REST, Redis, Kafka, Pulsar, NATS, SQLite, MQTT, and ZeroMQ implementations. The loop is constant. The transport is a choice.


What Part 12 Has Proven

Twelve transports. One loop. Twelve different routing mechanisms, infrastructure profiles, latency characteristics, and operational models.

Constraint Transport
Semantic similarity routing ChromaDB, Qdrant
HTTP universality REST API
Real-time pub/sub Redis
Durability + replay Kafka
Multi-tenant geo-replication Pulsar
Edge persistence NATS JetStream
Zero infrastructure SQLite, ZeroMQ
Constrained networks (2-byte header) MQTT
Zero-copy ML/HPC pipelines Arrow Flight

The escalating constraint pattern is the IP argument in narrative form. If QIS were "a DHT implementation," it would only run on DHTs. It runs on everything. The discovery is the complete loop — the architecture that makes any routing mechanism produce real-time quadratic intelligence scaling.

Part 13 will explore a different constraint. Arrow Flight covers in-memory columnar speed. The loop is still the loop.


Run It

pip install pyarrow grpcio protobuf
python run_demo.py
Enter fullscreen mode Exit fullscreen mode

The full code is in the QIS Protocol Notes repository.


Quadratic Intelligence Swarm was discovered by Christopher Thomas Trevethan on June 16, 2025. Covered under 39 provisional patents. The discovery is transport-agnostic by design — the quadratic scaling comes from the complete loop, not from any specific routing layer. This series proves that with working code.

Previous in series: QIS Outcome Routing with ZeroMQ

Top comments (0)