Node.js uses an event loop to process asynchronous tasks. The event loop is responsible for handling the execution of asynchronous tasks, but it does it in a single thread. If there is some CPU-intensive or long-running (blocking) logic that needs to be executed, it should not be run on the main thread. This is where BullMQ can help us.
In this blog post, we are going to set up a basic queue, using BullMQ. If you'd like to jump right into developing with a queue, check out our starter.dev kit for ExpressJS, where a fully functioning queue is already provided for you.
Why and when to use a queue?
BullMQ is a library that can be used to implement message queues in Node.js applications. A message queue allows different parts of an application, or different applications, to communicate with each other asynchronously by sending and receiving messages. This can be useful in a variety of situations, such as when one part of the application needs to perform a task that could take a long time, or when different parts of the application need to be decoupled from each other for flexibility and scalability. If you need to process large numbers of messages in a distributed environment, it can help you solve your problems.
One such scenario to use a queue is when you need to deal with webhooks. Webhook handler endpoints usually need to respond with 2xx
quickly, therefore, you cannot put long running tasks inside that handler, but you also need to process the incoming data. A good way of mitigating this is to put the incoming data into the queue, and respond quickly. The processing gets taken care of with the BullMQ worker, and if it is set up correctly, it will run inside a child process not blocking the main thread.
Prerequisites
BullMQ utilizes Redis to handle its message queue. In development mode, we are using docker-compose
to start up a redis instance. We expose the redis docker container's 6379
port to be reachable on the host machine. We also mount the /misc/data
and the misc/conf
folders to preserve data for our local development environments.
version: '3'
services:
## Other docker containers ...
redis:
image: 'redis:alpine'
command: redis-server /usr/local/etc/redis/redis.conf
ports:
- '6379:6379'
volumes:
- ./misc/data:/var/lib/redis
- ./misc/conf:/usr/local/etc/redis/
environment:
- REDIS_REPLICATION_MODE=master
We can start up our infrastructure with the docker-compose up -d
command and we can stop it with the docker-compose stop
command. We also need to set up connection information for BullMQ. We make it configurable by using environment variables.
// config.constants.ts
export const REDIS_QUEUE_HOST = process.env.REDIS_QUEUE_HOST || 'localhost';
export const REDIS_QUEUE_PORT = process.env.REDIS_QUEUE_PORT
? parseInt(process.env.REDIS_QUEUE_PORT)
: 6479;
To start working with BullMQ, we also need to install it to our node project:
npm install bullmq
Setting up a queue
Creating a queue is pretty straightforward, we need to pass the queue name as a string and the connection information.
// queue.ts
import { Queue } from 'bullmq';
import {
REDIS_QUEUE_HOST,
REDIS_QUEUE_PORT,
} from './config.constants';
export const myQueue = new Queue('my-queue', {
connection: {
host: REDIS_QUEUE_HOST,
port: REDIS_QUEUE_PORT,
},
});
We also create a function that can be used to add jobs to the queue from an endpoint handler. I suggest setting up a rule that removes completed and failed jobs in a timely manner. In this example we remove completed jobs after an hour from redis, and we leave failed jobs for a day. These values depend on what you want to achieve, It can very well happen that in a production app, you would keep the jobs for weeks.
// queue.ts
import { Queue, Job } from 'bullmq';
// ...
const DEFAULT_REMOVE_CONFIG = {
removeOnComplete: {
age: 3600,
},
removeOnFail: {
age: 24 * 3600,
},
};
export async function addJobToQueue<T>(data: T): Promise<Job<T>> {
return myQueue.add('job', data, DEFAULT_REMOVE_CONFIG);
}
It would be called when a specific endpoint gets called.
// main.ts
app.post('/', async (req: Request, res: Response, next: NextFunction) => {
const job = await addJobToQueue(req.body);
res.json({ jobId: job.id });
return next();
});
The queue now can store jobs, but in order for us to be able to process those jobs, we need to set up a worker.
Put the processing into a thread
We set up a worker with an async function at the beginning. The worker needs the same name as the queue to start consuming jobs in that queue. It also needs the same connection information as the queue we set up before.
// worker.ts
import { Job, Worker } from 'bullmq';
import {
REDIS_QUEUE_HOST,
REDIS_QUEUE_PORT,
} from './config.constants';
let worker: Worker
export function setUpWorker(): void {
worker = new Worker('my-queue', async () => {/** ... */}, {
connection: {
host: REDIS_QUEUE_HOST,
port: REDIS_QUEUE_PORT,
},
autorun: true,
});
defaultWorker.on('completed', (job: Job, returnvalue: 'DONE') => {
console.debug(`Completed job with id ${job.id}`, returnvalue);
});
defaultWorker.on('active', (job: Job<unknown>) => {
console.debug(`Completed job with id ${job.id}`);
});
defaultWorker.on('error', (failedReason: Error) => {
console.error(`Job encountered an error`, failedReason);
});
}
// we call the method after we set up the queue in queue.ts
import { Queue } from 'bullmq';
import { setUpWorker } from './worker';
// ...
setUpWorker();
After we create the worker and set up event listeners, we call the setUpWorker()
method in the queue.ts
file after the Queue
gets created. Let's set up the job processor function.
// job-processor.ts
import { Job } from 'bullmq';
module.exports = async function jobProcessor(job: Job): Promise<'DONE'> {
await job.log(`Started processing job with id ${job.id}`);
console.log(`Job with id ${job.id}`, job.data);
// TODO: do your CPU intense logic here
await job.updateProgress(100);
return 'DONE';
};
This example processor function doesn't do much, but if we had a long running job, like complex database update operations, or sending data towards a third-party API, we would do it here. Let's make sure our worker will run these jobs on a separate thread.
// worker.ts
// ...
let worker: Worker
const processorPath = path.join(__dirname, 'job-processor.js');
export function setUpWorker(): void {
worker = new Worker('my-queue', processorPath, {
connection: {
host: REDIS_QUEUE_HOST,
port: REDIS_QUEUE_PORT,
},
autorun: true,
});
// ...
}
If you provide a file path to the worker as the second parameter, BullMQ will run the function exported from the file in a separate thread. That way, the main thread is not used for the CPU intense work the processor does.
The above example works if you run the TypeScript compiler on your back-end code (tsc
), but if you prefer keeping your code in TypeScript and run the logic with ts-node
, then you should use the TypeScript file as your processor path.
const processorPath = path.join(__dirname, 'job-processor.ts');
The problem with bundlers
Sometimes, your back-end code gets bundled with webpack. For example, if you use NX to keep your front-end and back-end code together in one repository, you will notice that your back-end code gets bundled with webpack. In order to be able to run your processors in a separate thread, you need to tweak your project.json
configuration:
{
"targets": {
"build": {
"options": {
"outputPath": "dist/apps/api",
"main": "apps/api/src/main.ts",
"tsConfig": "apps/api/tsconfig.app.json",
"assets": [],
"additionalEntryPoints": [
{
"entryName": "sync.processor",
"entryPath": "apps/api/src/app/queue/job-processor.ts"
}
]
}
}
}
}
Conclusion
In an ExpressJS application, running CPU intense tasks on the main thread could cause your endpoints to turn unresponsive and/or slow. Moving these tasks into a different thread can help alleviate performance issues, and using BullMQ can help you out greatly.
If you want to learn more about NodeJS, check out node.framework.dev for a curated list of libraries and resources. If you are looking to start a new ExpressJS project, check out our starter kit resources at starter.dev