DEV Community

Cover image for Throttling a Kafka Queue in Node.js
Rishabh Rawat for AppSignal

Posted on • Originally published at blog.appsignal.com

Throttling a Kafka Queue in Node.js

Coming from an HTTP-only background, message queues look very fascinating. Unlike HTTP, you can just push events to the queue and move on. After all, they're a big part of what makes async communication possible between microservices.

In this article, we'll take a look at Apache Kafka and how you can build a durable throttling layer with it.

Let's get started!

What We'll Cover

Here's what we'll go through in this post:

  • Why use Kafka?
  • The problem with normal consumption
  • How to enable manual commits to throttle consumption
  • Granular control with async queue
  • Putting it all together
  • Real-world applications
  • Wrapping up

Prerequisites

To follow along with this tutorial, you will need:

  1. Working knowledge of building REST API with Express.js (Node.js version >= 16)
  2. Basic knowledge of Apache Kafka
  3. (Optional) AppSignal account (for application performance monitoring)
  4. A running MongoDB cluster on your Atlas account

This article contains a few code snippets. You can refer to the complete working code along with the getting started guide on GitHub.

Installing Kafka

You can skip this section if you have Apacha Kafka already installed on your machine. The complete quickstart is available in the official Kafka documentation. Here's what we need to do:

  1. Download Kafka as indicated in step 1 of the documentation above.

  2. Once installed, go to the downloaded Kafka folder. Your Kafka installation version might differ.

cd kafka_2.13-3.3.1/
Enter fullscreen mode Exit fullscreen mode
  1. Run zookeeper and server by running these commands in separate terminal tabs. The commands must be run in the given order.
bin/zookeeper-server-start.sh config/zookeeper.properties
Enter fullscreen mode Exit fullscreen mode
bin/kafka-server-start.sh config/server.properties
Enter fullscreen mode Exit fullscreen mode

If one stops or exits the process, try to restart both. You should only proceed when both scripts are in a running state and there isn't an expiring session.

  1. Open a new tab in your terminal and create a test topic. We'll use this topic throughout the article.
bin/kafka-topics.sh --create --topic testTopic2 --bootstrap-server localhost:9092
Enter fullscreen mode Exit fullscreen mode

To confirm if the topic has been successfully created, you can run the following command:

bin/kafka-topics.sh --describe --topic testTopic2 --bootstrap-server localhost:9092
Enter fullscreen mode Exit fullscreen mode

You should have Apache Kafka running on localhost:9092 after following these steps. We can proceed to the next steps.

Setting up AppSignal for Node.js

This is an optional step. We're going to integrate AppSignal to monitor and learn more about our Node.js application. You need to set up an AppSignal account to follow this section.

We need two things to integrate AppSignal into our project:

  1. AppSignal application name. You can find this in the config.js file under the "appName" field. I've created an app called "throttle-kafka-nodejs".
  2. Your AppSignal API key, sourced as "APPSIGNAL_PUSH_API_KEY" from the ENV file.

You can find the AppSignal SDK in the src/appsignal.cjs file.

Let's now turn to why we should use Kafka in the first place.

Why Use Kafka for Node.js?

Apache Kafka provides a durable queue for your events with built-in reliability. It becomes pretty evident how Kafka can handle billions of events once you look at its architecture.

Durability

In contrast to other event queues where the events are lost upon consumption, this is not the case with Kafka. Unsurprisingly, that's one of its biggest selling points. Having access to the events even after consumption gives a sense of safety that is unparalleled.

The events are stored on disk and have a default retention period of 7 days. The retention period is configurable, and increasing it comes at the cost of a bigger disk.

Robustness

Kafka is designed with robustness in mind. Here's a simplistic generalized overview:

Apache Kafka architecture overview

Brokers are nodes, a topic is a queue, and you can have various topics. Each topic is divided into a predetermined number of partitions (think database partitioning).

Writes go to the leader and are replicated to the followers on separate nodes (to ensure high availability or HA). Every partition is also copied to other nodes or brokers (configured via a replication factor setting) for HA.

When a broker dies, the broker leaders have to be re-elected on one of the live brokers. For instance, if broker 3 dies in our above-configured cluster, the leader for partition 3 will be elected from either broker 1 or broker 2. This happens by the cluster automatically and is not something you should worry about.

The Problem with Normal Consumption

