Who this is for: IoT platform engineers, data engineers, and anyone who wants to turn raw device uplinks into analytics ready JSON.
What you’ll build: A Node.js script that subscribes to device uplinks, auto‑detects common payload encodings (JSON, Base64 and a demo Hex/TLV), and normalizes them into a single schema for storage, alerting or streaming.Note: This tutorial uses example payloads and a simplified TLV layout for teaching purposes. Replace it with your official spec in production.
Table of contents
- Prereqs & setup
- Topic & payload examples
- Parsing strategy
- Full script
- Run it
- Pitfalls & next steps
1) Prereqs & setup
- Node.js ≥ 18
- A reachable MQTT broker (local or hosted)
- Install dependencies:
npm init -y
npm install mqtt dotenv yargs
Create a .env
file:
MQTT_URL=mqtts://broker.example.com:8883
MQTT_USERNAME=your-username
MQTT_PASSWORD=your-password
MQTT_TOPIC=devices/+/up
The topic devices/+/up
subscribes to all devices (the +
wildcard matches one level). Adjust it to your scheme.
2) Topic & payload examples
Here are two demo payloads you might see from a device:
JSON payload (typical)
{
"deviceId": "ELK-123456",
"ts": "2025-09-08T06:01:22Z",
"gnss": { "lat": 22.543096, "lon": 114.057865, "speedKph": 48.3, "heading": 180 },
"batteryPct": 87,
"status": ["IGN_ON", "MOVING"]
}
Hex/TLV payload (demo)
# TLV uses Tag(1B) + Len(1B) + Value(N) — example fields:
01 07 454C4B2D313233343536 # tag=0x01 deviceId ASCII("ELK-123456")
02 04 66 A2 9C 16 # tag=0x02 ts epoch (uint32 BE)
03 04 01 35 E0 F0 # tag=0x03 lat_i32 /1e6
04 04 04 4D A4 A1 # tag=0x04 lon_i32 /1e6
05 02 01 E5 # tag=0x05 speed 0.1 kph
06 01 57 # tag=0x06 battery pct
3) Parsing strategy
To normalise these payloads, we detect the type of the incoming data and convert it into a consistent schema. The script:
-
Detects JSON: payloads starting with
{
or[
are parsed as JSON. -
Detects Base64: strings that decode to valid JSON or TLV when run through
Buffer.from(s, 'base64')
. - Detects Hex/TLV: pure hex strings with even length are parsed as TLV; tags define fields such as deviceId, timestamp, coordinates, speed, battery, etc.
-
Fallback: if none of the above apply, treat the message as plain text and include it as
_raw
for audit.
The output format looks like this:
{
sourceTopic: string,
rawFormat: 'json' | 'base64|json' | 'base64|tlv' | 'hex|tlv' | 'text',
deviceId?: string,
ts?: string,
lat?: number,
lon?: number,
speedKph?: number,
heading?: number,
batteryPct?: number,
statusFlags?: { ignOn?: boolean, moving?: boolean, tamper?: boolean },
_raw?: string
}
4) Full script
Below is the full Node.js script (parse-eelink-mqtt.js
). It subscribes to the topic, decodes each payload, normalises fields and writes the result to stdout or an NDJSON file. Replace the TLV parser with your actual protocol in production.
#!/usr/bin/env node
/* eslint-disable no-console */
const mqtt = require('mqtt');
const fs = require('fs');
const path = require('path');
const { hideBin } = require('yargs/helpers');
const yargs = require('yargs/yargs');
require('dotenv').config();
const argv = yargs(hideBin(process.argv))
.option('broker', { type: 'string', default: process.env.MQTT_URL })
.option('username', { type: 'string', default: process.env.MQTT_USERNAME })
.option('password', { type: 'string', default: process.env.MQTT_PASSWORD })
.option('topic', { type: 'string', default: process.env.MQTT_TOPIC || 'devices/+/up' })
.option('save', { type: 'string' })
.option('insecure', { type: 'boolean', default: false })
.help().argv;
const outStream = argv.save ? fs.createWriteStream(path.resolve(argv.save), { flags: 'a' }) : null;
// Helpers: isLikelyJSON(), isBase64ish(), fromBase64ish(), hexBufferFromStringIfValid(),
// parseTLV(), decodePayload(), validateAndClean() … see full version for details.
const client = mqtt.connect(argv.broker, {
username: argv.username,
password: argv.password,
clientId: `eelink-parser-${Math.random().toString(16).slice(2)}`,
rejectUnauthorized: !argv.insecure,
});
client.on('connect', () => {
console.log('[MQTT] connected', argv.broker);
client.subscribe(argv.topic, { qos: 1 }, (err) => {
if (err) console.error('subscribe error:', err.message);
else console.log('[MQTT] subscribed:', argv.topic);
});
});
client.on('message', (topic, payloadBuf) => {
try {
const rec = validateAndClean(decodePayload(topic, payloadBuf));
const line = JSON.stringify(rec);
console.log(line);
if (outStream) outStream.write(line + '\n');
} catch (e) {
console.error('decode error:', e.message);
}
});
5) Run it and sample output
You can run the script either using the .env
file or by passing CLI options. For example:
# using .env
node parse-eelink-mqtt.js --save out.ndjson
# specifying everything on the command line
node parse-eelink-mqtt.js \
--broker mqtts://broker.example.com:8883 \
--username alice --password secret \
--topic devices/+/up \
--save out.ndjson --insecure
When messages arrive, the script prints a normalized JSON record per message. Here's an example:
{"sourceTopic":"devices/ELK-123456/up","rawFormat":"json","deviceId":"ELK-123456","ts":"2025-09-08T06:01:22.000Z","lat":22.543096,"lon":114.057865,"speedKph":48.3,"heading":180,"batteryPct":87,"statusFlags":{"ignOn":true,"moving":true,"tamper":false}}
{"sourceTopic":"devices/ELK-654321/up","rawFormat":"hex|tlv","deviceId":"ELK-654321","ts":"2025-09-08T06:02:10.000Z","lat":22.54301,"lon":114.0579,"speedKph":50.2,"batteryPct":83}
Each line is valid JSON, so you can feed the NDJSON directly into analytics engines, message brokers or databases.
6) Pitfalls & next steps
- Use
--insecure
only for testing; in production you should always validate certificates or use a trusted CA. - Deduplicate messages based on
deviceId
andts
when writing to a database to avoid duplicate inserts. - For higher throughput, avoid synchronous writes inside the message handler; use a queue or batch writes.
- Replace the demo TLV decoder with your device's official protocol specification.
Normalizing uplinks at the edge allows you to quickly ingest and monitor Eelink data within your data pipelines.
Top comments (0)