Building a Cron Scheduler with RabbitMQ in Go


Introduction
This project is about a cron scheduler service which schedules cron jobs that are created using a dashboard interface. It publishes the jobs to the consumer service through RabbitMQ job queue, where it acknowledges successful job execution and retries for upto 3 times on job failures. And thus updates the job status in PostgreSQL database for tracking through the dashboard interface.
Some Theory
Cron jobs
: These are jobs/tasks which are scheduled to run at a specific time. They can be configured to run only at a given specific time, repeat every day at a specific time, repeat every x hours, etc by usingcron expressions
.RabbitMQ
: It is an open-sourcemessage broker
that allows two services to communicate asynchronously. It provides a queue through which two separate services can communicate by sending messages. This is one of the best communication methods used by distributed systems. We will be using the AMQP (Advanced Message Queuing Protocol) in this project supported by RabbitMQ.AMQP
: The Advanced Message Queuing Protocol is an open standard protocol wheremessages
(data) are published throughexchanges
toqueues
. The queues are connected with exchanges usingbindings
which define the exchanges to which queue the data should be sent. Then other service can consume the messages over the queue
Project Architecture
This project will have two main components:
scheduler
This will host our dashboard and schedule the cron jobs. On cron timings, the jobs will be published to the queue.consumer
This will consume messages from the queue and process them based on the job type. On success, it will register in PostgreSQL DB. On failure, it will try for up to maximum 3 times. If job fails for 3 retries, it will register as permanently_failed in the DB. This will alsoACK
(acknowledge) orNACK
(un-acknowledge and put back to queue) the jobs depending on the results.
Find this project here:
Note:
The dashboard in this project has pages to display basic stats, listing all of the scheduled jobs and listing job run entries with status (running/completed/failed) which are not explained in this blog.
The main purpose of this blog is to only explain the code for core logic of cron job scheduling, publishing to and consuming from RabbitMQ, processing the job based on type (ping, email, slack, webhook) with acknowledgement & retries and registering everything on the PostgreSQL database.
To understand the project structure used here, please refer to this blog: Effective Project Structure for Backend Projects in Go written by me!
Scheduling cron job & Publishing to RabbitMQ
In scheduler service’s apiService.go
file, which is located at /scheduler/internal/services/apiService.go
type ApiService struct {
db *sql.DB
cron *cron.Cron
rabbitmq *amqp091.Connection
mqChannel *amqp091.Channel
}
func (s *ApiService) CreateNewJob(job *models.Job) error {
// first putting this job in db
query := `INSERT INTO jobs(cron_expr, type, payload)
VALUES ($1, $2, $3) RETURNING id;
`
payloadJSON, _ := json.Marshal(job.Payload)
var jobId int
if err := s.db.QueryRow(query, job.CronExpr, job.Type, payloadJSON).Scan(&jobId); err != nil {
return err
}
job.Id = jobId
// scheduling a cron job
id, err := s.cron.AddFunc(job.CronExpr, func() {
log.Println("running cron job: publishing to rabbitmq")
q, err := s.mqChannel.QueueDeclare("cron_events", false, false, false, false, nil)
if err != nil {
log.Println("failed creating a queue for rabbitmq: ", err.Error())
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
jsonBody, err := json.Marshal(map[string]any{
"job": job,
"time": time.Now().Format("2006-01-02T15:04:05.000000-07:00"),
})
if err != nil {
log.Println("failed creating payload for rabbitmq: ", err.Error())
return
}
if err := s.mqChannel.PublishWithContext(ctx, "", q.Name, false, false, amqp091.Publishing{
ContentType: "application/json",
Body: jsonBody,
}); err != nil {
log.Println("failed publishing to rabbitmq: ", err.Error())
return
}
log.Println("event published to rabbitmq!")
})
if err != nil {
// on cron scheduling error, deleting the job created in db
delQuery := `DELETE FROM jobs WHERE id = $1`
s.db.Exec(delQuery, jobId)
return err
}
updateQuery := `UPDATE jobs SET cron_id = $1 WHERE id = $2`
s.db.Exec(updateQuery, id, jobId)
job.CronId = int(id)
return nil
}
This is a service function that we have created for our ApiService
and called from /api/create
API’s handler.
So, we first insert the job into the jobs
table
query := `INSERT INTO jobs(cron_expr, type, payload)
VALUES ($1, $2, $3) RETURNING id;
`
payloadJSON, _ := json.Marshal(job.Payload)
var jobId int
if err := s.db.QueryRow(query, job.CronExpr, job.Type, payloadJSON).Scan(&jobId); err != nil {
return err
}
job.Id = jobId
After inserting, we update id
of the job
variable.
Then we schedule the cron job using robfig/cron
package. We have loaded the cron instance from this package into our ApiService
.
The cron job is scheduled using AddFunc()
function from the package, which takes two parameters, the cron expression and the function to run for this cron.
The cron expression we use is:
x y * * *
Where x
is minute and y
is hour to repeat every day. This cron expression is inside our job
model which we access as job.CronExpr
.
// scheduling a cron job
id, err := s.cron.AddFunc(job.CronExpr, func() {
log.Println("running cron job: publishing to rabbitmq")
q, err := s.mqChannel.QueueDeclare("cron_events", false, false, false, false, nil)
if err != nil {
log.Println("failed creating a queue for rabbitmq: ", err.Error())
return
}
Inside the function that will be invoked at cron time,
We create a queue for the RabbitMQ using rabbitmq/amqp091-go
package’s QueueDeclare()
function.
We have stored the RabbitMQ’s channel inside our ApiService
as mqChannel
which is of type *amqp091.Channel
.
So we create the queue by passing name of the queue which in this case is cron_events
, and then check for any errors in creating of the queue.
This creates the queue(if not already exists) on which the consumer will get to consume the messages. Then we prepare the payload to send in the message over queue.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
jsonBody, err := json.Marshal(map[string]any{
"job": job,
"time": time.Now().Format("2006-01-02T15:04:05.000000-07:00"),
})
if err != nil {
log.Println("failed creating payload for rabbitmq: ", err.Error())
return
}
Here, we first created a context with 5 seconds timeout to be used for publishing the message to queue. It is important in case it takes more than 5 seconds to publish the message.
Then we create a json with job
key that will store our job model’s json data and time
key which stores the current schedule time of this cron job.
Then we check for any errors in creating the payload body and continue to publish the message
if err := s.mqChannel.PublishWithContext(ctx, "", q.Name, false, false, amqp091.Publishing{
ContentType: "application/json",
Body: jsonBody,
}); err != nil {
log.Println("failed publishing to rabbitmq: ", err.Error())
return
}
We publish the message to queue using PublishWithContext()
function on mqChannel
and pass the timeout context, keep the default ““
exchange and provide queue’s name.
In this, we have also passed our payload using amqp091.Publishing{}
where we have mentioned the ContentType
and Body
.
We then check for any errors in publishing the message to queue.
Then after scheduling the cron job and defining the function to run for cron, we check for any errors in scheduling of cron job
if err != nil {
// on cron scheduling error, deleting the job created in db
delQuery := `DELETE FROM jobs WHERE id = $1`
s.db.Exec(delQuery, jobId)
return err
}
If there were errors, then we delete the DB entry we made for this job earlier.
updateQuery := `UPDATE jobs SET cron_id = $1 WHERE id = $2`
s.db.Exec(updateQuery, id, jobId)
job.CronId = int(id)
return nil
If there were no errors, we finally update the cron_id
of our job in the DB’s entry.
Listening on the RabbitMQ’s Job Queue & Consuming Jobs
In consumer service’s rabbitmqService.go
file, which is located at /consumer/internal/services/rabbitmqService.go
We have first created a RMQService
struct and a NewRMQService()
constructor.
type RMQService struct {
dbService *DBService
conn *amqp091.Connection
channel *amqp091.Channel
}
func NewRMQPService(db *sql.DB, conn *amqp091.Connection, channel *amqp091.Channel) *RMQService {
return &RMQService{
dbService: NewDBService(db),
conn: conn,
channel: channel,
}
}
This struct holds the db connection dbService
, the RabbitMQ’s connection conn
and the RabbitMQ’s channel channel
.
In the main.go
's main function, we create a new rabbitmq service’s instance
rabbitmqService := services.NewRMQService(cfg.Db, cfg.RabbitMQ, cfg.MQChannel)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rabbitmqService.Start(ctx)
After creating the rabbitmq service’s instance, we define a WithCancel
context and call its cancel()
function on defer of main function to gracefully close the RabbitMQ’s connection when the consumer service shuts down. Then we call the Start()
function.
func (s *RMQService) Start(ctx context.Context) {
q, err := s.channel.QueueDeclare("cron_events", false, false, false, false, nil)
if err != nil {
log.Println("error in creating rabbitmq queue: ", err.Error())
return
}
msgs, err := s.channel.Consume(q.Name, "", false, false, false, false, nil)
if err != nil {
log.Println("error in creating a consume channel for rabbitmq: ", err.Error())
return
}
go func() {
for {
select {
case <-ctx.Done():
log.Println("Stopping rabbitmq...")
return
case msg, ok := <-msgs:
if !ok {
log.Println("RabbitMQ message channel is closed.")
return
}
processMessage(&msg, s.dbService)
}
}
}()
log.Println("RabbitMQ Consumer Running: Waiting for messages...")
<-ctx.Done()
}
In Start function, we first declare the queue using QueueDeclare()
and then call the Consume()
function.
msgs, err := s.channel.Consume(q.Name, "", false, false, false, false, nil)
if err != nil {
log.Println("error in creating a consume channel for rabbitmq: ", err.Error())
return
}
In this, we pass the queue’s name and mentioned autoAck as false. So that we can manually ACK
and NACK
the job messages for retries.
go func() {
for {
select {
case <-ctx.Done():
log.Println("Stopping rabbitmq...")
return
case msg, ok := <-msgs:
if !ok {
log.Println("RabbitMQ message channel is closed.")
return
}
processMessage(&msg, s.dbService)
}
}
}()
log.Println("RabbitMQ Consumer Running: Waiting for messages...")
<-ctx.Done()
Then we run a goroutine in which we run an infinite for loop which basically has a select
statement which is waiting on the cancel context’s cannel and RabbitMQ’s consume channel.
When the application shuts down, close()
of the context is called and a value is received on ←ctx.Done()
which returns from this function to close the RabbitMQ’s connection.
When a message is published on the queue by the scheduler
service, that message is received on msgs
channel which is fetched by ←msgs
case and the message is stored in msg
Then we call the processMessage()
function to further parse the job’s payload and process the job.
Processing the Job
func processMessage(msg *amqp091.Delivery, dbService *DBService) {
log.Println("Received message on RabbitMQ channel: ", string(msg.Body))
// parsing message body which has keys "job"[Job json] and "time"[Schedule time string]
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
log.Println("error in extracting message payload: ", err.Error())
msg.Ack(false)
return
}
// parsing job json from the message body json
jobBody, ok := body["job"].(map[string]any)
if !ok {
log.Println("error in extracting job json: ", body["job"])
msg.Ack(false)
return
}
// parsing scheduled time from the message body json
sTime, ok := body["time"].(string)
if !ok {
log.Println("error in extracting scheduled time: ", body["time"])
msg.Ack(false)
return
}
// converting the job json to bytes, to convert it to models.Job
jobBodyByte, err := json.Marshal(jobBody)
if err != nil {
log.Println("error in parsing job json: ", err.Error())
msg.Ack(false)
return
}
var job *models.Job
if err := json.Unmarshal(jobBodyByte, &job); err != nil {
log.Println("error in extracting job: ", err.Error())
msg.Ack(false)
return
}
// get retries of any existing job entry for this job id, scheduled time which was not failed
var jobEntry *models.JobEntry
j, err := dbService.getExistingJobEntry(job, sTime)
if err != nil {
log.Println("error in getting a job entry: ", err.Error())
msg.Ack(false)
return
}
if j != nil {
// if a job entry already exists, update that in jobEntry variable
jobEntry = j
} else {
createdEntry, err := dbService.createNewJobEntry(job, sTime)
if err != nil {
log.Println("error creating new entry in db: ", err.Error())
msg.Ack(false)
return
}
jobEntry = createdEntry
}
// checking if the retry count reached max retries
if jobEntry.Retries >= MaxJobRetries {
dbService.markJobAsPermanentlyFailed(jobEntry)
msg.Ack(false)
return
}
switch job.Type {
case "ping":
if err := processPingJob(dbService, job, jobEntry, sTime); err != nil {
handleJobError(dbService, err, msg, jobEntry)
return
}
case "email":
if err := processEmailJob(dbService, job, jobEntry, sTime); err != nil {
handleJobError(dbService, err, msg, jobEntry)
return
}
case "slack":
if err := processSlackJob(dbService, job, jobEntry, sTime); err != nil {
handleJobError(dbService, err, msg, jobEntry)
return
}
case "webhook":
if err := processWebhookJob(dbService, job, jobEntry, sTime); err != nil {
handleJobError(dbService, err, msg, jobEntry)
return
}
default:
handleJobError(dbService, fmt.Errorf("invalid event type: %s", job.Type), msg, jobEntry)
}
msg.Ack(false)
}
In this function, we first get the json body from message’s payload. Then we extract the job’s json from this body stored at key job
and scheduled time store at key time
.
Then we encode the job’s json to convert it to a Job
type variable stored in job *models.Job
.
If there are errors at any of the parsing step, we acknowledge the message using Ack(false)
. We pass multiple as false in Ack to avoid acknowledging any other prior deliveries.
We do this, because we can’t process the job further and have to remove this message from the queue by acknowledging.
// get retries of any existing job entry for this job id, scheduled time which was not failed
var jobEntry *models.JobEntry
j, err := dbService.getExistingJobEntry(job, sTime)
if err != nil {
log.Println("error in getting a job entry: ", err.Error())
msg.Ack(false)
return
}
if j != nil {
// if a job entry already exists, update that in jobEntry variable
jobEntry = j
} else {
createdEntry, err := dbService.createNewJobEntry(job, sTime)
if err != nil {
log.Println("error creating new entry in db: ", err.Error())
msg.Ack(false)
return
}
jobEntry = createdEntry
}
Then we call getExistingJobEntry()
function from dbService to check if there were any previous execution trials for this job at that specific scheduled time. If there was prior execution and this is a retry round, then we initiate the jobEntry
variable.
If there were no previous execution, we create a new job entry with running
status for that scheduled time in database using createNewJobEntry()
function and put it’s value in jobEntry
variable.
// checking if the retry count reached max retries
if jobEntry.Retries >= MaxJobRetries {
dbService.markJobAsPermanentlyFailed(jobEntry)
msg.Ack(false)
return
}
Then we check in case of retrying job, if the retry count has exceeded MaxJobRetries
i.e 3.
If yes, then we update that job with permanently_failed
status in database for that scheduled time using markJobAsPermanentlyFailed()
function and acknowledge the message to remove it from the queue and finally return from the function.
switch job.Type {
case "ping":
if err := processPingJob(dbService, job, jobEntry, sTime); err != nil {
handleJobError(dbService, err, msg, jobEntry)
return
}
case "email":
if err := processEmailJob(dbService, job, jobEntry, sTime); err != nil {
handleJobError(dbService, err, msg, jobEntry)
return
}
case "slack":
if err := processSlackJob(dbService, job, jobEntry, sTime); err != nil {
handleJobError(dbService, err, msg, jobEntry)
return
}
case "webhook":
if err := processWebhookJob(dbService, job, jobEntry, sTime); err != nil {
handleJobError(dbService, err, msg, jobEntry)
return
}
default:
handleJobError(dbService, fmt.Errorf("invalid event type: %s", job.Type), msg, jobEntry)
}
Then we switch-case on the job.Type
and call further task process functions based on the job type.
If we get error from the task process functions, we call handleJobError()
function and return.
func handleJobError(dbService *DBService, err error, msg *amqp091.Delivery, jobEntry *models.JobEntry) {
log.Println("error in processing job: ", err.Error(), ", retries: ", jobEntry.Retries)
time.Sleep(2 * time.Second)
dbService.markJobAsFailed(err, jobEntry.Retries+1, jobEntry)
msg.Nack(false, true)
}
In handleJobError()
function, we sleep for 2 seconds before retrying.
We mark the job run entry as failed
in database using markJobAsFailed()
function. The job entry’s status will move to permanently_failed
if it fails all 3 retries.
Then we negative acknowledge Nack()
the message and pass requeue as true to put this message back into the queue.
On success processing of the job inside of the task processing function, we use markJobAsCompleted()
function from dbService to update the job entry status to completed
.
msg.Ack(false)
Then finally in end of processMessage()
function, we Ack()
the message.
In this way, the processing of job is done after it was consumed!
Conclusion
In this blog, we have learned about how we can schedule cron jobs and publish them to RabbitMQ’s queue. And how these messages can be consumed from the queue.
And we also learned, after consuming how we can acknowledge and retry on job failures.
This blog contained parts from the full project I created to also provide the dashboard for creating jobs, listing all jobs and view the job run statuses as running
/completed
/failed
(for retries)/permanently_failed
.
Make sure to give it a go:
If you find this article helpful, don't forget to hit the ❤️ button.
Check out my website here and feel free to connect.
Happy Coding! 👨💻
Subscribe to my newsletter
Read articles from Aniket Yadav directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
