Every transport in this series so far has optimized for a different constraint: persistence, reliability, broker-free operation, minimal RAM, zero infrastructure.
Apache Arrow Flight optimizes for one constraint that none of the others touched: serialization overhead.
When an ML training node finishes an epoch and wants to share what it learned, the bottleneck is not network bandwidth. It is the time spent converting data from the format it lives in memory — columnar, typed, cache-aligned — into bytes that can travel across a socket, and then converting those bytes back on the other end. In high-throughput ML pipelines, this serialization/deserialization round-trip can consume 40–80% of data transfer time.
Apache Arrow Flight eliminates that round-trip entirely. Data moves from memory to memory, GPU to GPU, process to process, in the Apache Arrow columnar format — the same format it already occupies in RAM. Zero copy. No conversion. Sub-microsecond per-record overhead at scale.
If the Quadratic Intelligence Swarm loop can run on Arrow Flight, it can deliver outcome packets to ML and HPC pipelines at the speed of memory transfers. That is Part 12.
What the Transport Series Has Proven (So Far)
This is Part 12 of a series proving that the breakthrough in Quadratic Intelligence Swarm — discovered June 16, 2025 by Christopher Thomas Trevethan, covered under 39 provisional patents — is the complete loop, not any particular routing mechanism.
The series has run the same loop through eleven transport implementations:
| Part | Transport | Routing Mechanism | Infrastructure Required |
|---|---|---|---|
| 1 | ChromaDB | HNSW vector search, O(log N) | Vector DB process |
| 2 | Qdrant | Distributed vector cluster | Cluster of nodes |
| 3 | REST API | HTTP POST/GET | Web server |
| 4 | Redis Pub/Sub | Topic fan-out | Redis server |
| 5 | Apache Kafka | Partitioned durable log | Broker + KRaft/ZK |
| 6 | Apache Pulsar | Multi-tenant geo-replicated | Broker + BookKeeper |
| 7 | NATS JetStream | Cloud-native edge persistence | JetStream server |
| 8 | SQLite | Single-file database | A file |
| 9 | MQTT | 2-byte header, 8KB RAM capable | Mosquitto broker |
| 10 | ZeroMQ | Binary sockets, no broker | Nothing |
| 11 | Arrow Flight | gRPC streams, columnar zero-copy | gRPC server |
Part 10 (ZeroMQ) proved the loop needs no infrastructure. Part 11 (Arrow Flight) proves the loop can operate at in-memory speeds inside the data pipelines where ML training actually happens.
The QIS loop works on all eleven. The loop is the discovery. The transport is a choice.
Why Apache Arrow Flight Is Different
Apache Arrow is a language-independent columnar memory format for flat and hierarchical data. If you have used pandas, DuckDB, Polars, Spark, or any modern data engine, your data already lives in Arrow format — or is trivially convertible to it — while it is being processed.
Arrow Flight is the RPC layer built on top of Arrow, using gRPC as the wire protocol. It was designed specifically to solve the cross-system data transfer problem in analytics and ML: data lives in Arrow format in process A, you want it in process B, and every existing solution (REST JSON, CSV export, Parquet file write/read) adds a conversion step that throws away the columnar structure and rebuilds it on the other side.
Flight skips the conversion entirely. The server exposes three primary RPCs:
| RPC | Direction | QIS Mapping |
|---|---|---|
DoPut |
Client → Server | Deposit an outcome packet (or batch) |
DoGet |
Server → Client | Query: pull packets for my fingerprint |
DoExchange |
Bidirectional stream | Real-time synthesis: send a fingerprint, receive a continuous stream of matching packets |
The FlightDescriptor — a path or opaque command attached to every stream — is the routing key. When a training node deposits an outcome packet, it attaches a FlightDescriptor that encodes its semantic fingerprint: domain, problem type, model architecture, training phase. Any other node that queries the same descriptor path gets the matching packets.
This is, structurally, a deterministic address. The FlightDescriptor IS the semantic address. The discovery — that pre-distilled insights routed to deterministic addresses enable N(N-1)/2 synthesis paths at O(log N) cost — applies here exactly as it does to a DHT, a vector database, or a Redis channel.
The quadratic scaling is in the loop. Arrow Flight is the fastest wire you can run it on.
The QIS Loop on Arrow Flight
The fundamental QIS loop:
Raw signal
→ local processing (never leaves the node)
→ distillation: ~512-byte OutcomePacket
→ semantic fingerprint (encodes domain + problem type)
→ post to FlightDescriptor (deterministic address)
→ other nodes with similar fingerprints pull via DoGet
→ local synthesis: integrate relevant packets
→ new outcome packets generated
→ loop continues
On Arrow Flight, the outcome packet is represented as an Arrow RecordBatch — a collection of typed, nullable, columnar arrays. A single OutcomePacket maps to one row. A DoGet response is a stream of RecordBatches: all matching packets for your fingerprint, delivered at memory speed.
Implementation
Install dependencies:
pip install pyarrow grpcio protobuf
Define the OutcomePacket schema:
# qis_arrow_flight/schema.py
import pyarrow as pa
OUTCOME_PACKET_SCHEMA = pa.schema([
# Identity
pa.field("node_id", pa.string()), # Anonymized node identifier
pa.field("timestamp_utc", pa.int64()), # Unix epoch ms
# Semantic fingerprint — the deterministic address
pa.field("domain", pa.string()), # e.g., "clinical.oncology"
pa.field("problem_type", pa.string()), # e.g., "treatment_response_prediction"
pa.field("model_architecture", pa.string()),# e.g., "transformer-256d"
pa.field("training_phase", pa.string()), # e.g., "epoch_12_of_50"
# Outcome signal — what was learned
pa.field("metric_name", pa.string()), # e.g., "val_auc"
pa.field("metric_delta", pa.float32()), # Change this epoch
pa.field("metric_value", pa.float32()), # Absolute value
pa.field("confidence", pa.float32()), # 0.0–1.0
# What worked
pa.field("intervention_type", pa.string()), # e.g., "learning_rate_schedule"
pa.field("intervention_params", pa.string()),# JSON-encoded params
pa.field("outcome_decile", pa.int8()), # Ordinal: 1=bottom 10%, 10=top 10%
# Privacy
pa.field("sample_count", pa.int32()), # Local samples (not shared)
pa.field("raw_data_hash", pa.string()), # Commitment — data never leaves node
])
Flight server — the routing layer:
# qis_arrow_flight/server.py
import pyarrow as pa
import pyarrow.flight as flight
from collections import defaultdict
from typing import Dict, List
import threading
import time
from .schema import OUTCOME_PACKET_SCHEMA
class QISFlightServer(flight.FlightServerBase):
"""
QIS outcome routing server using Apache Arrow Flight.
FlightDescriptors are semantic addresses. Nodes deposit outcome packets
via DoPut; nodes query for similar packets via DoGet.
The routing mechanism is Flight's built-in descriptor matching.
Quadratic scaling (N(N-1)/2 synthesis paths) emerges from the loop —
not from any property of Arrow Flight itself.
"""
def __init__(self, location: str, **kwargs):
super().__init__(location, **kwargs)
# descriptor_path -> list of RecordBatches
self._store: Dict[str, List[pa.RecordBatch]] = defaultdict(list)
self._lock = threading.Lock()
self._deposit_count = 0
def _descriptor_key(self, descriptor: flight.FlightDescriptor) -> str:
"""
Convert a FlightDescriptor to a routing key.
Path-type descriptors: path segments joined as 'domain/problem_type/...'
Command-type descriptors: the command bytes decoded as UTF-8
"""
if descriptor.descriptor_type == flight.DescriptorType.PATH:
return "/".join(descriptor.path)
else:
return descriptor.command.decode("utf-8")
def do_put(self, context, descriptor, reader, writer):
"""
DoPut: A node deposits outcome packets.
The FlightDescriptor encodes the semantic fingerprint (deterministic address).
The packet stream is Arrow RecordBatches — zero-copy columnar data.
"""
key = self._descriptor_key(descriptor)
batches = []
for chunk in reader:
batch = chunk.data
# Validate schema
if not batch.schema.equals(OUTCOME_PACKET_SCHEMA):
raise flight.FlightServerError(
f"Schema mismatch. Expected QIS OutcomePacket schema."
)
batches.append(batch)
with self._lock:
self._store[key].extend(batches)
self._deposit_count += len(batches)
print(f"[QISFlight] Deposited {len(batches)} batch(es) at '{key}'. "
f"Total deposits: {self._deposit_count}")
def do_get(self, context, ticket):
"""
DoGet: A node queries for outcome packets at a deterministic address.
The ticket encodes the descriptor key. Returns all matching packets
as a stream of RecordBatches — the node synthesizes locally.
"""
key = ticket.ticket.decode("utf-8")
with self._lock:
batches = self._store.get(key, [])
if not batches:
# Return empty stream with correct schema
return flight.RecordBatchStream(
pa.record_batch([], schema=OUTCOME_PACKET_SCHEMA)
)
# Concatenate all matching batches and stream back
combined = pa.concat_tables(
[pa.Table.from_batches([b]) for b in batches]
).to_batches()
return flight.RecordBatchStream(
pa.schema(OUTCOME_PACKET_SCHEMA),
iter(combined)
)
def list_flights(self, context, criteria):
"""
List all available semantic addresses (FlightDescriptors).
Nodes can discover what problem domains have active packets.
"""
with self._lock:
for key, batches in self._store.items():
total_rows = sum(b.num_rows for b in batches)
descriptor = flight.FlightDescriptor.for_path(*key.split("/"))
info = flight.FlightInfo(
schema=OUTCOME_PACKET_SCHEMA,
descriptor=descriptor,
endpoints=[flight.FlightEndpoint(
ticket=flight.Ticket(key.encode()),
locations=[]
)],
total_records=total_rows,
total_bytes=-1,
)
yield info
def get_flight_info(self, context, descriptor):
key = self._descriptor_key(descriptor)
with self._lock:
batches = self._store.get(key, [])
total_rows = sum(b.num_rows for b in batches)
return flight.FlightInfo(
schema=OUTCOME_PACKET_SCHEMA,
descriptor=descriptor,
endpoints=[flight.FlightEndpoint(
ticket=flight.Ticket(key.encode()),
locations=[]
)],
total_records=total_rows,
total_bytes=-1,
)
Node client — deposit and query:
# qis_arrow_flight/node.py
import pyarrow as pa
import pyarrow.flight as flight
import time
import hashlib
import json
from typing import Optional, List
from .schema import OUTCOME_PACKET_SCHEMA
class QISArrowNode:
"""
A QIS node using Apache Arrow Flight as the routing transport.
Each node:
1. Processes data locally (raw data never leaves)
2. Distills insights into OutcomePackets (Arrow RecordBatches)
3. Deposits packets at the deterministic semantic address (FlightDescriptor path)
4. Queries for packets from similar nodes (same domain + problem type)
5. Synthesizes locally
The N(N-1)/2 synthesis paths emerge from N nodes doing steps 3-5.
Arrow Flight provides the zero-copy transport — the quadratic scaling
is in the architecture.
"""
def __init__(
self,
node_id: str,
flight_server: str = "grpc://localhost:8815",
domain: str = "ml.training",
problem_type: str = "classification",
):
self.node_id = node_id
self.domain = domain
self.problem_type = problem_type
self.client = flight.FlightClient(flight_server)
# Semantic fingerprint = deterministic address
# Other nodes with same domain+problem_type will find these packets
self.descriptor_path = [domain, problem_type]
self.descriptor = flight.FlightDescriptor.for_path(*self.descriptor_path)
self.ticket = flight.Ticket("/".join(self.descriptor_path).encode())
def _make_data_commitment(self, local_data_ref: str) -> str:
"""
Cryptographic commitment to local data. The data stays local.
The hash proves we processed real data without revealing it.
"""
return hashlib.sha256(local_data_ref.encode()).hexdigest()[:16]
def deposit_outcome(
self,
metric_name: str,
metric_value: float,
metric_delta: float,
confidence: float,
intervention_type: str,
intervention_params: dict,
outcome_decile: int,
sample_count: int,
local_data_ref: str,
model_architecture: str = "unspecified",
training_phase: str = "unspecified",
):
"""
Deposit an outcome packet to the Flight server.
This is step 3 of the QIS loop: distill local learning into a
~512-byte insight and post it to the deterministic address.
"""
packet = pa.record_batch(
{
"node_id": [self.node_id],
"timestamp_utc": [int(time.time() * 1000)],
"domain": [self.domain],
"problem_type": [self.problem_type],
"model_architecture": [model_architecture],
"training_phase": [training_phase],
"metric_name": [metric_name],
"metric_delta": [float(metric_delta)],
"metric_value": [float(metric_value)],
"confidence": [float(confidence)],
"intervention_type": [intervention_type],
"intervention_params": [json.dumps(intervention_params)],
"outcome_decile": [int(outcome_decile)],
"sample_count": [int(sample_count)],
"raw_data_hash": [self._make_data_commitment(local_data_ref)],
},
schema=OUTCOME_PACKET_SCHEMA,
)
writer, _ = self.client.do_put(
self.descriptor,
packet.schema
)
writer.write(packet)
writer.close()
print(f"[{self.node_id}] Deposited: {metric_name}={metric_value:.4f} "
f"(Δ{metric_delta:+.4f}) at {'/'.join(self.descriptor_path)}")
def query_similar_nodes(self) -> Optional[pa.Table]:
"""
Query for outcome packets from nodes with the same semantic fingerprint.
This is step 4 of the QIS loop: pull insights from your edge twins —
every node processing the same domain + problem type — without
any raw data leaving any node.
"""
try:
reader = self.client.do_get(self.ticket)
table = reader.read_all()
if table.num_rows == 0:
return None
return table
except flight.FlightServerError:
return None
def synthesize(self, packets: pa.Table) -> dict:
"""
Local synthesis: integrate relevant packets from edge twins.
This is step 5 of the QIS loop. The synthesis happens locally —
no raw data, no model weights, no centralization required.
Returns a synthesis summary: what is working for nodes like you,
mathematically, right now.
"""
if packets is None or packets.num_rows == 0:
return {"status": "no_packets", "recommendation": None}
# Filter to packets that are genuinely informative
# (confidence above threshold, positive delta)
import pyarrow.compute as pc
high_quality = packets.filter(
pc.and_(
pc.greater(packets["confidence"], 0.7),
pc.greater(packets["metric_delta"], 0.0),
)
)
if high_quality.num_rows == 0:
return {"status": "no_high_quality_packets", "recommendation": None}
# Identify the top-performing intervention
deltas = high_quality["metric_delta"].to_pylist()
interventions = high_quality["intervention_type"].to_pylist()
params = high_quality["intervention_params"].to_pylist()
deciles = high_quality["outcome_decile"].to_pylist()
best_idx = deltas.index(max(deltas))
mean_delta = sum(deltas) / len(deltas)
top_decile_count = sum(1 for d in deciles if d >= 8)
return {
"status": "synthesized",
"packets_analyzed": high_quality.num_rows,
"mean_metric_delta": round(mean_delta, 5),
"best_intervention": interventions[best_idx],
"best_params": json.loads(params[best_idx]),
"best_delta": round(deltas[best_idx], 5),
"top_decile_nodes": top_decile_count,
"recommendation": (
f"Try '{interventions[best_idx]}' with params "
f"{json.loads(params[best_idx])} — "
f"best observed delta +{deltas[best_idx]:.4f} across "
f"{high_quality.num_rows} similar nodes."
),
}
def run_loop_step(
self,
metric_name: str,
metric_value: float,
metric_delta: float,
confidence: float,
intervention_type: str,
intervention_params: dict,
outcome_decile: int,
sample_count: int,
local_data_ref: str,
**kwargs,
) -> dict:
"""
One complete QIS loop step: deposit + query + synthesize.
The full loop in three lines. The N(N-1)/2 synthesis paths emerge
from N nodes running this method concurrently.
"""
self.deposit_outcome(
metric_name, metric_value, metric_delta, confidence,
intervention_type, intervention_params, outcome_decile,
sample_count, local_data_ref, **kwargs
)
packets = self.query_similar_nodes()
return self.synthesize(packets)
Running the server and two training nodes:
# run_demo.py
import threading
import time
import pyarrow.flight as flight
from qis_arrow_flight.server import QISFlightServer
from qis_arrow_flight.node import QISArrowNode
def run_server():
server = QISFlightServer("grpc://0.0.0.0:8815")
server.serve()
# Start server in background thread
t = threading.Thread(target=run_server, daemon=True)
t.start()
time.sleep(1.0) # Give server a moment to bind
# Node A: training on GPU 0, oncology classification
node_a = QISArrowNode(
node_id="gpu-node-a",
domain="clinical.oncology",
problem_type="treatment_response_classification",
model_architecture="transformer-256d",
)
# Node B: training on GPU 1, same domain and problem
node_b = QISArrowNode(
node_id="gpu-node-b",
domain="clinical.oncology",
problem_type="treatment_response_classification",
model_architecture="transformer-256d",
)
# Node A completes epoch 5 — deposits what it learned
result_a = node_a.run_loop_step(
metric_name="val_auc",
metric_value=0.847,
metric_delta=+0.023,
confidence=0.91,
intervention_type="learning_rate_schedule",
intervention_params={"schedule": "cosine_annealing", "T_max": 10, "eta_min": 1e-6},
outcome_decile=8,
sample_count=1240,
local_data_ref="/local/oncology_data/site_a/",
training_phase="epoch_5_of_50",
)
print(f"\nNode A synthesis: {result_a}\n")
# Node B completes epoch 5 — sees Node A's packet immediately
result_b = node_b.run_loop_step(
metric_name="val_auc",
metric_value=0.831,
metric_delta=+0.014,
confidence=0.88,
intervention_type="dropout_tuning",
intervention_params={"dropout_rate": 0.3, "attention_dropout": 0.1},
outcome_decile=6,
sample_count=890,
local_data_ref="/local/oncology_data/site_b/",
training_phase="epoch_5_of_50",
)
print(f"Node B synthesis: {result_b}\n")
# Expected output:
# Node A synthesis: {status: synthesized, packets_analyzed: ...,
# recommendation: "Try 'cosine_annealing' schedule — ..."}
# Node B synthesis: {status: synthesized, packets_analyzed: 2,
# recommendation: "Try 'learning_rate_schedule' — best observed delta +0.023 ..."}
#
# Node B just learned what Node A's GPU discovered — zero raw data shared,
# zero model weight transfer, sub-millisecond delivery.
The two GPUs exchanged insights in the same format their data already lives in. No serialization. No conversion. No raw data moved.
Why Arrow Flight Changes the HPC/ML Calculus
The existing approaches to cross-node ML learning have a shared assumption: you must move model weights or gradients to aggregate knowledge.
Federated Learning moves gradients: compressed, but still proportional to model size. A 7B-parameter model's gradient is ~28GB in fp32 before any compression. For 100 nodes, federated aggregation is 2.8TB of gradient traffic per round.
QIS outcome packets are ~512 bytes each. For 100 nodes, one exchange round is 51KB — four orders of magnitude less traffic — because you are sharing what worked, not how the model was updated.
On Arrow Flight, those 512-byte packets are delivered as in-memory columnar data. The transport overhead approaches zero. The bottleneck becomes the synthesis logic on the receiving node, not the network or the serialization layer.
For HPC pipelines running on InfiniBand or NVLink, Arrow Flight's zero-copy design means outcome packets can move GPU-to-GPU without touching the CPU or the host memory bus. A training node on one GPU deposits an insight; a training node on another GPU pulls it. The data stays in GPU-accessible memory formats throughout.
| Approach | Per-node transfer | 100 nodes (1 round) | Serialization |
|---|---|---|---|
| Federated Learning (7B model) | ~28GB | ~2.8TB | Gradient→bytes→gradient |
| Model distillation | ~100MB | ~10GB | Logits→bytes→logits |
| QIS outcome packets | ~512 bytes | ~51KB | Arrow→Arrow (zero-copy) |
| QIS via Arrow Flight | ~512 bytes | ~51KB | Zero-copy, in-memory |
The quadratic scaling — 100 nodes producing 4,950 synthesis opportunities — stays constant. Arrow Flight just makes each synthesis opportunity essentially free from a transport perspective.
Where This Fits in the Architecture
Christopher Thomas Trevethan's discovery on June 16, 2025, covered under 39 provisional patents, is that pre-distilled insights routed to deterministic addresses enable real-time quadratic intelligence scaling. The seven-layer architecture is:
- Data Sources → Raw signals from sensors, databases, training corpora
- Edge Nodes → Local processing. Raw data never leaves.
-
Semantic Fingerprint →
domain/problem_typepath encodes the problem space -
Routing Layer → Arrow Flight
DoGetto the deterministicFlightDescriptorpath -
Outcome Packets → Arrow
RecordBatchrows, ~512 bytes per insight -
Local Synthesis →
synthesize()runs locally on the pulling node - External Augmentation → Optional: LLM analysis of synthesis output
The FlightDescriptor path IS the semantic address. Layer 4 is Arrow Flight. The other six layers are unchanged from ChromaDB, Qdrant, REST, Redis, Kafka, Pulsar, NATS, SQLite, MQTT, and ZeroMQ implementations. The loop is constant. The transport is a choice.
What Part 12 Has Proven
Twelve transports. One loop. Twelve different routing mechanisms, infrastructure profiles, latency characteristics, and operational models.
| Constraint | Transport |
|---|---|
| Semantic similarity routing | ChromaDB, Qdrant |
| HTTP universality | REST API |
| Real-time pub/sub | Redis |
| Durability + replay | Kafka |
| Multi-tenant geo-replication | Pulsar |
| Edge persistence | NATS JetStream |
| Zero infrastructure | SQLite, ZeroMQ |
| Constrained networks (2-byte header) | MQTT |
| Zero-copy ML/HPC pipelines | Arrow Flight |
The escalating constraint pattern is the IP argument in narrative form. If QIS were "a DHT implementation," it would only run on DHTs. It runs on everything. The discovery is the complete loop — the architecture that makes any routing mechanism produce real-time quadratic intelligence scaling.
Part 13 will explore a different constraint. Arrow Flight covers in-memory columnar speed. The loop is still the loop.
Run It
pip install pyarrow grpcio protobuf
python run_demo.py
The full code is in the QIS Protocol Notes repository.
Quadratic Intelligence Swarm was discovered by Christopher Thomas Trevethan on June 16, 2025. Covered under 39 provisional patents. The discovery is transport-agnostic by design — the quadratic scaling comes from the complete loop, not from any specific routing layer. This series proves that with working code.
Previous in series: QIS Outcome Routing with ZeroMQ
Top comments (0)