InfluxDB Snippet
This snippet can be used by copy-pasting to the neqto.js script.
NEQTO Bridge | SPRESENSE |
---|---|
v00.00.30+ | v01.00.00+ |
This snippet provides a function to send IoT data to InfluxDB.
Details
The send_influxdb
function can be used to POST the passed time-series data in Line Protocol format to an InfluxDB 'bucket' using its API, over HTTPS. The result (error/response) is then passed to the callback function.
To start using this snippet, HOST
(Address of the InfluxDB instance), PORT
(Port of the InfluxDB server), ORG_NAME
(Name of the Organization), ACCESS_TOKEN
(Token from InfluxDB account), and CA
are required to be configured by the user.
NOTE: The provided function only allows payloads up to size of 4KB. For bigger sized payloads, please refer to the sample for divided writing in neqto.js docs for NEQTO Bridge and Sony Spresense.
The following CA can be used (as of 2020-07-30),
/* DST Root CA X3 - 44afb080d6a327ba893039862ef8406b */
var CA = "-----BEGIN CERTIFICATE-----\nMIIDSjCCAjKgAwIBAgIQRK+wgNajJ7qJMDmGLvhAazANBgkqhkiG9w0BAQUFADA/\nMSQwIgYDVQQKExtEaWdpdGFsIFNpZ25hdHVyZSBUcnVzdCBDby4xFzAVBgNVBAMT\nDkRTVCBSb290IENBIFgzMB4XDTAwMDkzMDIxMTIxOVoXDTIxMDkzMDE0MDExNVow\nPzEkMCIGA1UEChMbRGlnaXRhbCBTaWduYXR1cmUgVHJ1c3QgQ28uMRcwFQYDVQQD\nEw5EU1QgUm9vdCBDQSBYMzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB\nAN+v6ZdQCINXtMxiZfaQguzH0yxrMMpb7NnDfcdAwRgUi+DoM3ZJKuM/IUmTrE4O\nrz5Iy2Xu/NMhD2XSKtkyj4zl93ewEnu1lcCJo6m67XMuegwGMoOifooUMM0RoOEq\nOLl5CjH9UL2AZd+3UWODyOKIYepLYYHsUmu5ouJLGiifSKOeDNoJjj4XLh7dIN9b\nxiqKqy69cK3FCxolkHRyxXtqqzTWMIn/5WgTe1QLyNau7Fqckh49ZLOMxt+/yUFw\n7BZy1SbsOFU5Q9D8/RhcQPGX69Wam40dutolucbY38EVAjqr2m7xPi71XAicPNaD\naeQQmxkqtilX4+U9m5/wAl0CAwEAAaNCMEAwDwYDVR0TAQH/BAUwAwEB/zAOBgNV\nHQ8BAf8EBAMCAQYwHQYDVR0OBBYEFMSnsaR7LHH62+FLkHX/xBVghYkQMA0GCSqG\nSIb3DQEBBQUAA4IBAQCjGiybFwBcqR7uKGY3Or+Dxz9LwwmglSBd49lZRNI+DT69\nikugdB/OEIKcdBodfpga3csTS7MgROSR6cz8faXbauX+5v3gTt23ADq1cEmv8uXr\nAvHRAosZy5Q6XkjEGB5YGV8eAlrwDPGxrancWYaLbumR9YbK+rlmM6pZW87ipxZz\nR8srzJmwN0jP41ZL9c8PDHIyh8bwRLtTcm1D9SZImlJnt1ir/md2cXjbDaJWFBM5\nJDGFoqgCWjBH4d1QB7wCCZAA62RjYJsWvIjJEubSfZGL+T0yjWW06XyxV3bqxbYo\nOb8VZRzI9neWagqNdwvYkQsEjgfbKbYK7p2CNTUQ\n-----END CERTIFICATE-----";
//=================================================================
// INFLUXDB SNIPPET
//=================================================================
//=================================================================
// The following configuration are MANDATORY. Set by user.
//=================================================================
// The address of the InfluxDB server.
// eg. '<REGION>.<PROVIDER>.cloud2.influxdata.com' or 'xx.xx.xx.xx'
var HOST = '<YOUR_HOST>';
// The port of the InfluxDB server.
// eg. 443 for Influx Cloud and 9999 for self-hosted InfluxDB server
var PORT = '<YOUR_PORT>';
// The Access Token for authorization, from InfluxDB.
// eg. 'xxxxxxxxxxxxxxxx'
var ACCESS_TOKEN = '<YOUR_ACCESS-TOKEN>';
// The name of the Organization to send data to.
var ORG_NAME = '<YOUR_ORGANIZATION>';
// Public certificate of the certificate authority that signed the InfluxDB server certificate for SSL/TLS handshake.
// eg. '-----BEGIN CERTIFICATE-----\n...<CA>...\n-----END CERTIFICATE-----'
var CA = '<YOUR_CA>';
//=================================================================
/**
* Upload data to InfluxDB Measurement Field using Access Token.
* https://docs.influxdata.com/influxdb/v2.0/api/#operation/PostWrite
* @function send_influxdb
* @param {string} bucket - The bucket to send data to, as a String.
* @param {string} measurement - The measurement to write data to, as a String.
* @param {object} fieldsObj - All field key-value pairs for the point, as an Object.
* @param {object} tagsObj - All tag key-value pairs for the point, as an Object.
* @param {object} precision - The precision of the timestamp, as a String.
* @param {function} callback - User callback to return the result (error/response).
* @returns {undefined}
*/
var send_influxdb = function (bucket, measurement, fieldsObj, tagsObj, precision, callback) {
var fields = '';
for (var key in fieldsObj) fields += `,${key}=${fieldsObj[key]}`;
var tags = '';
for (var key in tagsObj) tags += `,${key}=${tagsObj[key]}`;
var factor = 1000000; // precision == 'ms'
if (precision == 's') factor = 0.001;
else if (precision == 'ms') factor = 1;
else if (precision == 'us') factor = 1000;
var timestamp = Number(new Date().getTime() * factor);
var body = `${measurement}${tags} ${fields.substring(1)} ${timestamp}`; // <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
var options = {
"method": 'POST',
"host": HOST,
"port": Number(PORT),
"path": `/api/v2/write?org=${ORG_NAME}&bucket=${bucket}&precision=${precision}`,
"headers": {
"Content-Type": 'text/plain',
"Authorization": `Token ${ACCESS_TOKEN}`,
"Content-Length": body.length.toString()
},
"ca": CA
};
var request = https.request(options, function (response) {
response.on('end', function () {
callback(null, { "statusCode": response.statusCode, "statusMessage": response.statusMessage, "body": response.read() });
});
});
request.on('error', function () {
callback({ "errCode": request.errCode }, null);
});
request.end(body.toString(), function () {
print("[request] SUCCESS");
});
}
Function Usage Example
/*
<INSERT ABOVE SNIPPET HERE WITH SET CONFIGURATIONS>
*/
//=================================================================
log.setLevel(-1); //-1:NONE 0:ERROR 1:WARNING 2:DEBUG 3:TRACE
log.printLevel(2); //0:DISABLE 1:LOG 2:CONSOLE 3:BOTH
//=================================================================
// MAIN SCENARIO
//=================================================================
/**
* Callback to fetch error/response from the request.
* @function callback
* @param {object} err - Error returned in case of a failed request. Has one property - `errCode`.
* @param {object} data - Response returned by a successfully completed request. Has three properties - `statusCode`, `statusMessage`, and `body`.
*/
var callback = function (err, data) {
if (err) {
print("[error]", err.errCode);
} else {
print("[status]", data.statusCode, data.statusMessage);
print("[response]", data.body);
}
}
var bucket = "test";
var precision = "ms"; /* "s", "ms", "us", "ns" */
var measurement = "testMeasurement";
var fields = {
"fieldKey1": '"stringValue"', /* string field value must be enclosed in double quotes */
"fieldKey2": 'floatValue', /* float field value can be in scientific notation */
"fieldKey3": 'integerValue_i', /* integer field values must have a trailing 'i' */
"fieldKey4": 'unsignedIntegerValue_u', /* unsigned integer field values must have a trailing 'u' */
"fieldKey5": 'booleanValue' /* t, T, true, True, TRUE; f, F, false, False, FALSE */
};
var tags = {
"tagKey1": 'tagValue1',
"tagKey2": 'tagValue2'
};
send_influxdb(bucket, measurement, fields, tags, precision, callback);