DEV Community

Cover image for Schema Validation in Atlas Stream Processing
Joe Niemiec for MongoDB

Posted on • Edited on

Schema Validation in Atlas Stream Processing

Developers, database administrators, and analysts will all be familiar with schemas from relational databases. However, schemas are found in many data systems and are a critical part of any application. Everything from databases to CSV files has schemas to help organize data in a way that allows it to be understood and reused. Good schema design is key in ensuring developers are productive and can create compelling applications quickly.

In this blog, we'll highlight how our recent announcement of Atlas Stream Processing embraces MongoDB's unique approach to schemas.

MongoDB has a different take compared to the rigid schemas found in many traditional databases. MongoDB uses flexible schemas where the schema can be enforced, but it’s not required. This makes it easier for developers to adapt as application needs evolve and grow.

Schemas represent the structure of your data rather than the data values themselves. Effective schema design ensures the structure in the data matches what a developer requires for given data fields. For instance, data can exist as an integer, string, boolean value, object, or another data type. This structure is critical for sharing between systems and for error-free data processing.

In the streaming world, data comes from many sources, and schemas between those sources can vary greatly. Sources include IoT devices like connected vehicles, remote sensors, and change streams or write-ahead logs that describe activity occurring in the database.

Schemas with MongoDB

The flexibility of MongoDB’s document model does away with the need for a strictly defined schema and allows data to be structured in ways that best represent it in the application. This is particularly important when using stream processing, as not all messages have the same schema. A stream can be a mixture of different devices and versions that have evolved over time with many compatibility types. The document model allows messages to be represented in complex ways, not limited by the rules of serialization formats or schema registries.

Consider a weather station reporting sensor results. Flexible schemas allow for reporting in a variety of ways. See the measurements reported in four distinct ways in the example below:

{
    name: 'Station1201',
    measurements: [{
            'temp': 88,
            'u': 'F'
        },
        {
            'press': 12,
            'u': 'bar'
        }
    ]
}

{
    name: 'Station1201',
    firmware: 'v12.00.001',
    measurements: {
        'temp': 88,
        'tempunits': 'F',
        'press': 12,
        'pressunits': 'bar'
    }
}

{
    name: 'Station1201',
    measurements: {
        'temp': 88,
        'u': 'F'
    }
}

{
    name: 'Station1201',
    measurements: {
        'press': 12,
        'u': 'bar'
    }
}
Enter fullscreen mode Exit fullscreen mode

Regardless of schema differences, Atlas Stream Processing can continue processing data, avoid changing a message's structure, and handle missing fields or changed data types natively with the MongoDB Query API. This provides the developer with numerous options when handling schema evolution and compatibility. Schemas that change over time can continue to be processed if the messages continue to contain the required fields, irrespective of whether the schema evolves or other fields are added or removed. With traditionally rigid schemas, this would break compatibility. Even when required fields are missing, by using the $cond operator, default values can be assigned to continue processing.

Validating schemas in Atlas Stream Processing

Atlas Stream Processing provides the ability to validate the structure of documents using the $validate operator. With $validate, it's possible to utilize MQL query operators that can be used in the db.collection.find() commands to filter and match specific documents. Developers can also use $jsonSchema to apply json-schema.org draft-04 specifications to annotate and validate JSON documents. An additional benefit of using $validate is that documents not meeting the MQL operators for filtering or the $jsonSchema requirements can optionally have the validationAction set to a DLQ (Dead Letter Queue) rather than just discarding and potentially losing the messages. Options for validationAction include DLQ, discarding the message, or writing to a log file. This allows for valuable messages to be kept, and if required, processed later by other applications.

Let's look at using $validate to process messages for solar panel energy monitoring; below is a sample of the device messages to be processed from Kafka:

{
    device_id: 'device_1',
    group_id: 2,
    timestamp: '2023-08-24T16:46:12.048+00:00',
    max_watts: 250,
    event_type: 0,
    obs: {
        watts: 121,
        temp: 14
    }
} 

{
    device_id: 'device_8',
    group_id: 2,
    timestamp: '2023-08-25T19:40:51.845+00:00',
    max_watts: 250,
    event_type: 1,
    event_details: 'Network error',
    _ts: ISODate("2023-08-25T19:40:51.845Z"),
    _stream_meta: {
        sourceType: 'sampleData',
        timestamp: ISODate("2023-08-25T19:40:51.845Z")
    }
}
Enter fullscreen mode Exit fullscreen mode

