DEV Community

Cover image for How to Keep a History of MQTT Data With Node.js
Alexey Timin for ReductStore

Posted on • Originally published at reduct.store

How to Keep a History of MQTT Data With Node.js

The MQTT protocol is widely used in IoT applications because of its simplicity and ability to connect different data sources to applications using a publish/subscribe model. While many MQTT brokers support persistent sessions and can store message history as long as an MQTT client is not available, there may be cases where data needs to be stored for a longer period. In such cases, it is recommended to use a time series database. There are many options available, but if you need to store unstructured data such as images, sensor data, or Protobuf messages, consider using ReductStore. It is a time series database specifically designed for storing large amounts of blob data and optimized for IoT and edge computing.

ReductStore provides client SDKs for many programming languages to integrate it into your infrastructure. In this example, we will use the client SDK for JavaScript.

Let’s make a simple MQTT application to see how it works.

Prerequisites

For this usage example, we have the following requirements:

  • Linux AMD64
  • Docker and Docker Compose
  • NodeJS >= 16

If you are an Ubuntu user, you can install the dependencies by running the following command in your terminal:

$ sudo apt-get update
$ sudo apt-get install docker-compose nodejs
Enter fullscreen mode Exit fullscreen mode

Run MQTT Broker and ReductStore with Docker Compose

The easiest way to run the broker and database is to use Docker Compose with the following docker-compose.yml file:

version: "3"
services:
  reduct-storage:
    image: reduct/store:latest
    volumes:
      - ./data:/data
    ports:
      - "8383:8383"

  mqtt-broker:
    image: eclipse-mosquitto:1.6
    ports:
      - "1883:1883"
Enter fullscreen mode Exit fullscreen mode

Then run the configuration:

docker-compose up
Enter fullscreen mode Exit fullscreen mode

Docker Compose downloaded the images and ran the containers. Pay attention that we published ports 1883 for MQTT protocol and 8383 for ReductStore HTTP API.

Write NodeJS Script

Now it's time to dive into the code and start working. To begin, let's initialize the NPM package and install the necessary dependencies. We'll need the MQTT Client and the JavaScript Client SDK.

$ npm init
$ npm install --save reduct-js async-mqtt
Enter fullscreen mode Exit fullscreen mode

When we have all the dependencies installed, we can write the script:

const MQTT = require('async-mqtt');
const {Client} = require('reduct-js');

MQTT.connectAsync('tcp://localhost:1883').then(async (mqttClient) => {
  await mqttClient.subscribe('#');

  const reductClient = new Client('http://localhost:8383');
  const bucket = await reductClient.getOrCreateBucket('mqtt');

  mqttClient.on('message', async (topic, msg) => {
    const record = await bucket.beginWrite(topic);
    await record.write(msg)
    console.log('Received message "%s" from topic "%s" is written', msg,
        topic);
  });

}).catch(error => console.error(error));
Enter fullscreen mode Exit fullscreen mode

Let's examine the code in detail. First, we need to establish a connection to the MQTT broker and subscribe to all topics using the # wildcard.

MQTT.connectAsync('tcp://localhost:1883').then(async (mqttClient) => {
  await mqttClient.subscribe('#');

 // rest of code

}).catch(error => console.error(error));
Enter fullscreen mode Exit fullscreen mode

If the MQTT connection is successful, we can start dealing with ReductStore. To start writing data there, we need a bucket. We create a bucket with the name mqtt or get an existing one:

 const reductClient = new Client('http://localhost:8383');
 const bucket = await reductClient.getOrCreateBucket('mqtt');
Enter fullscreen mode Exit fullscreen mode

The final step is to write the received message to the database. To accomplish this, we need to use a callback for the event message in order to capture the message. Then, we can write the message to the entry that corresponds to the topic name.

  mqttClient.on('message', async (topic, msg) => {
    const record = await bucket.beginWrite(topic);
    await record.write(msg)
    console.log('Received message "%s" from topic "%s" is written', msg,
        topic);
  });
Enter fullscreen mode Exit fullscreen mode

When we call bucket.beginWrite we create an entry in the bucket if it doesn’t exist yet. Then we write data to the entry with the current timestamp. Now our MQTT data is safe and sound in the storage, and we can access them by using the same SDK.

Publish Data To MQTT Topic

When you launch the script, it does nothing because there is no data from MQTT. You have to publish something. I prefer to use mosquitto_pub. For Ubuntu users, it is a part of the mosquitto-clients package:

$ sudo apt-get install mosquitto-clients
$ mosuitto_pub -t topic-1 -m "Hello, topic-1!"
$ mosuitto_pub -t topic-2 -m "Hello, topic-2!"
Enter fullscreen mode Exit fullscreen mode

Getting Data From ReductStore

Now you know how to get data from MQTT and write it to ReductStore, but we need a little NodejS script to read the data from the storage:

const {Client} = require('reduct-js');

const client = new Client('http://localhost:8383');

client.getBucket('mqtt').then(async (bucket) => {
  for (const entry of await bucket.getEntryList()) {

    for await (const record of bucket.query(entry.name)) {
      data = await record.read();
      console.log('Found record "%s" with timestamp "%d"', data, record.time);
    }
  }

}).catch(error => console.error(error));
Enter fullscreen mode Exit fullscreen mode

Here, we browse all entries in the mqtt bucket and query all records from each entry. Reading the latest record in the entry is very easy.

const record  = await bucket.beginRead(entry.name);
const data = await record.readAsString();
Enter fullscreen mode Exit fullscreen mode

To retrieve all the records or records within a given time interval, we can use the query method. This method returns an asynchronous iterator, allowing us to use a for await loop to iterate over the records.

   for await (const record of bucket.query(entry.name)) {
      data = await record.read();
      console.log('Found record "%s" with timestamp "%d"', data, record.time);
   }
Enter fullscreen mode Exit fullscreen mode

Best Practices

The provided usage example is quite simple and lacks many details that you may encounter in a real-world application. Here are a few pieces of advice to help you build a robust and powerful IoT application based on ReductStore and MQTT:

  • Create a ReductStore bucket with a FIFO quota to prevent disk overwriting in the future.
  • Use token authentication to protect your data. You can generate an access token using either the Web Console or the CLI client.
  • Map MQTT5 properties to ReductStore labels. This will facilitate data filtering during querying or replication.
  • Utilize ReductCLI for data replication or backup purposes.

Conclusion

As you can see, the MQTT protocol and ReductStore are both incredibly user-friendly technologies that can be easily integrated with each other in NodeJS. This combination offers a powerful solution for various applications. Whether you're working on a small project or a large-scale system, the MQTT protocol and ReductStore provide a reliable and efficient way to handle data communication and storage.

To help you better understand how to use these technologies, we've prepared an example that demonstrates their seamless integration. You can access the source code of the example by clicking here. This example showcases just how simple and effective it is to use MQTT and ReductStore together..


I hope this tutorial has been helpful. If you have any questions or feedback, don’t hesitate to reach out in Discord or by opening a discussion on GitHub.

Links

Top comments (0)