Hello, in this article , we are going to implement a message queue in node.js using BullMQ library which is built on the top of redis.
We will implement two message queues. One for adding refund task for a particular order. On successful completion of refund task, we will initiate notification task to notify user about refund completion. For notification task, we will use another queue.
Step 1: Setting up Project
Make a new folder "messaging_queue" and initialize it with npm init
and add dependencies.
mkdir messaging_queue
cd messaging_queue
npm init
npm i express bullmq -D
Step 2: Implementation of Queue
Firstly, create a refundQueue.js file to write the code for implementing a refundQueue and a function to add refund tasks to refundQueue.
const { Queue } =require("bullmq")
const refundQueue = new Queue("refund-queue",{
connection:{
host:"127.0.0.1",
port:"6379"
}
})
async function addRefundTask(order){
const response = await refundQueue.add(`refund-to-${order.id}`,{
amount:order.amount,
id:order.id,
user_id:order.user_id
})
console.log("Job added to refundQueue with id: ",response.id)
}
exports.addRefundTask=addRefundTask
Now, create a notificationQueue.js file to write the code for implementing a notificationQueue and a function to add notification tasks to notificationQueue.
const { Queue } =require("bullmq")
const notificationQueue = new Queue("notification-queue",{
connection:{
host:"127.0.0.1",
port:"6379"
}
})
async function addNotificationTask(user_id,order_id){
const response = await notificationQueue.add(`notification-to-${user_id}`,{
order_id:order_id,
user_id:user_id
})
console.log("Job added to notificationQueue with id: ",response.id)
}
exports.addNotificationTask=addNotificationTask
Step 3: Refund and Notification Process
Now, create two files refundProcess.js and notificationProcess.js to write the code for implementation of refund and notification process.
refundProcess.js
const refundComplete=()=>new Promise((res,rej)=>setTimeout(()=>res(),4*1000));
async function refundProcess(id,amount,user_id){
console.log(`Refund for order ${id} has started`);
console.log(`Refund Amount: ${amount}`)
console.log(`User ID: ${user_id}`)
await refundComplete()
console.log("Refund Completed Successfully!")
}
exports.refundProcess=refundProcess
notificationProcess.js
const notificationComplete=()=>new Promise((res,rej)=>setTimeout(()=>res(),2*1000));
async function notificationProcess(user_id,order_id){
console.log(`Notify user ${user_id} about refund for order ${order_id}`)
await notificationComplete()
console.log("Notification Sent Successfully!")
}
exports.notificationProcess=notificationProcess
Step 4: Implementation of Worker
Now, create a new file worker.js to implement refundWorker and notificationWorker.
const { Worker } =require("bullmq")
const { refundProcess } =require("./refundProcess")
const { notificationProcess } =require("./notificationProcess")
const refundWorker = new Worker("refund-queue", async(job)=>{
console.log(`Refund Job ${job.id} started`)
await refundProcess(job.data.id,job.data.amount,job.data.user_id)
})
const notificationWorker = new Worker("notification-queue", async(job)=>{
console.log(`Notification Job ${job.id} started`)
await notificationProcess(job.data.user_id,job.data.order_id)
})
exports.refundWorker= refundWorker
exports.notificationWorker=notificationWorker
Step 5: Express Server
Now, in index.js file , write code for implementation of express server.
const express=require('express')
const { addRefundTask } = require("./refundQueue")
const { refundWorker } = require("./worker")
const { addNotificationTask } = require('./notificationQueue')
const { notificationWorker }=require('./worker')
async function init(){
const app=express()
const PORT = 8000;
const order1={
id:"order1",
amount:4000,
user_id:"user1"
}
const order2={
id:"order2",
amount:10000,
user_id:"user2"
}
app.listen(PORT,()=>{
console.log("Server running at port 8000")
})
await addRefundTask(order1);
await addRefundTask(order2)
refundWorker.on('completed', async(job) => {
console.log(`Refund Job ${job.id} has completed!`);
await addNotificationTask(job.data.user_id,job.data.id);
});
refundWorker.on('failed', (job, err) => {
console.log(`Refund Job ${job.id} has failed with ${err.message}`);
});
notificationWorker.on('completed', (job) => {
console.log(`Notification Job ${job.id} has completed!`);
});
notificationWorker.on('failed', (job, err) => {
console.log(`Notification Job ${job.id} has failed with ${err.message}`);
});
}
init()
Here, we will add two orders to refundQueue.
We are using two event listeners 'completed' and 'failed' for both refundWorker and notificationWorker. On successful completion of refund task, a notification task is added to notificationQueue.
Step 6: Docker setup
We need a redis server running in local computer to run the code of BullMQ. So, we will use docker for that. Ensure docker is installed in your system. And, create a docker-compose.yml file.
version : '3.4'
services:
redis:
container_name : redis-server
image : redis
ports:
- 6379:6379
stdin_open : true
Now, start the redis container using following command
docker compose up -d
Now, we can run our express server
node index.js
Here, is the output
Thank you for reading the article !
Top comments (1)
As is github code Compile/Run Suggestions:
As is github code Run Results:
BullMQ: DEPRECATION WARNING! Optional instantiation of Queue, Worker and QueueEvents without providing explicitly a connection or connection options is deprecated. This behaviour will be removed in the next major release