DEV Community

Cover image for I Got Tired of Untyped MQTT Payloads, So I Built a Library
João Coimbra
João Coimbra

Posted on

I Got Tired of Untyped MQTT Payloads, So I Built a Library

Every time I worked with MQTT in TypeScript, I had the same problem.

You subscribe to a topic, get a Buffer back, do JSON.parse(message.toString()), and cross your fingers that the payload matches what you expect. No types. No validation. Just vibes — and occasional runtime explosions in production.

// The classic "pray it works" pattern
client.subscribe("devices/+/status")

client.on("message", (topic, payload) => {
  const data = JSON.parse(payload.toString()) // any 🙏
  const deviceId = topic.split("/")[1]        // manually extracting params

  if (!data.online) alertDeviceOffline(deviceId) // is this even a boolean?
})
Enter fullscreen mode Exit fullscreen mode

After the third time a device sent a malformed payload and silently broke my dashboard, I decided to fix this properly.

The Problem With MQTT + TypeScript

MQTT is a great protocol — lightweight, async, perfect for IoT. But the DX in TypeScript is rough:

  • Payloads are untyped — you cast or assert, both feel wrong
  • Topic parameters are invisibledevices/:deviceId/status gives you no help extracting deviceId; you parse strings manually
  • Wildcard subscriptions are stringly-typeddevices/+/status works, but the callback knows nothing about what matched
  • No runtime validation — a bad payload from a misbehaving device can silently corrupt your state or crash a handler

The result: defensive typeof checks scattered everywhere, manual split("/") to get params, and bugs that only show up when a device sends something unexpected at 3am.

Enter topiq

topiq is a small TypeScript library that wraps mqtt.js and adds a typed topic layer on top. The core idea: define topics once with Zod, get fully typed .on(), .emit(), and .stream() everywhere.

Before vs After

Before (plain mqtt.js):

import mqtt from "mqtt"

const client = mqtt.connect("mqtt://broker.example.com")

client.subscribe("devices/+/status")

client.on("message", (topic, payload) => {
  const parts = topic.split("/")
  const deviceId = parts[1] // hope this index is right

  const data = JSON.parse(payload.toString()) as {
    online: boolean
    battery: number
  } // no validation, just wishful casting

  if (!data.online) alertDeviceOffline(deviceId)
  updateDashboard(deviceId, { battery: data.battery })
})
Enter fullscreen mode Exit fullscreen mode

After (topiq):

import { topic, topiq } from "topiq"
import { z } from "zod"

const deviceStatus = topic("devices/:deviceId/status", z.object({
  online: z.boolean(),
  battery: z.number(),
}))

const client = topiq(
  { host: "broker.example.com" },
  { topics: { deviceStatus } }
)

const unsubscribe = client.on(deviceStatus, (data, { topic, params }) => {
  // data: { online: boolean, battery: number } — inferred from your schema
  // params: { deviceId: string } — extracted from "devices/:deviceId/status"
  // topic: e.g. "devices/abc-123/status" — the raw incoming string
  if (!data.online) alertDeviceOffline(params.deviceId)
  updateDashboard(params.deviceId, { battery: data.battery })
})

// Clean up when done
unsubscribe()
Enter fullscreen mode Exit fullscreen mode

data is fully typed and validated at runtime by Zod. If the payload doesn't match the schema, the message is silently discarded — no crashes, no noise, no defensive checks. .on() returns an unsubscribe function so cleanup is trivial.

How params work: { deviceId: string } is inferred directly from the path "devices/:deviceId/status" — no manual string splitting. Under the hood, topiq uses tail-recursive TypeScript utilities to extract params from arbitrarily deep paths without hitting compiler limits.

Publishing — emit()

To publish, you build a concrete topic string from the definition and pass the typed payload:

// topic.build() returns a typed concrete string — params are fully typed
client.emit(deviceStatus.build({ deviceId: "abc-123" }), {
  online: true,
  battery: 87,
  // wrong fields → compile error
})

