Using a MongoDB Collection as a Job Queue for Cron Jobs: Dart Implementation

Dinko MarinacDinko Marinac
8 min read

Introduction

During my experimentation with cloud functions in Dart, I had the need to implement cron jobs, also known as scheduled functions in Firebase. The result of this is my new package, called mongo_cron.

So, what are cron jobs and why do we use them?

Cron jobs are scheduled tasks that run automatically at specified intervals or times. They are commonly used in Unix-like operating systems for system maintenance or administration tasks. In the context of web applications and distributed systems, cron jobs can be used to perform periodic tasks such as sending emails, push notifications, generating reports, or creating database backups.

There are several ways cron jobs can be implemented in a backend:

  1. Operating system cron: Using the built-in cron daemon on Unix-like systems.
  2. Application-level scheduling: Implementing a scheduler within your application using a package like cron.
  3. Distributed job queues: Using message brokers like RabbitMQ or Redis to manage distributed cron jobs.
  4. Cloud Schedulers: Cloud providers like Google Cloud Platform offer a scheduler service
  5. Database-backed job queues: Utilizing a database table to store and manage cron jobs.

In this article, we'll focus on the last approach, using a database table as a job queue for cron jobs.

I'm using MongoDB in my experiment and I've found a Node.js library called mongodb-cron which inspired me to take the same approach to solving this problem instead of relying on an external mechanism for scheduling.

If you want to jump straight into action and start using the library, go to pub.dev.

Cron expressions

Cron expressions are used to define when a job should run. The standard crontab syntax consists of five fields, each representing a different time unit:

┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of the month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday)
│ │ │ │ │
│ │ │ │ │
* * * * *

Each field can contain:

  • A specific value (e.g., 5)
  • A range (e.g., 1-5)
  • A list of values (e.g., 1,3,5)
  • An asterisk (*) to represent any value
  • A step value (e.g., */15 for "every 15 units")

