Every transport in this series has done the same thing in a slightly different way: accept an outcome packet, route it by semantic address, make it available for local synthesis.
ChromaDB does it with HNSW vector search. Kafka does it with partitions. ZeroMQ does it with zero infrastructure. Arrow Flight does it at zero-copy speed.
Apache Flink does something none of the others do: it synthesizes while it routes.
Flink is not a message queue. It is not a database. It is a stateful stream processing engine — meaning the data flowing through it is not just transported but continuously transformed, aggregated, and maintained as live state inside the processing nodes themselves. When you build QIS on top of Flink, the synthesis step does not happen after the routing. It happens inside the routing infrastructure, continuously, as each new outcome packet arrives.
That is a fundamentally different execution model. This is Part 13.
The Problem This Transport Solves
Imagine 200 ML training nodes running distributed fine-tuning on a large model. Each node finishes a training batch and has a useful signal: which hyperparameters are working, which loss curves are plateauing, which data subsets are producing the best validation scores.
This is exactly the kind of intelligence that should be shared. Node 47 is overfitting on the same distribution that Node 112 just solved. Node 83 found a learning rate schedule that cut epoch time by 22%. Node 196 has already hit the same gradient vanishing problem you are about to hit.
But no one is sharing this. Why?
Because the current options are bad. You can centralize everything into a parameter server — but then the parameter server becomes the bottleneck, and at 200 nodes it is already saturated. You can run Federated Learning — but FL requires enough local data for a meaningful gradient, and it synchronizes in rounds, not continuously. You can use a message queue — but then someone has to write the consumer that reads from the queue, aggregates the intelligence, and makes it queryable by semantic domain.
What you want is a system that:
- Accepts outcome packets from all 200 nodes
- Routes each packet by semantic domain (hyperparameter tuning, loss curves, data quality)
- Continuously synthesizes across all packets arriving in the same domain
- Makes the current synthesis available for any node to query in real-time
- Handles late arrivals from nodes with intermittent connectivity
Apache Flink does all five natively.
Why Flink Is Different
The previous twelve transports in this series handled routing. Flink handles routing and synthesis simultaneously.
Here is the primitive that makes it possible: process(ProcessFunction).
A Flink ProcessFunction is a stateful operator attached to a keyed stream. Each key (in QIS terms: each semantic address) has its own isolated state, maintained in Flink's distributed state backend (RocksDB by default). When a new outcome packet arrives for a key, the ProcessFunction runs, reads the current state for that key, updates it with the new packet, writes the new state, and optionally emits a downstream result.
The state never centralizes. Each key's state lives on the TaskManager assigned to that key. If the cluster has 10 TaskManagers, the state is distributed across all 10 — sharded by semantic address, exactly like a DHT, except the state is not just an index pointer but the actual synthesized intelligence.
In QIS terms:
- keyBy(semantic_address) = semantic routing (deterministic address lookup, O(1))
- ValueState[SynthesisSummary] = local synthesis (maintained continuously as new packets arrive)
- ctx.output(synthesis_packet) = outcome packet emission (downstream nodes receive updated synthesis)
- Watermarks + event-time windows = handling late arrivals from intermittent edge nodes
The Seven-Layer Loop on Flink
The Quadratic Intelligence Swarm architecture — discovered June 16, 2025 by Christopher Thomas Trevethan, covered under 39 provisional patents — has seven layers. Here is how each maps to Flink primitives:
┌─────────────────────────────────────────────────────────────────────────┐
│ QIS LAYER FLINK PRIMITIVE │
├─────────────────────────────────────────────────────────────────────────┤
│ 1. Data Sources → StreamExecutionEnvironment.from_collection() │
│ or Kafka/Pulsar source connectors │
├─────────────────────────────────────────────────────────────────────────┤
│ 2. Edge Nodes → FlatMapFunction (local processing, raw data │
│ never leaves — only outcome packet emitted) │
├─────────────────────────────────────────────────────────────────────────┤
│ 3. Semantic → packet["semantic_key"] — domain + tier │
│ Fingerprint string assigned at the edge before emission │
├─────────────────────────────────────────────────────────────────────────┤
│ 4. Routing Layer → keyBy(lambda p: p["semantic_key"]) │
│ O(1) hash partitioning by semantic address │
├─────────────────────────────────────────────────────────────────────────┤
│ 5. Outcome → ~512-byte dict: domain, outcome, confidence, │
│ Packets timestamp, edge_id (no raw data) │
├─────────────────────────────────────────────────────────────────────────┤
│ 6. Local → ProcessFunction + ValueState[SynthesisSummary] │
│ Synthesis Continuous — runs on every arriving packet │
├─────────────────────────────────────────────────────────────────────────┤
│ 7. External → Side outputs, broadcast state, external DB │
│ Augmentation sinks (optional — loop works without this) │
└─────────────────────────────────────────────────────────────────────────┘
The key architectural difference from all previous transports: layers 4, 5, and 6 (routing, packets, synthesis) execute inside the same operator. No separate synthesis consumer. No query layer. The synthesis is live state inside the router.
Implementation
PyFlink DataStream API. Requires Flink 1.18+ with PyFlink installed.
"""
qis_flink_transport.py
QIS Outcome Routing on Apache Flink — Part 13 of the transport-agnostic proof series.
Demonstrates: keyBy() as semantic routing, ValueState as continuous synthesis,
event-time watermarks for late-arriving edge packets.
Discovery: Christopher Thomas Trevethan, June 16, 2025
Patents: 39 provisional patents covering the complete QIS architecture
"""
import json
import hashlib
import time
from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import Optional
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.functions import (
ProcessFunction,
RuntimeContext,
)
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.common import Types, WatermarkStrategy
from pyflink.common.watermark_strategy import TimestampAssigner
# ─────────────────────────────────────────────────────────
# OUTCOME PACKET — the only unit that crosses node boundaries
# Raw data NEVER leaves the edge. Only this ~512-byte structure.
# ─────────────────────────────────────────────────────────
@dataclass
class OutcomePacket:
"""
Pre-distilled insight from a single edge node.
Semantic fingerprint embedded in semantic_key — the routing address.
QIS routing requirement: any mechanism that maps packets with the same
semantic_key to the same processing slot achieves the O(1) routing target.
Flink's keyBy() satisfies this natively.
"""
edge_id: str
semantic_key: str # domain:tier:subtopic — the deterministic address
outcome: str # what worked / what failed
metric_value: float # the quantitative signal
confidence: float # 0.0–1.0
event_timestamp: int # milliseconds, for event-time watermarking
domain: str = ""
context: dict = field(default_factory=dict)
def to_flink_row(self) -> dict:
return asdict(self)
@staticmethod
def semantic_key_for(domain: str, tier: str, subtopic: str) -> str:
"""Deterministic semantic address. Same problem → same key → same slot."""
raw = f"{domain}:{tier}:{subtopic}"
fingerprint = hashlib.sha256(raw.encode()).hexdigest()[:12]
return f"{raw}:{fingerprint}"
@dataclass
class SynthesisSummary:
"""
Continuously maintained synthesis state for a single semantic address.
This is the 'local synthesis' layer of QIS — living inside Flink state.
"""
semantic_key: str
packet_count: int = 0
positive_outcomes: int = 0
total_confidence: float = 0.0
best_outcome: str = ""
best_metric: float = float('-inf')
last_updated: str = ""
contributing_edges: list = field(default_factory=list)
def update(self, packet: OutcomePacket) -> None:
self.packet_count += 1
self.total_confidence += packet.confidence
self.last_updated = datetime.utcnow().isoformat()
if packet.metric_value > 0:
self.positive_outcomes += 1
if packet.metric_value > self.best_metric:
self.best_metric = packet.metric_value
self.best_outcome = packet.outcome
if packet.edge_id not in self.contributing_edges:
self.contributing_edges.append(packet.edge_id)
@property
def success_rate(self) -> float:
if self.packet_count == 0:
return 0.0
return self.positive_outcomes / self.packet_count
@property
def avg_confidence(self) -> float:
if self.packet_count == 0:
return 0.0
return self.total_confidence / self.packet_count
def to_dict(self) -> dict:
return {
"semantic_key": self.semantic_key,
"packet_count": self.packet_count,
"success_rate": round(self.success_rate, 3),
"avg_confidence": round(self.avg_confidence, 3),
"best_outcome": self.best_outcome,
"best_metric": round(self.best_metric, 4),
"contributing_edges": len(self.contributing_edges),
"last_updated": self.last_updated,
}
# ─────────────────────────────────────────────────────────
# QIS SYNTHESIS PROCESS FUNCTION
# This is where routing and synthesis merge into one operator.
# ─────────────────────────────────────────────────────────
class QISSynthesisFunction(ProcessFunction):
"""
Flink ProcessFunction that implements the QIS synthesis layer.
For each semantic key (= semantic address = QIS routing destination):
- Maintains a ValueState[SynthesisSummary] — continuously updated
- On each incoming packet: updates state, emits current synthesis
- The synthesis is never stale: it updates the instant a new packet arrives
This is the architectural difference from queue-based transports:
synthesis is not a separate consumer — it is the operator itself.
"""
def __init__(self):
self._state = None
def open(self, runtime_context: RuntimeContext):
descriptor = ValueStateDescriptor(
"synthesis_summary",
Types.PICKLED_BYTE_ARRAY()
)
self._state = runtime_context.get_state(descriptor)
def process_element(self, packet_dict: dict, ctx: ProcessFunction.Context):
packet = OutcomePacket(**{
k: v for k, v in packet_dict.items()
if k in OutcomePacket.__dataclass_fields__
})
# Read current synthesis state for this semantic address
current = self._state.value()
if current is None:
summary = SynthesisSummary(semantic_key=packet.semantic_key)
else:
import pickle
summary = pickle.loads(current)
# Update synthesis with new packet
summary.update(packet)
# Persist updated state (lives in Flink's distributed state backend)
import pickle
self._state.update(pickle.dumps(summary))
# Emit updated synthesis as a new outcome packet downstream
# This is the output of the synthesis layer — feeds the next loop iteration
yield json.dumps({
"type": "synthesis_update",
"semantic_key": packet.semantic_key,
"synthesis": summary.to_dict(),
"trigger_edge": packet.edge_id,
"trigger_outcome": packet.outcome,
})
# ─────────────────────────────────────────────────────────
# TIMESTAMP ASSIGNER — enables event-time watermarking
# Handles late-arriving packets from intermittent edge nodes
# ─────────────────────────────────────────────────────────
class PacketTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp):
return value.get("event_timestamp", int(time.time() * 1000))
# ─────────────────────────────────────────────────────────
# QIS FLINK PIPELINE
# ─────────────────────────────────────────────────────────
def build_qis_pipeline(packets: list[dict]) -> list[str]:
"""
Builds and runs the QIS outcome routing pipeline on Flink.
Architecture:
Source → keyBy(semantic_key) → QISSynthesisFunction → Sink
keyBy() performs semantic routing: all packets with the same semantic_key
route to the same TaskManager slot — O(1), deterministic, distributed.
QISSynthesisFunction performs continuous synthesis: ValueState updated
on every packet, no separate aggregation step required.
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH) # STREAMING for live deployments
# Watermark strategy: bounded out-of-orderness (5 seconds)
# Handles late-arriving packets from intermittent edge nodes
watermark_strategy = (
WatermarkStrategy
.for_bounded_out_of_orderness(5000) # 5-second tolerance for late arrivals
.with_timestamp_assigner(PacketTimestampAssigner())
)
results = []
stream = (
env.from_collection(packets)
.assign_timestamps_and_watermarks(watermark_strategy)
# SEMANTIC ROUTING: keyBy is QIS semantic addressing
# All packets sharing a semantic_key route to the same slot
.key_by(lambda p: p["semantic_key"], key_type=Types.STRING())
# SYNTHESIS: stateful ProcessFunction — routing and synthesis in one operator
.process(QISSynthesisFunction(), output_type=Types.STRING())
)
# Collect results (in production: sink to Kafka, database, or API endpoint)
stream.add_sink(
lambda s: results.append(json.loads(s))
)
env.execute("QIS Outcome Routing — Flink Transport")
return results
# ─────────────────────────────────────────────────────────
# SIMULATION: 200 ML training nodes, 3 semantic domains
# ─────────────────────────────────────────────────────────
def simulate_ml_training_network():
"""
Simulates 200 ML training nodes emitting outcome packets across 3 domains.
Demonstrates quadratic synthesis without centralized aggregation.
"""
import random
random.seed(42)
domains = {
"hyperparameter_tuning": [
("lr_schedule", "cosine decay from 3e-4 outperforms linear warmup at step 500+"),
("lr_schedule", "linear warmup stalls after epoch 3 on this distribution"),
("batch_size", "batch 256 with gradient accumulation x4 = better throughput than 1024"),
("optimizer", "AdamW beta2=0.95 converges 18% faster than default beta2=0.999"),
],
"data_quality": [
("dedup", "near-duplicate threshold 0.85 cosine removes 12% corpus, improves val loss"),
("dedup", "aggressive dedup at 0.70 removes too much — val accuracy drops 4 points"),
("filtering", "perplexity filter >500 removes noise without losing rare domain samples"),
],
"loss_dynamics": [
("gradient_vanish", "LayerNorm before attention (pre-norm) eliminates vanishing at depth 32+"),
("plateau", "plateau at epoch 7 resolved by reducing weight decay 1e-2 → 5e-3"),
("instability", "loss spike at step 2000 — gradient clipping at 0.5 not 1.0 required"),
],
}
packets = []
now_ms = int(time.time() * 1000)
for edge_id in range(1, 201): # 200 edge nodes
domain = random.choice(list(domains.keys()))
outcomes = domains[domain]
subtopic, outcome_text = random.choice(outcomes)
tier = "production" if edge_id <= 100 else "research"
semantic_key = OutcomePacket.semantic_key_for(domain, tier, subtopic)
# Simulate intermittent connectivity: some packets arrive "late"
late_offset = random.randint(0, 8000) if random.random() < 0.15 else 0
packets.append({
"edge_id": f"node_{edge_id:03d}",
"semantic_key": semantic_key,
"outcome": outcome_text,
"metric_value": random.uniform(-0.1, 0.95),
"confidence": random.uniform(0.55, 0.99),
"event_timestamp": now_ms - late_offset,
"domain": domain,
"context": {"tier": tier, "subtopic": subtopic},
})
return packets
# ─────────────────────────────────────────────────────────
# QUADRATIC SCALING DEMONSTRATION
# ─────────────────────────────────────────────────────────
def demonstrate_quadratic_scaling():
"""
Shows N(N-1)/2 synthesis opportunities at O(1) routing cost per packet.
Same math as every other transport in the series — the loop is the discovery.
"""
print("\n─── QIS QUADRATIC SCALING — FLINK TRANSPORT ───\n")
print(f"{'Nodes':>8} {'Synthesis Pairs':>18} {'Routing Cost per Packet':>26} {'Infrastructure':>20}")
print("─" * 78)
configs = [
(10, "Local Flink"),
(100, "Local Flink"),
(1_000, "Flink 3-node cluster"),
(10_000, "Flink 10-node cluster"),
(1_000_000, "Flink 50-node cluster"),
]
for n, infra in configs:
pairs = n * (n - 1) // 2
cost = "O(1) keyBy hash"
print(f"{n:>8,} {pairs:>18,} {cost:>26} {infra:>20}")
print("\nRouting: O(1) keyBy hash partitioning")
print("Synthesis: O(1) ValueState read/write per packet")
print("Scaling: N(N-1)/2 = Θ(N²) synthesis pairs, O(1) per operation")
print("\nThe loop is the discovery. The transport is an implementation detail.")
if __name__ == "__main__":
print("QIS Outcome Routing — Apache Flink Transport (Part 13)\n")
print("Discovery: Christopher Thomas Trevethan, June 16, 2025")
print("Patents: 39 provisional patents — QIS architecture, not any single transport\n")
demonstrate_quadratic_scaling()
print("\n─── SIMULATING 200 ML TRAINING NODES ───\n")
packets = simulate_ml_training_network()
print(f"Generated {len(packets)} outcome packets across 3 semantic domains")
print(f"Packet size: ~{len(json.dumps(packets[0]))} bytes (well under 512-byte target)\n")
# Note: build_qis_pipeline() requires a running Flink environment
# For local testing without Flink cluster, see the state accumulation demo below
print("Pipeline topology:")
print(" Source[200 nodes] → keyBy(semantic_key) → QISSynthesisFunction → Sink")
print("\n keyBy(): O(1) hash routing — semantic address lookup")
print(" QISSynthesisFunction: ValueState updated on every packet — continuous synthesis")
print(" Downstream: synthesis packets fed back into network — QIS loop closed\n")
# Manual accumulation demo (runs without Flink cluster)
summaries: dict[str, SynthesisSummary] = {}
for p in packets:
key = p["semantic_key"]
if key not in summaries:
summaries[key] = SynthesisSummary(semantic_key=key)
pkt = OutcomePacket(**{k: v for k, v in p.items() if k in OutcomePacket.__dataclass_fields__})
summaries[key].update(pkt)
print("─── SYNTHESIS RESULTS BY SEMANTIC ADDRESS ───\n")
for key, summary in list(summaries.items())[:5]:
d = summary.to_dict()
domain_part = key.split(":")[0]
subtopic_part = key.split(":")[1] if ":" in key else ""
print(f"Domain: {domain_part} / {subtopic_part}")
print(f" Packets received: {d['packet_count']}")
print(f" Contributing edges: {d['contributing_edges']}")
print(f" Success rate: {d['success_rate']:.1%}")
print(f" Avg confidence: {d['avg_confidence']:.2f}")
print(f" Best outcome: {d['best_outcome'][:70]}...")
print()
unique_keys = len(summaries)
total_pairs = sum(
s.packet_count * (s.packet_count - 1) // 2
for s in summaries.values()
)
print(f"Unique semantic addresses: {unique_keys}")
print(f"Total synthesis pairs generated: {total_pairs:,}")
print(f"Total packets processed: {len(packets)}")
print(f"Packets per synthesis pair: {len(packets)/max(total_pairs,1):.3f}")
print(f"\nN(N-1)/2 synthesis pairs at O(1) routing cost. No central aggregator.")
What Flink Gets You That No Previous Transport Did
The previous twelve transports proved the loop works across routing mechanisms. Flink adds something new: the synthesis layer IS the infrastructure layer.
| Capability | Queue-Based Transports | Flink |
|---|---|---|
| Routing | ✅ | ✅ |
| Continuous synthesis | ❌ Requires separate consumer | ✅ ProcessFunction |
| Late arrival handling | ❌ | ✅ Event-time watermarks |
| Exactly-once semantics | Varies | ✅ Chandy-Lamport checkpointing |
| State per semantic address | ❌ External store required | ✅ ValueState (RocksDB) |
| Stateful replay | ❌ | ✅ Savepoints |
| Live synthesis query | ❌ Must read from sink | ✅ Queryable state (Flink 1.18+) |
| Throughput at scale | High (Kafka: millions/sec) | High (Flink: millions/sec) |
The critical difference is queryable state. In Flink 1.18+, you can query a running job's ValueState directly via the Queryable State API — meaning any node can ask "what is the current synthesis for semantic address X?" without routing a packet through the system at all. The synthesis is always live. The query latency is sub-millisecond.
This is the QIS query pattern implemented as a first-class infrastructure feature, not an application-level concern.
Handling Late-Arriving Packets from Intermittent Edge Nodes
One constraint the previous transports in this series glossed over: what happens when an edge node is offline for 20 minutes and then reconnects with 40 outcome packets from the gap?
Queue-based transports deliver them in arrival order. The synthesis consumer sees them as "new" packets and processes them chronologically from when they arrived — not from when the events occurred.
Flink's event-time processing fixes this by design. The watermark strategy in the implementation above (for_bounded_out_of_orderness(5000)) tells Flink: "hold the window open for 5 seconds of late arrivals before advancing." For longer connectivity gaps, you can configure AllowedLateness — Flink will continue processing late packets even after a window closes, using the last-known watermark as context.
For QIS networks with mobile edge nodes, satellite-connected sensors, or rural clinics with intermittent connectivity, this is not an edge case — it is the normal operating condition. Flink handles it natively. The synthesis never loses late-arriving intelligence.
The IP Argument — Part 13
Every part of this series proves the same architectural fact: the Quadratic Intelligence Swarm breakthrough is the complete loop — not any single component.
| Part | Transport | Routing Mechanism | Infrastructure |
|---|---|---|---|
| 1 | Python intro (outcome routing) | Function call | None |
| 2 | ChromaDB | HNSW vector search, O(log N) | Vector DB process |
| 3 | Qdrant | Distributed HNSW, multi-node | Qdrant cluster |
| 4 | REST API | B-tree index, O(log N) | HTTP server |
| 5 | Redis Pub/Sub | Topic matching, O(1) | Redis server |
| 6 | Apache Kafka | Partitioned durable log, O(1) | Kafka broker |
| 7 | Apache Pulsar | Namespace hierarchy, O(1) | Pulsar cluster |
| 8 | NATS JetStream | Subject routing, O(1) | NATS server |
| 9 | SQLite | B-tree index, O(log N) | None (a file) |
| 10 | MQTT | Topic matching, O(1) | Broker (1.5MB RAM) |
| 11 | ZeroMQ | Hash routing, O(1) | Nothing |
| 12 | Apache Arrow Flight | FlightDescriptor path, O(1) | Flight server |
| 13 | Apache Flink | keyBy() partitioning, O(1) | Flink cluster |
If QIS were DHT-based routing specifically — if the breakthrough were a clever use of Kademlia — this series could not exist. You cannot implement Kademlia on SQLite. You cannot implement it on ZeroMQ. You cannot implement it on Arrow Flight's FlightDescriptor.
But the QIS loop runs on all of them. Because the loop is the discovery. The routing mechanism satisfies a complexity requirement (O(log N) or better), nothing more. Any mechanism that routes outcome packets to a deterministic address by semantic similarity closes the loop. Flink's keyBy() does this in one line.
Christopher Thomas Trevethan discovered the architecture. The 39 provisional patents cover the complete loop — the routing requirement, the semantic fingerprinting, the outcome packet structure, the local synthesis, the feedback cycle that produces N(N-1)/2 synthesis opportunities at logarithmic cost. Not any single transport.
When to Choose Flink Over Other Transports
Flink belongs in your stack when:
- Continuous synthesis is required — You want the synthesis to update in real-time as packets arrive, not on a pull/query cycle
- Event-time ordering matters — Your edge nodes have intermittent connectivity and late arrivals must be handled correctly
- You already run Flink — QIS slots into an existing Flink deployment as a keyed stream with a ProcessFunction. Zero new infrastructure.
- ML/data pipeline integration — Flink has native connectors for Kafka, Pulsar, HDFS, S3, JDBC. QIS outcome packets can enter from and exit to any of these.
- Queryable intelligence — Queryable State API gives you live synthesis queries without routing packets through the system.
For constrained environments (mobile, IoT, offline-first), use MQTT or SQLite. For zero infrastructure, use ZeroMQ. For ML/HPC at memory-transfer speed, use Arrow Flight. For continuous synthesis with stateful intelligence pipelines, use Flink.
The choice is always yours. The loop works on all of them.
Running This
# Install PyFlink
pip install apache-flink
# Run the simulation (without cluster — local accumulation demo)
python qis_flink_transport.py
# For full Flink execution (requires Flink cluster or local mini-cluster):
# 1. Download Flink: https://flink.apache.org/downloads/
# 2. Start local cluster: ./bin/start-cluster.sh
# 3. Submit job: flink run -py qis_flink_transport.py
# For streaming mode (live ML training feed):
# Change: env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
# Add: Kafka source connector pointing to your training node telemetry
Series Navigation
- Part 12 — Apache Arrow Flight: Zero-copy columnar transport, FlightDescriptor as semantic address, DoExchange for real-time bilateral synthesis
- Part 13 — Apache Flink: You are here — stateful stream processing, keyBy() as semantic routing, ValueState as continuous synthesis
- Part 14: Coming next
All implementations available in the QIS Protocol repository (link pending GitHub org creation).
Quadratic Intelligence Swarm was discovered — not invented — by Christopher Thomas Trevethan on June 16, 2025. The discovery is covered under 39 provisional patents protecting the complete architecture across all routing implementations. The protocol is open: free for nonprofit, research, and educational use. Commercial licenses fund deployment to underserved communities worldwide.
Top comments (0)