What is MQTT ?
MQTT (Message Queuing Telemetry Transport) is an open source, lightweight messaging protocol, optimized for low latency. This protocol provides a callable and cost-efficient way to connect devices using a publish/subscribe model. A communication system built on MQTT consists of the publishing server, a broker and one or more clients. It is designed for constrained devices and low-bandwidth, high-latency or unreliable networks.
Pigeon MQTT Nestjs
Pigeon MQTT Nestjs is broker that can run on any stream server
Installation
Install From NPM :
$ npm i pigeon-mqtt-nest
Usage
Pigeon will register as a global module. You can
import with configuration
@Module({
imports: [
PigeonModule.forRoot({
port: 1884,
id: "binarybeast",
concurrency:100,
queueLimit:42,
maxClientsIdLength:23,
connectTimeout:30000,
heartbeatInterval:60000,
})
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {
}
- options
<object>-
mq<MQEmitter>middleware used to deliver messages to subscribed clients. In a cluster environment it is used also to share messages between brokers instances. Default:mqemitter -
concurrency<number>maximum number of concurrent messages delivered bymq. Default:100 -
persistence<Persistence>middleware that stores QoS > 0, retained, will packets and subscriptions. Default:aedes-persistence(in memory) -
queueLimit<number>maximum number of queued messages before client session is established. If number of queued items exceeds,connectionErrorthrows an errorClient queue limit reached. Default:42 -
maxClientsIdLengthoption to override MQTT 3.1.0 clients Id length limit. Default:23 -
heartbeatInterval<number>an interval in millisconds at which server beats its health signal in$SYS/<aedes.id>/heartbeattopic. Default:60000 -
id<string>aedes broker unique identifier. Default:uuidv4() -
connectTimeout<number>maximum waiting time in milliseconds waiting for a [CONNECT][CONNECT] packet. Default:30000
-
Handlers
| Handler | Emitted When |
|---|---|
| preConnect | Invoked when server receives a valid CONNECT packet. |
| authenticate | Invoked after preConnect. |
| authorizePublish | publish LWT to all online clients,incoming client publish |
| authorizeSubscribe | restore subscriptions in non-clean session.,incoming client SUBSCRIBE
|
| published | same as Event: publish, but provides a backpressure functionality. |
Handler: preConnect
- client:
<Client> - packet:
<object>[CONNECT][CONNECT] - callback:
<Function>(error, successful) => void- error
<Error>|null - successful
<boolean>
- error
Invoked when server receives a valid [CONNECT][CONNECT] packet. The packet can be modified.
client object is in default state. If invoked callback with no errors and successful be true, server will
continue to establish a session.
Any error will be raised in connectionError event.
Some Use Cases:
- Rate Limit / Throttle by
client.conn.remoteAddress - Check
aedes.connectedClientto limit maximum connections - IP blacklisting
@Injectable()
export class TestService {
@onPreConnect()
onPreConnect(@Client() client, @Packet() packet, @Function() done) {
console.log("Function: @onPreConnect()");
return done(null, true);
}
}
Handler: authenticate
- client:
<Client> - credential:
<string> - callback:
<Function>(error, successful) => void- error
<Error>|null - successful
<boolean>
- error
Invoked after preConnect.
Server parses the [CONNECT][CONNECT] packet, initializes client object which set client.id to match the one
in [CONNECT][CONNECT] packet and extract username and password as parameters for user-defined authentication flow.
If invoked callback with no errors and successful be true, server authenticates client and continues to setup
the client session.
If authenticated, server acknowledges a [CONNACK][CONNACK] with returnCode=0, otherwise returnCode=5. Users could
define the value between 2 and 5 by defining a returnCode property in error object.
@Injectable()
export class TestService {
@onAuthenticate()
onAuthenticate(@Client() client, @Credential() credential, @Function() done) {
console.log("Function: @onAuthenticate()");
return done(null, true);
}
}
@Injectable()
export class TestService {
@onAuthenticate()
onAuthenticate(@Client() client, @Credential() credential, @Function() done) {
console.log("Function: @onAuthenticate()");
var error = new Error('Auth error')
error.returnCode = 4
return done(error, false);
}
}
Please refer to Connect Return Code
to see their meanings.
Handler: authorizePublish
- client:
<Client>|null - packet:
<object>[PUBLISH][PUBLISH] - callback:
<Function>(error) => void- error
<Error>|null
- error
Invoked when
- publish LWT to all online clients
- incoming client publish
client is null when aedes publishes obsolete LWT without connected clients
If invoked callback with no errors, server authorizes the packet otherwise emits clientError with error. If
an error occurs the client connection will be closed, but no error is returned to the client (MQTT-3.3.5-2)
@Injectable()
export class TestService {
@onAuthorizePublish()
onAuthorizePublish(@Client() client, @Packet() packet, @Function() done) {
console.log("Function: @onAuthorizePublish()");
if (packet.topic === 'aaaa') {
return done(new Error('wrong topic'))
}
if (packet.topic === 'bbb') {
packet.payload = Buffer.from('overwrite packet payload')
}
return done(null);
}
}
By default authorizePublish throws error in case a client publish to topics with $SYS/ prefix to prevent possible
DoS (see #597). If you write your own implementation
of authorizePublish we suggest you to add a check for this. Default implementation:
@Injectable()
export class TestService {
@onAuthorizePublish()
onAuthorizePublish(@Client() client, @Packet() packet, @Function() done) {
if (packet.topic.startsWith($SYS_PREFIX)) {
return done(new Error($SYS_PREFIX + ' topic is reserved'))
}
return done(null);
}
}
Handler: authorizeSubscribe
- client:
<Client> - subscription:
<object> - callback:
<Function>(error) => void- error
<Error>|null - subscription:
<object>|null
- error
Invoked when
- restore subscriptions in non-clean session.
- incoming client [
SUBSCRIBE][SUBSCRIBE]
subscription is a dictionary object like { topic: hello, qos: 0 }.
If invoked callback with no errors, server authorizes the packet otherwise emits clientError with error.
In general user should not touch the subscription and pass to callback, but server gives an option to change the
subscription on-the-fly.
@Injectable()
export class TestService {
@onAuthorizeSubscribe()
onAuthorizeSubscribe(@Client() client, @Subscription() subscription, @Function() done) {
console.log("Function: @onAuthorizeSubscribe()");
if (subscription.topic === 'aaaa') {
return done(new Error('wrong topic'))
}
if (subscription.topic === 'bbb') {
// overwrites subscription
subscription.topic = 'foo'
subscription.qos = 1
}
return done(null, subscription);
}
}
To negate a subscription, set the subscription to null. Aedes ignores the negated subscription and the qos
in SubAck is set to 128 based
on MQTT 3.11 spec:
@Injectable()
export class TestService {
@onAuthorizeSubscribe()
onAuthorizeSubscribe(@Client() client, @Subscription() subscription, @Function() done) {
done(null, subscription.topic === 'aaaa' ? null : sub)
}
}
Handler: published
- packet:
<aedes-packet>& [PUBLISH][PUBLISH] - client:
<Client> - callback:
<Function>
same as Event: publish, but provides a backpressure functionality. TLDR; If you are doing operations
on packets that MUST require finishing operations on a packet before handling the next one use this otherwise,
especially for long-running operations, you should use Event: publish instead.
@Injectable()
export class TestService {
@onPublish()
OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {
console.log("Function: @OnPublish()");
}
}
Events
| Name | Emitted When |
|---|---|
| Client |
client registers itself to server
|
| Client Ready |
client has received all its offline messages and be initialized. |
| Client Disconnect |
client disconnects. |
| Client Error | an error occurs. |
| Connection Error | an error occurs. |
| Keep Alive Timeout | timeout happes in the client keepalive. |
| Publish |
servers delivers the packet to subscribed client. |
| Ack |
packet successfully delivered to the client. |
| Subscribe |
client successfully subscribe the subscriptions in server. |
| Unsubscribe |
client successfully unsubscribe the subscriptions in server. |
| Connack Sent |
server sends an acknowledge to client. |
| Closed |
server is closed. |
Event: client
-
client<Client>
Emitted when the client registers itself to server. The client is not ready yet.
Its connecting state equals to true.
Server publishes a SYS topic $SYS/<aedes.id>/new/clients to inform it registers the client into its registration
pool. client.id is the payload.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onClient()
OnNewClient(@Client() client) {
console.log("Function: @onClient()");
}
}
Event: clientReady
-
client<Client>
Emitted when the client has received all its offline messages and be initialized.
The client connected state equals to true and is ready for processing incoming
messages.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onClientReady()
async onClientReady(@Client() client) {
console.log("Function: @onClientReady()");
}
}
Event: clientDisconnect
-
client<Client>
Emitted when a client disconnects.
Server publishes a SYS topic $SYS/<aedes.id>/disconnect/clients to inform it deregisters the client. client.id is
the payload.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onClientDisconnect()
OnClientDisconnect(@Client() client) {
console.log("Function: @OnClientDisconnect()");
}
}
Event: clientError
-
client<Client> -
error<Error>
Emitted when an error occurs.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onClientError()
OnClientError(@Client() client, @Error() error) {
console.log("Function: @onClientError()");
}
}
Event: connectionError
-
client<Client> -
error<Error>
Emitted when an error occurs. Unlike clientError it raises only when client is uninitialized.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onConnectionError()
OnConnectionError(@Client() client, @Error() error) {
console.log("Function: @OnConnectionError()");
}
}
Event: keepaliveTimeout
-
client<Client>
Emitted when timeout happes in the client keepalive.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onKeepLiveTimeout()
onKeepLiveTimeout(@Client() client) {
console.log("Function: @onKeepLiveTimeout()");
}
}
Event: publish
-
packet<aedes-packet>& [PUBLISH][PUBLISH] -
client<Client>|null
Emitted when servers delivers the packet to subscribed client. If there are no clients subscribed to the packet
topic, server still publish the packet and emit the event. client is null when packet is an internal message
like aedes heartbeat message and LWT.
Note!
packetbelongsaedes-packettype. Some properties belongs to aedes internal, any changes on them will break aedes internal flow.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onPublish()
OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {
console.log("Function: @OnPublish()");
}
}
Event: ack
-
packet<object>[PUBLISH][PUBLISH] for QoS 1, [PUBREL][PUBREL] for QoS 2 -
client<Client>
Emitted an QoS 1 or 2 acknowledgement when the packet successfully delivered to the client.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onAck()
onAck(@Client() client, @Packet() packet) {
console.log("Function: @onAck()");
}
}
Event: subscribe
-
subscriptions<object> -
client<Client>
Emitted when client successfully subscribe the subscriptions in server.
subscriptions is an array of { topic: topic, qos: qos }. The array excludes duplicated topics and includes negated
subscriptions where qos equals to 128. See more
on authorizeSubscribe
Server publishes a SYS topic $SYS/<aedes.id>/new/subscribers to inform a client successfully subscribed to one or more
topics. The payload is a JSON that has clientId and subs props, subs equals to subscriptions array.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onSubscribe()
OnSubscribe(@Subscription() subscription, @Client() client) {
console.log("Function: @OnSubscribe()");
}
}
Event: unsubscribe
-
unsubscriptionsArray<string> -
client<Client>
Emitted when client successfully unsubscribe the subscriptions in server.
unsubscriptions are an array of unsubscribed topics.
Server publishes a SYS topic $SYS/<aedes.id>/new/unsubscribers to inform a client successfully unsubscribed to one or
more topics. The payload is a JSON that has clientId and subs props, subs equals to unsubscriptions array.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onUnsubscribe()
OnUnsubscribe(@Subscription() subscription, @Client() client) {
console.log("Function: @OnUnsubscribe()");
}
}
Event: connackSent
-
packet<object>[CONNACK][CONNACK] -
client<Client>
Emitted when server sends an acknowledge to client. Please refer to the MQTT specification for the explanation of
returnCode object property in CONNACK.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onConnackSent()
onConnackSent(@Client() client, @Packet() packet) {
console.log("Function: @onConnackSent()");
}
}
Event: closed
Emitted when server is closed.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onClosed()
onClosed(@Client() client, @Packet() packet) {
console.log("Function: @onClosed()");
}
}
Methods
| Method | Emitted When |
|---|---|
| Publish | Directly deliver packet on behalf of server to subscribed clients. |
| Close | Close aedes server and disconnects all clients. |
Method: Publish
-
packet<object>[PUBLISH][PUBLISH]
Directly deliver packet on behalf of server to subscribed clients.
Bypass authorizePublish.
callback will be invoked with error arugments after finish.
@Injectable()
export class TestService {
//inject Pigeon Service
constructor(@Inject(PigeonService) private readonly pigeonService: PigeonService) {
}
@onPublish()
async OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {
//use this method to publish
await this.pigeonService.publish({
topic: "test2", qos: 0, cmd: "publish", payload: "", dup: false, retain: false
});
}
}
Method: Close
- callback:
<Function>
Close aedes server and disconnects all clients.
callback will be invoked when server is closed.
@Injectable()
export class TestService {
//inject Pigeon Service
constructor(@Inject(PigeonService) private readonly pigeonService: PigeonService) {
}
@onPublish()
async OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {
//use this method to publish
await this.pigeonService.close();
}
}
Method: Get Broker Instance
get broker instance
@Injectable()
export class TestService {
//inject Pigeon Service
constructor(@Inject(PigeonService) private readonly pigeonService: PigeonService) {
}
@onPublish()
async OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {
await this.pigeonService.getBrokerInstance();
}
}
Packets
This section describes the format of all packets
| Packet | - |
|---|---|
| Connect | --- |
| Connack | --- |
| Subscribe | --- |
| Suback | --- |
| Unsubscribe | --- |
| Unsuback | --- |
| Publish | --- |
| Puback | --- |
| Pubrec | --- |
| Pubrel | --- |
| Pubcomp | --- |
| Pingreq | --- |
| Pingresp | --- |
| Disconnect | --- |
Packet: Connect
{
cmd: 'connect',
protocolId: 'MQTT', // Or 'MQIsdp' in MQTT 3.1 and 5.0
protocolVersion: 4, // Or 3 in MQTT 3.1, or 5 in MQTT 5.0
clean: true, // Can also be false
clientId: 'my-device',
keepalive: 0, // Seconds which can be any positive number, with 0 as the default setting
username: 'matteo',
password: Buffer.from('collina'), // Passwords are buffers
will: {
topic: 'mydevice/status',
payload: Buffer.from('dead'), // Payloads are buffers
properties: { // MQTT 5.0
willDelayInterval: 1234,
payloadFormatIndicator: false,
messageExpiryInterval: 4321,
contentType: 'test',
responseTopic: 'topic',
correlationData: Buffer.from([1, 2, 3, 4]),
userProperties: {
'test': 'test'
}
}
},
properties: { // MQTT 5.0 properties
sessionExpiryInterval: 1234,
receiveMaximum: 432,
maximumPacketSize: 100,
topicAliasMaximum: 456,
requestResponseInformation: true,
requestProblemInformation: true,
userProperties: {
'test': 'test'
},
authenticationMethod: 'test',
authenticationData: Buffer.from([1, 2, 3, 4])
}
}
If password or will.payload are passed as strings, they will automatically be converted into a Buffer.
Packet: Connack
{
cmd: 'connack',
returnCode: 0, // Or whatever else you see fit MQTT < 5.0
sessionPresent: false, // Can also be true.
reasonCode: 0, // reason code MQTT 5.0
properties: { // MQTT 5.0 properties
sessionExpiryInterval: 1234,
receiveMaximum: 432,
maximumQoS: 2,
retainAvailable: true,
maximumPacketSize: 100,
assignedClientIdentifier: 'test',
topicAliasMaximum: 456,
reasonString: 'test',
userProperties: {
'test': 'test'
},
wildcardSubscriptionAvailable: true,
subscriptionIdentifiersAvailable: true,
sharedSubscriptionAvailable: false,
serverKeepAlive: 1234,
responseInformation: 'test',
serverReference: 'test',
authenticationMethod: 'test',
authenticationData: Buffer.from([1, 2, 3, 4])
}
}
Packet: Subscribe
{
cmd: 'subscribe',
messageId: 42,
properties: { // MQTT 5.0 properties
subscriptionIdentifier: 145,
userProperties: {
test: 'test'
}
}
subscriptions: [{
topic: 'test',
qos: 0,
nl: false, // no Local MQTT 5.0 flag
rap: true, // Retain as Published MQTT 5.0 flag
rh: 1 // Retain Handling MQTT 5.0
}]
}
Packet: Suback
{
cmd: 'suback',
messageId: 42,
properties: { // MQTT 5.0 properties
reasonString: 'test',
userProperties: {
'test': 'test'
}
}
granted: [0, 1, 2, 128]
}
Packet: Unsubscribe
{
cmd: 'unsubscribe',
messageId: 42,
properties: { // MQTT 5.0 properties
userProperties: {
'test': 'test'
}
}
unsubscriptions: [
'test',
'a/topic'
]
}
Packet: Unsuback
{
cmd: 'unsuback',
messageId: 42,
properties: { // MQTT 5.0 properties
reasonString: 'test',
userProperties: {
'test': 'test'
}
}
}
Packet: Publish
{
cmd: 'publish',
messageId: 42,
qos: 2,
dup: false,
topic: 'test',
payload: Buffer.from('test'),
retain: false,
properties: { // optional properties MQTT 5.0
payloadFormatIndicator: true,
messageExpiryInterval: 4321,
topicAlias: 100,
responseTopic: 'topic',
correlationData: Buffer.from([1, 2, 3, 4]),
userProperties: {
'test': 'test'
},
subscriptionIdentifier: 120, // can be an Array in message from broker, if message included in few another subscriptions
contentType: 'test'
}
}
Packet: Puback
{
cmd: 'puback',
messageId: 42,
reasonCode: 16, // only for MQTT 5.0
properties: { // MQTT 5.0 properties
reasonString: 'test',
userProperties: {
'test': 'test'
}
}
}
Packet: Pubrec
{
cmd: 'pubrec',
messageId: 42,
reasonCode: 16, // only for MQTT 5.0
properties: { // properties MQTT 5.0
reasonString: 'test',
userProperties: {
'test': 'test'
}
}
}
Packet: Pubrel
{
cmd: 'pubrel',
messageId: 42,
reasonCode: 16, // only for MQTT 5.0
properties: { // properties MQTT 5.0
reasonString: 'test',
userProperties: {
'test': 'test'
}
}
}
Packet: Pubcomp
{
cmd: 'pubcomp',
messageId: 42,
reasonCode: 16, // only for MQTT 5.0
properties: { // properties MQTT 5.0
reasonString: 'test',
userProperties: {
'test': 'test'
}
}
}
Packet: Pingreq
{
cmd: 'pingreq'
}
Packet: Pingresp
{
cmd: 'pingresp'
}
Packet: Disconnect
{
cmd: 'disconnect',
reasonCode: 0, // MQTT 5.0 code
properties: { // properties MQTT 5.0
sessionExpiryInterval: 145,
reasonString: 'test',
userProperties: {
'test': 'test'
},
serverReference: 'test'
}
}
Middleware Plugins
Persistence
- aedes-persistence: In-memory implementation of an Aedes persistence
- aedes-persistence-mongodb: MongoDB persistence for Aedes
- aedes-persistence-redis: Redis persistence for Aedes
- aedes-persistence-level: LevelDB persistence for Aedes
- aedes-persistence-nedb: NeDB persistence for Aedes
MQEmitter
- mqemitter: An opinionated memory Message Queue with an emitter-style API
- mqemitter-redis: Redis-powered mqemitter
- mqemitter-mongodb: Mongodb based mqemitter
- mqemitter-child-process: Share the same mqemitter between a hierarchy of child processes
- mqemitter-cs: Expose a MQEmitter via a simple client/server protocol
- mqemitter-p2p: A P2P implementation of MQEmitter, based on HyperEmitter and a Merkle DAG
- mqemitter-aerospike: Aerospike mqemitter
Dependencies
Stay in touch
- Author Twitter - @binarybeast
- Author Linkedin - behnamnasehi
Top comments (1)
Hello, I don't understand that "A communication system built on MQTT consists of the publishing server, a broker and one or more clients." What is the broker and publishing server? Could you help me, please?