InfluxDB
This snippet provides a function to send IoT data to InfluxDB.
Resources used: HTTPS x 1
Details
The send_influxdb
function can be used to POST the passed time-series data in Line Protocol format to an InfluxDB 'bucket' using its API, over HTTPS. The result (error/response) is then passed to the callback function.
To start using this snippet, HOST
(Address of the InfluxDB instance), PORT
(Port of the InfluxDB server), ORG_NAME
(Name of the Organization), ACCESS_TOKEN
(Token from InfluxDB account), and CA
are required to be configured by the user.
NOTE: The provided function can handle data sizes up to 4KB. To handle larger data sizes, please refer to divided writing for https objects in the neqto.js documentation.
Click here to learn how to get a CA.
var CA = "-----BEGIN CERTIFICATE-----\n...<CA>...\n-----END CERTIFICATE-----"
//=================================================================
// INFLUXDB SNIPPET
//=================================================================
//=================================================================
// The following configuration are MANDATORY. Set by user.
//=================================================================
// The address of the InfluxDB server.
// eg. '<REGION>.<PROVIDER>.cloud2.influxdata.com' or 'xx.xx.xx.xx'
var HOST = '<YOUR_HOST>';
// The port of the InfluxDB server.
// eg. 443 for Influx Cloud and 9999 for self-hosted InfluxDB server
var PORT = '<YOUR_PORT>';
// The Access Token for authorization, from InfluxDB.
// eg. 'xxxxxxxxxxxxxxxx'
var ACCESS_TOKEN = '<YOUR_ACCESS-TOKEN>';
// The name of the Organization to send data to.
var ORG_NAME = '<YOUR_ORGANIZATION>';
// Public certificate of the certificate authority that signed the InfluxDB server certificate for SSL/TLS handshake.
// eg. '-----BEGIN CERTIFICATE-----\n...<CA>...\n-----END CERTIFICATE-----'
var CA = '<YOUR_CA>';
//=================================================================
/**
* Upload data to InfluxDB Measurement Field using Access Token.
* https://docs.influxdata.com/influxdb/v2.0/api/#operation/PostWrite
* @function send_influxdb
* @param {string} bucket - The bucket to send data to, as a String.
* @param {string} measurement - The measurement to write data to, as a String.
* @param {object} fieldsObj - All field key-value pairs for the point, as an Object.
* @param {object} tagsObj - All tag key-value pairs for the point, as an Object.
* @param {object} precision - The precision of the timestamp, as a String.
* @param {function} callback - User callback to return the result (error/response).
* @returns {undefined}
*/
var send_influxdb = function (bucket, measurement, fieldsObj, tagsObj, precision, callback) {
var fields = '';
for (var key in fieldsObj) fields += `,${key}=${fieldsObj[key]}`;
var tags = '';
for (var key in tagsObj) tags += `,${key}=${tagsObj[key]}`;
var factor = 1000000; // precision == 'ms'
if (precision == 's') factor = 0.001;
else if (precision == 'ms') factor = 1;
else if (precision == 'us') factor = 1000;
var timestamp = Number(new Date().getTime() * factor);
var body = `${measurement}${tags} ${fields.substring(1)} ${timestamp}`; // <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
var options = {
"method": 'POST',
"host": HOST,
"port": Number(PORT),
"path": `/api/v2/write?org=${ORG_NAME}&bucket=${bucket}&precision=${precision}`,
"headers": {
"Content-Type": 'text/plain',
"Authorization": `Token ${ACCESS_TOKEN}`,
"Content-Length": body.length.toString()
},
"ca": CA
};
var request = https.request(options, function (response) {
response.on('end', function () {
callback(null, { "statusCode": response.statusCode, "statusMessage": response.statusMessage, "body": response.read() });
});
});
request.on('error', function () {
callback({ "errCode": request.errCode }, null);
});
request.end(body.toString(), function () {
print("[request] SUCCESS");
});
}
Function Usage Example
/*
<INSERT ABOVE SNIPPET HERE WITH SET CONFIGURATIONS>
*/
//=================================================================
log.setLevel(-1); //-1:NONE 0:ERROR 1:WARNING 2:DEBUG 3:TRACE
log.printLevel(2); //0:DISABLE 1:LOG 2:CONSOLE 3:BOTH
//=================================================================
// MAIN SCENARIO
//=================================================================
/**
* Callback to fetch error/response from the request.
* @function callback
* @param {object} err - Error returned if the request failed. Has one property - `errCode`.
* @param {object} data - Response returned by a successfully completed request. Has three properties - `statusCode`, `statusMessage`, and `body`.
*/
var callback = function (err, data) {
if (err) {
print("[error]", err.errCode);
} else {
print("[status]", data.statusCode, data.statusMessage);
print("[response]", data.body);
}
}
var bucket = "test";
var precision = "ms"; /* "s", "ms", "us", "ns" */
var measurement = "testMeasurement";
var fields = {
"fieldKey1": '"stringValue"', /* string field value must be enclosed in double quotes */
"fieldKey2": 'floatValue', /* float field value can be in scientific notation */
"fieldKey3": 'integerValue_i', /* integer field values must have a trailing 'i' */
"fieldKey4": 'unsignedIntegerValue_u', /* unsigned integer field values must have a trailing 'u' */
"fieldKey5": 'booleanValue' /* t, T, true, True, TRUE; f, F, false, False, FALSE */
};
var tags = {
"tagKey1": 'tagValue1',
"tagKey2": 'tagValue2'
};
send_influxdb(bucket, measurement, fields, tags, precision, callback);
The company names and product names mentioned above are registered trademarks or trademarks of their respective companies.
Updated: 2023-04-14