DEV Community

TOKUJI
TOKUJI

Posted on

BlackBull goes multi-protocol (part 3) — one `app.py`, two protocols

The first two posts were the engineering: part 1 rebuilt the connection
dispatcher so BlackBull's core stopped assuming HTTP, and part 2 put a lock-free,
single-owner MQTT 5 broker on top of that seam. This post is the payoff — what
all of it looks like from an application author's chair.

The short version: one file, one pip install, two protocols.


One decorator for HTTP, one for MQTT

from blackbull import BlackBull
from blackbull.mqtt import MQTTExtension, Message

app = BlackBull()
mqtt = app.add_extension(MQTTExtension(port=1883))

@app.route(path='/')
async def index():
    return "HTTP here; MQTT broker on :1883."

@mqtt.on_message(topic='sensors/{room}/temperature')
async def on_temperature(msg: Message, room: str):
    print(f"{room}: {msg.payload.decode()}")    # {room} captured like a path param

app.run(port=8000)    # HTTP on 8000, MQTT on 1883
Enter fullscreen mode Exit fullscreen mode

mqtt.on_message deliberately mirrors app.route: both decorate an async
function, both match on an address pattern, and {room} captures a topic level
the same way {task_id} captures a URL path segment. The difference is
semantics — an HTTP route is the response; an MQTT tap observes the
broker's routing, which delivers to subscribers whether or not any handler is
registered.

MQTTExtension registers itself through the single extension seam
(app.add_extension) and binds :1883. The core BlackBull class carries zero
MQTT-specific code — the broker lives entirely in blackbull.mqtt.


Drive it with the tools you already have

It's a real MQTT 5 broker — CONNECT / SUBSCRIBE / PUBLISH at QoS 0–2, retained
messages, and Last-Will — so standard clients just work:

mosquitto_sub -t 'sensors/#' -p 1883 -V 5            # terminal 1
mosquitto_pub -t 'sensors/room1/temperature' \
    -m '21.5' -p 1883 -V 5                            # terminal 2
Enter fullscreen mode Exit fullscreen mode

The message appears in the subscriber's terminal and fires the on_temperature
handler above. One process. No apt install mosquitto, no broker sidecar, no C
extension.


When HTTP and MQTT share a process

The real reason to put two protocols in one process isn't cleaner ops — it's
shared memory without a sidecar. Here's a tiny temperature dashboard that
would normally need a broker, a web server, and a Redis instance to bridge
them:

from blackbull import BlackBull
from blackbull.mqtt import MQTTExtension, Message

app = BlackBull()
mqtt = app.add_extension(MQTTExtension(port=1883))

# Plain dict — no locks, no Redis, no cross-process serialization.
latest: dict[str, float] = {}

@mqtt.on_message(topic='sensors/{room}/temperature')
async def ingest(msg: Message, room: str):
    latest[room] = float(msg.payload)

@app.route(path='/sensors')
async def dashboard():
    return latest   # {"room1": 21.5, "room2": 30.1}

app.run(port=8000)
Enter fullscreen mode Exit fullscreen mode

A sensor publishes 21.5 to sensors/room1/temperature via MQTT. The broker
routes it to subscribers and fires ingest, which updates the dictionary. A
browser hits /sensors and sees the current state as JSON. One variable, two
protocols, zero glue infrastructure.

This is what "multi-protocol" buys you in practice: the broker and the web
server live in the same memory space, so the bridge between them is just a
function call that writes to a dict. No redis-py, no pika, no
serialization — just Python.


Machine-readable docs for both protocols

BlackBull already auto-generates an OpenAPI 3.1 document from your route
signatures:

app.enable_openapi()        # publishes /openapi.json and /docs
Enter fullscreen mode Exit fullscreen mode

OpenAPI is an HTTP request/response vocabulary, though — it has nothing to say
about topics and publish/subscribe. The messaging world's counterpart is
AsyncAPI, and BlackBull ships the counterpart:

from blackbull.mqtt import AsyncAPIExtension

app.add_extension(AsyncAPIExtension(title='Sensor Gateway', version='1.0.0'))
Enter fullscreen mode Exit fullscreen mode

After app.run() the document is served at /asyncapi.json, with a CDN-hosted
HTML viewer at /asyncapi (no new Python dependency). Each on_message topic
filter becomes a channel; each callback a receive operation. It's generated
lazily, so taps registered after the extension are still documented. Your HTTP
surface describes itself with OpenAPI; your MQTT surface describes itself with
AsyncAPI; same idea, right vocabulary for each.


The whole arc

Start to finish, that's the story:

  • BlackBull decided to be a multi-protocol server.
  • It prepared the ground — a protocol-agnostic dispatcher (peek-and-replay), built behind a regression oracle so the live HTTP server broke nothing.
  • It built on that ground — a lock-free, single-owner MQTT 5 broker on the actor model, with HTTP throughput actually a little higher than before.
  • And it stayed one app.py: HTTP and MQTT side by side, sharing memory, each self-documenting — no sidecars, no C extensions.

What's next

MQTT is the first consumer of the multi-protocol seam, not the last.
@app.raw_handler already lets you bind raw TCP to a port and handle the
reader/writer yourself for custom protocols. A first-class gRPC handler — riding
the existing HTTP/2 stream, flow-control, and trailers machinery — is the
natural next step. Same pattern: register a ProtocolBinding, let the core
detect and dispatch. The core won't have to learn a thing about gRPC. That's
the whole point.


Try it

pip install 'blackbull[mqtt]'
python examples/mqtt_broker.py
Enter fullscreen mode Exit fullscreen mode

Source: github.com/TOKUJI/BlackBull
Docs: tokuji.github.io/BlackBull

Top comments (0)