Now that we know what makes Kafka one of the best candidates for the pub-sub mechanism, we should give it a run. We won't change any default configs just yet. We have a basic producer and a batch consumer set up in our Node.js application.

Let's push some events and see how our application performs:

router.post("/produce", async (req, res) => {
  try {
    const { events } = req.body;
    const producer = await kafkaProducer;
    const promises = events.map(async (event) => {
      const payload = {
        topic: config.kafka.topic,
        messages: [{ value: JSON.stringify(event) }],
      };

      return producer.send(payload);
    });

    await Promise.all(promises);

    return res
      .status(200)
      .json({ status: true, message: "Produced event successfully" });
  } catch (err) {
    return res
      .status(500)
      .json({ status: false, message: `Could not produce event: ${err}` });
  }
});
Enter fullscreen mode Exit fullscreen mode

Since there's no way to stop consumption cycles, our application consumer will keep fetching the events in batches of the default size — 10MB. However, our application may be unable to process the events at the same rate. This causes the application to accumulate events in memory, making it prone to unexpected downtime due to resource starvation.

Moreover, autocommit is turned on by default in various Kafka Node.js client libraries, including KafkaJS. As the name suggests, all the events you fetch are committed automatically after a batch is successfully processed. But what if an application faces downtime in the middle of batch execution? The same events will be fetched again from Kafka in the next cycle, causing duplicate events.

But this isn't a bug, it's a feature. It ensures "at least once" semantics — see the documentation. Your use case determines your approach (e.g., autocommit in a financial system should be a no-go).

In this article, we're focusing on "exactly once" semantics and so want to avoid autocommit.

Monitoring the Anomalies in Our Node.js Application

The default batch limit of 10 MB might be too much (or too little) depending on the size of one event. Fetching events in excess can lead to an unbearable load on the consuming application. To observe and look out for anomalies, we're using AppSignal for Node.js.

We can analyze slow API endpoints and database queries for our application without setting up any configs or enabling integrations.

Slow queries dashboard

It's evident that our database is not having a good time. Let's improve this — time to reduce the batch size and switch to manual Kafka commits.

Manual Kafka Commits to the Rescue

Let's turn some knobs to ensure our application doesn't get overwhelmed.

We'll update our Kafka consumer to incorporate batching. We'll fetch a manageable batch size from Kafka and keep autocommit turned off, instead performing manual commits.

Here's our consumer function:

initConsumer: async (mongoClient) => {
  const consumer = KafkaJS.consumer({
    groupId: config.kafka.consumer,
    topic: config.kafka.topic,
    maxBytesPerPartition: config.kafka.maxBytesPerPartition,
    maxBytes: config.kafka.maxBytes,
    allowAutoTopicCreation: true,
  });

  await consumer.connect();

  consumer.on("consumer.commit_offsets", (event) => {
    console.info("✅ offset is committed");
  });

  await consumer.subscribe({ topic: config.kafka.topic, fromBeginning: false });

  await consumer.run({
    autoCommit: false,
    eachBatchAutoResolve: false,
    eachBatch: async ({
      batch,
      resolveOffset,
      heartbeat,
      commitOffsetsIfNecessary,
      uncommittedOffsets,
      isRunning,
      isStale,
      pause,
      resume,
    }) => {
      console.log("🛑 Pausing consumer group");
      pause([{ topic: config.kafka.topic }]);

      console.log(`🚧 Processing ${batch.messages.length} messages 🚧`);
      for (let message of batch.messages) {
        if (!isRunning() || isStale()) break;

        // Processing the event
        tracer.startActiveSpan(
          "insert | kafka event | SUCCEEDED",
          async (span) => {
            const Item = mongoClient.collection("items");

            // Send query data to AppSignal
            setCategory("query.mongodb");
            setBody(`Item.insertOne()`);
            setTag("operationType", "insert");

            // Save event data to DB
            await Item.insertOne(message);

            span.end();
          }
        );

        resolveOffset(message.offset);
        await heartbeat();

        commitOffsetsIfNecessary([message.offset]);
      }

      console.log("🟢 Resuming consumption for next batch");
      consumer.resume([{ topic: config.kafka.topic }]);
    },
  });
};
Enter fullscreen mode Exit fullscreen mode

