Why & How to Implement NestJS event listeners with cronjob?
What’s the need for cronjobs in project?
The project focused on scheduling appointments between doctors and patients. The scheduling pattern considered is stream scheduling. Since it was a backend project, the chosen technology stack included:
NestJS ( For backend server )
Typeorm ( For Object Relation Mapping )
GraphQL ( For apis )
PostgreSQL ( For database )
Docker ( For deployment & development purpose )
Yarn ( As a package manager instead of npm )
Cronjobs were needed in the cases where let’s say there is a status parameter for appointment which is actively looked upon to see which appointment is scheduled or rescheduled. There are some other statuses such as on-hold, in-process, reschedule_needed & completed. Whenever free appointments are fetched, appointments with status such as reschedule_needed & completed are needed to be ignored.
The status on-hold is for first five minutes when the appointment starts but the doctor doesn’t change the status. This scenario itself calls for cronjobs and event based automation. If you are also encountering such a scenario then consider implementing event listeners and handlers.
Here is the diagram what events how look like in the process considering patient side. you can also check out figjam link.
How it is implemented in this project?
Every table in postgres has its own module. So obviously the event implementation along with cronjobs is in appointment module. The node libraries which we will be using are EventEmitter2 from nestjs & node-cron.
The cronjob is implemented in class where it is called in constructor. What are we exactly doing here? We are fetching all the appointments which are 2 hours after current time. Then we filter them accordingly. We filter to get appointments that have start before 5 mins and appointments that are in between 5 to 15 mins according to that status. Status scheduled and rescheduled are considered for 5 mins & status on-hold is considered for 5 to 15 mins. We also filter on-going appointments in that timeline!
Here is the code, please take a look for yourself
// appointment.service.ts
import { Injectable } from '@nestjs/common';
import { Appointment } from './appointment.entity';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import {
bulkUpdateDTO,
createAppointmentDTO,
updateAppointmentDTO,
} from './dtos/appointment.dto';
import { DoctorService } from '../doctor/doctor.service';
import { EventEmitter2 } from '@nestjs/event-emitter';
import * as cron from 'node-cron';
import { Between } from 'typeorm';
@Injectable()
export class AppointmentService {
minsForCron = '5';
constructor(
@InjectRepository(Appointment)
private readonly appointmentRepository: Repository<Appointment>,
private readonly doctorService: DoctorService,
private eventEmitter: EventEmitter2,
) {
this.scheduleTableCleanUp();
}
async scheduleTableCleanUp() {
const limit = 10;
const currentDate = new Date();
cron.schedule(`${this.minsForCron} * * * *`, async () => {
const twoHoursAhead = new Date(
currentDate.getTime() + 2 * 60 * 60 * 1000,
);
const statuses = ['scheduled', 'rescheduled', 'on-hold', 'on-going'];
const twoHoursAheadAppointment = await this.appointmentRepository
.createQueryBuilder('appointment')
.where('appointment.appointment_date_time <= :twoHoursAhead', {
twoHoursAhead,
})
.andWhere('appointment.status IN (:...statuses)', { statuses })
.limit(limit)
.getMany();
const newStatusForOnHold = ['scheduled', 'rescheduled'];
const fiveMinAhead = new Date(currentDate.getTime() + 5 * 60 * 1000);
const fifteenMinAhead = new Date(currentDate.getTime() + 15 * 60 * 1000);
const inprogressAppointment = twoHoursAheadAppointment.filter(
(appointment) => appointment.status === 'on-going',
);
const fiveMinsBelowAppointment: Appointment[] = [];
const fiveToFifteenAppointment: Appointment[] = [];
const otherAppointments = twoHoursAheadAppointment.filter(
(appointment) => {
let addToOthers = true;
if (
currentDate <= appointment.appointment_date_time &&
appointment.appointment_date_time <= fiveMinAhead &&
newStatusForOnHold.includes(appointment.status)
) {
addToOthers = false;
fiveMinsBelowAppointment.push(appointment);
} else if (
appointment.appointment_date_time > fiveMinAhead &&
appointment.appointment_date_time <= fifteenMinAhead &&
appointment.status === 'on-hold'
) {
addToOthers = false;
fiveToFifteenAppointment.push(appointment);
}
return appointment.status !== 'on-going' && addToOthers;
},
);
this.eventEmitter.emit(
'cron.job.update.two.hrs.ahead.appointment',
otherAppointments,
);
this.eventEmitter.emit(
'cron.job.update.two.hrs.ahead.ongoing.appointment',
inprogressAppointment,
);
this.eventEmitter.emit(
'cron.job.update.five.minutes.below.ongoing.appointment',
fiveMinsBelowAppointment,
);
this.eventEmitter.emit(
'cron.job.update.five.to.fifteen.minutes.below.ongoing.appointment',
fiveToFifteenAppointment,
);
});
}
// Other methods in appointment.service.ts
}
Four events we currently emit. These are 'cron.job.update.two.hrs.ahead.appointment', 'cron.job.update.two.hrs.ahead.ongoing.appointment', 'cron.job.update.five.minutes.below.ongoing.appointment' and 'cron.job.update.five.to.fifteen.minutes.below.ongoing.appointment'. We also pass appointment data with the events. For each event we bulk update appointments status
The event listening part is where we are handling all the events which we receive from appointment service & we bulk update the status accordingly! Here is how we are updating according to events. There is a separate file for it, it is named as ‘appointmentcron.service.ts‘.
Event | What status we update it to in bulk update |
'cron.job.update.five.minutes.below.ongoing.appointment' | on-hold |
'cron.job.update.five.to.fifteen.minutes.below.ongoing.appointment' | reschedule_needed |
'cron.job.update.two.hrs.ahead.appointment' | reschedule_needed |
'cron.job.update.two.hrs.ahead.ongoing.appointment', | completed |
// appointmentcron.service.ts
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Appointment } from './appointment.entity';
import { Repository } from 'typeorm';
import { OnEvent } from '@nestjs/event-emitter';
@Injectable()
export class AppointmentCronService {
constructor(
@InjectRepository(Appointment)
private readonly appointmentRepository: Repository<Appointment>,
) {}
@OnEvent('cron.job.update.two.hrs.ahead.appointment')
async handleCronJobForTwoHrsAheadAppointment(payload: [Appointment]) {
const appointmentIds = payload.map((appointment) => appointment.id);
await this.appointmentRepository
.createQueryBuilder()
.update(Appointment)
.set({ status: 'reschedule_needed' })
.whereInIds(appointmentIds)
.execute();
}
@OnEvent('cron.job.update.five.minutes.below.ongoing.appointment')
async handleCronJobForFiveMinsAheadAppointments(payload: [Appointment]) {
const appointmentIds = payload.map((appointment) => appointment.id);
await this.appointmentRepository
.createQueryBuilder()
.update(Appointment)
.set({ status: 'on-hold' })
.whereInIds(appointmentIds)
.execute();
}
@OnEvent('cron.job.update.two.hrs.ahead.ongoing.appointment')
async handleCronForTwoHrsAheadOnGoingAppointment(payload: [Appointment]) {
const appointmentIds = payload.map((appointment) => appointment.id);
await this.appointmentRepository
.createQueryBuilder()
.update(Appointment)
.set({ status: 'completed' })
.whereInIds(appointmentIds)
.execute();
}
@OnEvent('cron.job.update.five.to.fifteen.minutes.below.ongoing.appointment')
async handleCronJobForFiveToFifteenMinsAheadAppointments(
payload: [Appointment],
) {
const appointmentIds = payload.map((appointment) => appointment.id);
await this.appointmentRepository
.createQueryBuilder()
.update(Appointment)
.set({ status: 'reschedule_needed' })
.whereInIds(appointmentIds)
.execute();
}
}
Here is a brief diagram of how it is happening. You can also view it in the same figjam.
Now we have to make sure that appointment module has AppointmentCronService in providers
// appointment.module.ts
import { Module } from '@nestjs/common';
import { Appointment } from './appointment.entity';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AppointmentResolver } from './appointment.resolver';
import { AppointmentService } from './appointment.service';
import { DoctorModule } from '../doctor/doctor.module';
import { AppointmentCronService } from './appointmentcron.service';
@Module({
imports: [TypeOrmModule.forFeature([Appointment]), DoctorModule],
controllers: [],
providers: [AppointmentService, AppointmentResolver, AppointmentCronService],
exports: [AppointmentService],
})
export class AppointmentModule {}
For the final change we need to ensure that appointment module and Event Emitter Module is imported in app module
// app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ConfigModule, ConfigService } from '@nestjs/config';
import configuration from './configuration/configuration';
import { LoggerModule } from 'nestjs-rollbar';
import { TypeOrmModule } from '@nestjs/typeorm';
import { typeOrmAsyncConfig } from './configs';
import { UsersModule } from './modules/users/users.module';
import { AuthModule } from './modules/auth/auth.module';
import { JwtModule } from '@nestjs/jwt';
import { APP_GUARD } from '@nestjs/core';
import { JwtAuthGuard } from './modules/auth/jwt-auth.guard';
import { GraphqlModule } from './graphql/graphql.module';
import { RolesModule } from './modules/roles/roles.module';
import { PatientModule } from './modules/Patient/patient.module';
import { TimingModule } from './modules/timings/timings.module';
import { SupportTicketsModule } from './modules/SupportTickets/supporttickets.module';
import { FeedbackModule } from './modules/feedback/feedback.module';
import { AppointmentModule } from './modules/appointment/appointment.module';
import { ChatModule } from './modules/Chat/chat.module';
import { EventEmitterModule } from '@nestjs/event-emitter';
@Module({
imports: [
ConfigModule.forRoot({
load: [configuration],
isGlobal: true,
}),
LoggerModule.forRootAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
accessToken: configService.get('rollbar.accessToken'),
environment: configService.get('rollbar.environment'),
captureUncaught: true,
captureUnhandledRejections: true,
ignoreDuplicateErrors: false,
}),
inject: [ConfigService],
}),
TypeOrmModule.forRootAsync(typeOrmAsyncConfig),
EventEmitterModule.forRoot({
maxListeners: 10,
verboseMemoryLeak: true,
}),
AuthModule,
UsersModule,
GraphqlModule,
RolesModule,
PatientModule,
TimingModule,
SupportTicketsModule,
FeedbackModule,
AppointmentModule,
ChatModule,
JwtModule.register({
secret: 'x&92Kv^Zc7b9@JN5Q',
signOptions: { expiresIn: '1h' },
}),
],
controllers: [AppController],
providers: [
AppService,
{
provide: APP_GUARD,
useClass: JwtAuthGuard,
},
],
})
export class AppModule {}
Project Github Link: https://github.com/Daily-Utils/schedular
Api Documentation for the project: https://c1jfsswjgp.apidog.io/
Database Diagram / Schema for the project: https://dbdocs.io/ssiddharthsutar/schedular
Quickie For Those Who Don’t want to read all
Here are the step that we followed
Write cron in a service along with event emitter of nest ( check appointment service code for it)
Call the function in constructor of the service
Write an another service in the same module for event handling ( check appointmentcron service code for it)
Import both services as providers in the respective module
Add that module and event emitter module in app module
Final Section
It is all possible because of Ravi Teja Akella my teammate during this project. A huge thanks to him
I’m also making a project related to authentication using rust so stay tuned for that
Thank you for your time!
Reference
Nestjs events documentation: https://docs.nestjs.com/techniques/events#dispatching-events
Typeorm docs: https://orkhan.gitbook.io/typeorm/docs/update-query-builder
Subscribe to my newsletter
Read articles from Siddharth Sutar directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Siddharth Sutar
Siddharth Sutar
A dev ready with dev tools