Embedded Debezium using Java + PostgreSQL

Manish AgrawalManish Agrawal
3 min read

Overview

This guide provides a step-by-step process to set up Change Data Capture (CDC) using Debezium in embedded mode within a Java application, capturing changes from a PostgreSQL database source table and processing them without the need for Kafka and inserting the data to the destination table of the same database.

Prerequisites

  1. PostgreSQL database

  2. Java Development Kit (JDK)

  3. Apache Maven

  4. Debezium dependencies

PostgreSQL Configuration

Ensure that your PostgreSQL configuration supports logical replication:

wal_level = logical
max_replication_slots = 4 
max_wal_senders = 4

Debezium Configuration

Create a properties file embedded_debezium.properties with the following content:

name=embedded-debezium-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.flush.interval.ms=60000
database.hostname=localhost
database.port=5433
database.user=postgres
database.password=root
database.dbname=postgres
database.server.name=embedded-debezium
debezium.source.plugin.name=pgoutput
plugin.name=pgoutput
database.server.id=1234
topic.prefix=embedded-debezium
schema.include.list=public
table.include.list=public.source_db

Maven Project Setup

Create a pom.xml file with the following content:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <version.debezium>2.3.1.Final</version.debezium>
        <logback-core.version>1.2.11</logback-core.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-api</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-embedded</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-postgres</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-storage-jdbc</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback-core.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>${logback-core.version}</version>
        </dependency>
    </dependencies>
<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.example.Application</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Project Structure

Create the following directory structure for your project:

mkdir -p src/main/java/org/example mkdir -p src/main/resources

Java Code

Create the main application class Application.java in src/main/java/org/example:

package org.example;

import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.ChangeEventFormat;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class Application {

    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();

        try(final InputStream stream = Application.class.getClassLoader().getResourceAsStream("embedded_debezium.properties")) {
            properties.load(stream);
        }
        properties.put("offset.storage.file.filename",new File("offset.dat").getAbsolutePath());

        var engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
                .using(properties)
                .notifying(new CustomChangeConsumer())
                .build();
        engine.run();

    }
}

CustomChangeConsumer.java

package org.example;

import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.kafka.connect.data.Struct;

public class CustomChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {
    private static final String DB_URL = "jdbc:postgresql://localhost:5433/postgres";
    private static final String DB_USER = "postgres";
    private static final String DB_PASSWORD = "root";

    @Override
    public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException {
        try (Connection connection = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD)) {
            for (RecordChangeEvent<SourceRecord> recordChangeEvent : records) {
                SourceRecord record = recordChangeEvent.record();

                Struct keyStruct = (Struct) record.key();
                long id = keyStruct.getInt64("id");

                Struct valueStruct = (Struct) record.value();
                Struct afterStruct = valueStruct.getStruct("after");
                String firstName = afterStruct.getString("first_name");
                String lastName = afterStruct.getString("last_name");

                String sql = "INSERT INTO destination_db (id, first_name, last_name) VALUES (?, ?, ?)";
                try (PreparedStatement statement = connection.prepareStatement(sql)) {
                    statement.setLong(1, id);
                    statement.setString(2, firstName);
                    statement.setString(3, lastName);
                    statement.executeUpdate();
                } catch (SQLException e) {
                    e.printStackTrace();
                }

                committer.markProcessed(recordChangeEvent);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }

        committer.markBatchFinished();
    }
}

Building and Running the Application

Build the project:

mvn clean install

Package the application:

mvn clean package

Run the application:

java -cp target/debezium-embedded-1.0-SNAPSHOT.jar org.example.Application

Conclusion:

This guide demonstrates the use of Debezium's embedded mode within a Java application for capturing and processing changes from a PostgreSQL database. The provided example includes all necessary configurations and code snippets to get you started with CDC using Debezium and Java.

References

0
Subscribe to my newsletter

Read articles from Manish Agrawal directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Manish Agrawal
Manish Agrawal

Over 15 Years of Expertise in Software Development and Engineering I specialize in delivering innovative solutions across diverse programming languages, platforms, and architectures. 💡 Technical Expertise Backend: Node.js (Nest.js, Express.js), Java (Spring Boot), PHP (Laravel, CodeIgniter, YII, Phalcon, Symphony, CakePHP) Frontend: React, Angular, Vue, TypeScript, JavaScript, Bootstrap, Material design, Tailwind CMS: WordPress, MediaWiki, Moodle, Strapi Headless, Drupal, Magento, Joomla DevOps & Cloud: AWS, Azure, GCP, OpenShift, CI/CD, Docker, Kubernetes, Terraform, Ansible, GitHub Actions, Gitlab CI/CD, GitOps, Argo CD, Jenkins, Shell Scripting, Linux Observability & Monitoring: Datadog, Prometheus, Grafana, ELK Stack, PowerBI, Tableau Databases: MySQL, MariaDB, MongoDB, PostgreSQL, Elasticsearch Caching: Redis, Mamcachad Data Engineering & Streaming: Apache NiFi, Apache Flink, Kafka, RabbitMQ API Design: REST, gRPC, GraphQL Principles & Practices: SOLID, DRY, KISS, TDD Architectural Patterns: Microservices, Monolithic, Microfronend, Event-Driven, Serverless, OOPs Design Patterns: Singleton, Factory, Observer, Repository, Service Mesh, Sidecar Pattern Project Management: Agile, JIRA, Confluence, MS Excel Testing & Quality: Postman, Jest, SonarQube, Cucumber Architectural Tools: Draw.io, Lucid, Excalidraw 👥 Versatile Professional From small-scale projects to enterprise-grade solutions, I have excelled both as an individual contributor and as part of dynamic teams. 🎯 Lifelong Learner Beyond work, I’m deeply committed to personal and professional growth, dedicating my spare time to exploring new technologies. 🔍 Passionate about Research & Product Improvement & Reverse Engineering I’m dedicated to exploring and enhancing existing products, always ready to take on challenges to identify root causes and implement effective solutions. 🧠 Adaptable & Tech-Driven I thrive in dynamic environments and am always eager to adapt and work with new and emerging technologies. 🌱 Work Culture I Value I thrive in environments that foster autonomy, respect, and innovation — free from micromanagement, unnecessary bureaucracy. I value clear communication, open collaboration, self organizing teams,appreciation, rewards and continuous learning. 🧠 Core Belief I believe every problem has a solution—and every solution uncovers new challenges to grow from. 🌟 Let's connect to collaborate, innovate, and build something extraordinary together!