Modular event driven architectural like a Pro

What is an even driven architecture, well in plain terms it means sending a message to another service to handle without taking note of the response (failure or success).

In this article i will cover the following

  1. Why would you want to use and event driven architecture

  2. Why modular approach

  3. We will build a simple express app to show this approach in a clean form

Why would you want to use and event driven architecture

πŸ”„ 1. Decoupling of Services

  • Components (e.g., microservices) can operate independently and communicate via events.

  • This reduces tight coupling and makes the system more maintainable and scalable.

βš™οΈ 2. Scalability and Performance

  • Events can be processed asynchronously, allowing parts of the system to scale independently based on load.

  • Ideal for high-throughput systems where not every component needs to react immediately.

⏱️ 3. Real-Time Capabilities

  • Enables real-time reactions to events

  • Critical for applications like trading platforms, e-commerce, and messaging apps.

🧱 4. Flexibility and Extensibility

  • You can add new consumers or subscribers to events without changing the producer.

  • Encourages faster iteration and evolution of the system.

🚫 5. Failure Isolation

  • Failures in one consumer don’t affect others or the producer.

  • Easier to retry or handle failures gracefully via queues or dead-letter topics.

πŸ” 6. Audit and Traceability

  • Every event is a record of something that happened in the system.

  • Useful for building audit logs, debugging, or even replaying events to rebuild state.

πŸ“€ 7. Loose Coupling for Distributed Systems

  • Ideal for systems distributed across services or regions.

  • Communication via a message broker (like Kafka, RabbitMQ, event Emitters or Redis Streams) instead of direct API calls.

Why modular approach

1. 🧱 Separation of Concerns (SoC)

  • Modular design encapsulates related functionality within individual units (modules).

  • Each module focuses on a single responsibility (e.g., Users, Orders, Payments).

  • Event-driven architecture (EDA) allows these modules to communicate without being directly dependent on each other.


2. πŸ”Œ Loose Coupling

  • With EDA, modules don’t call each other directly β€” they emit or consume events.

  • This reduces tight interdependencies and makes it easier to change or replace parts of the system.


3. ♻️ Reusability & Composability

  • Modular design encourages reusable code components

  • EDA allows those reusable modules to be composed dynamically based on emitted events.


4. πŸ”„ Asynchronous Communication

  • Event-driven systems allow modules to react to changes after the fact rather than waiting for synchronous responses.

5. πŸ§ͺ Scalable Development & Testing

  • Teams can build and test modules independently.

  • You can simulate events during unit or integration testing without needing the full system.

πŸ“ Directory Structure

/src
  /modules
    /user
        users.service.ts
    /email
        email.service.ts
  /cores
    /events
      dlq.model.ts
      dlq.ts
      event.bus.ts
      event.types.ts
server.ts

dlq.model.ts

import { Schema, model } from 'mongoose';

const dlqSchema = new Schema({
    eventType: { type: String, required: true },
    payload: { type: Schema.Types.Mixed, required: true },
    error: {
        message: String,
        stack: String,
    },
    service: { type: String, required: true },
    retryCount: { type: Number, default: 0 },
    timestamp: { type: Date, default: Date.now },
});

export const DLQEntry = model('DLQEntry', dlqSchema);

dlq.ts

import { EventType } from "./event.types";
import { DLQEntry } from "./dlq.model";
import { eventBus } from "./event.bus";

export class DeadLetterQueue {
    private maxRetries = 10;
    private retryDelay = 5000; // 5 seconds initial delay

    async addFailedEvent<T extends EventType>(eventType: T, payload: any, error: Error, context: { service: string; retryCount: number; }): Promise<void> {
        await DLQEntry.create({eventType, payload, error: {
                message: error.message,
                stack: error.stack,
            },
            service: context.service,
            retryCount: context.retryCount,
            timestamp: new Date(),
        });

        // Start retrying if within retry limit
        if (context.retryCount < this.maxRetries) {
            this.retryFailedEvents(context.service);
        }
    }

    async getFailedEvents(service: string): Promise<typeof DLQEntry[]> {
        return DLQEntry.find({ service });
    }

