06. MQTT
mqttオブジェクトは、MQTTクライアント機能を提供する組み込みオブジェクトです。
機能概要:
- MQTTクライアント機能を提供します。
- MQTT v3.1.1 と v3.1 をサポートします。
- TLSをサポートします。
- サーバー認証とクライアント認証をサポートします。
制限事項:
- 使用可能なMQTTセッションリソースは1本です。
- 暗号化通信には、証明機関(CA)によって署名された証明書が必要となります。自己署名証明書は許可されていません。
mqtt Global Object
| Methods()/Properties | Summary | Version | Note | 
|---|---|---|---|
| mqtt.connect() | MQTT接続を開始します。 | {Client}インスタンスが生成されます。 | |
| mqtt.set() | 接続などに使用する情報を設定します。 | 
Details
mqtt.connect(url[,options])
MQTT接続を開始します。
{Client}インスタンスが生成されます。
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| url | string | mandatory | brokerのURL URLは、[protocol]://[hostname]:[port] 形式で指定します。 指定可能な文字数は最大255バイトです。 protocol プロトコル ’mqtt’,’mqtts’が指定可能となります。 hostname MQTT Brokerのドメイン名またはIPアドレス port ポート番号 10進数文字列で指定します。 デフォルト値は1883となります。 TLSのCA証明書はmqtt.set()で設定可能となります。 | |
| options | Object | optional | MQTT接続オプション 詳細はoptionsを参照してください。 | |
| return | {Client}, undefined | - | {Client} : 生成された{Client} undefined : 引数不正やリソース不足で{Client}の生成に失敗した場合 | 
options
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| keepalive | number | optional | キープアライブ時間[s] 0: 無効 設定範囲: 0~2,147,483,647 デフォルト値は60となります。 | |
| clientId | string | optional | MQTTクライアントID 設定範囲: 1~127バイトの半角英数字 デフォルト値は’mqttjs_’+ランダム10進数8桁となります。 | |
| protocolId | string | optional | プロトコルID デフォルト値は’MQTT’となります。 | |
| protocolVersion | number | optional | MQTTプロトコルバージョン 4: MQTT v3.1.1 3: MQTT v3.1 設定値が3の場合、protocolIdは’MQIsdp’を設定してください。 デフォルト値は4となります。 | |
| reconnectPeriod | number | optional | 切断から再接続開始までの間隔[ms] 設定範囲: 0~10000 デフォルト値は1000となります。 | |
| connectTimeout | number | optional | 接続タイムアウト(CONNACK受信待ち)時間[ms] 設定範囲: 0~2,147,483,647 デフォルト値は30000となります。 | |
| username | string | optional | ユーザー名 brokerが必要とする場合に設定します。 設定範囲: 1~511バイトの半角英数 デフォルト値はありません。 | |
| password | string | optional | パスワード brokerが必要とする場合に設定します。 設定範囲: 1~511バイトの半角英数 デフォルト値はありません。 | |
| reconnectCount | number | optional | 自動再接続リトライ回数 設定値が0の場合は自動再接続は無効になります。 0: 無効 1855 :無限回 設定範囲: 0~999, 1855 デフォルト値は0となります。 | |
| sendBuffSize | number | optional | 送信バッファサイズ[Byte] 設定範囲: 128~65536 デフォルト値は2560となります。 | |
| recvBuffSize | number | optional | 受信バッファサイズ[Byte] 設定範囲: 128~65536 デフォルト値は512となります。 | 
ユーザー名とパスワードを指定してfoo.barへの接続を開始するサンプルです。
var mqttOpt = {
    clientId: 'testClientId',
    username: 'foo',
    password: 'bar',
    keepalive: 300
};
var client = mqtt.connect('mqtt://foo.bar',mqttOpt);
mqtt.set(name[,arg])
接続などに使用する情報を設定します。
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| name | string | mandatory | 設定データ名 使用できる名前は、ssl.ca, ssl.cert, ssl.keyとなります。 | |
| arg | string | optional | 設定情報 | |
| return | undefined | - | - | 
mqtt.set(’ssl.ca’,certificate)
MQTTS接続に使用するCA証明書を設定します。
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| ’ssl.ca’ | string | mandatory | CA証明書を設定するための名前 | |
| certificate | string | optional | CA証明書 PEM形式の文字列を指定します。改行コードには \nを使用します。省略した場合は、CA証明書の設定を解除します。 | 証明書が不正の場合、接続失敗エラーが発生します。こちらを活用して証明書を検証してください。 | 
| return | undefined | - | - | 
下記のように設定します。
var sslCa = '-----BEGIN CERTIFICATE-----\n' + 
'(...ContentOmission...)\n' + 
//...Omit...
'(...ContentOmission...)\n' + 
'-----END CERTIFICATE-----';
mqtt.set('ssl.ca', sslCa);
mqtt.set(’ssl.cert’,certificate)
MQTTS接続に使用するクライアント証明書を設定します。
未設定の場合、クライアント認証は無効となります。
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| ’ssl.cert’ | string | mandatory | クライアント証明書を設定するための名前 | |
| certificate | string | optional | クライアント証明書 PEM形式の文字列を指定します。改行コードには \nを使用します。省略した場合は、クライアント証明書の設定を解除します。 | |
| return | undefined | - | - | 
mqtt.set(’ssl.key’,clientkey)
MQTTS接続に使用するクライアント秘密鍵を設定します。
未設定の場合、クライアント認証は無効となります。
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| ’ssl.key’ | string | mandatory | クライアント秘密鍵を設定するための名前 | |
| clientkey | string | optional | 秘密鍵 PEM形式の文字列を指定します。改行コードには \nを使用します。省略した場合は、クライアント秘密鍵の設定を解除します。 | |
| return | undefined | - | - | 
クライアント認証を用いたMQTTS接続を行う場合、下記のように設定します。
var sslCa = '-----BEGIN CERTIFICATE-----\n(...ContentOmission...)\n-----END CERTIFICATE-----';
var sslClientCa  = '-----BEGIN CERTIFICATE-----\n(...ContentOmission...)\n-----END CERTIFICATE-----';
var sslClientKey = '-----BEGIN PRIVATE KEY-----\n(...ContentOmission...)\n-----END PRIVATE KEY-----';
mqtt.set('ssl.ca', sslCa);
mqtt.set('ssl.cert', sslClientCa);
mqtt.set('ssl.key', sslClientKey);
{Client}
| Methods()/Properties | Summary | Version | Note | 
|---|---|---|---|
| .on() | イベントハンドラを登録します。 | ||
| .publish() | メッセージをトピックへパブリッシュします。 | ||
| .subscribe() | トピックのサブスクライブを登録します。 | ||
| .unsubscribe() | トピックのサブスクライブを解除します。 | ||
| .end() | MQTTを切断し、終了します。 | ||
| .reconnect() | MQTTの再接続を開始します。 | ||
| .isConnected() | MQTTの接続状態を取得します。 | ||
| .isReconnecting() | MQTTの再接続中状態を取得します。 | ||
| .canPublish() | MQTTのパブリッシュ可否状態を取得します。 | ||
| .get() | 与えられた引数に関連した情報を取得します。 | 
Details
.on(event,callback)
イベントハンドラを登録します。
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| event | string | mandatory | イベント名 使用できるイベント名は、connect, reconnect, close, error, end, messageとなります。 | |
| callback() | function | mandatory | イベント発生時にコールバック処理を実行します。 | |
| return | undefined | - | - | 
event : ’connect’
MQTT接続、再接続が成功した時にコールバック処理を実行します。
コールバックの引数は以下の通りとなります。
| Arguments | Type | Summary | Note | 
|---|---|---|---|
| connack | Object | connackは受信したconnack packetの内容となります。 この引数は、使用する必要はありません。 | 
event : ’reconnect’
MQTTの再接続が開始された時にコールバック処理を実行します。
event : ’close’
MQTTが切断された時にコールバック処理を実行します。
event : ’error’
エラーが発生した時、またはMQTTの接続が失敗した時にコールバック処理を実行します。
コールバックの引数は以下の通りです。
| Arguments | Type | Summary | Note | 
|---|---|---|---|
| err | {MqttError} | エラー情報 | 
event : ’end’
.end()処理が開始された時にコールバック処理を実行します。
event : ’message’
パブリッシュを受信した時にコールバック処理を実行します。
コールバックの引数は以下の通りです。
尚、受信データが受信バッファサイズ(recvBuffSize)を超えた場合、そのデータは破棄されます。
受信データには、トピックおよびメッセージの他に9バイトのヘッダ領域が含まれます。
| Arguments | Type | Summary | Note | 
|---|---|---|---|
| topic | string | 受信したパブリッシュのトピック | |
| message | string, ArrayBuffer | 受信したパブリッシュのメッセージ | 
.publish(topic,message[,options][,callback])
メッセージをトピックへパブリッシュします。
送信データが送信バッファサイズ(sendBuffSize)を超えた場合、引数エラーとなります。
送信データには、トピックおよびメッセージの他に9バイトのヘッダ領域が含まれます。
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| topic | string | mandatory | トピック名 最大127バイトとなります。使用可能な文字は半角英数と一部の記号 /,+,#となります。 | |
| message | string, ArrayBuffer | mandatory | パブリッシュするメッセージ | |
| options | Object | optional | パブリッシュのオプション qos : {number} QoSレベル 設定範囲: 0, 1, 2 デフォルト値は0となります。 retain : {boolean} Retain flag デフォルト値はfalseとなります。 dup : {boolean} Duplicate flag デフォルト値はfalseとなります。 QoSパラメータなどの詳細については、MQTT仕様書を参照してください。(http://mqtt.org/) | |
| callback(err) | function | optional | パブリッシュ処理終了時にコールバック処理を実行します。 err : {MqttError} エラー情報 引数のerr.codeが0の場合、成功となります。 | |
| return | undefined, {MqttError} | - | {MqttError} : 引数不正エラーや切断等で要求が受け付けられなかった場合、{MqttError}が返却されます。 | 
注意 : .publish()のコールバック処理が実行される前に、次の.publish()を行うと、後者の.publish()が破棄される場合があります。.publish()を行う前に、前の.publish()のコールバック処理が実行されたかを確認するか、又は.canPublish()でパブリッシュの可否を確認することをお勧めします。
.subscribe(topic[,options][,callback])
トピックのサブスクライブを登録します。
登録可能なトピック数は5です。
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| topic | string | mandatory | トピック名 最大127バイトとなります。使用可能な文字は半角英数と一部の記号 /,+,#となります。 | |
| options | Object | optional | サブスクライブのオプション qos : {number} QoSレベル 設定範囲: 0, 1, 2 デフォルト値は0となります。 bin : {boolean} messageイベントの'message'引数の型を指定します。 true: ArrayBuffer false: string デフォルト値はfalseとなります。 | |
| callback(err) | function | optional | サブスクライブ処理終了時にコールバック処理を実行します。 err : {MqttError} エラー情報 引数のerr.codeが0の場合、成功となります。 | |
| return | undefined, {MqttError} | - | {MqttError} : 引数不正エラーや切断等で要求が受け付けられなかった場合、{MqttError}が返却されます。 | 
.unsubscribe(topic[,callback])
トピックのサブスクライブを解除します。
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| topic | string | mandatory | トピック名 最大127バイトとなります。使用可能な文字は半角英数と一部の記号 /,+,#となります。 | |
| callback(err) | function | optional | サブスクライブ解除処理終了時にコールバック処理を実行します。 err : {MqttError} エラー情報 引数のerr.codeが0の場合、成功となります。 | |
| return | undefined, {MqttError} | - | {MqttError} : 引数不正エラーや切断等で要求が受け付けられなかった場合、{MqttError}が返却されます。 | 
.end([,callback])
MQTTを切断し、終了します。
MQTTセッションリソース解放されるまで、時間を要する場合があります。
再度、MQTT接続を行う際は、MQTTセッションリソースが解放されるまで、待機する必要があります。
サンプルコードを参照してください。
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| callback() | function | optional | MQTTセッションリソース解放時にコールバック処理を実行します。 | |
| return | undefined | - | - | 
.reconnect()
MQTTの再接続を開始します。
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| return | undefined | - | - | 
.isConnected()
MQTTの接続状態を取得します。
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| return | boolean | - | MQTTの接続状態 true: MQTT接続済み false: MQTT接続済みでない | 
.isReconnecting()
MQTTの再接続中状態を取得します。
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| return | boolean | - | MQTTの再接続中状態 true: MQTT再接続中 false: MQTT再接続中でない | 
.canPublish()
MQTTのパブリッシュ可否状態を取得します。
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| return | boolean | - | MQTTのパブリッシュ可否状態 true: パブリッシュできる状態 false: パブリッシュできない状態 | 
.get(name)
与えられた引数に関連した情報を取得します。
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| name | string | mandatory | 取得する情報の名前 使用できる名前はerrnoConnectとなります。 | |
| return | number, undefined | - | number : 取得した情報 undefined : 取得できなかった場合 | 
.get(’errnoConnect’)
接続エラー番号を取得します。
| Name | Type | M/O | Summary | Note | 
|---|---|---|---|---|
| ’errnoConnect’ | string | mandatory | 接続エラー番号を取得するための名前 | |
| return | number | - | 接続エラー番号 詳細はerrnoConnect tableを参照してください。 | 
Appendix
{MqttError}
| Methods()/Properties | Type | Summary | Note | 
|---|---|---|---|
| .code | number | エラーコード 詳細はmqtt errors tableを参照してください。 | |
| .message | string | エラー内容 詳細はmqtt errors tableを参照してください。 | 
mqtt errors table
下記は、エラーコード表となります。
✓は通知先のコールバックを示します。
| .code | .message | Note | .on('error',cb) | .publish(cb) | .subscribe(cb) | .unsubscribe(cb) | 
|---|---|---|---|---|---|---|
| 0 | OK | エラーなし | ✓ | ✓ | ✓ | |
| 1 | Connection failed | 接続(再接続)失敗 | ✓ | |||
| 10 | Illegal argument | 引数不正エラー | ✓ | ✓ | ✓ | ✓ | 
| 11 | Disconnected ※ | 切断エラー | ✓ | ✓ | ✓ | |
| 12 | Publish failed | パブリッシュ失敗 | ✓ | ✓ | ||
| 13 | Subscribe failed | サブスクライブ登録失敗 | ✓ | ✓ | ||
| 14 | Unsubscribe failed | サブスクライブ解除失敗 | ✓ | ✓ | ||
| 255 | Other error | その他のエラー | ✓ | 
※ 未接続時にpublish()/subscribe()/unsubscribe()が行われた場合に通知されます。
errnoConnect table
下記は、接続エラーコード表となります。
| errnoConnect | Summary | Note | 
|---|---|---|
| 0 | OK | エラーなし | 
| 1 | Unacceptable protocol version | プロトコル・バージョン許容不可 | 
| 2 | Identifier rejected | 識別子拒否 | 
| 3 | Server unavailable | サーバ使用不可 | 
| 4 | Bad user name or password | ユーザ名またはパスワードが不正 | 
| 5 | Not authorized | 権限なしエラー | 
| 6 | Connection failed | その他のエラー(主にネットワークエラー) | 
| 7 | SSL/TLS error | TLS関連のエラー | 
| Others | Reserved | 予約 | 
オブジェクトの使用例
Sample 1
MQTT接続のサンプルです。
10秒の周期タイマで起動し、送信と受信を行います。
var client = mqtt.connect('mqtt://foo.bar');
if(!client) {
    print('Failed to create mqtt instance');
} else {
    client.on('connect', function(connack) {
        print('CONNECTED');
        client.subscribe('foo_topic/device01');
    });
    client.on('error', function(err) {
        print('ERROR: ' + err.code + ' ERRNO: ' + client.get('errnoConnect'));
        if(1 == err.code) { //Connection failed
            client.reconnect();
        }
        //TODO: Error handling
    });
    client.on('close', function() {
        print("DISCONNECTED");
        client.reconnect();
    });
    client.on('message', function(topic, message) {
        print('TOPIC: ' + topic + ' MESSAGE: ' + message);
        //TODO: Message handling
    });
}
var tmobj = setInterval(function() {
    if(!client) {
    } else {
        if(client.isConnected()) {
            var dummyBatteryLevel = 10;
            if(10 >= dummyBatteryLevel) {
                client.publish( 'bar_topic/device01',
                                'Battery level : '
                                 + dummyBatteryLevel.toString()
                                 + '%');
            }
        } else {
            print('Skipped');
        }
    }
}, 10000);
Sample 2
MQTTS接続のサンプルです。
10秒の周期タイマで起動し、送信と受信を行います。
var sslCa = '-----BEGIN CERTIFICATE-----\n(...ContentOmission...)\n-----END CERTIFICATE-----';
mqtt.set('ssl.ca', sslCa);
var client = mqtt.connect('mqtts://foo.bar:8883');
if(!client) {
    print('Failed to create mqtt instance');
} else {
    client.on('connect', function(connack) {
        print('CONNECTED');
        client.subscribe('foo_topic/device01');
    });
    client.on('error', function(err) {
        print('ERROR: ' + err.code + ' ERRNO: ' + client.get('errnoConnect'));
        if(1 == err.code) { //Connection failed
            client.reconnect();
        }
        //TODO: Error handling
    });
    client.on('close', function() {
        print("DISCONNECTED");
        client.reconnect();
    });
    client.on('message', function(topic, message) {
        print('TOPIC: ' + topic + ' MESSAGE: ' + message);
        //TODO: Message handling
    });
}
var tmobj = setInterval(function() {
    if(!client) {
    } else {
        if(client.isConnected()) {
            var dummyBatteryLevel = 10;
            if(10 >= dummyBatteryLevel) {
                client.publish( 'bar_topic/device01',
                                'Battery level : '
                                 + dummyBatteryLevel.toString()
                                 + '%');
            }
        } else {
            print('Skipped');
        }
    }
}, 10000);
Sample 3
クライアント認証を用いたMQTTS接続のサンプルです。
10秒の周期タイマで起動し、送信と受信を行います。
var sslCa = '-----BEGIN CERTIFICATE-----\n(...ContentOmission...)\n-----END CERTIFICATE-----';
var sslClientCert = '-----BEGIN CERTIFICATE-----\n(...ContentOmission...)\n-----END CERTIFICATE-----';
var sslClientKey = '-----BEGIN PRIVATE KEY-----\n(...ContentOmission...)\n-----END PRIVATE KEY-----';
mqtt.set('ssl.ca', sslCa);
mqtt.set('ssl.cert', sslClientCert);
mqtt.set('ssl.key', sslClientKey);
var mqttClientId = 'foo';
var mqttUser = 'foo';
var mqttPass = 'foobar';
var mqttOpt = {clientId:mqttClientId, username:mqttUser, password:mqttPass, reconnectCount:1855};
var client = mqtt.connect('mqtts://foo.bar:8883', mqttOpt);
if(!client) {
    print('Failed to create mqtt instance');
} else {
    client.on('connect', function(connack) {
        print('CONNECTED');
        client.subscribe('foo_topic/device01');
    });
    client.on('error', function(err) {
        print('ERROR: ' + err.code + ' ERRNO: ' + client.get('errnoConnect'));
        if(1 == err.code) { //Connection failed
            client.reconnect();
        }
        //TODO: Error handling
    });
    client.on('close', function() {
        print("DISCONNECTED");
        client.reconnect();
    });
    client.on('message', function(topic, message) {
        print('TOPIC: ' + topic + ' MESSAGE: ' + message);
        //TODO: Message handling
    });
}
var tmobj = setInterval(function() {
    if(!client) {
    } else {
        if(client.isConnected()) {
            var dummyBatteryLevel = 10;
            if(10 >= dummyBatteryLevel) {
                client.publish( 'bar_topic/device01',
                                'Battery level : '
                                 + dummyBatteryLevel.toString()
                                 + '%');
            }
        } else {
            print('Skipped');
        }
    }
}, 10000);
Sample 4
MQTT接続の開始と終了を繰り返すサンプルです。
.end()で終了した直後にmqtt.connect()を実行すると、MQTTリソースがまだ解放されていないため、mqtt.connect()が失敗することがあります。その場合、しばらく待ってから再度接続してください。
var mqttUrl = 'mqtt://foo.bar:1883';
var mqttc = undefined;
var endEv = 0;
var to = setInterval(function() {
    print('end event');
    endEv = 1;
}, 30000);
while(1) {
    if(!mqttc) {
        mqttc = mqtt.connect(mqttUrl, {reconnectCount:1855});
        if(!mqttc) {
            print('Waiting for mqtt resource release');
            setTimeout(200).wait();
            continue;
        }
        mqttc.on('connect', function(connack) {
            print('CONNECTED');
        });
        //  ...Omitted: Registers an event handler.
    }
    //  ...Omitted: mqtt communication processing.
    if(1 == endEv) {
        endEv = 0;
        print('ABORT');
        mqttc.end();
        mqttc = undefined;
        continue;
    }
}
Sample 5
自動再接続有効でMQTT接続を行うサンプルです。
接続後、送信と受信のループバックテストを行います。
var client = mqtt.connect('mqtt://foo.bar:1883', {reconnectCount: 1855});
if(!client) {
    throw 'Failed to create mqtt instance';
}
client.on('error', function(err) {
    print('ERROR: ' + err.code + ' ERRNO: ' + client.get('errnoConnect'));
    //TODO: Error handling
});
client.on('message', function(topic, message) {
    print('TOPIC: ' + topic + ' MESSAGE: ' + message);
    //TODO: Message handling
});
client.on('connect', function() {
    print('CONNECTED');
    client.subscribe('testtopic/subtopic/#', { qos: 1 }, function(err) {
        if(err.code > 0) {
            print('MQTT SUBSCRIBE ERROR', err.code);
            //TODO: Error handling
        }
    });
    client.subscribe('shadowtopic/#', { qos: 1 }, function(err) {
        if(err.code > 0) {
            print('MQTT SUBSCRIBE ERROR', err.code);
            //TODO: Error handling
        }
    });
});
setInterval(function() {
    if (client.canPublish()) {
        var body = JSON.stringify({ "message" : "dummy" });
        client.publish('testtopic/subtopic/dummy', body, { qos: 1 }, function(err) {
            if(err.code == 0) {
                print('Publish OK');
            } else {
                print('Publish failed');
                //TODO: Error handling
            }
        });
    } else {
        print('Cannot publish');
    }
}, 15000);
Sample 6
手動でMQTT再接続を行うサンプルです。
再接続が連続して失敗した場合、一度インスタンスを解放し、再生成します。
var mqttUrl = 'mqtt://foo.bar:1883';
var reconnectAttempts = 0;
var maxReconnectAttempts = 24;
var registerEventHandlers = function() {
    if(!client) {
        return;
    }
    client.on('error', function(err) {
        print('ERROR: ' + err.code + ' ERRNO: ' + client.get('errnoConnect'));
        if (err.code == 1) { //Connection failed
            if (reconnectAttempts++ < maxReconnectAttempts) {
                print('reconnectAttempts:', reconnectAttempts);
                client.reconnect();
            } else {
                print('ABORT');
                reconnectAttempts = 0;
                client.end(); //Release mqtt instance
                client = undefined;
            }
        }
    });
    client.on('message', function(topic, message) {
        print('TOPIC: ' + topic + ' MESSAGE: ' + message);
        //TODO: Message handling
    });
    client.on('close', function() {
        print("DISCONNECTED");
        client.reconnect();
    });
    client.on('connect', function() {
        reconnectAttempts = 0;
        print("CONNECTED");
        client.subscribe("foo_topic/device01/#", { qos: 1 }, function(err) {
            if(err.code > 0) {
                print("MQTT SUBSCRIBE ERROR", err.code);
                //TODO: Error handling
            }
        });
    });
}
var client = mqtt.connect(mqttUrl);
if(!client) {
    throw 'Failed to create mqtt instance';
}
registerEventHandlers();
setInterval(function() {
    if(!client) {
        print('mqtt instance recreate');
        client = mqtt.connect(mqttUrl);
        registerEventHandlers();
    }
    if(client && client.canPublish()) {
        var body = JSON.stringify({ "message" : "dummy" });
        client.publish("foo_topic/device01/dummy", body, { qos: 1 }, function(err) {
            if(err.code == 0) {
                print('Publish OK');
            } else {
                print('Publish failed');
                //TODO: Error handling
            }
        });
    }
}, 15000);
