Every time I worked with MQTT in TypeScript, I had the same problem.
You subscribe to a topic, get a Buffer back, do JSON.parse(message.toString()), and cross your fingers that the payload matches what you expect. No types. No validation. Just vibes — and occasional runtime explosions in production.
// The classic "pray it works" pattern
client.subscribe("devices/+/status")
client.on("message", (topic, payload) => {
const data = JSON.parse(payload.toString()) // any 🙏
const deviceId = topic.split("/")[1] // manually extracting params
if (!data.online) alertDeviceOffline(deviceId) // is this even a boolean?
})
After the third time a device sent a malformed payload and silently broke my dashboard, I decided to fix this properly.
The Problem With MQTT + TypeScript
MQTT is a great protocol — lightweight, async, perfect for IoT. But the DX in TypeScript is rough:
- Payloads are untyped — you cast or assert, both feel wrong
-
Topic parameters are invisible —
devices/:deviceId/statusgives you no help extractingdeviceId; you parse strings manually -
Wildcard subscriptions are stringly-typed —
devices/+/statusworks, but the callback knows nothing about what matched - No runtime validation — a bad payload from a misbehaving device can silently corrupt your state or crash a handler
The result: defensive typeof checks scattered everywhere, manual split("/") to get params, and bugs that only show up when a device sends something unexpected at 3am.
Enter topiq
topiq is a small TypeScript library that wraps mqtt.js and adds a typed topic layer on top. The core idea: define topics once with Zod, get fully typed .on(), .emit(), and .stream() everywhere.
Before vs After
Before (plain mqtt.js):
import mqtt from "mqtt"
const client = mqtt.connect("mqtt://broker.example.com")
client.subscribe("devices/+/status")
client.on("message", (topic, payload) => {
const parts = topic.split("/")
const deviceId = parts[1] // hope this index is right
const data = JSON.parse(payload.toString()) as {
online: boolean
battery: number
} // no validation, just wishful casting
if (!data.online) alertDeviceOffline(deviceId)
updateDashboard(deviceId, { battery: data.battery })
})
After (topiq):
import { topic, topiq } from "topiq"
import { z } from "zod"
const deviceStatus = topic("devices/:deviceId/status", z.object({
online: z.boolean(),
battery: z.number(),
}))
const client = topiq(
{ host: "broker.example.com" },
{ topics: { deviceStatus } }
)
const unsubscribe = client.on(deviceStatus, (data, { topic, params }) => {
// data: { online: boolean, battery: number } — inferred from your schema
// params: { deviceId: string } — extracted from "devices/:deviceId/status"
// topic: e.g. "devices/abc-123/status" — the raw incoming string
if (!data.online) alertDeviceOffline(params.deviceId)
updateDashboard(params.deviceId, { battery: data.battery })
})
// Clean up when done
unsubscribe()
data is fully typed and validated at runtime by Zod. If the payload doesn't match the schema, the message is silently discarded — no crashes, no noise, no defensive checks. .on() returns an unsubscribe function so cleanup is trivial.
How params work:
{ deviceId: string }is inferred directly from the path"devices/:deviceId/status"— no manual string splitting. Under the hood, topiq uses tail-recursive TypeScript utilities to extract params from arbitrarily deep paths without hitting compiler limits.
Publishing — emit()
To publish, you build a concrete topic string from the definition and pass the typed payload:
// topic.build() returns a typed concrete string — params are fully typed
client.emit(deviceStatus.build({ deviceId: "abc-123" }), {
online: true,
battery: 87,
// wrong fields → compile error
})
// You can also pass the string directly — TypeScript enforces the pattern
client.emit("devices/abc-123/status", { online: true, battery: 87 })
// "devices/+/status" → compile error, wildcards not allowed in emit
The distinction between Topic (with wildcards, for subscribe/stream) and ConcreteTopic (no wildcards, for emit) is enforced at the type level — you can't accidentally publish to a wildcard pattern.
What's happening at the type level
This is where it gets interesting. build() doesn't just return string — it returns a precise literal type computed from your path and the params you passed:
const deviceStatus = topic("devices/:deviceId/status", z.object({ ... }))
deviceStatus.build({ deviceId: "abc-123" })
// return type: "devices/abc-123/status"
// not string — the exact literal
And ConcreteTopic<T> converts the wildcard pattern into a ${string} template literal, so TypeScript can validate arbitrary strings at compile time:
// ConcreteTopic<"devices/+/status"> resolves to "devices/${string}/status"
client.emit("devices/abc-123/status", data) // ✅
client.emit("devices/+/status", data) // ❌ compile error — wildcard not assignable
client.emit("sensors/abc-123/status", data) // ❌ compile error — wrong pattern
Topics without params also work correctly — build() takes zero arguments and TypeScript enforces it:
const ping = topic("/ping", z.object({ ts: z.number() }))
ping.build() // ✅ no params required
ping.build({ id: "x" }) // ❌ compile error — unexpected argument
All of this is powered by tail-recursive conditional types that handle arbitrarily deep paths without hitting TypeScript's recursion limit.
Streaming — stream()
For continuous data processing, topiq exposes an async iterable API with AbortSignal support:
import { topic, topiq } from "topiq"
import { z } from "zod"
const telemetry = topic("devices/:deviceId/telemetry", z.object({
temperature: z.number(),
humidity: z.number(),
}))
const client = topiq({ host: "broker.example.com" }, { topics: { telemetry } })
const controller = new AbortController()
for await (const { data, topic: rawTopic } of client.stream(telemetry, controller.signal)) {
// data: { temperature: number, humidity: number }
console.log(`[${rawTopic}] ${data.temperature}°C / ${data.humidity}%`)
}
// Stop the stream from anywhere
controller.abort()
No event emitter juggling. No manual cleanup. Just iterate and abort when done.
TLS
Pass tls: true to switch to MQTTS automatically (port 8883), or provide certs explicitly:
// Simple — topiq switches to mqtts:// and port 8883
const client = topiq({ host: "broker.example.com", tls: true }, { topics })
// With certificates (mTLS)
const client = topiq({
host: "broker.example.com",
tls: {
ca: fs.readFileSync("ca.crt"),
cert: fs.readFileSync("client.crt"),
key: fs.readFileSync("client.key"),
},
}, { topics })
Current Status
topiq is at v0.1.0 — stable enough for production use (I'm running it in a real IoT project), but the API may still evolve before v1.0. I'm sharing it now specifically to get feedback from people who actually deal with MQTT day to day.
If you work with IoT, home automation, industrial systems, or any real-time messaging over MQTT, I'd love to hear what you think — especially what's missing or what edge cases I haven't considered.
npm install topiq
# or
bun add topiq
GitHub: github.com/joao-coimbra/topiq
Have you dealt with untyped MQTT payloads in production? How did you handle it? Drop a comment — always curious how others approach this.
Top comments (0)