    async retryFailedEvents(service: string): Promise<void> {
        const failedEvents = <any>await this.getFailedEvents(service);

        for (const event of failedEvents)
        {
            if (event.retryCount >= this.maxRetries) {
                console.error(`❌ Event ${event.eventType} permanently failed after max retries`);
                continue; // Skip permanently failed events
            }

            setTimeout(async () => {
                try {
                    console.log(`πŸ”„ Retrying event ${event.eventType} (Attempt: ${event.retryCount + 1})`);

                    // Re-publish event
                    eventBus.publish(event.eventType, event.payload);

                    // Remove event from DLQ after successful retry
                    await DLQEntry.deleteOne({ _id: event._id });
                }
                catch (error) {
                    console.error(`⚠️ Retry failed for event ${event.eventType}:`, error);

                    // Increment retry count and update timestamp
                    await DLQEntry.updateOne(
                        { _id: event._id },
                        { $inc: { retryCount: 1 }, $set: { timestamp: new Date() } }
                    );
                }
            }, this.retryDelay * (event.retryCount + 1)); // Exponential Backoff
        }
    }
}

// Singleton instance
export const dlq = new DeadLetterQueue();

event.bus.ts

import { EventEmitter } from 'events';
import { EventType, EventPayloads } from './event.types';
import { dlq } from './dlq';

export class RestaurantEventBus {

    private emitter = new EventEmitter();
    private subscriptions = new Map<EventType, Function[]>();

    async publish<T extends EventType>(type: T, payload: EventPayloads[T]): Promise<void> {
        this.emitter.emit(type, payload);
    }

    async subscribe<T extends EventType>(type: T, handler: (payload: EventPayloads[T]) => Promise<void>, serviceName: string): Promise<void> {
        const wrappedHandler = async (payload: EventPayloads[T]) => {
            try { await handler(payload) }
            catch (error: any) {
                await dlq.addFailedEvent(type, payload, error, {
                    service: serviceName,
                    retryCount: 0,
                });
            }
        };

        this.emitter.on(type, wrappedHandler);
        this.subscriptions.set(type, [
            ...(this.subscriptions.get(type) || []),
            wrappedHandler,
        ]);
    }
}

export const eventBus = new RestaurantEventBus();

event.types.ts

import {IOrder, IOrderStatus} from "../../Modules/Order/entity/order.entity";
import {CartStatus, ICartItems } from "../../Modules/Carts/entity/carts.entity";
import {PaymentMethod} from "../../Modules/Sales/entity/sale.entity";
import {StockType} from "../../Modules/Inventory/entity/inventory.entity";

export enum EventType {
    // Email Events
    SEND_ACCOUNT_VERIFICATION_MAIL = 'SEND_ACCOUNT_VERIFICATION_MAIL',
    SEND_PASSWORD_RESET_MAIL = 'SEND_PASSWORD_RESET_MAIL',
}

export type EventPayloads = {
    // Emails Module Payloads
    [EventType.SEND_ACCOUNT_VERIFICATION_MAIL]: {
        email: string;
        otp: string;
    };

    [EventType.SEND_PASSWORD_RESET_MAIL]: {
        email: string;
        otp: string;
    };
};

email.service.ts

import {Errors} from "../../../Core/helpers/error.handler";
import {eventBus} from "../../../Core/events/event.bus";
import {EventType} from "../../../Core/events/event.types";
import { IEmailService, IEmailTemplate, IVerificationDto} from "../entity/emails.entity";
import {Notify} from "../action/notify";

export class EmailsService implements IEmailService {

    constructor() {
        eventBus.subscribe(EventType.SEND_ACCOUNT_VERIFICATION_MAIL, this.accountVerification, 'NotificationsService');
        eventBus.subscribe(EventType.SEND_PASSWORD_RESET_MAIL, this.passwordReset, 'NotificationsService');
    }

    accountVerification(data: IVerificationDto): Promise<void> {
       return new Promise(async (resolve, reject) => {
           try {
                await new Notify({
                    subject: 'Account Verification',
                    template: IEmailTemplate.ACCOUNT_ACTIVATION,
                    data: { otp: data.otp }
                }).
                send(data.email);
                return resolve();
           }
           catch (e) {
               return reject({message: Errors(e)});
           }
       });
    }

