DEV Community

Cover image for Measuring Crowd Engagement with an MQTT-based IoT App
Jasmine Mae for Fermyon

Posted on

Measuring Crowd Engagement with an MQTT-based IoT App

By: Kate Goldenring

A key emblem of a conference is the exposition hall. Rows of booths fill a large room, vying to grab attendees’ attention. Booth staff have one objective: scan those badges! In this case, each badge represents a tick in engagement. But, how do you distinguish between a curious booth visitor and one who is reaching for the swag water bottle? Rather than scans, what if sensors could detect booth traffic?

This blog walks through how to deploy an IoT setup with SpinKube to your booth to measure engagement. Specifically, we will use the volume of sound around the booth as a proxy for booth engagement at any given point. This not only removes human error from measuring engagement – I forgot to scan the badge! – but also measures the time of the day when visitors are engaged in the booth which could inform booth staffing.

We will explore a Spin application that uses the MQTT trigger and deploy it to run on your Kubernetes cluster using SpinKube, whether on the edge or in the cloud. MQTT is a lightweight, publish-subscribe messaging protocol that enables devices to send and receive messages through a broker. Our Spin app will receive MQTT messages from sound devices that are at each booth and chart booth volume over time. The result is a visual graph of engagement at each booth.

Overview

In this blog, we will learn how to compose a full-stack, IoT Spin application and deploy it to Kubernetes via SpinKube. First, we will dissect our full-stack Spin application, that not only consumes data from the MQTT sound sensors but also contains a frontend to visualize the collected data. Next, we will deploy our Spin application to Kubernetes using SpinKube which now supports the MQTT trigger. We will use a mock MQTT device for our demo, but the repository that hosts the example contains steps for configuring a WiFi enabled arduino board in case you want to bring this to your booth.

Inside a Booth Volume Spin Application

Our booth demo consists of 3 components:

  1. A component that is triggered by messages from sound devices. It takes the volume value and persists it along with the current time and source of the message in a SQLite database.
  2. A backend HTTP API component that returns the booth volume over time from the database
  3. A frontend component that graphs the volume of a booth overtime
  4. The fully implemented application can be found here.

MQTT Event Responder Component

Let’s start by looking at the implementation of our first component, which is triggered by messages published to an MQTT topic and persists the data in a SQLite database. It is implemented in Rust and was scaffolded using the MQTT template. In the Spin application manifest (spin.toml), we are using a Spin application variable to dynamically configure the address for the MQTT broker. This will enable us to set Spin to connect to a broker at "mqtt://localhost:1883" when running locally and at "mqtt://emqx.default.svc.cluster.local:1883" when running in SpinKube. We also set the keep alive interval (secs) for connections with the broker and username and password authentication credentials. In this example application, the broker does not require authentication, so the credentials are left empty.

[application.trigger.mqtt]
address = "{{ broker_uri }}"
username = ""
password = ""
keep_alive_interval = "30"
Enter fullscreen mode Exit fullscreen mode

Individual components in a Spin app can be triggered for messages published to specific topics of this broker. For our mqtt-message-persister component, Spin will listen for all messages posted to a topic that matches booth/+, and execute the application each time a message is published to a matching topic. The + sign is a single-level wildcard that will match any string in place of the wildcard, i.e. booth/20 but not booth/20/b. The quality of service (QoS) level is also set on each component. Here, we set a QoS of 1, which indicates that messages must be delivered at least once.

[[trigger.mqtt]]
component = "mqtt-message-persister"
topic = "booth/+"
qos = "1"
Enter fullscreen mode Exit fullscreen mode

Finally, since our application persists the volume levels from the sound sensors in a SQLite database, we need to explicitly allow the use of a SQLite database with the “default” label in the component configuration:

[component.mqtt-message-persister]
source = "mqtt-message-persister/target/wasm32-wasi/release/mqtt_message_persister.wasm"
sqlite_databases = ["default"]
Enter fullscreen mode Exit fullscreen mode

The implementation of this component is fairly simple. It deserializes the message payload to extract the volume published by the device. Since we do not want to flood our database with silence, we first check if the volume is above a threshold value. If so, we store the volume, a timestamp, and the topic name in the SQLite database using the Spin SQLite SDK.

