EDA using kafka in node js
new to kafka ? click here - Why Kafka?
Steps to follow
Setup the Kafka Config
We need Producer to produce events
We need Consumer to consume events in a specific topic
Setup Kafka:
install
kafkajs
usenpm i kafkajs
orbun add kafkajs
run zookeeper in background using docker
docker run -p 2181:2181 zookeeper
create a config folder and a file inside named
kafka.config.ts
now initialize unsing below code ๐
import { Kafka, Producer, Admin, Message } from "kafkajs";
export class KafkaConfig {
private kafka: Kafka;
private producer: Producer;
private admin: Admin;
constructor(brokers: string[]) {
this.kafka = new Kafka({
clientId: "producer-service",
brokers: brokers,
});
this.producer = this.kafka.producer();
this.admin = this.kafka.admin();
}
}
Producer :
Producer creates the topics. using which you can send events later.
Mainly Producer can do 4 task connect, create Topics, produce messages in a specific topic, disconnect.
connect / disconnect : using the Kafka constructor you can do ๐
import { Kafka, Producer, Admin, Message } from "kafkajs";
async connect() {
try {
await this.producer.connect();
await this.admin.connect();
} catch (error) {
throw new Error("Something went wrong");
}
}
async disconnect() {
try {
await this.producer.disconnect();
await this.admin.disconnect();
} catch (error) {
throw new Error(`${error}`);
}
}
create Topics to publish events: using the admin.listTopics()
you can now see the topics which already created. You can also put a condition for ignoring the ducplicate topic names ๐
async createTopics(topic: string) {
try {
const topicExists = await this.admin.listTopics();
console.log(topicExists);
if (!topicExists.includes(topic)) {
await this.admin.createTopics({
topics: [{ topic }],
});
console.log("topic created ");
} else {
console.log("topic already created");
}
} catch (error) {
throw new Error(`${error}`);
}
}
Produce Messages in Topics : now you can push messages using topic names ๐
async produceMessages(topic: string, messages: Message[]) {
try {
await this.producer.send({
topic,
messages,
});
} catch (error) {
console.log("sdasdksdajnds k asdnkasdn ");
throw new Error(`${error}`);
}
}
Start the Service
import { KafkaConfig } from "../config/kafka.config";
export const kafka = new KafkaConfig(["localhost:9092"]);
export const startServices = async () => {
try {
await kafka.connect();
await kafka.createTopics("test-topic");
} catch (error) {
console.log(error);
process.exit(1);
}
};
Subscribe to my newsletter
Read articles from Pratik Verma directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by