In this post we'll dive into what Novu is, why you'd want to use it, and then jump into the code behind it.
You don't need to know how it works to use it, but it will help you understand it at a deeper level, and will teach you how to use Redis BullMQ on your own projects.
If you prefer video, here's my video on the same topic:
Novu, is open-source notification infrastructure. It simplifies the process of integrating notifications into your app, supporting various channels like email, SMS, and push notifications. Novu has 31,000 stars on GitHub!
Before we begin, I also run a YouTube channel on open source, and am building my an open source email app called Inbox Zero that helps people clean up and automate their inbox.
So Why Novu?
Why use Novu? Why not send the email notifications to your users yourself using something like SES or Resend?
As your app grows, notifications become more complex.
If you're building an app like Twitter and you want to notify users when they receive messages, you wouldn't want to bombard the most popular user with individual messages every few seconds.
Novu offers a smarter solution. Instead of sending out notifications immediately it does the following:
- Aggregate notifications: group notifications together based on a specific timeframe (e.g., 30 minutes). This prevents users from being overwhelmed by a constant stream of individual messages.
- Optimize delivery: choose the most appropriate channel for each notification (email, push notification, etc.), ensuring users receive them in a way they prefer.
- Scale efficiently: Novu is built to handle high volumes of notifications without compromising performance.
Basic Usage
To integrate Novu into your project, you have to utilize their API, for example using Novu node package:
import { Novu } from '@novu/node';
const novu = new Novu(process.env.NOVU_API_KEY);
await novu.trigger('<WORKFLOW_TRIGGER_ID>',
{
to: {
subscriberId: '<UNIQUE_SUBSCRIBER_IDENTIFIER>',
email: 'john@doemail.com',
firstName: 'John',
lastName: 'Doe',
},
payload: {
name: "Hello World",
organization: {
logo: 'https://happycorp.com/logo.png',
},
},
}
);
You trigger a notification by providing the workflow ID, along with an object containing the subscriber ID for who you want to send this message to, and the payload, which represents the message to be send to the user.
Once you've triggered the notification, you can easily display it in your project's UI. Novu provides packages for UI, as shown in the image above, although you have the option to build your own. This makes it simpler to integrate notifications directly into your project.
Providers
There are a lot of different providers you can connect to. For example, if you're sending email, you could use Sendgrid, Mailgun, and SES. You could send messages via SMS using Twilio. For more details, you can check the list of Novu Providers here.
A Glimpse into Novu Cloud
This is what Novu cloud-hosted offering looks like. Let's set up a new workflow for a mention in a comment.
The first step is to call the trigger, after calling the trigger the user gets an in-app notification immediately. This notification tells the user that commenterName
mentioned them in commentSnippet
which are variables that are passed in the payload when the trigger is called.
{{commenterName}} has mentioned you in <b> "{{commentSnippet}}" </b>
The third step is creating a digest. A digest gathers notifications to use them later on. In this template, we will wait for 30 minutes to gather all received notifications.
The last step is sending an email. The email content varies depending on the number of mentions received.
{{#if step.digest}}
{{step.events.0.mentionedUser}} and {{step.total_count}} others mentioned you in a comment.
{{else}}
{{mentionedUser}} mentioned you in a comment.
{{/if}}
The Power Behind Novu
Novu uses NestJS a framework within NodeJS, which brings a solid structure to it. It uses MongoDB and Redis as its databases.
Novu serves major clients, operating at a large scale and handling hundreds of millions of events monthly. It's built for scalability, utilizing Kubernetes and AWS auto-scaling for the management, enabling it to scale horizontally.
Introduction to BullMQ: A Core Component
A core piece of Novu's back-end is BullMQ, which is used as a message broker. It's a queue where you add tasks to it and there are workers that listens to jobs.
import { Queue } from 'bullmq';
const myQueue = new Queue('foo');
async function addJobs() {
await myQueue.add('myJobNAme', { foo: 'bar'});
await myQueue.add('myJob NAme', {qux: 'baz'});
}
await addJobs();
You might want to use queues if you have long running operations. Imagine an API that compresses images. Each image takes one minute to compres. You don't want your API to take a minute to respond.
The solution?
Asynchronous processing:
- You wait for someone to call your API to compress an image.
- Immediately return a success response.
- In the background, the job is added the to a queue.
- The workers are then going to do the image compression.
This also makes the system highly scalable. You can have many background workers handling the compression and can scale out these workers separately from the rest of your app.
Here, you can see a worker being set up, which is listening for events.
import { Worker } from 'bullmq';
const worker = new Worker('foo', async job => {
// Will print { foo: 'bar'} for the first job
// and { qux: 'baz' } for the second.
console.log(job.data);
});
You can have a hundred different workers working in parallel and they're all managing different items in the queue. And this is a great way to scale your app and tasks.
Deep Dive into Novu's Inner Workings
Every notification begins with a trigger. To understand this process, let's delve into the backend. Navigate to the events
folder within Novu's repository, and open the events controller file. Here, you'll find an endpoint dedicated to handling POST requests, labeled /trigger
.
Once the trigger is initiated, the function trackEvent
serves as a handler for incoming trigger events, it takes two parameters UserSession
and Body
. Inside the function a method is called to process the received data.
This method creates a command object with various details extracted from the user's session and the trigger event, once the command is ready, it's executed, and returns the result as a TriggerEventResponseDto
.
// novu/apps/api/src/app/events/events.controller.ts
@Post('/trigger')
@ApiResponse(TriggerEventResponseDto, 201)
async trackEvent(
@UserSession() user: IJwtPayload,
@Body() body: TriggerEventRequestDto
): Promise<TriggerEventResponseDto> {
const result = await this.parseEventRequest.execute(
ParseEventRequestMulticastCommand.create({
// parameters
})
);
return result as unknown as TriggerEventResponseDto;
}
Inside the parseEventRequest
, this class is doing all sorts of validation, making sure there are workflow steps and active workflow steps. Upon successful validation, this is the point where we actually handle the request and get the job data and add it to a workflow queue service.
// novu/apps/api/src/app/events/usecases/parse-event-request/parse-event request.usecase.ts
if (command.payload && Array.isArray(command.payload.attachments)) {
this.modifyAttachments(command);
await this.storageHelperService.uploadAttachments(command.payload.attachments);
command.payload.attachments = command.payload.attachments.map(({ file, ...attachment }) => attachment);
}
const defaultPayload = this.verifyPayload.execute(
VerifyPayloadCommand.create({
payload: command.payload,
template,
})
);
command.payload = merge({}, defaultPayload, command.payload);
const jobData: IWorkflowDataDto = {
...command,
actor: command.actor,
transactionId,
};
await this.workflowQueueService.add({ name: transactionId, data: jobData, groupId: command.organizationId });
Inside the workflow queue service. All it does is add something to the queue or if we get an array of jobs it will bulk add them to the queue. As mentioned earlier, this is how the BullMQ is implemented. When you call the Novu API to trigger a notification, it's adding a notification to the BullMQ queue.
// novu/packages/application-generic/src/services/queues/workflow-queue.service.ts
public async add(data: IWorkflowJobDto) {
return await super.add(data);
}
public async addBulk(data: IWorkflowBulkJobDto[]) {
return await super.addBulk(data);
}
After submitting events to the queue, the next step involves running a worker to process those events on the receiving end. createWorker
constructs a BullMQ worker, specifying how to process tasks from a queue by providing the processing function, the topic name, and worker options. initWorker
simplifies worker setup, logging initialization and calling createWorker
to start processing tasks efficiently.
// novu/packages/application-generic/src/services/workers/worker-base.services.ts
public initWorker(processor: WorkerProcessor, options?: WorkerOptions): void {
Logger.log(`Worker ${this.topic} initialized`, LOG_CONTEXT);
this.createWorker(processor, options);
}
public createWorker(
processor: WorkerProcessor,
options: WorkerOptions
): void {
this.instance.createWorker(this.topic, processor, options);
}
If I jump back to standard worker, you'll see that I have this.initWorker and over here getWorkerProcessor()
is where I'm returning the function.
// novu/apps/worker/src/app/workflow/services/standard.worker.ts
export class StandardWorker extends StandardWorkerService {
constructor(
private runJob: RunJob,
// constructor parameters
) {
super(new BullMqService(workflowInMemoryProviderService));
this.initWorker(this.getWorkerProcessor(), this.getWorkerOptions());
this.worker.on('failed', async (job: Job<IStandardDataDto, void, string>, error: Error): Promise<void> => {
await this.jobHasFailed(job, error);
});
}
private getWorkerOptions(): WorkerOptions {
return {
...getStandardWorkerOptions(),
settings: {
backoffStrategy: this.getBackoffStrategies(),
},
};
}
}
Within the getWorkerProcess
definition, you'll see that it's calling runJob
to execute while also parsing the minimalJobData
that we've received.
// novu/apps/worker/src/app/workflow/services/standard.worker.ts
storage.run(new Store(PinoLogger.root), () => {
_this.runJob
.execute(RunJobCommand.create(minimalJobData))
.then(resolve)
.catch((error) => {
Logger.error(
error,
`Failed to run the job ${minimalJobData.jobId} during worker processing`,
LOG_CONTEXT
);
return reject(error);
})
.finally(() => {
transaction.end();
});
});
If you look inside the runJob
, you'll see that it's calling sendMessage.execute
. This command directs the sendMessage
class to dispatch various messages, which can be sent via SMS or email.
// novu/apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts
await this.sendMessage.execute(
SendMessageCommand.create({
// parameters
})
);
We can jump to the sendMessageEmail
class and check the important part, which is the sendMessage
. Inside this, it creates a mailFactory
and gets the handler, and what this is doing is getting the provider that we need.
// novu/apps/worker/src/app/workflow/usecases/send-message/send-message email.usecase.ts
private async sendMessage(
integration: IntegrationEntity,
mailData: IEmailOptions,
message: MessageEntity,
command: SendMessageCommand
) {
const mailFactory = new MailFactory();
const mailHandler = mailFactory.getHandler(this.buildFactoryIntegration(integration), mailData.from);
// ...rest of the code
}
In mail.factory.ts
, you'll see different provider handlers. If we jump into one of the providers, specifically the Mailgun provider, you can see all it does is call the API that actually sends an email via Mailgun. Once Mailgun successfully delivers the message to the user's inbox, the notification sending process ends.
// novu/providers/mailgun/src/lib/mailgun.provider.ts
this.mailgunClient = mailgun.client({
username: config.username,
key: config.apiKey,
url: config.baseUrl || 'https://api.mailgun.net',
});
Final Thoughts
This exploration has scratched the surface of Novu's capabilities. While there's more to discover, you now grasp the essential flow: triggering events (via UI or API) creates DTOs, pushes them to a queue, and triggers workers to process and send notifications (SMS, email, etc.).
Remember, this is just a glimpse into Novu's broad range of functionalities. Hopefully, you've gained a solid understanding of the core workflow. If you're interested in learning more, I highly recommend exploring the project's GitHub repository.
Stay in touch
- Follow me on Twitter
- Subscribe to my YouTube channel
- Join my Learn from Open Source newsletter
- Star Inbox Zero on GitHub
Top comments (1)
ππ»