Easy Methods for Adding Kafka to Your JS App

Hello! In this article I will be sharing a quick and concise demonstration on how to use Kafka for user analytics, I'll share a small project I developed to track the number of users currently logged in.
This mini app is part of my learning journey. If you have any suggestions for improving the implementation or design, please let me know, as it can help me enhance my skills.
Prerequisite : Basic Knowledge of Kafka. Can refer this resource
You can check out the github repo for source code
Technologies used
NodeJS
Express JS
Kafka-js package
Docker
Redis
MongoDB
Project structure
The app is divided into three modules
Authentication service (contains Kafka producer too)
Kafka Admin
Kafka Consumer
Design
Implementation
Docker commands for zookeeper and kafka
docker run -it -p 2181:2181 zookeeper #zookeeper
#kafka
docker run -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=<your-ip>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<your-ip>:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka
Kafka Admin : The main job of the admin is to create topics and partitions.
- Initialize a Kafka class and create an admin
// kafka-admin/index.js
import { Kafka } from "kafkajs";
const kafka = new Kafka({
clientId: "kafka-admin",
brokers: ["your-ip:9092"], //! add ip address
});
const admin = kafka.admin();
const connectAdmin = async () => {
try {
await admin.connect();
} catch (error) {
throw new Error("Failed to connect admin: " + error.message);
}
};
A function to create a topic and initialize the kafka admin setup
const createTopic = async (topicName) => { try { await admin.createTopics({ topics: [{ topic: topicName }], }); console.log(`Topic ${topicName} created successfully`); } catch (error) { throw new Error("Failed to create topic: " + error.message); } }; const init = async () => { try { await connectAdmin(); console.log("Kafka admin connected successfully"); await createTopic("user-activity"); } catch (error) { console.error("Error initializing Kafka admin:", error.message); } finally { await admin.disconnect(); } }; init() .then(() => { console.log("Kafka admin initialization complete"); }) .catch((error) => { console.error("Error during Kafka admin initialization:", error.message); });
Now run the file
node index.js
Kafka Consumer : Consumer groups can consume the topics
Important : Kafka Consumers need to gave groupId
//consumer/index.js
import { Kafka } from "kafkajs";
const kafka = new Kafka({
clientId: "consumer-service",
brokers: ["your-ip:9092"],
});
const consumer = kafka.consumer({
groupId: "user-activity-group",
});
- Now the consumer subscribes to the topic ‘user-activity’ and based on the type of event it is stored in redis database
const run = async () => {
try {
await consumer.connect();
await consumer.subscribe({
topics: ["user-activity"],
fromBeginning: false,
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const result = JSON.parse(message.value.toString());
if (result.eventType === "user.login") {
await redis.sadd("active-users", result.metaData.userId);
console.log(`User ${result.metaData.email} logged in`);
}
if (result.eventType === "user.logout") {
await redis.srem("active-users", result.metaData.userId);
console.log(`User ${result.metaData.email} logged out`);
}
const activeUsersCount = await redis.scard("active-users");
console.log(`Active users count: ${activeUsersCount}`);
},
});
} catch (error) {
console.error("Error in consumer run:", error);
}
};
- Run the function
run().catch((error) => {
console.log("Failed to run consumer", error.message);
});
Kafka Producer: It publishes data (messages) to a Kafka topic.
Here I wrote a function to send the message after logging in
//producer/index.js import { Kafka } from "kafkajs"; const kafka = new Kafka({ clientId: "auth-service", brokers: ["192.168.1.4:9092"], }); export const producer = kafka.producer(); export const connectProducer = async () => { try { await producer.connect(); console.log("Producer connected successfully"); return producer; } catch (error) { throw new Error("Failed to connect producer: " + error.message); } }; export const sendLoginEvent = async (userData) => { try { const message = { key: userData.userId, value: JSON.stringify({ eventType: "user.login", metaData: userData, }), }; await producer.send({ topic: "user-activity", messages: [message], }); console.log("Login event sent successfully"); } catch (error) { throw new Error("Failed to send the login event", error.message); } };
Now the kafka set-up is complete and
sendLoginEvent(userData)
can be called when logging intry { await sendLoginEvent({ userId: user._id.toString(), email: user.email, userName: user.userName, loginTime: new Date().toISOString(), ipAddress: req.ip, }); } catch (kafkaError) { console.error("Failed to send login event to Kafka:", kafkaError.message); }
Don’t forget to initialize Kafka producer
const initializeServices = async () => { try { await connectDB(); await connectProducer(); console.log("All services connected successfully"); app.listen(5004, () => { console.log("Auth service is running on port 5004"); }); } catch (error) { console.error("Failed to initialize services:", error.message); process.exit(1); // Exit if critical services fail } }; // Initialize all services initializeServices();
Create end points
//express js routes router.post("/login", login);
The logged-in users count
So, this was the mini app I built , Please let me know in the comments how can this be improved.
Thank you!!!
Subscribe to my newsletter
Read articles from Venusai directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Venusai
Venusai
I am a developer from Hyderabad, India. I am cse undergrad at IIITDM Kurnool, I love building web applications that are useful.