From the maintenance crew, it’s known that the solar panel with {device_id: device_8} is a test device. Here is a simple $validate with a $ne query operator used to discard any documents in which {device_id: device_8}, allowing the test device documents to be removed from the processor pipeline. As a developer, it's common to need to handle messages that are otherwise not important to the processing pipeline. $validate makes this simple to do.

{
    $validate: {
        validator: {
            $expr: {
                $ne: ["$device_id", "device_8"]
            }
        },
        validationAction: "discard"
    }
}
Enter fullscreen mode Exit fullscreen mode

To ensure that the solar panel message structure is correct for processing, $jsonSchema can be used to validate against the json-schema draft 4 specification. In the example below, $validate is checking for required fields to exist. It checks the data type (int, string, object, etc.), min and max numeric ranges, regex pattern matching, and that a specific field does not exist. If any of these schema requirements are violated, the message is sent to the DLQ. By using $jsonSchema, developers can focus on making sure the message fits the pipeline needs, without worrying about additional fields that have been added, removed, or changed. This allows for truly flexible processing of a variety of schemas.

{
    $validate: {
        validator: {
            $jsonSchema: {
                required: ["device_id", "timestamp", "obs", "event_type"],
                not: {
                    required: ["event_details"]
                },
                properties: {
                    device_id: {
                        bsonType: "string",
                        pattern: "^device_\\d+",
                        description: "device_id is required and must be like device_#"
                    },
                    obs: {
                        bsonType: "object",
                        required: ["watts", "temp"],
                        properties: {
                            watts: {
                                bsonType: "int",
                                minimum: 0,
                                maximum: 250,
                                description: "'obs.watts' is required and cannot be less then 0 or more than 250"
                            },
                            temp: {
                                bsonType: "int",
                                description: "'obs.temp' must be an integer"
                            },
                        }
                    },
                    event_type: {
                        bsonType: "int",
                        minimum: 0,
                        maximum: 1,
                    },
                    timestamp: {
                        bsonType: "string",
                        description: "'timestamp' must be a string "
                    }
                }
            }
        },
        validationAction: "dlq"
    }
}

Enter fullscreen mode Exit fullscreen mode

It is also possible to combine both the use of query operators and $jsonSchema together in a single $validate by using logical operators like $and. This provides an extremely easy and powerful way to do field-level comparisons, while at the same time validating that the structure of messages is as expected, before doing additional processing. An example of this is a tumbling window, performing aggregations over time.

{
    $validate: {
        validator: {
            $and: [{
                    $expr: {
                        $ne: [
                            "$device_id",
                            "device_8"
                        ]
                    }
                },
                {
                    $jsonSchema: {
                        required: ["device_id", "timestamp", "obs", "event_type"],
                        not: {
                            required: ["event_details"]
                        },
                        properties: {
                            device_id: {
                                bsonType: "string",
                                pattern: "^device_\\d+",
                                description: "device_id is required and must be like device_#"
                            },
                            obs: {
                                bsonType: "object",
                                required: ["watts", "temp"],
                                properties: {
                                    watts: {
                                        bsonType: "int",
                                        minimum: 0,
                                        maximum: 250,
                                        description: "'obs.watts' is required and cannot be less than 0 or more than 250"
                                    },
                                    temp: {
                                        bsonType: "int",
                                        description: "'obs.temp' must be an integer"
                                    },
                                }
                            },
                            event_type: {
                                bsonType: "int",
                                minimum: 0,
                                maximum: 1,
                            },
                            timestamp: {
                                bsonType: "string",
                                description: "'timestamp' must be a string "
                            }
                        }
                    }
                }
            ]
        },
        validationAction: "dlq"
    }
}
Enter fullscreen mode Exit fullscreen mode

Schemas are essential to an application’s ability to process data and integrate. At the same time, schemas naturally become complex and challenging to manage as applications evolve and organizations grow. Atlas Stream Processing simplifies these complexities inherent to schema management, reducing the amount of code required to process streaming data, while making it faster to iterate, easier to validate for data correctness, and simpler to gain visibility into your data.

Learn more about MongoDB Atlas Stream Processing

Check out the MongoDB Atlas Stream Processing GA announcement or read our docs.

Try Atlas Stream Processing today

Login to MongoDB Atlas to get started

Top comments (1)

Collapse
 
clrkgg_20 profile image
clrkgg

Great post!