I think the one of the most challenging work for Starchart
app is that we need to support long asynchronous job to complete. The app adds a record to DB and create a record in Route53. Then it needs to get a HTTPS certificate. It takes at least a few minutes to complete this process. There were multiple technical approaches to solve the problem. We chose to use BullMQ
queue system. I hadn't tried any of queue systems before. It was a good opportunity to learn how to use BullMQ
, especially Flows
.
BullMQ
setup
Before using BullMQ
, a couple of dependencies are required.
npm i bullmq ioredis
Also you will need a redis
docker container to test in your local. Here's the Redis Docker image link.
BullMQ Flows
I used BullMQ Flows
as it suits our needs that the jobs should be run sequentially. What needs to happens is...
- Add a record to MySQL database
- Create a record in Route53
- Continuously check Route53 whether it's deployed successfully or fails. It takes about 1 minutes according to AWS
- Update the Route53 sync status in DB
Let's write a flow for these jobs.
const flowProducer = new FlowProducer({ connection: redis });
export const addDnsRequest = async ({ username, type, name, value }: JobRecord) => {
// Step 1. Create a record in MySQL for a domain with pending status
const addDbRecord: FlowJob = {
name: `addDbRecord:${name}-${username}`, //Set up unique name for each job
queueName: 'add-db-record', //Set a queue name. It will be used as a part of job name unless you configure it manually
data: { username, type, name, value } as JobRecord,
opts: {
failParentOnFailure: true, //If any one of the job fails, the whole flow will fail
attempts: 5, //Retry 5 times
backoff: { //Wait for 15 seconds when retrying. Each retry wait will be exponentially increased.
type: 'exponential',
delay: 15_000,
},
},
};
// Step 2. Request Route53 to create a record
const createDnsRecord: FlowJob = {
name: `createDnsRecord:${name}-${username}`,
queueName: create-dns-record,
data: { username, type, name, value } as JobRecord,
opts: {
failParentOnFailure: true,
attempts: 5,
backoff: {
type: 'exponential',
delay: 15_000,
},
},
};
// Step 3. Poll Route53 to check connection status of the domain until it's ready
const checkDnsStatus: FlowJob = {
name: `checkDnsStatus:${name}-${username}`,
queueName: checkDnsStatusQueueName,
children: [createDnsRecord], //With this dependency, parent job will not move to queue until child job is processed
opts: {
failParentOnFailure: true,
attempts: 5,
backoff: {
type: 'exponential',
delay: 60_000,
},
},
};
// Step 4. Update the MySQL record with the active or error status
const syncDbStatus: FlowJob = {
name: `syncDbStatus:${name}-${username}`,
queueName: syncDbStatusQueueName,
children: [addDbRecord, checkDnsStatus], //This job needs two children jobs to be completed
opts: {
failParentOnFailure: true,
attempts: 5,
backoff: {
type: 'exponential',
delay: 30_000,
},
},
};
return await flowProducer.add(syncDbStatus); //All of jobs are added to the flow by adding the last parent process
Set up workers for each queue
Now we need to set up a Worker to execute these jobs. Since most of the workers are similar, I will just add the most useful example.
export const createDnsRecordWorker = new Worker<JobRecord>(
'create-dns-record', //Define a queue for the worker
async (job) => {
const { username, type, name, value } = job.data; //Get data fed from the queue.
try {
return createRecord(username, type, name, value);
} catch (error) {
logger.warn('Could not create a record in Route53', error);
throw error;
}
},
{ connection: redis }
);
You can customize your worker based on your needs. When the whole flow should fail due to one of the process is failed, you need to throw UnrecoverableError
.
Also you can capture values from multiple children jobs as below.
const values: { [jobKey: string]: string } = await job.getChildrenValues();
const key = Object.keys(values)[0];
const value = values[key];
Conclusion
I haven't had an experience to use queue system. By using BullMQ
, we are able to solve many problems such as sending scheduled emails or notification email, creating HTTPS certificate, etc. I'm sure there are many problems that can be solved with queue system. I hope this article is useful to start writing your work flow.
Top comments (2)
Nice article, thanks.
I guess there's a typo in
addDnsRequest
function, your return statement is out of the scope, should be smth like:what if I have a struct like this