DEV Community

jbebar
jbebar

Posted on • Edited on

How to read messages reliably from RABBIT MQ broker?

What is Rabbit MQ πŸ‡?

Rabbit MQ is a broker of messages implementing the AMQP protocal. It makes in possible for application to communicate in synchronous as well as in aynchronous way. Each application can subscribe and wait for new messages to arrive.

AMQP involves three main actors: publishers, consumers and the broker.
A publisher is an application that pushes messages to a Rabbit MQ broker.
In the Rabbit MQ broker has two important elements: queues and exchanges binded by routing rules.
When Rabbit MQ receives a message, it will check the routing information of the message and drop it the right queue according to the routing rules. The messages will stay in the queue until an application subscribes to the queue and reads the messages.

In this article I will just talk about the consuming part based on a queue with existing messages and a consumer reading from it, so no need to understand the routing part :).

There are two ways a consumer can read a message from a queue:

  • auto acknowledgement mode (autoAck): the messages are automatically removed from the queue as soon as they are sent to the consumer, this is
  • manual Acknowledgement mode (manualAck): the consumer subscribes to a queue, and the messages are removed only when the consumer has sent back an acknowledgement (Ack) back to the broker.

We will experiment these two ways of consuming messages.

A simple consumer example πŸ§ͺ

Let's see the code of a simple consumer made in Kotlin:



package org.jbebar

import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.Delivery
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.exitProcess

fun main(args: Array<String>) {
    val arguments = args.map { it.split("=") }.map { Pair(it[0], it[1]) }.toMap()
    val autoAck = arguments["autoAck"].toBoolean()
    val qOS = arguments["qos"]?.toInt()
    val processDurationSeconds = arguments["processDurationSeconds"]!!.toLong()
    val countDownLatch = CountDownLatch(10)

    var processedMessageCount = AtomicInteger(0)
    val factory = ConnectionFactory()
    factory.host = "localhost"

    val connection = factory.newConnection()
    val channel = connection.createChannel().apply { qOS?.let { basicQos(it) } }

    val deliveryCallback = { _: String, message: Delivery ->
        val receivedMessage = String(message.body)
        println("Processing : $receivedMessage")
        Thread.sleep(processDurationSeconds * 1000)
        if (!autoAck) {
            println("Acknowledging message $receivedMessage")
            channel.basicAck(message.envelope.deliveryTag, false)
        }
        processedMessageCount.incrementAndGet()
        countDownLatch.countDown()
        println("Processed : $receivedMessage")
    }

    channel.basicConsume("demo-queue", autoAck, deliveryCallback, { _ -> })

    countDownLatch.await(10, TimeUnit.SECONDS)

    println("Processed ${processedMessageCount.get()} messages.")
    exitProcess(0)
}


Enter fullscreen mode Exit fullscreen mode

The first part of this code retrieves the command line arguments that will parameter our consumer:

  • "autoAck": parameters the way our consumers reads messages, if true it is in auto ack mode, if false we are in manualAck.

  • "qOS": quantity of outstanding messages, quantity of messages that the broker allows to be unacknowledged for one consumer. Also called the quantity of inflight messages.
    This parameter is meaningful only if we are in manualAck mode

  • "processDuration": This parameter sets the time or application takes to process each message. It is not related to rabbit mq configuration, but it is a way to simulate a lagging consumer or a fast one.



    val autoAck = arguments["autoAck"].toBoolean()
    val numberOfMessagesToProcess = arguments["numberOfMessagesToProcess"]?.toInt() ?: 1
    val qOS = arguments["qos"]?.toInt()
    val processDuration = Duration.ofSeconds(arguments["processDurationSeconds"]?.toLong() ?: 0L)


Enter fullscreen mode Exit fullscreen mode

We create connection based on the Qos passed earlier:



    val factory = ConnectionFactory()
    factory.host = "localhost"
    val connection = factory.newConnection()
    val channel = connection.createChannel().apply {
        qOS?.let { basicQos(it) }
    }


Enter fullscreen mode Exit fullscreen mode

In this deliveryCallback, we simulate a business process and pass it to the channel which will consume from a queue called "demo-queue".



     val deliveryCallback = { _: String, message: Delivery ->
        val receivedMessage = String(message.body)
        println("Processing : $receivedMessage")
        Thread.sleep(processDurationSeconds * 1000)
        if (!autoAck) {
            println("Acknowledging message $receivedMessage")
            channel.basicAck(message.envelope.deliveryTag, false)
        }
        processedMessageCount.incrementAndGet()
        countDownLatch.countDown()
        println("Processed : $receivedMessage")
    }


Enter fullscreen mode Exit fullscreen mode

