Easy Methods for Adding Kafka to Your JS App

VenusaiVenusai
4 min read

Hello! In this article I will be sharing a quick and concise demonstration on how to use Kafka for user analytics, I'll share a small project I developed to track the number of users currently logged in.

This mini app is part of my learning journey. If you have any suggestions for improving the implementation or design, please let me know, as it can help me enhance my skills.

Prerequisite : Basic Knowledge of Kafka. Can refer this resource

You can check out the github repo for source code

Technologies used

  • NodeJS

  • Express JS

  • Kafka-js package

  • Docker

  • Redis

  • MongoDB

Project structure

The app is divided into three modules

  • Authentication service (contains Kafka producer too)

  • Kafka Admin

  • Kafka Consumer

Design

Implementation

Docker commands for zookeeper and kafka

 docker run -it -p 2181:2181 zookeeper #zookeeper
#kafka 
docker run -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=<your-ip>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<your-ip>:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka

Kafka Admin : The main job of the admin is to create topics and partitions.

  • Initialize a Kafka class and create an admin
// kafka-admin/index.js
import { Kafka } from "kafkajs";

const kafka = new Kafka({
  clientId: "kafka-admin",
  brokers: ["your-ip:9092"], //! add ip address
});

const admin = kafka.admin();

const connectAdmin = async () => {
  try {
    await admin.connect();
  } catch (error) {
    throw new Error("Failed to connect admin: " + error.message);
  }
};
  • A function to create a topic and initialize the kafka admin setup

      const createTopic = async (topicName) => {
        try {
          await admin.createTopics({
            topics: [{ topic: topicName }],
          });
          console.log(`Topic ${topicName} created successfully`);
        } catch (error) {
          throw new Error("Failed to create topic: " + error.message);
        }
      };
    
      const init = async () => {
        try {
          await connectAdmin();
          console.log("Kafka admin connected successfully");
          await createTopic("user-activity");
        } catch (error) {
          console.error("Error initializing Kafka admin:", error.message);
        } finally {
          await admin.disconnect();
        }
      };
    
      init()
        .then(() => {
          console.log("Kafka admin initialization complete");
        })
        .catch((error) => {
          console.error("Error during Kafka admin initialization:", error.message);
        });
    
  • Now run the file

      node index.js
    

Kafka Consumer : Consumer groups can consume the topics

Important : Kafka Consumers need to gave groupId

//consumer/index.js
import { Kafka } from "kafkajs";

const kafka = new Kafka({
  clientId: "consumer-service",
  brokers: ["your-ip:9092"],
});

const consumer = kafka.consumer({
  groupId: "user-activity-group",
});
  • Now the consumer subscribes to the topic ‘user-activity’ and based on the type of event it is stored in redis database
const run = async () => {
  try {
    await consumer.connect();
    await consumer.subscribe({
      topics: ["user-activity"],
      fromBeginning: false,
    });
    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        const result = JSON.parse(message.value.toString());
        if (result.eventType === "user.login") {
          await redis.sadd("active-users", result.metaData.userId);
          console.log(`User ${result.metaData.email} logged in`);
        }
        if (result.eventType === "user.logout") {
          await redis.srem("active-users", result.metaData.userId);
          console.log(`User ${result.metaData.email} logged out`);
        }

        const activeUsersCount = await redis.scard("active-users");
        console.log(`Active users count: ${activeUsersCount}`);
      },
    });
  } catch (error) {
    console.error("Error in consumer run:", error);
  }
};
  • Run the function
run().catch((error) => {
  console.log("Failed to run consumer", error.message);
});

Kafka Producer: It publishes data (messages) to a Kafka topic.

  • Here I wrote a function to send the message after logging in

      //producer/index.js
      import { Kafka } from "kafkajs";
    
      const kafka = new Kafka({
        clientId: "auth-service",
        brokers: ["192.168.1.4:9092"],
      });
    
      export const producer = kafka.producer();
    
      export const connectProducer = async () => {
        try {
          await producer.connect();
          console.log("Producer connected successfully");
          return producer;
        } catch (error) {
          throw new Error("Failed to connect producer: " + error.message);
        }
      };
    
      export const sendLoginEvent = async (userData) => {
        try {
          const message = {
            key: userData.userId,
            value: JSON.stringify({
              eventType: "user.login",
              metaData: userData,
            }),
          };
          await producer.send({
            topic: "user-activity",
            messages: [message],
          });
          console.log("Login event sent successfully");
        } catch (error) {
          throw new Error("Failed to send the login event", error.message);
        }
      };
    

    Now the kafka set-up is complete and sendLoginEvent(userData) can be called when logging in

      try {
            await sendLoginEvent({
              userId: user._id.toString(),
              email: user.email,
              userName: user.userName,
              loginTime: new Date().toISOString(),
              ipAddress: req.ip,
            });
          } catch (kafkaError) {
            console.error("Failed to send login event to Kafka:", kafkaError.message);
          }
    

    Don’t forget to initialize Kafka producer

      const initializeServices = async () => {
        try {
          await connectDB();
          await connectProducer();
          console.log("All services connected successfully");
          app.listen(5004, () => {
            console.log("Auth service is running on port 5004");
          });
        } catch (error) {
          console.error("Failed to initialize services:", error.message);
          process.exit(1); // Exit if critical services fail
        }
      };
    
      // Initialize all services
      initializeServices();
    
  • Create end points

      //express js routes
      router.post("/login", login);
    

    The logged-in users count

    So, this was the mini app I built , Please let me know in the comments how can this be improved.

Thank you!!!

10
Subscribe to my newsletter

Read articles from Venusai directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Venusai
Venusai

I am a developer from Hyderabad, India. I am cse undergrad at IIITDM Kurnool, I love building web applications that are useful.