Let's run through this step-by-step:

  • We've explicitly set fetchMaxBytes and turned off autocommit.
  • Upon successful execution, we mark the event as processed by invoking commitOffsetsIfNecessary manually (this was automated earlier).
  • Once the events are committed to offset, we resume consumption.
  • The commits are happening after processing every message. This way, we don't wait for the whole batch to finish processing before a commit can happen.

With these changes, we've improved the reliability of the pub-sub setup through manual commits and smaller batches. Let's explore how we can improve this further in the next section.

Granular Control with Async Queue in Kafka

As you might've noticed, we specify the batch size, not the number of events. In fact, the number of events we get in a batch is completely dependent on the size of one individual event. This leaves room for unwanted surprises. We don't want our application to get burdened with more events than it can process.

Welcome, async queue!

We will build a second throttling layer with async queue. With this queue, our application will only process a set number of events at once. The number is specified by the concurrency parameter.

Let's understand with an example.

Throttling Kafka queue with async queue

The second throttling layer lets us specify the exact number of events we want to process concurrently. Here's the updated consumer code:

initConsumer: async (mongoClient) => {
  const concurrentQueue = async.queue(async (queueData) => {
    const data = JSON.parse(queueData.value);
    console.time(`${data.hash} took`);
    console.log("processing in queue: ", data);

    // AppSignal event capturing, can comment if throttling has unexpected behaviour
    tracer.startActiveSpan("insert | kafka event | SUCCEEDED", async (span) => {
      const Item = mongoClient.collection("items");

      // Send query data to AppSignal
      setCategory("query.mongodb");
      setBody(`Item.insertOne()`);
      setTag("operationType", "insert");

      // Save event data to DB
      await Item.insertOne(data);

      console.timeEnd(`${data.hash} took`);

      span.end();
    });
  }, config.kafka.queueConcurrency);

  const consumer = KafkaJS.consumer({
    groupId: config.kafka.consumer,
    topic: config.kafka.topic,
    maxBytesPerPartition: config.kafka.maxBytesPerPartition,
    maxBytes: config.kafka.maxBytes,
    allowAutoTopicCreation: true,
  });

  // Connect to the consumer group
  await consumer.connect();

  consumer.on("consumer.commit_offsets", (event) => {
    console.info("✅ offset is committed");
  });

  // Subscribe to the topic
  await consumer.subscribe({ topic: config.kafka.topic, fromBeginning: false });

  // Run the consumer
  await consumer.run({
    autoCommit: false, // This is turned off since we're doing manual commits
    eachBatchAutoResolve: false,
    eachBatch: async ({
      batch,
      resolveOffset,
      heartbeat,
      commitOffsetsIfNecessary,
      uncommittedOffsets,
      isRunning,
      isStale,
      pause,
      resume,
    }) => {
      console.log("🛑 Pausing consumer group");
      pause([{ topic: config.kafka.topic }]);

      let offsets = [];
      console.log(`⬇️ Fetched ${batch.messages.length} messages from batch`);
      for (let message of batch.messages) {
        if (!isRunning() || isStale()) break;

        // Process the event
        concurrentQueue.push(message);

        offsets.push(message.push);

        resolveOffset(message.offset);
        await heartbeat();
      }

      // Handler for draining queue
      // Resumes the consumer to fetch next batch only when concurrentQueue is empty
      concurrentQueue.drain(function () {
        commitOffsetsIfNecessary(offsets);
        console.log("🟢 Resuming consumption for next batch");
        consumer.resume([{ topic: config.kafka.topic }]);
      });
    },
  });

  process.on("drainQueue", concurrentQueue.drain);
};
Enter fullscreen mode Exit fullscreen mode

We are now fetching a batch from Kafka and pushing it to the queue. The queue makes sure only a predetermined number of events get processed concurrently.

Putting It All Together

With our updated pub-sub setup, let's analyze how the events are processed. We're producing a total of 10 events where the batch size is set to 1 KB and queue concurrency is 2.

