Modular event driven architectural like a Pro


What is an even driven architecture, well in plain terms it means sending a message to another service to handle without taking note of the response (failure or success).
In this article i will cover the following
Why would you want to use and event driven architecture
Why modular approach
We will build a simple express app to show this approach in a clean form
Why would you want to use and event driven architecture
π 1. Decoupling of Services
Components (e.g., microservices) can operate independently and communicate via events.
This reduces tight coupling and makes the system more maintainable and scalable.
βοΈ 2. Scalability and Performance
Events can be processed asynchronously, allowing parts of the system to scale independently based on load.
Ideal for high-throughput systems where not every component needs to react immediately.
β±οΈ 3. Real-Time Capabilities
Enables real-time reactions to events
Critical for applications like trading platforms, e-commerce, and messaging apps.
π§± 4. Flexibility and Extensibility
You can add new consumers or subscribers to events without changing the producer.
Encourages faster iteration and evolution of the system.
π« 5. Failure Isolation
Failures in one consumer donβt affect others or the producer.
Easier to retry or handle failures gracefully via queues or dead-letter topics.
π 6. Audit and Traceability
Every event is a record of something that happened in the system.
Useful for building audit logs, debugging, or even replaying events to rebuild state.
π€ 7. Loose Coupling for Distributed Systems
Ideal for systems distributed across services or regions.
Communication via a message broker (like Kafka, RabbitMQ, event Emitters or Redis Streams) instead of direct API calls.
Why modular approach
1. π§± Separation of Concerns (SoC)
Modular design encapsulates related functionality within individual units (modules).
Each module focuses on a single responsibility (e.g., Users, Orders, Payments).
Event-driven architecture (EDA) allows these modules to communicate without being directly dependent on each other.
2. π Loose Coupling
With EDA, modules donβt call each other directly β they emit or consume events.
This reduces tight interdependencies and makes it easier to change or replace parts of the system.
3. β»οΈ Reusability & Composability
Modular design encourages reusable code components
EDA allows those reusable modules to be composed dynamically based on emitted events.
4. π Asynchronous Communication
- Event-driven systems allow modules to react to changes after the fact rather than waiting for synchronous responses.
5. π§ͺ Scalable Development & Testing
Teams can build and test modules independently.
You can simulate events during unit or integration testing without needing the full system.
π Directory Structure
/src
/modules
/user
users.service.ts
/email
email.service.ts
/cores
/events
dlq.model.ts
dlq.ts
event.bus.ts
event.types.ts
server.ts
dlq.model.ts
import { Schema, model } from 'mongoose';
const dlqSchema = new Schema({
eventType: { type: String, required: true },
payload: { type: Schema.Types.Mixed, required: true },
error: {
message: String,
stack: String,
},
service: { type: String, required: true },
retryCount: { type: Number, default: 0 },
timestamp: { type: Date, default: Date.now },
});
export const DLQEntry = model('DLQEntry', dlqSchema);
dlq.ts
import { EventType } from "./event.types";
import { DLQEntry } from "./dlq.model";
import { eventBus } from "./event.bus";
export class DeadLetterQueue {
private maxRetries = 10;
private retryDelay = 5000; // 5 seconds initial delay
async addFailedEvent<T extends EventType>(eventType: T, payload: any, error: Error, context: { service: string; retryCount: number; }): Promise<void> {
await DLQEntry.create({eventType, payload, error: {
message: error.message,
stack: error.stack,
},
service: context.service,
retryCount: context.retryCount,
timestamp: new Date(),
});
// Start retrying if within retry limit
if (context.retryCount < this.maxRetries) {
this.retryFailedEvents(context.service);
}
}
async getFailedEvents(service: string): Promise<typeof DLQEntry[]> {
return DLQEntry.find({ service });
}
async retryFailedEvents(service: string): Promise<void> {
const failedEvents = <any>await this.getFailedEvents(service);
for (const event of failedEvents)
{
if (event.retryCount >= this.maxRetries) {
console.error(`β Event ${event.eventType} permanently failed after max retries`);
continue; // Skip permanently failed events
}
setTimeout(async () => {
try {
console.log(`π Retrying event ${event.eventType} (Attempt: ${event.retryCount + 1})`);
// Re-publish event
eventBus.publish(event.eventType, event.payload);
// Remove event from DLQ after successful retry
await DLQEntry.deleteOne({ _id: event._id });
}
catch (error) {
console.error(`β οΈ Retry failed for event ${event.eventType}:`, error);
// Increment retry count and update timestamp
await DLQEntry.updateOne(
{ _id: event._id },
{ $inc: { retryCount: 1 }, $set: { timestamp: new Date() } }
);
}
}, this.retryDelay * (event.retryCount + 1)); // Exponential Backoff
}
}
}
// Singleton instance
export const dlq = new DeadLetterQueue();
event.bus.ts
import { EventEmitter } from 'events';
import { EventType, EventPayloads } from './event.types';
import { dlq } from './dlq';
export class RestaurantEventBus {
private emitter = new EventEmitter();
private subscriptions = new Map<EventType, Function[]>();
async publish<T extends EventType>(type: T, payload: EventPayloads[T]): Promise<void> {
this.emitter.emit(type, payload);
}
async subscribe<T extends EventType>(type: T, handler: (payload: EventPayloads[T]) => Promise<void>, serviceName: string): Promise<void> {
const wrappedHandler = async (payload: EventPayloads[T]) => {
try { await handler(payload) }
catch (error: any) {
await dlq.addFailedEvent(type, payload, error, {
service: serviceName,
retryCount: 0,
});
}
};
this.emitter.on(type, wrappedHandler);
this.subscriptions.set(type, [
...(this.subscriptions.get(type) || []),
wrappedHandler,
]);
}
}
export const eventBus = new RestaurantEventBus();
event.types.ts
import {IOrder, IOrderStatus} from "../../Modules/Order/entity/order.entity";
import {CartStatus, ICartItems } from "../../Modules/Carts/entity/carts.entity";
import {PaymentMethod} from "../../Modules/Sales/entity/sale.entity";
import {StockType} from "../../Modules/Inventory/entity/inventory.entity";
export enum EventType {
// Email Events
SEND_ACCOUNT_VERIFICATION_MAIL = 'SEND_ACCOUNT_VERIFICATION_MAIL',
SEND_PASSWORD_RESET_MAIL = 'SEND_PASSWORD_RESET_MAIL',
}
export type EventPayloads = {
// Emails Module Payloads
[EventType.SEND_ACCOUNT_VERIFICATION_MAIL]: {
email: string;
otp: string;
};
[EventType.SEND_PASSWORD_RESET_MAIL]: {
email: string;
otp: string;
};
};
email.service.ts
import {Errors} from "../../../Core/helpers/error.handler";
import {eventBus} from "../../../Core/events/event.bus";
import {EventType} from "../../../Core/events/event.types";
import { IEmailService, IEmailTemplate, IVerificationDto} from "../entity/emails.entity";
import {Notify} from "../action/notify";
export class EmailsService implements IEmailService {
constructor() {
eventBus.subscribe(EventType.SEND_ACCOUNT_VERIFICATION_MAIL, this.accountVerification, 'NotificationsService');
eventBus.subscribe(EventType.SEND_PASSWORD_RESET_MAIL, this.passwordReset, 'NotificationsService');
}
accountVerification(data: IVerificationDto): Promise<void> {
return new Promise(async (resolve, reject) => {
try {
await new Notify({
subject: 'Account Verification',
template: IEmailTemplate.ACCOUNT_ACTIVATION,
data: { otp: data.otp }
}).
send(data.email);
return resolve();
}
catch (e) {
return reject({message: Errors(e)});
}
});
}
passwordReset(data: IVerificationDto): Promise<void> {
return new Promise(async (resolve, reject) => {
try {
await new Notify({
subject: 'Password Reset',
template: IEmailTemplate.PASSWORD_RESET,
data: { otp: data.otp }})
.send(data.email);
return resolve();
}
catch (e) {
return reject({message: Errors(e)});
}
});
}
}
user.service.ts
class UsersService implements IUsersService {
private static instance: UsersService;
public readonly repository: IUsersRepository = UsersRepository;
constructor() {
//subscribe event for other service to publish to
eventBus.subscribe(EventType.UPDATE_RIDER_STATE, this.E_UpdateUser.bind(this), 'UsersService');
}
public static getInstance(): UsersService {
if (!UsersService.instance) {
UsersService.instance = new UsersService();
}
return UsersService.instance;
}
async CreateAccessToken(user: IUsers): Promise<string> {
return Promise.resolve(await this.repository.GenerateAccessToken(user));
}
async CreateRefreshToken(user: IUsers): Promise<string> {
return Promise.resolve(await this.repository.GenerateRefreshToken(user));
}
async CreateSignIN(user: IUsers, res: Response, req: Request): Promise<any> {
switch (user.status) {
case 'active':
const accessToken = await this.CreateAccessToken(user);
const refreshToken = await this.CreateRefreshToken(user);
const roleInfo = await RoleResource.getRole(user.role);
Cookies.set(res, refreshToken);
return res.status(200).json({
success: true,
message: 'Login successful',
data: {
user: await UserResource.single(user, roleInfo),
accessToken,
}
});
case 'suspended':
return response(res).error(
'Account is suspended. Please contact support',
401
);
case 'pending':
const otp = GenerateOtp(4);
if (otp) {
await this.repository.Update(user.id, {otp, verifyAccountRequest: true});
eventBus.publish(EventType.SEND_ACCOUNT_VERIFICATION_MAIL, {email: user.email, otp});
}
return response(res).success(
200,
'An OTP has been sent to your email address',
)
default:
return response(res).error('Status not recognized');
}
}
Signup(data: IRegisterDto): Promise<IUsers> {
return new Promise(async (resolve, reject) => {
try {
const createNewCustomer = await this.repository.Register({...data, password: Password.hash(data.password)});
return resolve(createNewCustomer);
}
catch (e) {
return reject({ message: Errors(e) })
}
})
}
ForgotPassword(email: string): Promise<IForgetPassword> {
return new Promise(async (resolve, reject) => {
try {
const user = await this.repository.FindByEmail(email);
if (!user) return reject({message: 'Account not found'});
if (user.status !== 'active') return reject({message: 'Account not authorized'});
const newOtp = GenerateOtp(4);
await this.repository.Update(user.id, {
otp: newOtp,
resetPassword: true
});
//emit an event to the email service module
eventBus.publish(EventType.SEND_PASSWORD_RESET_MAIL, {email: user.email, otp: newOtp});
return resolve({email});
}
catch (e) {
return reject({ message: Errors(e) })
}
})
}
async E_UpdateUser(payload: any): Promise<void> {
const { userId, state } = payload;
await this.repository.Update(userId, {state});
}
}
// Export a default instance
export default UsersService.getInstance();
Subscribe to my newsletter
Read articles from Ezumah Jeremiah Kalu directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Ezumah Jeremiah Kalu
Ezumah Jeremiah Kalu
Experience software engineer with 6 years of experience. I specialize in software engineering, application security, secure coding practices, scalable applications, distributed systems, and microservices architecture. My expertise lies in crafting robust backend solutions using Node.js, while also demonstrating proficiency in front-end development with React and React Native.