#[mqtt_component]
async fn handle_message(message: Payload, metadata: Metadata) -> anyhow::Result<()> {
    let message = String::from_utf8_lossy(&message);
    let data = serde_json::from_str::<Data>(&message)?; 
    // Define threshold value to determine whether we should store data or not
    let threshold = variables::get("threshold").unwrap_or(DEFAULT_THRESHOLD.to_string()).parse::<i64>().unwrap();

    // Check whether our collected volume exceeds the threshold value
    if data.volume > threshold {
        let datetime: DateTime<Utc> = std::time::SystemTime::now().into();
        let formatted_time = datetime.format("%Y-%m-%d %H:%M:%S.%f").to_string();

        // Open connection to our "default" SQLite database
        let connection = Connection::open_default()?;

        let execute_params = [
            Value::Text(metadata.topic),
            Value::Integer(data.volume),
            Value::Text(formatted_time),
        ];

        // Insert collected data into our SQLite database
        connection.execute(
            "INSERT INTO noise_log (source, volume, timestamp) VALUES (?, ?, ?)",
            execute_params.as_slice(),
        )?;
    }
    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

Backend API Component

With all the sound data being stored in the database by our mqtt-message-persister component, we need another component to act as an API to expose the data to the frontend. The HTTP triggered api component, implemented in Typescript, does this. It simply fetches all rows from the noise_log database and returns them serialized:

export async function handler(req: Request, res: ResponseBuilder) {
// Opens connection to our "default" SQLite database 
  let conn = Sqlite.openDefault();
  // Retrieve all data from the `noise_log` table
  let result = conn.execute("SELECT * FROM noise_log", []);
  let items = result.rows.map(row => {
    return {
        source: row["source"],
        volume: Number(row["volume"]),
        timestamp: row["timestamp"],
    }
  });
  res.set({ "content-type": "application/json" });
  // Send the data as JSON objects
  res.send(JSON.stringify(items));
}
Enter fullscreen mode Exit fullscreen mode

Frontend Volume Graph

To bring it all together, we need a nice frontend that enables the marketing team to see how loud (aka engaged) our booths were. Our frontend is a simple static file server that fetches the data from the api component and graphs a line for each topic (in this case booth). Say we are monitoring two booths in the expo hall, booth 22 and 33. While booth 22 receives bursts of booth traffic, booth 33 has no visitors, so its sensor is just capturing the constant hum of the room. The frontend would display the following graph:

Booth Volume Level

Deploying our Application to SpinKube

With our Spin application completed, we can now deploy it to Kubernetes by configuring the cluster with SpinKube. SpinKube is an open source project that enables running Spin applications on Kubernetes alongside containers. The project consists of 4 sub-projects, the Spin Operator, the spin kube plugin, the runtime class manager, and the Spin containerd shim. The latter is what contains the Spin runtime and executes the Spin applications on your nodes. The v0.16.0 release of the shim added support for the MQTT trigger, enabling us to run our application on SpinKube! You can use one of SpinKube’s installation guides to install SpinKube on your distribution of Kubernetes.

Before applying our application to the cluster, we need to make sure there is a MQTT broker running that can be reached from within the cluster. For simplicity, we are deploying an EMQX MQTT broker as a Pod in the cluster along with a service we can configure as the address for the MQTT trigger in our Spin application. For testing purposes, we will also apply a fake sound sensor to the cluster that publishes sound values to the broker. Apply these resources from the example repository:

kubectl apply -f spinkube/broker.yaml
kubectl apply -f spinkube/sound-device.yaml
Enter fullscreen mode Exit fullscreen mode

Before deploying our application to the cluster, we need to install the spin kube plugin and build and push the application to a registry.

spin plugins install kube
spin build
spin registry push ttl.sh/spin-mqtt-booth-volume:v0.1.0
Enter fullscreen mode Exit fullscreen mode

Now, let’s scaffold and apply our Spin application, setting the MQTT broker address through the broker_uri application variable:

spin kube scaffold --from ttl.sh/spin-mqtt-booth-volume:v0.1.0 --variable broker_uri="mqtt://emqx.default.svc.cluster.local:1883" --replicas 1 --runtime-config-file spinkube/runtime-config.toml | kubectl apply -f -
Enter fullscreen mode Exit fullscreen mode

Note: this blog skips over the steps to create a Turso database to persist the data. Reference the documentation from the example for instructions.

Apply ingress or port-forward your mqtt-booth-volume service and now you can assess your booth traffic! Queue the applause, away from the sensor please. Hope this served as inspiration for what you can build with Spin and SpinKube.

Top comments (0)