DEV Community

Cover image for How to use FastAPI with Materialize for real-time data processing
Bobby Iliev
Bobby Iliev

Posted on • Updated on • Originally published at devdojo.com

How to use FastAPI with Materialize for real-time data processing

Introduction

[!WARNING]
This demo includes examples for an unsupported version of Materialize (0.26.x).

This is a self-contained demo of FastAPI and Materialize.

This demo project contains the following components:

Diagram

FastAPI and Materialize Demo

Running the demo

Clone the repository:

git clone https://github.com/bobbyiliev/materialize-tutorials.git
cd materialize-tutorials
git checkout lts
Enter fullscreen mode Exit fullscreen mode

Access the FastAPI demo project directory:

cd mz-fastapi-demo
Enter fullscreen mode Exit fullscreen mode

Pull all Docker images:

docker-compose pull
Enter fullscreen mode Exit fullscreen mode

Build the project:

docker-compose build
Enter fullscreen mode Exit fullscreen mode

Finally, run all containers:

docker-compose up
Enter fullscreen mode Exit fullscreen mode

Create the Materialize sources and views

Once the demo is running, you can create the Materialize sources and views.

Let's start by creating a Redpanda/Kafka source:

CREATE SOURCE sensors
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'sensors'
FORMAT BYTES;
Enter fullscreen mode Exit fullscreen mode

Then create a non-materialized view which you can think of essentially an alias that we will use to create our materialized views. The non-materialized views do not store the results of the query:

CREATE VIEW sensors_data AS
    SELECT
        *
    FROM (
        SELECT
            (data->>'id')::int AS id,
            (data->>'pm25')::double AS pm25,
            (data->>'pm10')::double AS pm10,
            (data->>'geo_lat')::double AS geo_lat,
            (data->>'geo_lon')::double AS geo_lon,
            (data->>'timestamp')::double AS timestamp
        FROM (
            SELECT CAST(data AS jsonb) AS data
            FROM (
                SELECT convert_from(data, 'utf8') AS data
                FROM sensors
            )
        )
    );
Enter fullscreen mode Exit fullscreen mode

After that, create a materialized view that will hold all records in the last 10 minutes:

CREATE MATERIALIZED VIEW sensors_view AS
    SELECT
        *
    FROM sensors_data
    WHERE
        mz_logical_timestamp() < (timestamp*1000 + 100000)::numeric;
Enter fullscreen mode Exit fullscreen mode

Note that we are using the mz_logical_timestamp() function rather than the now() function. This is because in Materialize now() doesn’t represent the system time, as it does in most systems; it represents the time with timezone when the query was executed. It cannot be used when creating views. For more information, see the documentation here.

Next, let's create materialized view that will only include data from the last second so we can see the dataflow and use it for our Server-Sent Events (SSE) demo later on:

CREATE MATERIALIZED VIEW sensors_view_1s AS
    SELECT
        *
    FROM sensors_data
    WHERE
        mz_logical_timestamp() < (timestamp*1000 + 6000)::numeric;
Enter fullscreen mode Exit fullscreen mode

With that our materialized views are ready and we can visit the FastAPI demo project in the browser!

FastAPI Demo

Finally, visit the FastAPI demo app via your browser:

  • Endpoint for all records in the last 10 minutes:

http://localhost/sensors

  • SSE Endpoint streaming the latest records as they are generated using TAIL:

http://localhost/stream

Example response:

SSE FastAPI with Materialize

Materialize Cloud

If you want to run the demo on the cloud, you would need the following:

  • A publicly accessible Redpanda/Kafka instance so that you can connect to it.
  • A Materialize Cloud account. You can sign up for a free Materialize Cloud account to get started with Materialize Cloud.

If you already have that setup, you would need to make the following changes to the demo project:

  • When creating the source, change the redpanda:9092 to your Redpanda/Kafka instance:
CREATE SOURCE sensors
FROM KAFKA BROKER 'your_redpanda_instance:9092' TOPIC 'sensors'
FORMAT BYTES;
Enter fullscreen mode Exit fullscreen mode
  • Change the DATABASE_URL environment variable to your Materialize Cloud database URL and uncomment the certificate-specific environment variables in the docker-compose.yml file.
    in the docker-compose.yml file.

  • Download the Materialize instance certificate files from your Materialize Cloud dashboard.

Stop the demo

To stop the demo, run the following command:

docker-compose down -v
Enter fullscreen mode Exit fullscreen mode

You can also stop only the data generation container:

docker-compose stop datagen
Enter fullscreen mode Exit fullscreen mode

Helpful Links

Top comments (2)

Collapse
 
sikukudancan profile image
DANCAN SIKUKU

good work

Collapse
 
bobbyiliev profile image
Bobby Iliev

Thanks Dancan!