Hello everyone! I'm back after busy work and want to introduce an event-driven app. I've explored and learned microservices and want to start from event-driven architecture. If you want to know more about Event-Driven Architecture, please visit this page.
Preparation
I'm using Node.js for the sample app so you will need some tools like Node.js and Yarn. I'll give you another sample with different programming languages such as C# and Go in the next post. You may check this Github Repository to look around.
bervProject / rabbitmq-demo
RabbitMQ Demo
RabbitMQ Demo
Simple Publish/Subscribe App.
Tools
Preparation
- (For Linux/MacOS user) Copy
.env.sh.example
to.env.sh
. - (For Windows user - CMD) Copy
,env.cmd.example
to.env.cmd
. - (For Powershell) Copy
.env.ps1.example
to.env.ps1
.
- Update the
.env
file to point your RabbitMQ host. - Run
yarn --frozen-lockfile
to download the dependencies.
Consumer
- Run
node consumer.js
.
Publisher/Sender
- Run
node sender.js
.
LICENSE
MIT
Prepare the RabbitMQ in Amazon MQ
We will provision the RabbitMQ manually through the AWS Console.
-
Search Amazon MQ in the search box and click the Amazon MQ.
-
Click
Get started
button. -
Select
RabbitMQ
and clickNext
. -
We will use
Single-instance broker
just for testing, you may useCluster deployment
for production use. And, clickNext
. -
Give the broker name and select
mq.t3.micro
instance. -
Setup the username and password for the RabbitMQ host.
-
Open the
Additional setting
. Make sure you select thePublic access
andUse the default VPC and subnet(s)
. Please avoidPublic access
for production use. You may need to usePrivate access
and setup the VPC yourself for production use. -
Finally, click
Next
. Review your configuration, and clickCreate broker
.
Time to Code!
-
Init your project. Use
yarn init
. Input the details of your project. -
Install some dependencies.
-
yarn add amqplib
- for send/consume message from/to RabbitMQ. -
yarn add uuid
- to generate message id and correlation id.
-
Sender Code
const amqp = require('amqplib');
const { v4: uuidv4 } = require('uuid');
// setup queue name
const queueName = 'test-queue';
/**
* Send message
*/
async function send() {
// connect to RabbitMQ
const connection = await amqp.connect(process.env.RABBITMQ_HOST || 'amqp://localhost');
// create a channel
const channel = await connection.createChannel();
// create/update a queue to make sure the queue is exist
await channel.assertQueue(queueName, {
durable: true,
});
// generate correlation id, basically correlation id used to know if the message is still related with another message
const correlationId = uuidv4();
// send 10 messages and generate message id for each messages
for (let i = 1; i <= 10; i++) {
const buff = Buffer.from(JSON.stringify({
test: `Hello World ${i}!!`
}), 'utf-8');
const result = channel.sendToQueue(queueName, buff, {
persistent: true,
messageId: uuidv4(),
correlationId: correlationId,
});
console.log(result);
}
// close the channel
await channel.close();
// close the connection
await connection.close();
}
send();
Consumer Code
const amqp = require('amqplib');
// setup queue name
const queueName = 'test-queue';
/**
* consume the message
*/
async function consume() {
// setup connection to RabbitMQ
const connection = await amqp.connect(process.env.RABBITMQ_HOST || 'amqp://localhost');
// setup channel
const channel = await connection.createChannel();
// make sure the queue created
await channel.assertQueue(queueName, {
durable: true,
});
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queueName);
// setup consume
channel.consume(queueName, function (message) {
// just print the message in the console
console.log("[%s] Received with id (%s) message: %s", message.properties.correlationId, message.properties.messageId, message.content.toString());
// ack manually
channel.ack(message);
}, {
// we use ack manually
noAck: false,
});
}
consume();
Test Your Code
-
Setup your environment variable, you may use the example scripts to setup the environment variable. As example:
- For powershell
$env:RABBITMQ_HOST = 'amqps://<username>:<password>@<rabbitmq-endpoint>:<rabbitmqport>'
- For Linux/MacOS
export RABBITMQ_HOST=amqps://<username>:<password>@<rabbitmq-endpoint>:<rabbitmqport>
-
Run the sender. Use
node sender.js
. You will get the console like this. -
Run the consumer. Use
node consumer.js
. You will get the console like this.
Congrats
Congrats! You've made a simple app to send and receive messages. Don't forget to clean up the resources if you do not use them again.
Top comments (3)
cool post . Can you explain why you have selected to ack manually in consumer function ?
Because I want to delete or ack the message only when the message is finished to be processed. I also can nack when there is an error. You may check this article to know which method you need.
Very usefull , thank you !