    passwordReset(data: IVerificationDto): Promise<void> {
        return new Promise(async (resolve, reject) => {
            try {
                await new Notify({
                    subject: 'Password Reset',
                    template: IEmailTemplate.PASSWORD_RESET,
                    data: { otp: data.otp }})
                    .send(data.email);
                return resolve();
            }
            catch (e) {
                return reject({message: Errors(e)});
            }
        });
    }

}

user.service.ts

class UsersService implements IUsersService {

    private static instance: UsersService;
    public readonly repository: IUsersRepository = UsersRepository;

    constructor() {
        //subscribe event for other service to publish to
        eventBus.subscribe(EventType.UPDATE_RIDER_STATE, this.E_UpdateUser.bind(this), 'UsersService');
    }

    public static getInstance(): UsersService {
        if (!UsersService.instance) {
            UsersService.instance = new UsersService();
        }
        return UsersService.instance;
    }

    async CreateAccessToken(user: IUsers): Promise<string> {
        return Promise.resolve(await this.repository.GenerateAccessToken(user));
    }

    async CreateRefreshToken(user: IUsers): Promise<string> {
        return Promise.resolve(await this.repository.GenerateRefreshToken(user));
    }

    async CreateSignIN(user: IUsers, res: Response, req: Request): Promise<any> {
        switch (user.status) {
            case 'active':
                const accessToken = await this.CreateAccessToken(user);
                const refreshToken = await this.CreateRefreshToken(user);

                const roleInfo = await RoleResource.getRole(user.role);

                Cookies.set(res, refreshToken);

                return res.status(200).json({
                    success: true,
                    message: 'Login successful',
                    data: {
                        user: await UserResource.single(user, roleInfo),
                        accessToken,
                    }
                });

            case 'suspended':
                return response(res).error(
                    'Account is suspended. Please contact support',
                    401
                );

            case 'pending':
                const otp = GenerateOtp(4);
                if (otp) {
                    await this.repository.Update(user.id, {otp, verifyAccountRequest: true});
                    eventBus.publish(EventType.SEND_ACCOUNT_VERIFICATION_MAIL, {email: user.email, otp});
                }

                return response(res).success(
                    200,
                    'An OTP has been sent to your email address',
                )

            default:
                return response(res).error('Status not recognized');
        }
    }

    Signup(data: IRegisterDto): Promise<IUsers> {
        return new Promise(async (resolve, reject) => {
            try {
                const createNewCustomer = await this.repository.Register({...data, password: Password.hash(data.password)});
                return resolve(createNewCustomer);
            }
            catch (e) {
                return reject({ message: Errors(e) })
            }
        })
    }

    ForgotPassword(email: string): Promise<IForgetPassword> {
        return new Promise(async (resolve, reject) => {
            try {
                const user = await this.repository.FindByEmail(email);
                if (!user) return reject({message: 'Account not found'});

                if (user.status !== 'active') return reject({message: 'Account not authorized'});

                const newOtp = GenerateOtp(4);
                await this.repository.Update(user.id, {
                    otp: newOtp,
                    resetPassword: true
                });

                //emit an event to the email service module
                eventBus.publish(EventType.SEND_PASSWORD_RESET_MAIL, {email: user.email, otp: newOtp});

                return resolve({email});
            }
            catch (e) {
                return reject({ message: Errors(e) })
            }
        })
    }

    async E_UpdateUser(payload: any): Promise<void> {
        const { userId, state } = payload;
        await this.repository.Update(userId, {state});
    }

}

// Export a default instance
export default UsersService.getInstance();
0
Subscribe to my newsletter

Read articles from Ezumah Jeremiah Kalu directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Ezumah Jeremiah Kalu
Ezumah Jeremiah Kalu

Experience software engineer with 6 years of experience. I specialize in software engineering, application security, secure coding practices, scalable applications, distributed systems, and microservices architecture. My expertise lies in crafting robust backend solutions using Node.js, while also demonstrating proficiency in front-end development with React and React Native.