A Distributed Task Scheduler for 1M+ Daily Tasks

Introduction
This article is about how I upgraded my previous cron scheduler service that I built in Go for scaling it to around 1 million daily scheduled jobs. This lays out the overall system design, approach and considerations I took in building this system.
The Problem
I had created a cron scheduler service which uses robfig/cron
package to schedule cron jobs on the machine. This is good for limited number of jobs, but when the job numbers are increased to thousands and even millions, this system will not work.
As for scheduling the crons, the package uses goroutines and for this huge amount of jobs, it can easily overwhelm the hardware resource. Now there can be a solution to run multiple scheduler instances and divide the overall crons on these instances. But this brings another issue. When crons are scheduled on multiple machines, what if an instance suddenly crashes? The jobs scheduled on that machine will be lost!. We will then have to build some tracking mechanism to track each cron respective to the machine, which will bring further complexities. So have the job’s state on a server instance would not work.
So in this article, we discuss how to make a system that can have million jobs scheduled daily!
The Solution
I have referred multiple system design blogs & videos(Mayil’s Medium, Algomaster, Jordan’s YT) to design a distributed scheduler service, and came up with following points:
A job submission service(UI/API) will create new jobs and store them in a job store.
The job store will persist all of the job related data and the run history of the jobs with their status.
A scheduling service periodically queries the jobs table to get all tasks to run at current time.
These jobs will be then pushed to a distributed queue.
These jobs then will be consumed & processed by workers, which will update their job run status in job run history store.
The idea is that, instead of my approach of scheduling jobs on a machine, we can just poll the jobs table every minute to check all jobs that are required to run at that minute.
This is the big change! Instead of actually scheduling hardware cron jobs, we can just fetch from the job store every minute and process them.
After polling and fetching required jobs from DB, we can just publish them to the queue.
For this, the blogs have suggested using the NoSQL Cassandra DB due to it’s low latency and high read-write throughputs that fits well when we have to periodically poll on the DB.
For queueing jobs, we can use Kafka.
My Approach & Considerations
To keep our infra simple and considering limited resources, we will use:
PostgreSQL DB for storing jobs data and job run history.
Redis for queuing jobs in redis streams, managing distributed locks and storing job in sorted sets.
Since we are using PostgreSQL DB, we should consider the limitations of polling the DB every minute. Polling periodically can put heavy load on the DB and can increase the I/O ops thus affecting our single point of source for other operations.
To reduce load on DB, we can process jobs in batches. In this implementation, we will query the DB every 10 minutes to load all of the jobs for next 10 minutes temporarily. After 10 minutes, they will be removed from this temporary storage.
Now next question is where should we store the next 10 minute jobs efficiently?
Answer is Redis’s sorted sets!
Redis ZSET
First i had considered Redis lists, but going through the list for specific timestamp can take O(N) time. Which can be huge, considering a large number of jobs for a minute(e.g thousands of jobs per minute at spike).
Using Redis sorted set brings the advantage of scores! In sorted sets, the values are by default ordered based on scores. So we can use the timestamps as scores and this can bring faster access to the jobs when we try to fetch for specific timestamp.
For reading, it will take O(log N + M), where N is the total number of elements in set and M is the total elements returned. So internally, Redis first searches for the first element which satisfies range condition i.e >=min which takes O(log N) time like binary search. And once the node position is found, it takes O(M) time to linearly go through the M items and return them until the range is <= max.
Redis Stream & Consumer Group
For queuing jobs, we will use Redis streams. The workers will be a part of consumer group listening over this stream. Using consumer groups solves the problem of duplicate job execution, since the consumer group itself acts as a load balancer for multiple consumers in the group and distributes the job coming from the stream properly ensuring a job will be delivered to only one consumer.
Redis streams also allows to acknowledge the job messages using XACK, using which we can build our acknowledge and retry mechanism.
When a consumer gets a jobs and suddenly crashes without acknowledging, Redis allows us to claim back that job from a pending entries list. So any other consumer can claim, process and ack that job if the job has been there for too long in the pending list.
Architecture
This system consists of following components:
Dashboard
: The dashboard lets users create a new job, list all jobs and view job run entries with their status (completed, failed, permanently_failed). The dashboard interface services are served with a load balancer. On creation of a new job, an entry is made in thejobs
table.Scheduler
: The scheduler service has two sub-service - Batch processor and Publisher.
The Batch Processor polls the PostgreSQL DB every 10 mins with a Redis lock to query jobs for next 10 min range. After getting the jobs, it adds them to Redis’s sorted set (ZSET).
The Publisher runs every minute to fetch all jobs from ZSET for current time. It then publishes these jobs to Redis stream.Worker
: The worker services are part of a consumer group which gets jobs from the Redis stream and process them further. On success, it makes entry in thejob_runs
table as completed. On failure, it puts back the jobs to the stream for further retrying. On retrying for max 3 times, it marks the job as permanently failed and creates entry in thejob_runs
table.PostgreSQL DB
: PostgreSQL is used to store the job details injobs
table which is partitioned using the hour time of jobs. e.g a partition for storing jobs whose scheduled hour is 00 to 06, another for 07 to 12, etc. It stores the hour, minute, payload and retry count.
It has another tablejob_runs
to store the job run entries with their statuses (completed, failed, permanently_failed). It also stores the output (for completed runs) and error (for failed runs).Redis
: Redis is used for letting the Batch Processor of scheduler service to create a lock so that only one instance polls the DB.
Redis’s sorted set is used to store jobs with their timestamp as scores for fast access during every minute polling of the Publisher of scheduler service.
Redis Stream is used to queue the jobs.
As this is a time sensitive system, storing and processing of jobs are done at their UTC times across all services and DB.
Dashboard
The dashboard lets users create a new job, list all of the jobs and view run entries of a job.
Creating & Viewing jobs
: When a job is created from dashboard interface, the timezone of the client is also passed with the request. The schedule time is then converted to UTC and the job is inserted intojobs
table. Thejobs
table hasid
,hour
UTC converted,minute
UTC converted,type
(ping, email, slack, webhook),payload
,retries
count,created_at
andupdated_at
times.
Thejobs
table is partitioned using thehour
column in different hour range partitions. e.gjobs_00_to_06
for storing jobs whose UTC hour ranges between 00 and 06,jobs_07_to_12
for storing jobs whose UTC hour ranges between 07 and 12, etc.Viewing job run entries
: Job run entries are viewed using thejob_runs
table which stores theid
,job_id
,status
(completed, failed, permanently_failed),output
for success runs,error
for failed runs,scheduled_at
andcompleted_at
time.
Scheduler service
The Scheduler service has two main components: Batch Processor
and Publisher
:
Batch Processor
:
It polls the PostgreSQL DB every 10 mins to read jobs from the
jobs
table.For multiple scheduler service instances, it first tries to create a Redis lock so that only one scheduler instance polls the DB.
If the instance is unable to get the lock, it will exit and wait for next timer period.
The batch processor fetches jobs of next 10 mins at every 10s rounded time i.e 12:00, 12:10, 12:20,… it fetches jobs of next 10 min range i.e at 12:10, it fetches jobs for 12:11 to 12:20 range, at 12:20, it fetches jobs for 12:21 to 12:30, etc.On getting the Redis lock, jobs are queried. It pushes these jobs to the Redis’s sorted set (ZSET).
After pushing the jobs, it also clears all previous jobs if they were not processed in past 10 mins from ZSET. These lost jobs can be logged as lost/failed.
Here, ZSET is used because of its score mechanism. The jobs are stored in a sorted manner using their timestamps as score(sorting key). So, the insertion becomes O(log N) and fetching is O(log N + M).
In case of errors, the locked batch processor can run for maximum 3 times for retrying.
Publisher
:
It runs for every minute to get jobs from the ZSET for that specific time.
All jobs of current time are fetched in batches via all of the scheduler instances until there are no jobs available in ZSET for that current time.
The jobs are fetched and deleted at same time using a single lua script which runs on the redis server end to avoid same jobs getting fetched from different instances.
Then it publishes all of the jobs to Redis stream using
XAdd
.
Worker service
The worker services are part of a consumer group listening over this stream. Using consumer groups, we can solve the problem of duplicate job execution, since the consumer group ensures that a single job message is delivered to only one consumer.
When a worker is started, it first check for the consumer group, and if it is not present, it creates it.
Then we get the job from the stream in a batch of N using
XREADGROUP
. On every read, we wait for 5 seconds until there are N jobs in the stream. As soon as N jobs becomes available in the stream it fetches them. After 5 seconds, if there are less than N jobs, it still fetches them.These jobs are then processed based on their type which is present in their payload. For
ping
type, we make a GET call, foremail
type, we try to send an email usingmailtrap.io
service, forslack
type, we make a POST call to given slack url and forwebhook
type, we make the POST call from given url and body.If the job execution was success, we make an entry in the
job_runs
table withcompleted
status and reset theretries
count of that job to 0 injobs
table. Then that job message is acknowledged usingXAck
and deleted from the stream usingXDel
.If the job execution fails, we first get the
retries
count of that job fromjobs
table. If theretries
count is less than 3, we make an entry injob_runs
table withfailed
status and increment theretries
count of that job injobs
table. Then the job message is acknowledged and deleted from stream. Then a new entry of that job is published to the stream for further retries usingXAdd
.If on failed job execution, we get the
retries
count >= 3, we make an entry in thejob_runs
table withpermanently_failed
status and reset theretries
count to 0 for that job injobs
table. Then the job message is acknowledged and deleted from stream.
The worker can get very slow if there is a single process running on the worker machine. So we can make use of workerpools here. e.g running 10 goroutines in the workerpool to keep processing the jobs coming in redis stream concurrently.
Some Calculations, Testing & Observations
Since we are now targeting around a million scheduled jobs daily, so average tasks per minute would be
(1,000,000)/24×60 ≈ 695 jobs/minute.
i.e 6950 jobs on average in our 10 mins of batch processing time. But this is uniform distribution of tasks and in real system, the tasks would not be evenly distributed across time.
Let’s consider a single busiest 10 min range can have around 75K jobs burst where some mins can have spike of around 15K also.
So with these numbers, I tested the system deployed on ECS + Fargate:
I generated a python script using GPT - generate_insert.py
which gives me batch insertion query to create a load in the DB for testing. It takes the hour, start minute, end minute, a list of heavy minutes which will have N jobs and N - the spike count that the heavy minutes will have. It is well customizable.
Configurations:
1 dashboard instance
2 scheduler instances
4 worker instances
Fragate specs: .25 vCPU and .5 GB Memory.
Observations:
The batch processing of jobs from DB to ZSET and publishing of jobs from ZSET every minute to the stream was very quick. all happened within milliseconds.
On worker instances, the consuming of jobs was really quick at the start i.e around 1000 - 1500 jobs per second. But as it continued, due to requeuing of failed jobs(whose execution had 5s timeout) and waiting for 5 seconds until it’s failure and with overall 3 max retries for a failed job, the job execution time on worker machines exponentially grew.
The overall processing of this 10 min batch took around 13 mins where 10 goroutines were working in the workerpool on each worker machine, considering there were sudden spikes of 15k to 20k for some mins.
Without workerpools, it took more than 30 mins. So yes, increasing the hardware resource of a worker machine and adding more goroutines can definitely reduce the overall job execution time. Or else, we can also just horizontally scale the worker machines.
Improvements
Batch Processing failures
Considering a case when a scheduler instance fails after getting the lock, the batch of that 10 min range will never be processed.
We have set the lock expiration time for the batch to 2 mins. So to handle this, on completion of a batch processing, we can put a key in redis to mark the completion of that batch (e.g batch_completed_11_20 - i.e for 12:11 to 12:20) with expiration of 10 mins.
So In the batch processor, we can periodically check for every 2 mins if the batch of that 10 min range was processed or not.
Worker instance crashes
When a job is claimed by a consumer in the consumer group, it is also registered in a Pending Entries List (PEL) to keep track what jobs are not yet acknowledged using Xack
. So even if the consumer crashes or is gone from the group, those jobs remain in the PEL.
For handling this, we can run a periodic goroutine to get jobs from PEL whose time has crossed a certain threshold time, then claim them using XCLAIM
, process them, ack and remove them from the PEL.
Conclusion
So with this approach, we can definitely process thousands of jobs every minute by making the batch processor pick jobs for every 10 mins and publishing them to the redis stream every minute using ZSET’s fast insertion and retrieval.
It is really amazing to design and build a system that can handle such a huge workload. Still this is not fully reliable and fault tolerant, but additions can be built on this to make it one!
Find this project here:
If you find this article helpful, don't forget to hit the ❤️ button.
Check out my website here and feel free to connect.
Happy Coding! 👨💻
Subscribe to my newsletter
Read articles from Aniket Yadav directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
