Add support for subscriptions
We will use RedisPubSub
from graphql-redis-subscriptions
to publish to subscription topics. We will publish a message when the user invokes the scheduleJob
mutation. A new CRON will be registered that will execute every minute. This is done so that "an automated" message is published to the notifications topic every minute.
Let's start by installing all the dependencies.
Step 1
Install the necessary packages
yarn add graphql-subscriptions graphql-redis-subscriptions \
ioredis cors whatwg-fetch apollo-server-express \
http subscriptions-transport-ws@0.9.17
These are the required packages
- graphql-subscriptions
- graphql-redis-subscriptions
- ioredis
- cors
- whatwg-fetch
- apollo-server-express
- http
- subscriptions-transport-ws
Step 2
Create a new file
touch server/utils/pubsub.js
Copy the snippet below into the pubsub.js
import { RedisPubSub } from 'graphql-redis-subscriptions';
import Redis from 'ioredis';
const options = {
host: process.env.REDIS_DOMAIN,
port: process.env.REDIS_PORT,
connectTimeout: 10000,
retryStrategy: times =>
// reconnect after
Math.min(times * 50, 2000)
};
export const pubsub = new RedisPubSub({
publisher: new Redis(options),
subscriber: new Redis(options)
});
Step 3
Create a subscription topic. Add the following snippet in the utils/constants.js
file
export const SUBSCRIPTION_TOPICS = {
NOTIFICATIONS: 'notifications'
};
Create the subscription file
touch server/gql/subscriptions.js
Copy the following snippet
import { GraphQLNonNull, GraphQLObjectType, GraphQLString, GraphQLInt } from 'graphql';
import { pubsub } from '@utils/pubsub';
import { SUBSCRIPTION_TOPICS } from '@utils/constants';
export const SubscriptionRoot = new GraphQLObjectType({
name: 'Subscription',
fields: {
notifications: {
type: new GraphQLObjectType({
name: 'ScheduleJobSubscription',
fields: () => ({
message: {
type: GraphQLNonNull(GraphQLString)
},
scheduleIn: {
type: GraphQLNonNull(GraphQLInt)
}
})
}),
subscribe: (_, args) => pubsub.asyncIterator(SUBSCRIPTION_TOPICS.NOTIFICATIONS)
}
}
});
Make the following changes in the server/index.js
import cors from 'cors';
import { SubscriptionServer } from 'subscriptions-transport-ws/dist/server';
import { GraphQLSchema, execute, subscribe } from 'graphql';
import 'whatwg-fetch';
import { ApolloServer } from 'apollo-server-express';
import { createServer } from 'http';
import { SubscriptionRoot } from '@gql/subscriptions';
...
export const init = async () => {
...
const schema = new GraphQLSchema({ query: QueryRoot, mutation: MutationRoot, subscription: SubscriptionRoot });
...
app.use(rTracer.expressMiddleware());
app.use(cors()); //
...
if (!isTestEnv()) {
const httpServer = createServer(app);
const server = new ApolloServer({
schema
});
await server.start();
server.applyMiddleware({ app });
// 2
const subscriptionServer = SubscriptionServer.create(
{ schema, execute, subscribe },
{ server: httpServer, path: server.graphqlPath }
);
['SIGINT', 'SIGTERM'].forEach(signal => {
process.on(signal, () => subscriptionServer.close());
});
httpServer.listen(9000, () => {
console.log(`Server is now running on http://localhost:9000/graphql`);
});
initQueues();
}
- Handle CORS error thrown by
studio.apollographql
- Create a subscription server that will expose a websocket on the same pathname as the mutations and queries.
To test your subscriptions go to https://studio.apollographql.com/sandbox/explorer
. Add http://localhost:9000/graphql
in the top left URL bar. Click documentation tab on the top left pane header and filter by subscription → notifications and you will see the newly added subscription.
Step 4
Copy the snippet below in the server/utils/queues.js
import { pubsub } from '@utils/pubsub';
import { SUBSCRIPTION_TOPICS } from '@utils/constants';
...
...
const CRON_EXPRESSIONS = {
MIDNIGHT: '0 0 * * *',
EVERY_MINUTE: '* * * * *'
};
export const QUEUE_NAMES = {
...,
EVERY_MINUTE_CRON: 'everyMinuteCron'
};
export const QUEUE_PROCESSORS = {
...,
[QUEUE_NAMES.EVERY_MINUTE_CRON]: (job, done) => {
console.log(`publishing to ${SUBSCRIPTION_TOPICS.NOTIFICATIONS}`);
pubsub.publish(SUBSCRIPTION_TOPICS.NOTIFICATIONS, {
notifications: {
message: 'This message is from the CRON',
scheduleIn: 0
}
});
done();
}
};
export const initQueues = () => {
console.log(' init queues');
...
queues[QUEUE_NAMES.EVERY_MINUTE_CRON].add({}, { repeat: { cron: CRON_EXPRESSIONS.EVERY_MINUTE } });
};
This will add support to publish to the newly created notifications
topic when the scheduleJob
mutation is invoked.
Copy the snippet below in the server/gql/custom/scheduleJobMutation.js
import { pubsub } from '@utils/pubsub';
import { SUBSCRIPTION_TOPICS } from '@utils/constants';
...
async resolve(source, args, context, info) {
...
...
.then(job => {
console.log(`${moment()}::Job with id: ${job.id} scheduled in ${args.scheduleIn} milliseconds`);
pubsub.publish(SUBSCRIPTION_TOPICS.NOTIFICATIONS, {
notifications: args
})
return { success: true };
})
...
},
This will create a CRON that runs every minutes and publishes a message to the notifications
topic.
Step 5
Time to test out your subscription! Go to https://studio.apollographql.com/sandbox/explorer
. Paste the snippet below in the left top pane
subscription Notifications {
notifications {
message
scheduleIn
}
}
Hit the Play button and you will see a subscription tab pop-up in the bottom right
Paste the snippet below in the left pane
mutation ScheduleJob($scheduleJobScheduleIn: Int!, $scheduleJobMessage: String!) {
scheduleJob(scheduleIn: $scheduleJobScheduleIn, message: $scheduleJobMessage) {
success
}
}
Paste the snippet below in the variables pane
{
"scheduleJobScheduleIn": 100,
"scheduleJobMessage": "Scheduled job message"
}
Select ScheduleJob
and hit the play button
Very soon you'll see another message come up in the subscriptions tab because of the EVERY_MINUTE
CRON
Commit your code using the following git commands
git add .
git commit -m 'Add support for graphql redis subcriptions!'
Where to go from here
You can find the complete code here: https://github.com/wednesday-solutions/node-express-batch-jobs
I would recommend going through the articles below
- https://www.apollographql.com/blog/backend/subscriptions/graphql-subscriptions-with-redis-pub-sub/
- https://www.apollographql.com/docs/apollo-server/data/subscriptions/
If this series peaked your interest please stay tuned for the next tutorial in which we will write a CD pipeline to deploy this application using ECS.
I hope you enjoyed reading this series on how to create container-aware CRONS, scheduled jobs, and GraphQL Subscriptions. If you have any questions or comments, please join the forum discussion below.
➤This blog was originally posted on https://wednesday.is To know more about what it’s like to work with Wednesday follow us on: Instagram|Twitter|LinkedIn
Top comments (0)