06. MQTT
The mqtt object is a built-in object that provides MQTT client functionality.
Functional overview:
- Provides the MQTT client function.
- Supports MQTT v3.1.1 and v3.1.
- Supports TLS.
- Supports server and client authentication.
Limitations:
- This object is supported from version
01.01.00
or later. - The number of MQTT session resources that can be used is
1
. - Encrypted communications require a certificate signed by a Certificate Authority (CA). Self-signed certificates are not allowed.
mqtt Global Object
Methods()/Properties | Summary | Version | Note |
---|---|---|---|
mqtt.connect() | Starts an MQTT connection. | A {Client} instance is generated. | |
mqtt.set() | Sets the information used for the connection. |
Details
mqtt.connect(url[,options])
Starts an MQTT connection.
Generates a {Client} instance.
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
url | string | mandatory | Broker URL URL is specified in [protocol]://[hostname]:[port] format. protocol Specify ’mqtt’ or ’mqtts’. hostname Domain name or IP address of the MQTT Broker The maximum number of characters is 64 bytes. port Port number Specify a decimal digit string. The default value is 1883. The TLS CA certificate can be set with mqtt.set(). | |
options | Object | optional | MQTT connection options Refer to options for details. | |
return | {Client}, undefined | - | {Client} : Generated {Client} undefined : {Client} generate failed due to invalid arguments or insufficient resources. |
options
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
keepalive | number | optional | Keep-alive time [s] 0: Invalid Range: 0 - 2,147,483,647 Default: 60 | |
clientId | string | optional | MQTT client id Range: 1 - 127 bytes of single-byte alphanumeric characters Default: 'mqttjs_' + 8 random decimal digits | |
protocolId | string | optional | Protocol id Default: MQTT | |
protocolVersion | number | optional | MQTT protocol version 4: MQTT v3.1.1 3: MQTT v3.1 When setting to 3, set protocolId to ’MQIsdp’. Default: 4 | |
reconnectPeriod | number | optional | Time to wait to reconnect when disconnected [ms] Range: 0 - 10000 Default: 1000 | |
connectTimeout | number | optional | Connection timeout (wait for CONNACK) [ms] Range: 0 - 2,147,483,647 Default: 30000 | |
username | string | optional | Username Set if broker requires username. Range: 1 - 511 bytes of single-byte alphanumeric characters Default: None | |
password | string | optional | Password Set if broker requires password. Range: 1 - 511 bytes of single-byte alphanumeric characters Default: None | |
reconnectCount | number | optional | Number of automatic reconnection retries If set to 0, automatic reconnection is disabled. 0: Invalid 1855: Infinite times Range: 0 - 999, 1855 Default: 0 | |
sendBuffSize | number | optional | Send buffer size [Byte] Range: 128 - 4096 Default: 2560 | Version 02.01.00+ |
recvBuffSize | number | optional | Receive buffer size [Byte] Range: 128 - 4096 Default: 512 | Version 02.01.00+ |
This is a sample that specifies a user name and password and starts a connection to foo.bar.
var mqttOpt = {
clientId: 'testClientId',
username: 'foo',
password: 'bar',
keepalive: 300
};
var client = mqtt.connect('mqtt://foo.bar',mqttOpt);
mqtt.set(name[,arg])
Sets the information used for a connection.
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
name | string | mandatory | Setting name Names that can be used are: ssl.ca, ssl.cert, ssl.key | |
arg | string | optional | Setting information | |
return | undefined | - | - |
mqtt.set(’ssl.ca’,certificate)
Sets the CA certificate used for the MQTTS connection.
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
’ssl.ca’ | string | mandatory | Name for setting CA certificate | |
certificate | string | optional | CA certificate Specify a string in PEM format. Use \n for line feed code.If omitted, the CA certificate settings are released. | If the certificate is invalid, a connection failure error occurs. Please verify the certificate by utilizing here. |
return | undefined | - | - |
Set as follows:
var sslCa = '-----BEGIN CERTIFICATE-----\n' +
'(...ContentOmission...)\n' +
//...Omit...
'(...ContentOmission...)\n' +
'-----END CERTIFICATE-----';
mqtt.set('ssl.ca', sslCa);
mqtt.set(’ssl.cert’,certificate)
Sets the client certificate used for the MQTTS connection.
If not set, client authentication is disabled.
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
’ssl.cert’ | string | mandatory | Name for setting client certificate | |
certificate | string | optional | Client certificate Specify a string in PEM format. Use \n for line feed code.If omitted, the client certificate settings are released. | |
return | undefined | - | - |
mqtt.set(’ssl.key’,clientkey)
Sets the client private key used for the MQTTS connection.
If not set, client authentication is disabled.
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
’ssl.key’ | string | mandatory | Name for setting client private key | |
clientkey | string | optional | Client private key Specify a string in PEM format. Use \n for line feed code.If omitted, the client private key settings are released. | |
return | undefined | - | - |
If you want to connect MQTTS using client authentication, set as follows:
var sslCa = '-----BEGIN CERTIFICATE-----\n(...ContentOmission...)\n-----END CERTIFICATE-----';
var sslClientCa = '-----BEGIN CERTIFICATE-----\n(...ContentOmission...)\n-----END CERTIFICATE-----';
var sslClientKey = '-----BEGIN PRIVATE KEY-----\n(...ContentOmission...)\n-----END PRIVATE KEY-----';
mqtt.set('ssl.ca', sslCa);
mqtt.set('ssl.cert', sslClientCa);
mqtt.set('ssl.key', sslClientKey);
{Client}
Methods()/Properties | Summary | Version | Note |
---|---|---|---|
.on() | Registers an event handler. | ||
.publish() | Publishes a message to a topic. | ||
.subscribe() | Subscribes to a topic. | ||
.unsubscribe() | Unsubscribes from a topic. | ||
.end() | Disconnects MQTT and exit. | ||
.reconnect() | Initiates an MQTT reconnection. | ||
.isConnected() | Gets MQTT connection status. | ||
.isReconnecting() | Gets the status of MQTT when reconnecting. | ||
.canPublish() | Gets the status of whether the MQTT message can be published. | ||
.get() | Gets information related to the given argument. |
Details
.on(event,callback)
Registers an event handler.
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
event | string | mandatory | Event name Names that can be used are: connect, reconnect, close, error, end, message | |
callback() | function | mandatory | Executes callback processing when an event occurs. | |
return | undefined | - | - |
event : ’connect’
Executes callback when MQTT connection and reconnection are successful.
The arguments for the callback are as follows:
Arguments | Type | Summary | Note |
---|---|---|---|
connack | Object | connack is the content of the recieved connack packet. This argument is not mandatory. |
event : ’reconnect’
Executes callback when MQTT reconnection is started.
event : ’close’
Executes callback when MQTT is disconnected.
event : ’error’
Executes callback when an error occurs or the MQTT connection fails.
The arguments for the callback are as follows:
Arguments | Type | Summary | Note |
---|---|---|---|
err | {MqttError} | Error information |
event : ’end’
Executes callback processing when .end() processing is started.
event : ’message’
Executes callback when a publish message received.
The arguments for the callback are as follows:
Note that if the received data exceeds the receive buffer size (recvBuffSize), the data will be discarded.
The received data includes a 9-byte header field in addition to the topic and the message.
Arguments | Type | Summary | Note |
---|---|---|---|
topic | string | The topic of the received publish | |
message | string, ArrayBuffer | The message of the received publish | *1 |
*1: For version 02.00.01
or earlier, there is a size limit of up to 380 bytes.
.publish(topic,message[,options][,callback])
Publishes a message to a topic.
If the data to be sent exceeds the send buffer size (sendBuffSize), an argument error occurs.
The sent data includes a 9-byte header field in addition to the topic and the message.
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
topic | string | mandatory | Topic name Topic name can be up to 127 bytes of alphanumeric characters and the symbols / , * , and # . | |
message | string, ArrayBuffer | mandatory | The message to publish | *1 |
options | Object | optional | Publish options qos : {number} QoS level Range: 0, 1, 2 Default: 0 retain : {boolean} Retain flag Default: false dup : {boolean} Duplicate flag Default: false Refer to the MQTT specifications for details such as QoS parameters. (http://mqtt.org/) | |
callback(err) | function | optional | Execute the callback process when the publish process is completed. err : {MqttError} Error information When err.code is 0, it means success. | |
return | undefined, {MqttError} | - | {MqttError} : If the request is not accepted due to an illegal argument error, disconnection, etc., {MqttError} is returned. (Version 02.00.00+) |
*1: For version 02.00.01
or earlier, there is a size limit of up to 2500 bytes.
Note: If the next .publish() is performed before the current .publish() callback, the latter .publish() may be discarded. Before performing a .publish(), it is recommended to check whether the callback of the previous .publish() has been executed, or check whether publishing is possible with .canPublish().
.subscribe(topic[,options][,callback])
Subscribes to a topic.
Up to 5
topics can be subscribed to.
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
topic | string | mandatory | Topic name Topic name can be up to 127 bytes of alphanumeric characters and the symbols / , * , and # . | |
options | Object | optional | Subscribe options qos : {number} QoS level Range: 0, 1, 2 Default: 0 bin : {boolean} Specifies the type of the 'message' argument of message event. true: ArrayBuffer false: string Default: false | |
callback(err) | function | optional | Execute the callback process when the subscribe process is completed. err : {MqttError} Error information An err.code value of 0 means success. | |
return | undefined, {MqttError} | - | {MqttError} : If the request is not accepted due to an illegal argument error, disconnection, etc., {MqttError} is returned. (Version 02.00.00+) |
.unsubscribe(topic[,callback])
Unsubscribes from a topic.
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
topic | string | mandatory | Topic name Topic name can be up to 127 bytes of alphanumeric characters and the symbols / , * , and # . | |
callback(err) | function | optional | Execute the callback process when the unsubscribe process is completed. err : {MqttError} Error information An err.code value of 0 means success. | |
return | undefined, {MqttError} | - | {MqttError} : If the request is not accepted due to an illegal argument error, disconnection, etc., {MqttError} is returned. (Version 02.00.00+) |
.end([,callback])
Disconnects MQTT and exit.
It may take some time before the MQTT session resource is released.
If you want to connect MQTT again, you need to wait until MQTT session resource is released.
Please refer to Sample Code.
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
callback() | function | optional | Executes callback processing when releasing MQTT session resources. | |
return | undefined | - | - |
.reconnect()
Initiates an MQTT reconnection.
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
return | undefined | - | - |
.isConnected()
Gets MQTT connection status.
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
return | boolean | - | MQTT connection status true: MQTT connected false: Not connected |
.isReconnecting()
Gets the status of MQTT when reconnecting.
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
return | boolean | - | MQTT reconnecting status true: MQTT reconnecting false: Not reconnecting |
.canPublish()
Gets the status of whether the MQTT message can be published.
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
return | boolean | - | Whether the MQTT message can be published true: Can publish false: Cannot publish |
.get(name)
Gets information related to the given argument.
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
name | string | mandatory | The name of the information to be obtained The name that can be used is errnoConnect. | |
return | number, undefined | - | number : Acquired information undefined : When information cannot be obtained |
.get(’errnoConnect’)
Gets the connection error number.
Name | Type | M/O | Summary | Note |
---|---|---|---|---|
’errnoConnect’ | string | mandatory | Name to get connection error number | |
return | number | - | Refer to errnoConnect table for connection error number details. |
Appendix
{MqttError}
Methods()/Properties | Type | Summary | Note |
---|---|---|---|
.code | number | Error code Refer to mqtt errors table for error code details. | |
.message | string | Error contents Refer to mqtt errors table for error content details. |
mqtt errors table
The following is the error code table.
"✓" indicates the callback to be notified. (xx.xx.xx
) is the supported FW version.
.code | .message | Note | .on('error',cb) | .publish(cb) | .subscribe(cb) | .unsubscribe(cb) |
---|---|---|---|---|---|---|
0 | OK | No error | ✓ | ✓ | ✓ | |
1 | Connection failed | Connection (reconnection) failure | ✓ | |||
10 | Illegal argument | Invalid argument error | ✓ | ✓ (02.00.00+) | ✓ (02.00.00+) | ✓ (02.00.00+) |
11 | Disconnected * | Disconnect error | ✓ | ✓ | ✓ | |
12 | Publish failed | Publish failed | ✓ (02.00.00+) | ✓ | ||
13 | Subscribe failed | Subscribe failed | ✓ (02.00.00+) | ✓ | ||
14 | Unsubscribe failed | Unsubscribe failed | ✓ (02.00.00+) | ✓ | ||
255 | Other error | Other errors | ✓ |
* Notified when publish()
/subscribe()
/unsubscribe()
is executed while unconnected.
errnoConnect table
The following is the connection error code table.
errnoConnect | Summary | Note |
---|---|---|
0 | OK | No error |
1 | Unacceptable protocol version | Protocol version not supported |
2 | Identifier rejected | Identifier rejection |
3 | Server unavailable | Server unavailable |
4 | Bad user name or password | Invalid username or password |
5 | Not authorized | Permission error |
6 | Connection failed | Other errors (mainly network errors) |
7 | SSL/TLS error | TLS related errors |
Others | Reserved | Reserved |
Object Usage Examples
Sample 1
This is a sample MQTT connection.
Starts a 10-second period timer, transmits and receives.
var client = mqtt.connect('mqtt://foo.bar');
if(!client) {
print('Failed to create mqtt instance');
} else {
client.on('connect', function(connack) {
print('CONNECTED');
client.subscribe('foo_topic/device01');
});
client.on('error', function(err) {
print('ERROR: ' + err.code + ' ERRNO: ' + client.get('errnoConnect'));
if(1 == err.code) { //Connection failed
client.reconnect();
}
//TODO: Error handling
});
client.on('close', function() {
print("DISCONNECTED");
client.reconnect();
});
client.on('message', function(topic, message) {
print('TOPIC: ' + topic + ' MESSAGE: ' + message);
//TODO: Message handling
});
}
var tmobj = setInterval(function() {
if(!client) {
} else {
if(client.isConnected()) {
var dummyBatteryLevel = 10;
if(10 >= dummyBatteryLevel) {
client.publish( 'bar_topic/device01',
'Battery level : '
+ dummyBatteryLevel.toString()
+ '%');
}
} else {
print('Skipped');
}
}
}, 10000);
Sample 2
This is a sample MQTTS connection.
Starts a 10-second period timer, transmits and receives.
var sslCa = '-----BEGIN CERTIFICATE-----\n(...ContentOmission...)\n-----END CERTIFICATE-----';
mqtt.set('ssl.ca', sslCa);
var client = mqtt.connect('mqtts://foo.bar:8883');
if(!client) {
print('Failed to create mqtt instance');
} else {
client.on('connect', function(connack) {
print('CONNECTED');
client.subscribe('foo_topic/device01');
});
client.on('error', function(err) {
print('ERROR: ' + err.code + ' ERRNO: ' + client.get('errnoConnect'));
if(1 == err.code) { //Connection failed
client.reconnect();
}
//TODO: Error handling
});
client.on('close', function() {
print("DISCONNECTED");
client.reconnect();
});
client.on('message', function(topic, message) {
print('TOPIC: ' + topic + ' MESSAGE: ' + message);
//TODO: Message handling
});
}
var tmobj = setInterval(function() {
if(!client) {
} else {
if(client.isConnected()) {
var dummyBatteryLevel = 10;
if(10 >= dummyBatteryLevel) {
client.publish( 'bar_topic/device01',
'Battery level : '
+ dummyBatteryLevel.toString()
+ '%');
}
} else {
print('Skipped');
}
}
}, 10000);
Sample 3
This is a sample MQTTS connection using client authentication.
Starts a 10-second period timer, transmits and receives.
var sslCa = '-----BEGIN CERTIFICATE-----\n(...ContentOmission...)\n-----END CERTIFICATE-----';
var sslClientCert = '-----BEGIN CERTIFICATE-----\n(...ContentOmission...)\n-----END CERTIFICATE-----';
var sslClientKey = '-----BEGIN PRIVATE KEY-----\n(...ContentOmission...)\n-----END PRIVATE KEY-----';
mqtt.set('ssl.ca', sslCa);
mqtt.set('ssl.cert', sslClientCert);
mqtt.set('ssl.key', sslClientKey);
var mqttClientId = 'foo';
var mqttUser = 'foo';
var mqttPass = 'foobar';
var mqttOpt = {clientId:mqttClientId, username:mqttUser, password:mqttPass, reconnectCount:1855};
var client = mqtt.connect('mqtts://foo.bar:8883', mqttOpt);
if(!client) {
print('Failed to create mqtt instance');
} else {
client.on('connect', function(connack) {
print('CONNECTED');
client.subscribe('foo_topic/device01');
});
client.on('error', function(err) {
print('ERROR: ' + err.code + ' ERRNO: ' + client.get('errnoConnect'));
if(1 == err.code) { //Connection failed
client.reconnect();
}
//TODO: Error handling
});
client.on('close', function() {
print("DISCONNECTED");
client.reconnect();
});
client.on('message', function(topic, message) {
print('TOPIC: ' + topic + ' MESSAGE: ' + message);
//TODO: Message handling
});
}
var tmobj = setInterval(function() {
if(!client) {
} else {
if(client.isConnected()) {
var dummyBatteryLevel = 10;
if(10 >= dummyBatteryLevel) {
client.publish( 'bar_topic/device01',
'Battery level : '
+ dummyBatteryLevel.toString()
+ '%');
}
} else {
print('Skipped');
}
}
}, 10000);
Sample 4
This sample repeats the start and end of MQTT connection.
If you do mqtt.connect() immediately after exiting with .end(), mqtt.connect() may fail because MQTT resources have not been released yet. In that case, wait a while and try the connection again.
var mqttUrl = 'mqtt://foo.bar:1883';
var mqttc = undefined;
var endEv = 0;
var to = setInterval(function() {
print('end event');
endEv = 1;
}, 30000);
while(1) {
if(!mqttc) {
mqttc = mqtt.connect(mqttUrl, {reconnectCount:1855});
if(!mqttc) {
print('Waiting for mqtt resource release');
setTimeout(200).wait();
continue;
}
mqttc.on('connect', function(connack) {
print('CONNECTED');
});
// ...Omitted: Registers an event handler.
}
// ...Omitted: mqtt communication processing.
if(1 == endEv) {
endEv = 0;
print('ABORT');
mqttc.end();
mqttc = undefined;
continue;
}
}
Sample 5
This is a sample to connect MQTT with automatic reconnection enabled.
After connection, loopback test of transmission and reception is performed.
var client = mqtt.connect('mqtt://foo.bar:1883', {reconnectCount: 1855});
if(!client) {
throw 'Failed to create mqtt instance';
}
client.on('error', function(err) {
print('ERROR: ' + err.code + ' ERRNO: ' + client.get('errnoConnect'));
//TODO: Error handling
});
client.on('message', function(topic, message) {
print('TOPIC: ' + topic + ' MESSAGE: ' + message);
//TODO: Message handling
});
client.on('connect', function() {
print('CONNECTED');
client.subscribe('testtopic/subtopic/#', { qos: 1 }, function(err) {
if(err.code > 0) {
print('MQTT SUBSCRIBE ERROR', err.code);
//TODO: Error handling
}
});
client.subscribe('shadowtopic/#', { qos: 1 }, function(err) {
if(err.code > 0) {
print('MQTT SUBSCRIBE ERROR', err.code);
//TODO: Error handling
}
});
});
setInterval(function() {
if (client.canPublish()) {
var body = JSON.stringify({ "message" : "dummy" });
client.publish('testtopic/subtopic/dummy', body, { qos: 1 }, function(err) {
if(err.code == 0) {
print('Publish OK');
} else {
print('Publish failed');
//TODO: Error handling
}
});
} else {
print('Cannot publish');
}
}, 15000);
Sample 6
This is a sample to manually reconnect MQTT.
If the reconnects fail consecutively, release the instance and recreate it.
var mqttUrl = 'mqtt://foo.bar:1883';
var reconnectAttempts = 0;
var maxReconnectAttempts = 24;
var registerEventHandlers = function() {
if(!client) {
return;
}
client.on('error', function(err) {
print('ERROR: ' + err.code + ' ERRNO: ' + client.get('errnoConnect'));
if (err.code == 1) { //Connection failed
if (reconnectAttempts++ < maxReconnectAttempts) {
print('reconnectAttempts:', reconnectAttempts);
client.reconnect();
} else {
print('ABORT');
reconnectAttempts = 0;
client.end(); //Release mqtt instance
client = undefined;
}
}
});
client.on('message', function(topic, message) {
print('TOPIC: ' + topic + ' MESSAGE: ' + message);
//TODO: Message handling
});
client.on('close', function() {
print("DISCONNECTED");
client.reconnect();
});
client.on('connect', function() {
reconnectAttempts = 0;
print("CONNECTED");
client.subscribe("foo_topic/device01/#", { qos: 1 }, function(err) {
if(err.code > 0) {
print("MQTT SUBSCRIBE ERROR", err.code);
//TODO: Error handling
}
});
});
}
var client = mqtt.connect(mqttUrl);
if(!client) {
throw 'Failed to create mqtt instance';
}
registerEventHandlers();
setInterval(function() {
if(!client) {
print('mqtt instance recreate');
client = mqtt.connect(mqttUrl);
registerEventHandlers();
}
if(client && client.canPublish()) {
var body = JSON.stringify({ "message" : "dummy" });
client.publish("foo_topic/device01/dummy", body, { qos: 1 }, function(err) {
if(err.code == 0) {
print('Publish OK');
} else {
print('Publish failed');
//TODO: Error handling
}
});
}
}, 15000);