// You can also pass the string directly — TypeScript enforces the pattern
client.emit("devices/abc-123/status", { online: true, battery: 87 })
// "devices/+/status" → compile error, wildcards not allowed in emit
Enter fullscreen mode Exit fullscreen mode

The distinction between Topic (with wildcards, for subscribe/stream) and ConcreteTopic (no wildcards, for emit) is enforced at the type level — you can't accidentally publish to a wildcard pattern.

What's happening at the type level

This is where it gets interesting. build() doesn't just return string — it returns a precise literal type computed from your path and the params you passed:

const deviceStatus = topic("devices/:deviceId/status", z.object({ ... }))

deviceStatus.build({ deviceId: "abc-123" })
// return type: "devices/abc-123/status"
// not string — the exact literal
Enter fullscreen mode Exit fullscreen mode

And ConcreteTopic<T> converts the wildcard pattern into a ${string} template literal, so TypeScript can validate arbitrary strings at compile time:

// ConcreteTopic<"devices/+/status"> resolves to "devices/${string}/status"

client.emit("devices/abc-123/status", data) // ✅
client.emit("devices/+/status", data)        // ❌ compile error — wildcard not assignable
client.emit("sensors/abc-123/status", data)  // ❌ compile error — wrong pattern
Enter fullscreen mode Exit fullscreen mode

Topics without params also work correctly — build() takes zero arguments and TypeScript enforces it:

const ping = topic("/ping", z.object({ ts: z.number() }))

ping.build()             // ✅ no params required
ping.build({ id: "x" }) // ❌ compile error — unexpected argument
Enter fullscreen mode Exit fullscreen mode

All of this is powered by tail-recursive conditional types that handle arbitrarily deep paths without hitting TypeScript's recursion limit.

Streaming — stream()

For continuous data processing, topiq exposes an async iterable API with AbortSignal support:

import { topic, topiq } from "topiq"
import { z } from "zod"

const telemetry = topic("devices/:deviceId/telemetry", z.object({
  temperature: z.number(),
  humidity: z.number(),
}))

const client = topiq({ host: "broker.example.com" }, { topics: { telemetry } })

const controller = new AbortController()

for await (const { data, topic: rawTopic } of client.stream(telemetry, controller.signal)) {
  // data: { temperature: number, humidity: number }
  console.log(`[${rawTopic}] ${data.temperature}°C / ${data.humidity}%`)
}

// Stop the stream from anywhere
controller.abort()
Enter fullscreen mode Exit fullscreen mode

No event emitter juggling. No manual cleanup. Just iterate and abort when done.

TLS

Pass tls: true to switch to MQTTS automatically (port 8883), or provide certs explicitly:

// Simple — topiq switches to mqtts:// and port 8883
const client = topiq({ host: "broker.example.com", tls: true }, { topics })

// With certificates (mTLS)
const client = topiq({
  host: "broker.example.com",
  tls: {
    ca: fs.readFileSync("ca.crt"),
    cert: fs.readFileSync("client.crt"),
    key: fs.readFileSync("client.key"),
  },
}, { topics })
Enter fullscreen mode Exit fullscreen mode

Current Status

topiq is at v0.1.0 — stable enough for production use (I'm running it in a real IoT project), but the API may still evolve before v1.0. I'm sharing it now specifically to get feedback from people who actually deal with MQTT day to day.

If you work with IoT, home automation, industrial systems, or any real-time messaging over MQTT, I'd love to hear what you think — especially what's missing or what edge cases I haven't considered.

npm install topiq
# or
bun add topiq
Enter fullscreen mode Exit fullscreen mode

GitHub: github.com/joao-coimbra/topiq


Have you dealt with untyped MQTT payloads in production? How did you handle it? Drop a comment — always curious how others approach this.

Top comments (0)