initConsumer: async (mongoClient) => {
  const concurrentQueue = async.queue(async (queueData) => {
    const data = JSON.parse(queueData.value);
    console.time(`${data.hash} took`);
    console.log('processing in queue: ', data);

    // AppSignal event capturing, can comment if throttling has unexpected behaviour
    tracer.startActiveSpan('insert | kafka event | SUCCEEDED', async (span) => {
      const Item = mongoClient.collection('items');

      // Send query data to AppSignal
      setCategory('query.mongodb');
      setBody(`Item.insertOne()`);
      setTag('operationType', 'insert');

      // Save event data to DB
      await Item.insertOne(data);

      console.timeEnd(`${data.hash} took`);

      span.end();
    });
  }, config.kafka.queueConcurrency);

  const consumer = KafkaJS.consumer({
    groupId: config.kafka.consumer,
    topic: config.kafka.topic,
    maxBytesPerPartition: config.kafka.maxBytesPerPartition,
    maxBytes: config.kafka.maxBytes,
    allowAutoTopicCreation: true,
  });

  // Connect to the consumer group
  await consumer.connect();

  consumer.on('consumer.commit_offsets', (event) => {
    console.info('✅ offset is committed');
  });

  // Subscribe to the topic
  await consumer.subscribe({ topic: config.kafka.topic, fromBeginning: false });

  // Run the consumer
  await consumer.run({
    autoCommit: false, // This is turned off since we're doing manual commits
    eachBatchAutoResolve: false,
    eachBatch: async ({
      batch,
      resolveOffset,
      heartbeat,
      commitOffsetsIfNecessary,
      uncommittedOffsets,
      isRunning,
      isStale,
      pause,
      resume,
    }) => {
      console.log('🛑 Pausing consumer group');
      pause([{ topic: config.kafka.topic }]);

      let offsets=[];
      console.log(`⬇️ Fetched ${batch.messages.length} messages from batch`);
      for (let message of batch.messages) {
        if (!isRunning() || isStale()) break;

        // Process the event
        concurrentQueue.push(message);

        offsets.push(message.push);

        resolveOffset(message.offset)
        await heartbeat();
      }

      // Handler for draining queue
      // Resumes the consumer to fetch next batch only when concurrentQueue is empty
      concurrentQueue.drain(function() {
        commitOffsetsIfNecessary(offsets);
        console.log('🟢 Resuming consumption for next batch');
        consumer.resume([{ topic: config.kafka.topic }]);
      });
    },
  });


  process.on('drainQueue', concurrentQueue.drain);
},


getProducer: async () => {
  const producer = KafkaJS.producer({
    allowAutoTopicCreation: false
  });

  producer.on('producer.connect', (event) => {
    console.info('Kafkajs producer connection', { extra: event });
  });
  producer.on('producer.disconnect', () => {
    console.error(new Error('Kafkajs producer disconnected'));
  });
  await producer.connect();
  return producer;
}
Enter fullscreen mode Exit fullscreen mode

Here's the output of the above code:

Terminal screenshot showing throttled Kafka queue output

Note that we produced 10 events, but we're only focusing on one of the fetch cycles that pulled 7 events. We can confirm a few things by studying the above output:

  1. Batched consumption is working as expected. Consumption is paused after a batch is fetched and is only resumed once the batch is completely processed.
  2. Our second throttling layer (i.e., queue) only processes two events at once. As soon as one of the two is successfully processed, another event is picked from the fetched batch for processing.

Real-World Kafka for Node Applications

Kafka pub-sub is widely used for a variety of use cases across the industry, such as:

  • In an e-commerce store, to update inventory and send stock refresh notifications to users.
  • To send user events to a post-processor for lazy insertions into an analytics database.
  • To build an audit trail pipeline where any modifications to an entity will be pushed to a topic and consumed by an audit service.

This is one of the reasons that companies like Uber, LinkedIn, Twitter, Netflix, PayPal, and many more use Apache Kafka.

Wrapping Up

In this post, we explored how to batch Kafka events with manual pause-resume functionality to build a scalable Kafka pub-sub pipeline. We talked about autocommit and how keeping it on isn't good for "exactly once" semantics. We also added a queue for granular control over a batch of events and consumed it at our own pace.

This mechanism of throttling Kafka enables your Node.js application to handle incoming load more gracefully and prevents choking of resources, thus promoting scalability. There's no one-size-fits-all solution; you can always customize the pipeline according to your needs and goals.

Happy coding!

P.S. If you liked this post, subscribe to our JavaScript Sorcery list for a monthly deep dive into more magical JavaScript tips and tricks.

P.P.S. If you need an APM for your Node.js app, go and check out the AppSignal APM for Node.js.

Top comments (0)