Debezium Server with RabbitMQ

Manish AgrawalManish Agrawal
3 min read

Overview

Debezium Server: Debezium Server is a lightweight runtime that can stream changes from various databases to a range of downstream systems like message brokers, databases, or other services. It is an alternative to Kafka Connect and allows the use of different messaging systems.

Configuration

Debezium Server uses Micro-Profile Configuration for configuration. This means that the application can be configured from disparate sources like configuration files, environment variables, system properties etc.

The main configuration file is conf/application.properties.

# Sink connector config - RabbitMQ
debezium.sink.type=rabbitmq
debezium.sink.rabbitmq.connection.host=localhost
debezium.sink.rabbitmq.connection.port=5672
debezium.sink.rabbitmq.connection.username=guest
debezium.sink.rabbitmq.connection.password=guest
debezium.sink.rabbitmq.connection.virtual.host=vhost
debezium.sink.rabbitmq.ackTimeout=3000
debezium.sink.rabbitmq.exchange=test_topic_exchange
debezium.sink.rabbitmq.routingKey=test_topic_routing_key

# Source connector config - PostgreSQL
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.plugin.name=pgoutput
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=5433
debezium.source.database.user=postgres
debezium.source.database.password=root
debezium.source.database.dbname=source_db
debezium.source.topic.prefix=test_topic_prefix

# Format config
debezium.format.key=json
debezium.format.value=json

# Quarkus
quarkus.log.console.json=false

Docker compose:

services:

  debezium:
    image: quay.io/debezium/server:2.5
    container_name: debezium
    healthcheck:
      test: curl http://debezium:8997/q/health || exit 1
      interval: 5s
      timeout: 5s
      retries: 5
    ports:
      - "8997:8997"
    volumes:
      - ./debezium_conf:/debezium/conf

Running RabbitMQ as a Docker container

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management

Consumer Service:

A separate application/service that reads changes from RabbitMQ and writes them to the destination database.

To achieve Change Data Capture (CDC) when the destination is another database using Debezium Server and RabbitMQ, you need to set up the process where the changes captured from the source database (PostgreSQL) are published to RabbitMQ, and then a consumer service reads these changes from RabbitMQ and writes them to the destination database.

Here's a sample Java code for a consumer service that reads change events from RabbitMQ and writes them to a destination PostgreSQL database.

import com.rabbitmq.client.*;
import org.json.JSONObject;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.TimeoutException;

public class RabbitMQConsumer {
    private static final String RABBITMQ_HOST = "localhost";
    private static final int RABBITMQ_PORT = 5672;
    private static final String RABBITMQ_USERNAME = "guest";
    private static final String RABBITMQ_PASSWORD = "guest";
    private static final String RABBITMQ_VIRTUAL_HOST = "vhost";
    private static final String EXCHANGE_NAME = "test_topic_exchange";
    private static final String ROUTING_KEY = "test_topic_routing_key";
    private static final String QUEUE_NAME = "cdc_queue";

    private static final String DB_URL = "jdbc:postgresql://localhost:5432/destination_db";
    private static final String DB_USER = "postgres";
    private static final String DB_PASSWORD = "root";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(RABBITMQ_HOST);
        factory.setPort(RABBITMQ_PORT);
        factory.setUsername(RABBITMQ_USERNAME);
        factory.setPassword(RABBITMQ_PASSWORD);
        factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Received: " + message);

            JSONObject jsonObject = new JSONObject(message);
            long id = jsonObject.getLong("id");
            String firstName = jsonObject.getString("first_name");
            String lastName = jsonObject.getString("last_name");

            try (Connection dbConnection = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD)) {
                String sql = "INSERT INTO destination_table (id, first_name, last_name) VALUES (?, ?, ?) ON CONFLICT (id) DO UPDATE SET first_name = EXCLUDED.first_name, last_name = EXCLUDED.last_name";
                try (PreparedStatement statement = dbConnection.prepareStatement(sql)) {
                    statement.setLong(1, id);
                    statement.setString(2, firstName);
                    statement.setString(3, lastName);
                    statement.executeUpdate();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        };

        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

Conclusion

This guide provides an overview of using Debezium Server with RabbitMQ for CDC, highlighting key concepts, configuration properties, and use cases to help you get started with this powerful combination for real-time data streaming and processing.

References

0
Subscribe to my newsletter

Read articles from Manish Agrawal directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Manish Agrawal
Manish Agrawal

Over 15 Years of Expertise in Software Development and Engineering I specialize in delivering innovative solutions across diverse programming languages, platforms, and architectures. 💡 Technical Expertise Backend: Node.js (Nest.js, Express.js), Java (Spring Boot), PHP (Laravel, CodeIgniter, YII, Phalcon, Symphony, CakePHP) Frontend: React, Angular, Vue, TypeScript, JavaScript, Bootstrap, Material design, Tailwind CMS: WordPress, MediaWiki, Moodle, Strapi Headless, Drupal, Magento, Joomla DevOps & Cloud: AWS, Azure, GCP, OpenShift, CI/CD, Docker, Kubernetes, Terraform, Ansible, GitHub Actions, Gitlab CI/CD, GitOps, Argo CD, Jenkins, Shell Scripting, Linux Observability & Monitoring: Datadog, Prometheus, Grafana, ELK Stack, PowerBI, Tableau Databases: MySQL, MariaDB, MongoDB, PostgreSQL, Elasticsearch Caching: Redis, Mamcachad Data Engineering & Streaming: Apache NiFi, Apache Flink, Kafka, RabbitMQ API Design: REST, gRPC, GraphQL Principles & Practices: SOLID, DRY, KISS, TDD Architectural Patterns: Microservices, Monolithic, Microfronend, Event-Driven, Serverless, OOPs Design Patterns: Singleton, Factory, Observer, Repository, Service Mesh, Sidecar Pattern Project Management: Agile, JIRA, Confluence, MS Excel Testing & Quality: Postman, Jest, SonarQube, Cucumber Architectural Tools: Draw.io, Lucid, Excalidraw 👥 Versatile Professional From small-scale projects to enterprise-grade solutions, I have excelled both as an individual contributor and as part of dynamic teams. 🎯 Lifelong Learner Beyond work, I’m deeply committed to personal and professional growth, dedicating my spare time to exploring new technologies. 🔍 Passionate about Research & Product Improvement & Reverse Engineering I’m dedicated to exploring and enhancing existing products, always ready to take on challenges to identify root causes and implement effective solutions. 🧠 Adaptable & Tech-Driven I thrive in dynamic environments and am always eager to adapt and work with new and emerging technologies. 🌱 Work Culture I Value I thrive in environments that foster autonomy, respect, and innovation — free from micromanagement, unnecessary bureaucracy. I value clear communication, open collaboration, self organizing teams,appreciation, rewards and continuous learning. 🧠 Core Belief I believe every problem has a solution—and every solution uncovers new challenges to grow from. 🌟 Let's connect to collaborate, innovate, and build something extraordinary together!