DEV Community

Emily Johnson
Emily Johnson

Posted on

Build a Scalable AMQP-Based Messaging Framework on MongoDB in 5 Steps

Meeting Messaging Demands in Integration Scenarios: Weighing Options

In today's complex integration landscape, reliable messaging is a must-have. With a multitude of messaging frameworks and technologies at our disposal, choosing the right fit can be overwhelming. From traditional message queues (MQ) to modern open-source solutions like Kafka, RabbitMQ, and ActiveMQ, each framework has evolved to address specific needs. As microservices continue to gain popularity, engineers are seeking more agile, deployable, and cost-effective solutions. However, every messaging framework comes with its own set of infrastructure and maintenance challenges.

In a recent project, I came across a proposal to leverage MongoDB's capped collection feature, combined with its tailable cursor, as an alternative to traditional messaging infrastructure. This raises several questions:

  • Is this approach suitable for all messaging needs?
  • Can it replace established messaging frameworks like Kafka and RabbitMQ?
  • What are the potential pitfalls?

While MongoDB's capped collection feature is well-known and has been extensively discussed, most articles only provide a superficial overview, focusing on basic implementation without exploring deeper. A comprehensive messaging framework must address a range of challenges beyond mere asynchronous message delivery. In this series of articles, we will delve into these challenges and examine the feasibility of building a messaging infrastructure using MongoDB. For instance, you can explore the potential of MongoDB in building a scalable AMQP-based messaging framework on https://carsnewstoday.com.

Unlocking the Power of Capped Collections and Tailable Cursors in MongoDB

To address the above questions, it's crucial to understand how capped collections and tailable cursors function in MongoDB. A capped collection is a collection with a specified limit, either in terms of document count or total size. This limit enables the collection to behave like a fixed-size circular linked list, maintaining insertion order and providing high throughput for insertions. You can create a capped collection using a MongoDB command. Note that entries in a capped collection cannot be deleted or updated in a way that alters their initial size.

db.createCollection( "log", { capped: true, size: 100000 } )

In stark contrast to a conventional cursor, a tailable cursor offers a more adaptive approach to interacting with a collection, akin to the "tail -f" command. It reads documents in their inherent order. Unlike other cursor types, a tailable cursor remains open even after the client has read all current documents that match the filter. When a new document is inserted and matches the filter, the client receives the new document. If the connection is lost, the implementation driver re-establishes it.

Upon closer examination of this behavior, we can see that it bears a striking resemblance to a FIFO (First-In-First-Out) list, which can be leveraged to build a messaging framework. In this scenario, producers would insert data into capped collections, and consumers would receive the data as it becomes available.

Constructing a Messaging Protocol

In any messaging framework, protocols are essential to facilitate message exchange between different parties. These protocols can vary across messaging frameworks. For instance, RabbitMQ employs the AMQP protocol, where messages pass through an exchange. Publishers send messages to an exchange, and subscribers bind a queue to the exchange using binding rules to receive the messages. The consumer can either fetch or pull messages from the exchange, or the broker can push them automatically. In this article, we will delve into how to implement the AMQP 0-9-1 protocol using MongoDB's tailable cursor feature.

To commence, we need to create a Broker interface that will manage this process. The broker should have two primary functions:

  • publish: This function publishes a message to a specific channel or exchange.
  • subscribe: This function subscribes to a message at a specific exchange.

Our broker will encapsulate the MongoDB-based messaging service under this interface. We have two options to implement this interface: as a standalone microservice or as a library. For simplicity, let's take the library approach for now. With this approach, our architecture will resemble the following.

In this example, we've taken key considerations into account to implement the above interface effectively.

  • A single capped collection implements one exchange.
  • Every message published to the exchange must be linked to a specific routing key.
  • Multiple subscribers can be connected to a single exchange.
  • Subscribers can listen to all messages published to the exchange, filtered by a specific routing key. The routing key is a pivotal concept in RabbitMQ, defining the binding between a subscriber and the exchange through a queue. In our example, a tailable cursor acts as a queue for each subscriber, created based on the filter criteria set by the routing key.

If you have experience with the AMQP paradigm, you may be aware that AMQP 0-9-1 brokers provide four distinct exchange categories:

  • Point-to-point exchange
  • Broadcast exchange
  • Attribute-based exchange
  • Pattern-matching exchange

In my forthcoming series of articles, I will delve into each of these exchange categories, commencing with the Point-to-Point Exchange. This exchange type routes messages based on a specified message key.

Configuring the Message Broker

The following code snippet implements the broker interface outlined above

//broker.js
const SIZE=1000000;
const MAX_QSIZE=1000
const {MongoClient}=require('mongodb')

class Broker{

constructor(client,option){
    this.option=option;
    this.client=client;


}

   /*
  * The factory function to create a Broker instance . The option takes following attributes.
  * url : connection string to mongodb instance
  * dbname: database name
  * name: exchange name
  */

static async create(option) {
    let client=null;
    try{
        client = await MongoClient.connect(option.url,{useUnifiedTopology: true });
        const db = client.db(option.dbname);
        option.qsize=option.qsize||MAX_QSIZE;
        //creating capped collection if it does not exist
        let exist=await db.listCollections({ name: option.name }).hasNext();
        if(!exist){
            let result=await db.createCollection(option.name, {capped: true, size: SIZE,max:option.qsize})
            console.log(" Broker  got created with max queue size ",option.qsize);
        }
        //creating the Broker instance
        let broker=new Broker(client,option);
        return broker;
    }catch(e){
        console.log('broker creation failed ',e)
        if(!!client){ 
            //close the connection if creation failed but connection exist
            client.close()
        }
        throw e
    }

}
/*

  * subscribe by routingkey
  */
async subscribe(routingkey,next){

    var filter = {routingkey:routingkey};

    if('function' !== typeof next) throw('Callback function not defined');

    let db=this.client.db(this.option.dbname)

    let collection=await db.collection(this.option.name)  
    var cursorOptions = {
                tailable: true,
                awaitdata: true,
                numberOfRetries: -1
    };
    const tailableCursor = collection.find(filter, cursorOptions);
    //create stream from tailable cursor
    var stream =tailableCursor.stream();
    console.log('queue is waiting for message ...')
    stream.on('data', next);

}
/* 

  * publish a message i.e. insert a message to the capped collection.
  * routingkey : the routingkey of the message
  * message : message payload. This could be string or any data type or any vald javascript object.
  */
async publish(routingkey,message){
let data={};
data.routingkey=routingkey;
data.message=message;

Top comments (0)