One important part of the code is the CountDownLatch.
It prevents our application from exiting before receiving straight after it launched as it blocks and exits only on two conditions:

  • countDownLatch.countDown() has been called 10 times, that means 10 messages have been processed.
  • it passes a duration of 10 seconds

You can see the full code here :).

Let's try our consumer πŸ§ͺ !

3 steps:



docker run -d --rm --hostname my-rabbit --name rabbit-broker -p 5672:5672 -p 15672:15672 rabbitmq:3-management


Enter fullscreen mode Exit fullscreen mode
  • 2 Build and launch the publisher to publish 10 messages:


git clone https://github.com/jbebar/rabbit-mq-sample-sender.git
mvn clean package
java -jar target/amqp-sender-jar-with-dependencies.jar 10


Enter fullscreen mode Exit fullscreen mode

You should see the queue in the admin interface of rabbit mq, if you go in the detail of the queue you will see this:

Alt Text

  • 3 Build and launch the app:


git clone https://github.com/jbebar/rabbit-mq-sample-receiver.git
mvn clean package
java -jar target/amqp-consumer-jar-with-dependencies.jar autoAck=true processDurationSeconds=0


Enter fullscreen mode Exit fullscreen mode

Out put:



Processing : Message 1
Processed : Message 1
Processing : Message 2
Processed : Message 2
Processing : Message 3
Processed : Message 3
Processing : Message 4
Processed : Message 4
Processing : Message 5
Processed : Message 5
Processing : Message 6
Processed : Message 6
Processing : Message 7
Processed : Message 7
Processing : Message 8
Processed : Message 8
Processing : Message 9
Processed : Message 9
Processing : Message 10
Processed : Message 10
Processed 10 messages.


Enter fullscreen mode Exit fullscreen mode

The process is fast enough to consume all messages before the 10 seconds time out.
But if we try with a business process duration of 5 seconds:



java -jar target/amqp-consumer-jar-with-dependencies.jar autoAck=true processDurationSeconds=5

Processing : Message_1
Processing : Message_2
Processing : Message_3
Processed 2 messages.


Enter fullscreen mode Exit fullscreen mode

Our consumer has time to process only two messages before application exits and the queue is now empty meaning we lost 8 messages before they could be processed by our service.
Our consumer had all its messages in memory and they vanished when it shutdown.

In real life scenario this is dangerous: what if a payment service missed a payment because the service restarted or because our application simply crashes?

Using manual ack and Qos to the rescue

Manual ack



java -jar target/amqp-consumer-jar-with-dependencies.jar autoAck=false processDurationSeconds=5

Processing : Message_1
Acknowledging message Message_1
Processed : Message_1
Processing : Message_2
Acknowledging message Message_2
Processing : Message_3
Processed 1 messages.


Enter fullscreen mode Exit fullscreen mode

If we look at the history of our queue we have 9 messages remaining, no more losses.
The broker behaviour with manualAck is to requeue the messages that have not been acknowledged after a certain period of time or because of a consumer disconnection.
In our case, our consumer was delivered all the messages, that's why the number of messages ready drops suddenly, then we had time to process just one of them and send the ack.
The message was then removed from the queue by the broker.
Finally, the number of ready messages goes back up to 9 because rabbit mq sees the connexion is lost with the consumer as the consumer shutdown.
You can see in the following graph, first the ready messages are dropping in favor of the unacknwoledged messages going up. Then when the consumer disconnects, the number of ready messages for other consumers goes back up.

Alt Text

Setting the Qos

The Qos limits the number of messages unacknowledged for a given consumer. This means it limits the number of messages buffered in the memory of the consumer and not yet processed.
This a way to protect our consumer from crashing permanently when reading from a queue with many messages. Also, if the consumer is too long to ack the messages, they are not available for other consumers.

The optimal value depends on the capacity of the consumer, the documentations advices a Qos from 100 to 300 :

qos-value-documentation

To continue this article, an interesting experiment would be to upload a large amount of messages to the queue, reduce the memory available for a our consumer and see at which Qos it will explode πŸ’₯ πŸ€“.

Thanks for reading and feel free to comment or ask any question.

Top comments (3)

Collapse
 
il77enne profile image
Stefano • Edited

May you pass me the code? I am trying to achieve the same thing that you did. Thanks
I am trying to achieve the same thing on C#

Collapse
 
jbebar profile image
jbebar

its here for the receiver :
github.com/jbebar/rabbit-mq-sample...
for the sender :
github.com/jbebar/rabbit-mq-sample...

or also in the "git clone" commands in the article ;)

Collapse
 
jbebar profile image
jbebar

Hello, I will check if I still have it πŸ˜