Kafka Can’t Breathe? Queue It, Don’t Kill It!

Ravi ThakurRavi Thakur
4 min read

🛠️ Taming Kafka Overload with BullMQ Queues and Smart Consumer Control

When Kafka is producing messages like there's no tomorrow, things can spiral quickly. A high-throughput Kafka consumer doing too much can end up hogging shared resources like the database or Elasticsearch—especially if it’s living alongside critical services in the same pods or nodes. This post walks through a pattern that helps decouple processing, apply back-pressure, and keep systems healthy.

😵 The Problem

In a typical setup:

  • A REST service triggers events.

  • A Kafka producer sends those events to a topic.

  • A Kafka consumer picks up the messages and immediately does all the work—like calling DB, making API calls, indexing in Elasticsearch, logging, etc.

This works… until it doesn’t. Under load, the consumer overwhelms the system. Shared pods or nodes suffer. The REST APIs start failing, and things break down fast.

🧠 The Fix: Decouple Heavy Work Using a Queue

To manage this cleanly, we can split responsibilities into pods and delegate heavy operations to background workers.

🔧 New Architecture

REST Service --> Kafka Producer --> Kafka Consumer --> Redis Queue (BullMQ) --> Workers --> DB/ES/APIs

📦 Pod Layout

Pod 1: REST + Kafka Producer

Pod 2: Kafka Consumer (lightweight)

Pod 3: BullMQ Worker (handles actual business logic)

✅ Key Design Decisions

  • Kafka Consumer only pushes the payload to a Redis queue.

  • Redis Queue is powered by BullMQ.

  • Worker picks 10 messages at a time using BullMQ's concurrency feature.

  • Added a small delay between batches to avoid spiking the database.

  • Back-pressure: If the queue is overloaded, we pause the Kafka consumer; resume once things cool down.

🧠 How Many Jobs Should a Worker Pick?

  • Choosing the right concurrency value (concurrency: N) in BullMQ is crucial:

  • Higher concurrency (e.g., 10 or more) is good for systems with high DB throughput or async I/O.

  • Lower concurrency (e.g., 1–5) is better when dealing with sensitive databases or rate-limited APIs.

    💡 Recommendation:

Start with a safe default like concurrency: 5 or 10, monitor CPU, memory, and DB usage. Then tune accordingly.

💻 Code Snippet: Kafka Consumer with BullMQ and Pause Logic

// kafkaConsumer.js
const { Kafka } = require('kafkajs');
const { Queue } = require('bullmq');
const redis = { connection: { host: 'localhost', port: 6379 } };

const taskQueue = new Queue('task-queue', redis);

const kafka = new Kafka({ brokers: ['kafka:9092'] });
const consumer = kafka.consumer({ groupId: 'my-group' });

let isPaused = false;

async function runConsumer() {
  await consumer.connect();
  await consumer.subscribe({ topic: 'events', fromBeginning: false });

await consumer.run({
    eachMessage: async ({ message }) => {
      if (isPaused) return;

      await taskQueue.add('process-event', JSON.parse(message.value.toString()));

      // Back-pressure check
      const jobCounts = await taskQueue.getJobCounts();
      if (jobCounts.waiting > 1000) {
        await consumer.pause();
        isPaused = true;
        console.log('Consumer paused due to high queue load');
      }
    }
  });

  // Resume logic
  setInterval(async () => {
    if (isPaused) {
      const jobCounts = await taskQueue.getJobCounts();
      if (jobCounts.waiting < 500) {
        await consumer.resume();
        isPaused = false;
        console.log('Consumer resumed');
      }
    }
  }, 5000);
}
runConsumer().catch(console.error);

💼 Worker Code: Process 10 Jobs at a Time

// worker.js
const { Worker } = require('bullmq');
const redis = { connection: { host: 'localhost', port: 6379 } };

const worker = new Worker(
  'task-queue',
  async (job) => {
console.log('Processing job:', job.id);
await doHeavyWork(job.data);
  },
  {
    concurrency: 10, // Pick 10 jobs in parallel
    ...redis
  }
);

async function doHeavyWork(data) {
  // Simulate DB/ES/API calls
  await new Promise((resolve) => setTimeout(resolve, 100));
  console.log('Processed:', data);
}

🤔 Can Different Pods Share a BullMQ Queue?

Yes—because BullMQ uses Redis as the queue backend, multiple pods can interact with it as long as they point to the same Redis instance. So the consumer and the worker can be entirely separate services or pods.

🎯 Final Thoughts

If you’re running Kafka consumers that perform heavy operations, decouple the execution from the consumption. Let Kafka consumers act as message forwarders. Queue up the real work using BullMQ or any reliable queue system. Then let rate-controlled workers handle processing—without overwhelming shared infra.

This setup adds resilience, scalability, and breathing room.

🏷️ Summary Points

  • Don’t let Kafka consumers hog resources—queue it instead.

  • Use BullMQ to control load and parallelism.

  • Choose concurrency value based on your infra’s ability. Start with 5–10.

  • Add pause/resume logic to Kafka to avoid queue flooding.

  • Monitor. Adjust. Repeat.

1
Subscribe to my newsletter

Read articles from Ravi Thakur directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Ravi Thakur
Ravi Thakur