Embedded Debezium using Java + PostgreSQL


Overview
This guide provides a step-by-step process to set up Change Data Capture (CDC) using Debezium in embedded mode within a Java application, capturing changes from a PostgreSQL database source table and processing them without the need for Kafka and inserting the data to the destination table of the same database.
Prerequisites
PostgreSQL database
Java Development Kit (JDK)
Apache Maven
Debezium dependencies
PostgreSQL Configuration
Ensure that your PostgreSQL configuration supports logical replication:
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
Debezium Configuration
Create a properties file embedded_debezium.properties
with the following content:
name=embedded-debezium-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.flush.interval.ms=60000
database.hostname=localhost
database.port=5433
database.user=postgres
database.password=root
database.dbname=postgres
database.server.name=embedded-debezium
debezium.source.plugin.name=pgoutput
plugin.name=pgoutput
database.server.id=1234
topic.prefix=embedded-debezium
schema.include.list=public
table.include.list=public.source_db
Maven Project Setup
Create a pom.xml
file with the following content:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>debezium-embedded</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.debezium>2.3.1.Final</version.debezium>
<logback-core.version>1.2.11</logback-core.version>
</properties>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-jdbc</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback-core.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback-core.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.example.Application</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Project Structure
Create the following directory structure for your project:
mkdir -p src/main/java/org/example mkdir -p src/main/resources
Java Code
Create the main application class Application.java
in src/main/java/org/example
:
package org.example;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.ChangeEventFormat;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class Application {
public static void main(String[] args) throws IOException {
Properties properties = new Properties();
try(final InputStream stream = Application.class.getClassLoader().getResourceAsStream("embedded_debezium.properties")) {
properties.load(stream);
}
properties.put("offset.storage.file.filename",new File("offset.dat").getAbsolutePath());
var engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(properties)
.notifying(new CustomChangeConsumer())
.build();
engine.run();
}
}
CustomChangeConsumer.java
package org.example;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.kafka.connect.data.Struct;
public class CustomChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {
private static final String DB_URL = "jdbc:postgresql://localhost:5433/postgres";
private static final String DB_USER = "postgres";
private static final String DB_PASSWORD = "root";
@Override
public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException {
try (Connection connection = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD)) {
for (RecordChangeEvent<SourceRecord> recordChangeEvent : records) {
SourceRecord record = recordChangeEvent.record();
Struct keyStruct = (Struct) record.key();
long id = keyStruct.getInt64("id");
Struct valueStruct = (Struct) record.value();
Struct afterStruct = valueStruct.getStruct("after");
String firstName = afterStruct.getString("first_name");
String lastName = afterStruct.getString("last_name");
String sql = "INSERT INTO destination_db (id, first_name, last_name) VALUES (?, ?, ?)";
try (PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setLong(1, id);
statement.setString(2, firstName);
statement.setString(3, lastName);
statement.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
}
committer.markProcessed(recordChangeEvent);
}
} catch (SQLException e) {
e.printStackTrace();
}
committer.markBatchFinished();
}
}
Building and Running the Application
Build the project:
mvn clean install
Package the application:
mvn clean package
Run the application:
java -cp target/debezium-embedded-1.0-SNAPSHOT.jar org.example.Application
Conclusion:
This guide demonstrates the use of Debezium's embedded mode within a Java application for capturing and processing changes from a PostgreSQL database. The provided example includes all necessary configurations and code snippets to get you started with CDC using Debezium and Java.
References
Subscribe to my newsletter
Read articles from Manish Agrawal directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Manish Agrawal
Manish Agrawal
Over 15 Years of Expertise in Software Development and Engineering I specialize in delivering innovative solutions across diverse programming languages, platforms, and architectures. 💡 Technical Expertise Backend: Node.js (Nest.js, Express.js), Java (Spring Boot), PHP (Laravel, CodeIgniter, YII, Phalcon, Symphony, CakePHP) Frontend: React, Angular, Vue, TypeScript, JavaScript, Bootstrap, Material design, Tailwind CMS: WordPress, MediaWiki, Moodle, Strapi Headless, Drupal, Magento, Joomla DevOps & Cloud: AWS, Azure, GCP, OpenShift, CI/CD, Docker, Kubernetes, Terraform, Ansible, GitHub Actions, Gitlab CI/CD, GitOps, Argo CD, Jenkins, Shell Scripting, Linux Observability & Monitoring: Datadog, Prometheus, Grafana, ELK Stack, PowerBI, Tableau Databases: MySQL, MariaDB, MongoDB, PostgreSQL, Elasticsearch Caching: Redis, Mamcachad Data Engineering & Streaming: Apache NiFi, Apache Flink, Kafka, RabbitMQ API Design: REST, gRPC, GraphQL Principles & Practices: SOLID, DRY, KISS, TDD Architectural Patterns: Microservices, Monolithic, Microfronend, Event-Driven, Serverless, OOPs Design Patterns: Singleton, Factory, Observer, Repository, Service Mesh, Sidecar Pattern Project Management: Agile, JIRA, Confluence, MS Excel Testing & Quality: Postman, Jest, SonarQube, Cucumber Architectural Tools: Draw.io, Lucid, Excalidraw 👥 Versatile Professional From small-scale projects to enterprise-grade solutions, I have excelled both as an individual contributor and as part of dynamic teams. 🎯 Lifelong Learner Beyond work, I’m deeply committed to personal and professional growth, dedicating my spare time to exploring new technologies. 🔍 Passionate about Research & Product Improvement & Reverse Engineering I’m dedicated to exploring and enhancing existing products, always ready to take on challenges to identify root causes and implement effective solutions. 🧠 Adaptable & Tech-Driven I thrive in dynamic environments and am always eager to adapt and work with new and emerging technologies. 🌱 Work Culture I Value I thrive in environments that foster autonomy, respect, and innovation — free from micromanagement, unnecessary bureaucracy. I value clear communication, open collaboration, self organizing teams,appreciation, rewards and continuous learning. 🧠 Core Belief I believe every problem has a solution—and every solution uncovers new challenges to grow from. 🌟 Let's connect to collaborate, innovate, and build something extraordinary together!