Cover Photo by Fidel Fernando on Unsplash
So, I recently had to solve a problem with one of my NodeJS servers. I had to parse and process data from an excel sheet which turned out to be pretty CPU-intensive and it blocked the NodeJS event loop, effectively freezing my server till it's done. I wasn't surprised though; I expected this to happen. While there are several things you could do to resolve this, I'm going to present a solution using the NodeJS worker_threads module. Before we start, let's quickly talk about this thread thing. The code used in this article is available here.
NodeJS and Threads
A thread is a path of execution within a process.
Source geeksforgeeks
JavaScript is a single-threaded programming language which means that only a set of instructions can be executed at a time. NodeJS applications are not really single-threaded but we can't create threads the way we can in a language like Java. NodeJS runs certain tasks like I/O operations in parallel but other CPU operations run on one thread.
What does this mean for us?
Your server will work fine if all the requests you receive only require I/O heavy operations like database reads, writes etc. But if you happen to have a request that requires something CPU-intensive like say parsing a document or running a really long loop like I had to do when processing an excel sheet, your server won't be able to serve other requests because the only thread processing requests will be stuck.
What is the "worker_threads" module?
The worker_threads module enables the use of threads that execute JavaScript in parallel.
Source NodeJS v14 docs
This allows us to build multi-threaded NodeJS applications which is what we need right now.
Okay… So, how are we going to use this to solve our problem?
Let's pretend to be Senior Software Engineers for a moment and start by writing some sort of Spec!
The Spec
The idea is to give CPU-intensive jobs to another thread. As a job is received, it is immediately stored in a queue for processing. A worker pool, a collection of worker threads, regularly request work from this queue for processing. When the job is done, the main thread is notified, the result is stored in the DB. You can do anything you want with the result but in my case, I instructed the system to send an email containing a link to the result, to the user who created the job.
What happens if the process ends with jobs in the queue?
Well, the queue should also be saved to the DB. When the app starts, it should load all outstanding jobs from the DB and enqueue them for processing. We will save all requests to the DB before putting them in the queue so all jobs on the queue are also on the DB.
What happens when the worker threads stop for some reason with incomplete jobs?
We can have exit event handlers on worker threads. This means that, if we keep track of who is doing what, we can tell that a worker has left a job unfinished and try to reassign it to a different worker! A Map will suffice to hold our assignments. Each worker will need to have a unique id we can use as a key for our Map. The uuid package will provide us with unique ids for our workers.
What happens if an error occurred while processing a job?
A job status field should account for this. I recommend using the following statuses: pending
, processing
, completed
and failed
. You can also have a message
field that contains useful information about a job.
Now, we know what to do. Enough talk, let's code!
I'll be using Typescript here but the JavaScript equivalent shouldn't be too different. I also use Observables a lot but there's nothing too special about it. I don't know exactly what an Observable is supposed to be but for me(and as it is used here), it's just a mechanism to emit events and listen for them.
Job Processor
This guy has a really simple job.
- Accept a job, save it to the DB then enqueue it.
async registerJob(job: any) {
// save job to DB before it is added to queue
const _id = await services.Job.create({
...job,
status: 'pending'
});
this.queue.enqueue({ ...job, _id });
}
- Initialize the Worker pool and listen for messages from the pool.
- When a worker requests work, dequeue a job and pass it to the worker. Store the assignment in a map and update the job status to
processing
in the DB. - When a worker announces that a job is complete, update the DB, the assignment map and find another job for it.
async processJobs() {
const workers = new WorkerPool(this.nWorkers);
workers.init();
workers.on('message', async ({ id, message, status, data }) => {
if (message === WorkerMessage.job_complete) {
const job = this.assignedJobs.get(id);
this.assignedJobs.set(id, null);
// update job status
services.Job.updateOne(
{ status, data },
{ _id: job._id }
);
}
const newJob: any = await this.queue.dequeue();
workers.send(id, newJob);
this.assignedJobs.set(id, newJob);
// update job status
services.Job.updateOne(
{ status: 'processing' },
{ _id: newJob._id }
);
});
workers.on('exit', (id) => {
const ongoingJob = this.assignedJobs.get(id);
if (!ongoingJob) return;
// Re-queue the job that wasn't finished
this.queue.enqueue(ongoingJob);
});
}
The Queue
Nothing special here either, just an implementation of a async Queue that clients can await until there's a new item.
// ... taken from Queue.ts
enqueue(item: T) {
this.items.push(item);
this.observable.push(QueueEvents.enqueue);
}
async dequeue() {
if (this.items.length > 0) {
const currentItem = this.items[0];
this.items = this.items.filter((_, index) => index !== 0);
this.observable.push(QueueEvents.dequeue);
return currentItem;
}
return new Promise((resolve) => {
const unsubscribe = this.observable.subscribe(async (message) => {
if (message !== QueueEvents.enqueue) return;
resolve(await this.dequeue());
unsubscribe();
});
})
}
}
The Worker Pool
Initialize the required number of workers, assign them ids and manage communication between clients and the workers.
// ... taken from WorkerPool.ts
private createWorker() {
const worker = new Worker(`${__dirname}/worker.js`);
const id = v4();
this.workers.set(id, worker);
worker.on("message", (value) => {
this.observable.push({
event: "message",
data: { id, ...value }
});
});
worker.on("exit", () => {
this.observable.push({ event: "exit" });
this.workers.delete(id);
// Create another worker to replace the closing worker
this.createWorker();
})
}
send(id: string, data: any) {
const worker = this.workers.get(id);
worker?.postMessage(data);
}
on(evt: string, handler: Function) {
this.observable.subscribe((value) => {
const { event, data } = value;
if (evt === event) {
handler(data);
}
});
}
}
The Worker
This guy is responsible for processing our CPU intensive jobs. It will request work when it starts. As soon it receives a job, it processes and alerts the main process that the job is complete with the result. There's no need to send another job request here because the main process will automatically try to find something else for it to do.
import { workerData, parentPort } from "worker_threads";
import { WorkerMessage } from "./WorkerMessage";
parentPort.on('message', async (job) => {
const { data } = job;
try {
// process job here
parentPort.postMessage({
message: WorkerMessage.job_complete,
status: 'completed',
data: { ...data, resultId }
});
} catch (error) {
parentPort.postMessage({
message: WorkerMessage.job_complete,
status: 'failed',
data: { ...data, error: error.message }
});
}
});
parentPort.postMessage({ message: WorkerMessage.request_job });
On Startup
All that's left is to call the Job Processor
.
import { jobProcessor } from "./JobProcessor";
jobProcessor.loadOutstandingJobs();
jobProcessor.processJobs();
Conclusion
That was a lot of Code but it's mostly simple stuff. I like that the code is mostly clean and reusable and we managed to solve our problem here but this solution is far from perfect. I can think of a few scenarios where you might want something different.
Your job might be too CPU intensive and you need to split each request among several workers.
You might have a lot of server instances set up and you don't want each of them to spin their own workers. Maybe you want a central worker pool for all your server instances to use.
… I can think of anymore… if you have more, please drop them in the comments
Also, if you think there's something I missed or there's a flaw in my work please let me know in the comments. See the full code on github.
Thank you!
Top comments (9)
Split your app into producer and consumer service. Consider a que between producer and consumer service like Kafka. It will really help to scale and failure of one service won't affect other.
Definitely, we can use it with an approach where the input file is attached & data pushed to a PubSub or a messaging queue then we can push the job to a different system altogether.
But that really depends on the specific use case though.
I do achieved the same result using Pub/Sub. Specifically backed by Redis and Bull Queue -> check this out github.com/OptimalBits/bull
Have you thought about using streams in here?
Not sure if this is relevant but recently i had to write a script to save thousands of records to a database after reading them from a CSV file and streams came in handy
That's definitely right. Streams would work like a charm in this case. However we have to be careful if want to do something like a transaction & maintain consistency.
I had to work with Excel files that contained images. Parsing the excel file itself was CPU intensive. Streams might work well with CSV files where you can just read each line and process it.
you may also use piscinajs
Does this work with the latest node.js compatibility improvements? And if there is a rust crate to assist, deno will integrate and take advantage of its native speed improvements.
I can guarantee that it works with NodeJS v14...