DEV Community

J Fowler
J Fowler

Posted on • Updated on

Sending IoT Device Data via MQTT broker.

In a previous post, we showed how to send and receive messages from IoT devices using an MQTT broker. In this post we'll extend that idea to a real world example.

Suppose you have an IoT device that measures temperature and humidity in a greenhouse (Not hard to make one using Raspberry Pi or Arduino).

We want to monitor the greenhouse conditions remotely from another computer or, perhaps, a central logging service. In the previous post, we showed a Go implementation of code to send messages so we will extend that example.

Instead of just sending a string saying "temp is x, humidity is y", let's define a structure for the message and device. Consider that you have (or want to add in the future) a device to monitor moisture or rainfall and you want to connect that one as well.

To leave open the possibility of multiple devices and types, we need a data model to handle that.

type Message struct {
    Time       time.Time   `json:"time"`
    DeviceId   string      `json:"device_id"`
    DeviceType string      `json:"device_type"`
    DeviceData interface{} `json:"device_data"`
}

Enter fullscreen mode Exit fullscreen mode

The Message struct is what will be sent to the MQTT broker. It includes the device id, device type, and an interface to the device specific data.

Our device measures temperature and humidity, so that is the data that will be sent in a message.

Next, we need to send the message to the broker. We'll employ JSON format for its simplicity in this example. Note that in a large-scale system with thousands or more devices, we may want to use a more compact format.

message := generateRandomMessage()
payload, err := json.Marshal(message)
if err != nil {
    panic(err)
}
token := client.Publish(topic, 0, false, payload)
Enter fullscreen mode Exit fullscreen mode

Go makes marshalling into JSON pretty easy. Once marshaled, the json message is sent to the broker.

What else would we want to do with the data once we have it: store it to a database, display it on a dashboard, check the values for alarm conditions. We'll need to convert the json to make it usable.

On the receiving side, we just need to unmarshal the json into a struct. We'll use a structure similar to that used on the sending side; but we want to unmarshal it into an object that we can store in a db.

type BaseModel struct {
    ID        uint      `gorm:"primarykey" json:"id"`
    CreatedAt time.Time `json:"created_at"`
    UpdatedAt time.Time `json:"updated_at"`
}
type IoTDeviceMessage struct {
    BaseModel
    Time       time.Time       `json:"time" gorm:"index"`
    DeviceID   string          `json:"device_id"`
    DeviceType string          `json:"device_type"`
    DeviceData json.RawMessage `json:"device_data"`
}

...

func processMsg(ctx context.Context, ....

...

    case msg, ok := <-input:
    if !ok {
        return
    }
    logger.Info().Msg(fmt.Sprintf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()))
    var iotMsg Message
    err := json.Unmarshal(msg.Payload(), &iotMsg)
    if err != nil {
        logger.Error().Err(err).Msg("Error unmarshalling Message")
    } else {
        out <- iotMsg
    }

...

Enter fullscreen mode Exit fullscreen mode

Remember that MQTT can be used to publish to multiple topics and it is common practice to use a hierarchical naming convention for topics. So in the case of multiple device types in the greenhouse example, the recommended way is to have different device types publish to different topics.

For example, in our greenhouse example the topic names could be structured like:

/greenhouse/temprh/deviceid
/greenhouse/moisture/deviceid
Enter fullscreen mode Exit fullscreen mode

In MQTT, we can subscribe to topics using a wildcard topic, such as:

if token := client.Subscribe("/greenhouse/#", 0, nil); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }
Enter fullscreen mode Exit fullscreen mode

which will match all devices in the greenhouse namespace. then we would just need to add logic to processMsg() to look at the topic of the incoming message to know how to unmarshal it. Although if we all devices can publish their data using the message model above, we can keep things simplified.

Now that we have a device message in a usable form, what should be done with it. In the next post in this series, we'll demonstrate our approach to persist the message in PostGres.

As usual the full source code for the sender can be found here and the subscriber code can be found here.

Let me know your thoughts in the comments.

Thanks!

Top comments (0)