RabbitMQ Delay Scheduler and TTL Implementation
data:image/s3,"s3://crabby-images/a9ad2/a9ad2955a906b5e4a77e818a29e6b6af17b93965" alt="Muhammad Sufiyan"
This document outlines the steps and concepts involved in implementing a delay scheduler and Time-To-Live (TTL) for messages in RabbitMQ. By the end of this guide, you will understand how to configure RabbitMQ to handle delayed message scheduling and TTL effectively.
Prerequisites
RabbitMQ Installed:
Ensure RabbitMQ is installed locally or running as a Docker container.
Enable the RabbitMQ Management Plugin for monitoring and debugging.
RabbitMQ Dockerfile (for custom plugins):
FROM rabbitmq:management COPY ./rabbitmq_delayed_message_exchange-4.0.2.ez /opt/rabbitmq/plugins/ RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Docker Compose Setup:
version: '3.8' services: rabbitmq: build: . ports: - "5672:5672" - "15672:15672" environment: RABBITMQ_DEFAULT_USER: guest RABBITMQ_DEFAULT_PASS: guest
Concepts
1. Delay Scheduler in RabbitMQ
RabbitMQ does not natively support delayed message scheduling out of the box. However, with the rabbitmq_delayed_message_exchange
plugin, you can achieve this functionality. This plugin introduces a new exchange type, x-delayed-message
, which allows messages to be delayed before being routed to a queue.
2. Time-To-Live (TTL)
TTL defines the lifespan of a message in RabbitMQ. If a message exceeds its TTL without being consumed, it is either dead-lettered (if a dead-letter exchange is configured) or dropped.
Implementation Steps
Step 1: Enable rabbitmq_delayed_message_exchange
(In our Case, we have done this through Docker, just run Docker compose up —» docker-compose-up —build)
Copy the plugin file
rabbitmq_delayed_message_exchange-4.0.2.ez
into the/opt/rabbitmq/plugins/
directory of your RabbitMQ container.Enable the plugin:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Restart the RabbitMQ service.
or
If we use Docker , then
Copy the plugin file
rabbitmq_delayed_message_exchange-4.0.2.ez
into the project directory any where.Enable the plugin:
docker-compose-up —build
Step 2: Create a Custom Service for RabbitMQ in Your Application
Below is an example RabbitMQService
implementation using Node.js and NestJS:
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import * as amqp from 'amqplib';
@Injectable()
export class RabbitMQService implements OnModuleInit, OnModuleDestroy {
private connection: amqp.Connection;
private channel: amqp.Channel;
private ready = false;
async onModuleInit() {
await this.connect();
}
private async connect() {
try {
this.connection = await amqp.connect('amqp://localhost:5672');
this.channel = await this.connection.createChannel();
this.ready = true;
console.log('RabbitMQ connection and channel established');
} catch (error) {
console.error('Error connecting to RabbitMQ', error);
}
}
private async checkConnection() {
if (!this.ready) {
console.log('Connection lost. Reconnecting...');
await this.reconnect();
}
}
private async reconnect() {
try {
await this.connect();
} catch (error) {
console.error('Reconnection failed', error);
}
}
public async sendMessage(queue: string, message: string) {
await this.checkConnection(); // Ensure connection before sending the message
if (!this.ready) {
throw new Error('RabbitMQ channel is not initialized.');
}
//For delay we used exchange
const exchange = 'delayed_exchange';
await this.channel.assertExchange(exchange, 'x-delayed-message', {
arguments: {
'x-delayed-type': 'direct',
},
});
await this.channel.assertQueue(queue, { durable: false });
//for exchange we will not use sendToQueue
// this.channel.sendToQueue(queue, Buffer.from(message));
// and we will use bind and publish for exchange
await this.channel.bindQueue(queue, exchange, '');
this.channel.publish(exchange, '', Buffer.from(message), {
headers: { 'x-delay': 10000 },
});
console.log(`Message sent to queue ${queue}: ${message}`);
}
async close() {
if (this.channel) {
await this.channel.close();
}
if (this.connection) {
await this.connection.close();
}
}
async onModuleDestroy() {
await this.close();
}
}
Step 3: Configure TTL in RabbitMQ Queues
TTL can be set when declaring a queue. Example:
await this.channel.assertQueue('my_queue', {
durable: true,
arguments: { 'x-message-ttl': 60000 }, // TTL of 60 seconds
});
Step 4: Run RabbitMQ and Application
Start RabbitMQ with Docker Compose:
docker-compose up --build
Start your Node.js application.
Testing the Implementation
Delay Scheduler
Send a delayed message:
await rabbitMQService.sendDelayedMessage('test_queue', 'Hello after 10 seconds!', 10000);
Observe the message appearing in the queue after the specified delay.
TTL
Set up a queue with a short TTL:
await this.channel.assertQueue('ttl_queue', { arguments: { 'x-message-ttl': 5000 }, });
Publish a message to the queue and observe it being removed after 5 seconds.
Debugging Tips
Use the RabbitMQ Management UI (
http://localhost:15672
) to monitor queues, exchanges, and messages.Check logs for errors related to exchange or queue declarations.
Conclusion
By enabling the rabbitmq_delayed_message_exchange
plugin and configuring TTL, RabbitMQ can effectively handle delayed message scheduling and message expiration. This setup ensures more control over message routing and processing in your applications.
Subscribe to my newsletter
Read articles from Muhammad Sufiyan directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
data:image/s3,"s3://crabby-images/a9ad2/a9ad2955a906b5e4a77e818a29e6b6af17b93965" alt="Muhammad Sufiyan"
Muhammad Sufiyan
Muhammad Sufiyan
As a former 3D Animator with more than 12 years of experience, I have always been fascinated by the intersection of technology and creativity. That's why I recently shifted my career towards MERN stack development and software engineering, where I have been serving since 2021. With my background in 3D animation, I bring a unique perspective to software development, combining creativity and technical expertise to build innovative and visually engaging applications. I have a passion for learning and staying up-to-date with the latest technologies and best practices, and I enjoy collaborating with cross-functional teams to solve complex problems and create seamless user experiences. In my current role as a MERN stack developer, I have been responsible for developing and implementing web applications using MongoDB, Express, React, and Node.js. I have also gained experience in Agile development methodologies, version control with Git, and cloud-based deployment using platforms like Heroku and AWS. I am committed to delivering high-quality work that meets the needs of both clients and end-users, and I am always seeking new challenges and opportunities to grow both personally and professionally.