EDA using kafka in node js

Pratik VermaPratik Verma
2 min read

new to kafka ? click here - Why Kafka?

Steps to follow

  • Setup the Kafka Config

  • We need Producer to produce events

  • We need Consumer to consume events in a specific topic

Setup Kafka:

  • install kafkajs use npm i kafkajs or bun add kafkajs

  • run zookeeper in background using docker docker run -p 2181:2181 zookeeper

  • create a config folder and a file inside named kafka.config.ts

  • now initialize unsing below code ๐Ÿ‘‡

import { Kafka, Producer, Admin, Message } from "kafkajs";

export class KafkaConfig {
  private kafka: Kafka;
  private producer: Producer;
  private admin: Admin;

  constructor(brokers: string[]) {
    this.kafka = new Kafka({
      clientId: "producer-service",
      brokers: brokers,
    });
    this.producer = this.kafka.producer();
    this.admin = this.kafka.admin();
  }
}

Producer :

Producer creates the topics. using which you can send events later.

Mainly Producer can do 4 task connect, create Topics, produce messages in a specific topic, disconnect.

connect / disconnect : using the Kafka constructor you can do ๐Ÿ‘‡

import { Kafka, Producer, Admin, Message } from "kafkajs";
async connect() {
    try {
      await this.producer.connect();
      await this.admin.connect();
    } catch (error) {
      throw new Error("Something went wrong");
    }
  } 
async disconnect() {
    try {
      await this.producer.disconnect();
      await this.admin.disconnect();
    } catch (error) {
      throw new Error(`${error}`);
    }
  }

create Topics to publish events: using the admin.listTopics() you can now see the topics which already created. You can also put a condition for ignoring the ducplicate topic names ๐Ÿ‘‡

async createTopics(topic: string) {
    try {
      const topicExists = await this.admin.listTopics();

      console.log(topicExists);

      if (!topicExists.includes(topic)) {
        await this.admin.createTopics({
          topics: [{ topic }],
        });

        console.log("topic created ");
      } else {
        console.log("topic already created");
      }
    } catch (error) {
      throw new Error(`${error}`);
    }
  }

Produce Messages in Topics : now you can push messages using topic names ๐Ÿ‘‡

async produceMessages(topic: string, messages: Message[]) {
    try {
      await this.producer.send({
        topic,
        messages,
      });
    } catch (error) {
      console.log("sdasdksdajnds k asdnkasdn ");
      throw new Error(`${error}`);
    }

  }

Start the Service

import { KafkaConfig } from "../config/kafka.config";

export const kafka = new KafkaConfig(["localhost:9092"]);
export const startServices = async () => {
  try {
    await kafka.connect();
    await kafka.createTopics("test-topic");
  } catch (error) {
    console.log(error);
    process.exit(1);
  }
};
0
Subscribe to my newsletter

Read articles from Pratik Verma directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Pratik Verma
Pratik Verma