Here are some examples:

  • 0 0 * * *: Run at midnight every day
  • /15 * * * *: Run every 15 minutes
  • 0 9 * * 1-5: Run at 9 AM every weekday (Monday to Friday)
  • 0 0 1 1 *: Run at midnight on January 1st (New Year's Day)

💡 Tip: If you are new to cron expressions, I would recommend using a tool like Cronitor to create your cron expression

Model definition

I knew from the get-go that I wanted to create a scheduler that would enable me to just assign a function to a cron expression like this:

scheduler.schedule('* * * * *', () {
  //do something
});

Since we are using a MongoDB collection to save the data about the job, we must define a model which we will use.

Let's think about what properties a cron job must have:

  • id - required for the database
  • cron expression - required to know how frequently to run the job
  • next run time - when to run the job
  • a function to run

There is problem here. There isn't a way to save a function to the database, so we must think of another way to solve this. I chose the simplest one: giving the job a name. The name can be used to find the function assigned to the expression.

The name is useful for another reason: a change of the id, which might happen during the insertion and deletion of the documents in the collection. As long as name stays the same, the appropriate function can be executed.

Taking inspiration from the Node library I've also decided to add the following properties:

  • repeatUntil - a timestamp telling us when to stop executing the job
  • autoRemove - a boolean telling us whether to remove the job from the collection after we stop using it
  • data - map containing any data that the job might need for the next iteration

The result is the following model:

import 'package:mongo_dart/mongo_dart.dart';

class Job {
  final ObjectId id;
  final String name;
  final String cronExpression;
  final DateTime sleepUntil;
  final DateTime? repeatUntil;
  final bool autoRemove;
  final Map<String, dynamic> data;
}

ObjectId is a model that describes the id that MongoDB uses. You can find more info about it here.

Algorithm

The core of the library is the algorithm that uses a timer to periodically check for and execute jobs.

Here's how it works:

  1. Initialise a timer that triggers at regular intervals.
  2. When the ticker fires, attempt to lock the next available job in the MongoDB collection.
  3. If a job is successfully locked, execute it.
  4. After execution, reschedule the job based on its cron expression.
  5. Repeat the process.

Here's a simplified version of the main logic:

Future<void> _tick() async {
  if (!_running) return;

  _processing = true;
  try {
    final doc = await _lockNext();
    if (doc == null) {
      // No job available, wait and try again
      await Future.delayed(config.idleDelay);
    } else {
      final job = Job.fromMap(doc);
      await _executeJob(job);
      await _reschedule(job);
    }
  } catch (e) {
    // Handle errors
  }

  _processing = false;
  _scheduleTick();
}

This _tick method is the heart of our cron job scheduler. It attempts to lock and execute the next available job, then reschedules itself to run again.

Rescheduling

Rescheduling is the most interesting part of this system.

When a job is picked up by the system and successfully executed, it needs to be rescheduled for its next run. This happens in the _reschedule method:

Future<void> _reschedule(Job job) async {
  final now = DateTime.now();
  final nextRun = _getNextRun(job, now);

  if (nextRun == null && job.autoRemove) {
    await _collection.deleteOne({'_id': job.id});
    _handlers.remove(job.name);
  } else if (nextRun == null) {
    await _collection.updateOne(
      {'_id': job.id},
      {
        '\\$set': {'sleepUntil': null}
      },
    );
  } else {
    await _collection.updateOne(
      {'_id': job.id},
      {
        '\\$set': {'sleepUntil': nextRun}
      },
    );
  }
}

The system calculates the next run time based on the job's cron expression:

DateTime? _getNextRun(Job job, DateTime fromTime) {
  try {
    final schedule = UnixCronParser().parse(job.cronExpression);
    final next = schedule.next(fromTime).time;
    if (job.repeatUntil != null && next.isAfter(job.repeatUntil!)) {
      return null;
    }
    return next;
  } catch (e) {
    print('Invalid cron expression: ${job.cronExpression}');
    return null;
  }
}

I use the easy_cron package to parse the job's cron expression and calculate the next run time.

Next run time takes into consideration the repeatUntil property and returns null if the job doesn't need to be rescheduled.

Based on the calculated next run time, the job is updated in the MongoDB collection:

  • If there's a valid next run time, the job's sleepUntil field is updated with this new time.
  • If there's no valid next run time (e.g., the job has reached its repeatUntil date) and autoRemove is true, the job is deleted from the collection.
  • If there's no valid next run time and autoRemove is false, the job's sleepUntil is set to null, effectively deactivating the job.

Job Selection

When the system looks for the next job to run, it uses a query which selects jobs where sleepUntil is less than or equal to the current time, ensuring that only jobs that are due (or overdue) are selected:

Future<Map<String, dynamic>?> _lockNext() async {
  final now = DateTime.now();
  final lockUntil = now.add(config.lockDuration);

  final result = await _collection.findAndModify(
    query: {
      'sleepUntil': {'\\$exists': true, '\\$lte': now},
    },
    update: {
      '\\$set': {'sleepUntil': lockUntil}
    },
    returnNew: false,
  );

  return result;
}

This comes with a useful side effect: the ability to handle system restarts gracefully. Since job information is persisted in the database, your application can recover its state after a restart.

If the system has been down or unable to process jobs for a period, it will naturally pick up any overdue jobs when it resumes operation. This is because the _lockNext method selects any job where sleepUntil is in the past.

Conclusion

Using a MongoDB collection as a job queue for cron jobs in Dart offers a robust and flexible solution for scheduling and executing periodic tasks in your applications. This approach provides several advantages:

  1. Persistence: Job information is stored in the database, allowing for easy recovery after system restarts or failures.
  2. Scalability: Multiple application instances (or isolates) can share the same job queue, enabling distributed processing of cron jobs.
  3. Flexibility: Jobs can be easily added, modified, or removed by interacting with the MongoDB collection.

The mongo_cron package encapsulates this functionality, providing a simple and intuitive API for scheduling and managing cron jobs in your Dart applications.

Whether you're building a small application or a large-scale distributed system, this approach to cron job management can provide a solid foundation for your scheduled task needs. As you implement this in your own projects, remember to consider factors such as database indexing, error handling, and monitoring to ensure optimal performance and reliability.

If you have found this useful, make sure to like and follow for more content like this. To know when the new articles are coming out, follow me on Twitter and LinkedIn.

Until next time, happy coding, and may your cron jobs always run on time!

0
Subscribe to my newsletter

Read articles from Dinko Marinac directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Dinko Marinac
Dinko Marinac

Mobile app developer and consultant. CEO @ MOBILAPP Solutions. Passionate about the Dart & Flutter ecosystem.