High internet adoption and penetration have made it necessary for backend applications to be built for scale. Scaling the infrastructure up/down dynamically based on traffic ensures that tech acts as an enabler for the business rather than a barrier. Scaling down in times of low load makes sure that we aren't wasting dollars in idle cycles. Growth should not be limited by tech.
Most complex applications will require you to write a batch / CRON job at some point in time.
When working with containerized applications and batch jobs you need to take care of the following:
- A job should be executed by only one container. If you have a batch job that is scheduled to run at 12 am every day, only one of the deployed containers should run it. Else it could lead to duplication, and/or unintended consequences. For example, a payments company has a batch job to send out payment reminders to customers whose bill is due. If each container runs the job, the user will be spammed with messages.
- The job status and queue shouldn't be tied to containers. By design, the orchestrater will spawn or kill containers based on the current load. Decoupling job queuing and status from the container allows the batch jobs to be agnostic of the container that will execute it.
To fulfill the above requirements, we will hold the job queue and statuses of the jobs in Redis. The container will get the queue from Redis and will query it to get the status of the job. It will not execute the job if it is already being processed.
This tutorial assumes that you have a good understanding of
In this 3-part series, you will
- Create an endpoint that schedules a job (Part 1)
- Setup batch jobs to be executed at 12am (Part 2)
- Setup GraphQL subscriptions with Redis support (Part 3)
Starter Project
Please clone the following repository: https://github.com/wednesday-solutions/node-express-batch-jobs-starter. It contains a fully functional GraphQL Node express server.
Note: If you're not familiar with GraphQL please take a few minutes to read the spec here.(https://graphql.org/)
Setup Redis locally
In this step, we will install and set up Redis locally and make sure that our application is able to connect to it. We will be creating queues using Bull (https://optimalbits.github.io/bull/) to:
- process scheduled jobs
- execute a job at midnight
- publish an automated notification every minute
Step 1
Install Redis locally. If you're using a mac you can use homebrew to install it
brew update
brew install redis
Start Redis using the command below
brew services start redis
To install and set up Redis for Windows/Linux systems please take a look at the relevant links
- https://flaviocopes.com/redis-installation/
- https://dev.to/divshekhar/how-to-install-redis-on-windows-10-3e99
Step 2
Install bull
and string-replace-loader
yarn add bull string-replace-loader
Add this rule to the webpack.server.config.js. Without this bull will not be able to find the .lua files.
module.exports = (options = {}) => ({
mode: options.mode,
entry: options.entry,
optimization: options.optimization,
...,
externals: {
bull: 'commonjs2 bull'
},
...,
module: {
rules: [
...,
{
test: /node_modules\/bull\/lib\/commands\/index\.js$/,
use: {
loader: 'string-replace-loader',
options: {
search: '__dirname',
replace: `"${path.dirname(require.resolve('bull'))}/lib/commands"`
}
}
},
...,
Step 3
Add the following values in the .env.local
file
REDIS_DOMAIN=localhost
REDIS_PORT=6379
Step 4
Create the server/utils/queue.js
file
touch server/utils/queue.js
Add the code below to it
import Bull from 'bull';
import moment from 'moment';
const queues = {};
// 1
export const QUEUE_NAMES = {
SCHEDULE_JOB: 'scheduleJob'
};
// 2
export const QUEUE_PROCESSORS = {
[QUEUE_NAMES.SCHEDULE_JOB]: (job, done) => {
console.log(`${moment()}::Job with id: ${job.id} is being executed.\n`, {
message: job.data.message
});
done();
}
};
// 3
export const initQueues = () => {
console.log('init queues');
Object.keys(QUEUE_PROCESSORS).forEach(queueName => {
// 4
queues[queueName] = getQueue(queueName);
// 5
queues[queueName].process(QUEUE_PROCESSORS[queueName]);
});
};
export const getQueue = queueName => {
if (!queues[queueName]) {
queues[queueName] = new Bull(queueName, `redis://${process.env.REDIS_DOMAIN}:${process.env.REDIS_PORT}`);
console.log('created queue: ', queueName, `redis://${process.env.REDIS_DOMAIN}:${process.env.REDIS_PORT}`);
}
return queues[queueName];
};
- Create a constant for queue names
- Associate the processors with the queues.
- Initialize all the queues in the
initQueues
method. Iterate over all the keys in queue processors. -
getQueue
will create a queue withqueueName
if it's not already present and return it. -
queues[queueName].process(QUEUE_PROCESSORS[queueName]);
will attach the functions in theQUEUE_PROCESSORS
to process jobs from the appropriate queue.
Step 5
To initialize your queues
- import
initQueues
in theserver/index.js
file
import { initQueues } from '@utils/queue';
- invoke
initQueues
like so
...
export const init = () => {
...
if (!isTestEnv()) {
app.listen(9000);
initQueues();
}
...
}
- setup the database and run the app using
./setup-local.sh
- If the db is already setup and you just need to run the application use
yarn start:local
You should see the above logs in your console.
Commit your code using the following git commands
git add .
git commit -m 'Local redis setup done'
Write mutation for job scheduling
In this step, we will expose a mutation called scheduleJob.
Step 1
Create a new file for the job scheduler
mkdir -p server/gql/custom
touch server/gql/custom/scheduleJobMutation.js
Step 2
Copy the snippet below in the newly created file.
import { GraphQLNonNull, GraphQLObjectType, GraphQLString, GraphQLInt, GraphQLBoolean } from 'graphql';
export const scheduleJob = {
type: new GraphQLObjectType({
name: 'ScheduleJob',
fields: () => ({
success: {
type: GraphQLNonNull(GraphQLBoolean),
description: 'Returns true if the job was scheduled successfully'
}
})
}),
args: {
scheduleIn: {
type: GraphQLNonNull(GraphQLInt),
description: 'Milliseconds from now that the job should be scheduled'
},
message: {
type: GraphQLNonNull(GraphQLString),
description: 'Message that should be consoled in the scheduled job'
}
},
async resolve(source, args, context, info) {
// since currently there is no logic to schedule the job
// this will always return false
return { success: false };
},
description: 'Schedule a job that will be executed in ${scheduleIn} milliseconds. This job will console ${message}.'
};
This mutation will accept two input parameters:
-
scheduleIn
represents the time in milliseconds that the job is scheduled to execute in from now. -
message
represents the message to be logged when the job is executed. We will use the value ofscheduleIn
todelay
the execution of the job added to the queue.
If the job is scheduled successfully the mutation will return { success: true }
else it will return { success: false }
Step 3
Now we need to expose the mutation. We can do this by adding the following code to the gql/mutations.js
file.
import { scheduleJob } from '@gql/custom/scheduleJobMutation';
...
...
export const addMutations = () => {
...
...
// custom mutations
mutations.scheduleJob = scheduleJob;
return mutations;
};
Now start the application using yarn start:local
. Go to http://localhost:9000/graphql
in the browser. Click on Docs
on the top right. Search for ScheduleJob
Step 4
Add the following snippet in the left pane.
mutation ScheduleJob {
scheduleJob(scheduleIn: 2000, message: "This message should be consoled at the scheduled time") {
success
}
}
Hit Play on the top left. You can also hit CMD + Enter to execute the request.
The API will respond with { success: false }
since we haven't added the logic for scheduling the job. We'll get to that in a bit
{
"data": {
"scheduleJob": {
"success": false
}
}
}
Step 5
Now we will add the logic to schedule the job when the mutation is invoked. Paste the snippet below in the resolve function
import moment from 'moment';
import { getQueue, QUEUE_NAMES } from '@utils/queue';
...
...
async resolve(source, args, context, info) {
// 1
return getQueue(QUEUE_NAMES.SCHEDULE_JOB)
.add({ message: args.message }, { delay: args.scheduleIn })
.then(job => {
// 2
console.log(`${moment()}::Job with id: ${job.id} scheduled in ${args.scheduleIn} milliseconds`);
return { success: true };
})
.catch(err => {
console.log(err);
return { success: false };
});
},
- We get the queue by its name
- we return
{ success: true }
if there are no errors while scheduling. Else we return{ success: false }
Yay! You've created a mutation that schedules jobs to be executed after a set amount of time. Now test out your newly created scheduler by changing the value of scheduleIn
and message
Commit your code using the following git commands
git add .
git commit -m 'Add mutation to schedule Job'
Where to go from here
You now have the ability to schedule jobs so that they are executed in a specific amount of time!
I hope you enjoyed reading this article as much as I enjoyed writing it. If this peaked your interest stay tuned for the next article in the series where I will take you through how to schedule a CRON job that executes at 12 am every day in a multi-container environment.
If you have any questions or comments, please join the forum discussion below.
β€This blog was originally posted on https://wednesday.is To know more about what itβs like to work with Wednesday follow us on: Instagram|Twitter|LinkedIn
Top comments (0)