CDC & ETL With Apache NiFi & Kafka

Table of contents
- Overview
- Components Overview
- Docker Compose Configuration
- Use Cases
- 1. CDC Replication Through Sink Connector and Source Connector Using Kafka Connect and Debezium
- 2. ETL Use Case Example - Two Separate Tables in Same PostgreSQL DB for Source and Destination After Transforming Data
- 3. Apache NiFi Consumes Kafka Topic from Source Connector and Dumps Data to Table After Transformation
- 4. Expose NestJS API and Use NiFi to Consume Kafka and Dump Data to Another Table in DB for Replication
- 5. Apache NiFi Webhook Call through NodeJS
- PostgreSQL Configuration
- Running the Setup
- Kafka-UI
- Shutting Down
- Conclusion
- Architecture/Diagrams
- References

Overview
This document outlines the setup and implementation of a comprehensive data pipeline involving Change Data Capture (CDC), ETL processes, data transformation, and API exposure. The solution leverages Kafka, Kafka Connect, Debezium, JDBC connectors, PostgreSQL, Kafka-UI, Apache NiFi, and NestJS. Docker Compose orchestrates all components for seamless integration and deployment.
Components Overview
Kafka with Kraft: Distributed event streaming platform.
Kafka Connect: Framework for integrating Kafka with external systems.
Debezium Connector: Captures changes from PostgreSQL for CDC.
JDBC Connector: Integrates with relational databases.
PostgreSQL: Relational database management system.
Kafka-UI: User interface for managing Kafka clusters.
Apache NiFi: ETL Tool for data routing and transformation.
NestJS: Framework for building scalable server-side applications.
Docker Compose Configuration
The docker-compose.yml
file sets up all required services:
version: '3.8'
services:
nifi:
image: apache/nifi:latest
container_name: nifi
ports:
- "9090:8080"
volumes:
- ../nifi_data/database_repository:/opt/nifi/nifi-current/database_repository
- ../nifi_data/flowfile_repository:/opt/nifi/nifi-current/flowfile_repository
- ../nifi_data/content_repository:/opt/nifi/nifi-current/content_repository
- ../nifi_data/provenance_repository:/opt/nifi/nifi-current/provenance_repository
- ../postgresql-42.7.3.jar:/opt/nifi-registry/libs/postgresql-42.7.3.jar
environment:
- NIFI_WEB_HTTP_PORT=8080
- NIFI_CLUSTER_IS_NODE=false
- NIFI_ELECTION_MAX_CANDIDATES=1
- NIFI_ELECTION_MAX_WAIT=1 min
- NIFI_VARIABLE_REGISTRY_PROPERTIES=../conf/variables.properties
kafka:
image: docker.io/bitnami/kafka:3.4
container_name: kafka
ports:
- "29092:29092"
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:29092
- KAFKA_CFG_BROKER_ID=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_NODE_ID=1
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
- BITNAMI_DEBUG=yes
- KAFKA_CFG_NUM_PARTITIONS=2
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "kafka:9092"]
interval: 10s
timeout: 10s
retries: 5
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8082:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_JMXPORT: "9999"
kafka_connect:
container_name: kafka_connect
image: debezium/connect:1.8
ports:
- '8083:8083'
links:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=medium_debezium
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- CONFIG_STORAGE_REPLICATION_FACTOR=1
- OFFSET_STORAGE_REPLICATION_FACTOR=1
- STATUS_STORAGE_REPLICATION_FACTOR=1
volumes:
- ../kafka_connect/jars:/kafka/connect/jars
postgresdb:
image: postgres
container_name: local_pgdb
restart: always
ports:
- "5433:5432"
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- ../local_pgdata:/var/lib/postgresql/data
command: ["postgres", "-c", "wal_level=logical"]
pgadmin:
image: dpage/pgadmin4
container_name: pgadmin4_container
restart: always
ports:
- "8889:80"
environment:
PGADMIN_DEFAULT_EMAIL: ${PG_ADMIN_USER}
PGADMIN_DEFAULT_PASSWORD: ${PG_ADMIN_PASSWORD}
PGADMIN_CONFIG_SERVER_MODE: 'False'
volumes:
- ../pgadmin-data:/var/lib/pgadmin
- JAR files are required - PostgreSQL for Apache NiFi, JDBC Connector for Sink Connector Config
Use Cases
1. CDC Replication Through Sink Connector and Source Connector Using Kafka Connect and Debezium
Configure Kafka Connect with Debezium connectors for CDC replication and JDBC connectors for integration with PostgreSQL.
Debezium Source Connector Configuration:
{
"name": "source-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "host.docker.internal",
"database.port": "5433",
"database.user": "postgres",
"database.password": "root",
"database.dbname": "postgres",
"database.server.name": "my-postgres-server",
"table.include.list": "public.source_db",
"plugin.name": "pgoutput",
"slot.name": "debezium"
}
}
JDBC Sink Connector Configuration:
{
"name": "sink-postgres-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://host.docker.internal:5433/postgres",
"connection.user": "postgres",
"connection.password": "root",
"topics": "my-postgres-server.public.source_db",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "id",
"table.name.format": "public.destination_db",
"transforms": "extractFields",
"transforms.extractFields.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractFields.field": "after",
"transforms.extractFields.delimiter": ".",
"transforms.extractFields.whitelist": "id,first_name,last_name"
}
}
Commands to Register, Restart, and Delete Connectors:
# Register the source connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/ -d @connectors/source-postgres-connector.json
# Register the sink connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/ -d @connectors/sink-postgres-connector.json
# Restart the sink connector
curl -X POST http://localhost:8083/connectors/sink-postgres-connector/restart
# Delete the sink connector
curl -X DELETE http://localhost:8083/connectors/sink-postgres-connector
Verify Kafka Connect:
Verify Kafka Connect is running: http://localhost:8083/
List all connectors: http://localhost:8083/connectors
Check connector status: http://localhost:8083/connectors/{connector_name}/status
2. ETL Use Case Example - Two Separate Tables in Same PostgreSQL DB for Source and Destination After Transforming Data
This use case involves creating two separate tables in the same database for source and destination, with data transformation applied during the process.
The Apache NiFi flow includes:
GenerateFlowFile: Generates a new FlowFile.
ExecuteSQL: Executes a SQL query to fetch data from the source table.
ConvertRecord: Converts the fetched data to a different format if needed.
UpdateRecord: Applies transformations to the data.
PutDatabaseRecord: Writes the transformed data to the destination table in PostgreSQL.
GenerateFlowFile processor is a triggering point here so it can be anything and used this to avoid looping of ExecuteSQL processor
ConvertRecord processor requires services enabled for : AvroReader, JsonTreeReader, JsonRecordSetWriter
- UpdateRecord processor used here for transforming but one can use other processor as well like JoltTransformJson
- PutDatabaseRecord processor requires enable and configured the DBCPConnectionPool Service with below Properties settings:
Database Connection URL: jdbc:postgresql://host.docker.internal:5433/postgres
Database Driver Class Name: org.postgresql.Driver
Database Driver Location(s): /opt/nifi-registry/libs/postgresql-42.7.3.jar
Postgres User: *****
Postgres Password: *****
PutDatabaseRecord Properties Configuration :
3. Apache NiFi Consumes Kafka Topic from Source Connector and Dumps Data to Table After Transformation
Configure Apache NiFi to consume Kafka topics, transform the data, and route it to the destination table in PostgreSQL.
The Apache NiFi flow includes:
ConsumeKafka Processor: Consumes messages from the Kafka topic.
ConvertRecord Processor: Converts the data format if needed.
JoltTransformJSON Processor: Applies necessary transformations to the data.
PutDatabaseRecord Processor: Writes the transformed data to the destination table in PostgreSQL.
KafkaConsume Properties Configuration:
JoltTransformJSON Properties Configuration:
Jolt Specification:
[
{
"operation": "shift",
"spec": {
"*": {
"payload": {
"after": {
"id": "id",
"first_name": "first_name",
"last_name": "last_name"
}
}
}
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"full_name": "=concat(@(1,first_name),' ',@(1,last_name))"
}
},
{
"operation": "remove",
"spec": {
"first_name": "",
"last_name": ""
}
}
]
4. Expose NestJS API and Use NiFi to Consume Kafka and Dump Data to Another Table in DB for Replication
Build a NestJS application that exposes the data via an API. The application will produce data to a Kafka topic using the KafkaJS library. Apache NiFi will then consume this data from the Kafka topic, apply necessary transformations, and write it to another similar table in the database for replication purposes.
NestJS API Configuration:
Kafka Producer: Configured to produce messages to the Kafka topic using KafkaJS.
API Endpoints: Provides endpoints to produce messages to the Kafka topic.
import { Module } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { KafkaController } from './kafka.controller';
@Module({
providers: [KafkaService],
controllers: [KafkaController],
})
export class KafkaModule {}
export interface IMessageObject {
id: number;
firstName: string;
lastName: string;
}
import { Controller, Post, Body } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { IMessageObject } from './contract/message-object';
@Controller('kafka')
export class KafkaController {
constructor(private readonly kafkaService: KafkaService) {}
@Post('produce')
async produceMessage(
@Body('topic') topic: string,
@Body('message') message: IMessageObject,
) {
await this.kafkaService.produce(topic, message);
return { status: 'Message sent' };
}
}
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Kafka, Producer } from 'kafkajs';
import { IMessageObject } from './contract/message-object';
@Injectable()
export class KafkaService implements OnModuleInit {
private kafka: Kafka;
private producer: Producer;
constructor() {
this.kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_ID || 'nestjs-kafka',
brokers: process.env.KAFKA_BROKERS
? process.env.KAFKA_BROKERS.split(',')
: ['localhost:29092'],
});
this.producer = this.kafka.producer();
}
async onModuleInit() {
await this.producer.connect();
}
async produce(topic: string, message: IMessageObject) {
await this.producer.send({
topic,
messages: [{ value: JSON.stringify(message) }],
});
}
}
Postman API Client:
{
"topic": "kafka_consume_nifi",
"message": {
"id" : 31,
"first_name": "test1",
"last_name": "test1_"
}
}
curl --location 'http://localhost:3000/kafka/produce' \
--header 'Content-Type: application/json' \
--data '{
"topic": "kafka_consume_nifi",
"message": {
"id" : 31,
"first_name": "test1",
"last_name": "test1_"
}
}'
Apache NiFi Flow:
ConsumeKafka Processor: Consumes messages from the Kafka topic produced by the NestJS API.
ConvertRecord Processor: Converts the data format if needed.
PutDatabaseRecord Processor: Writes the transformed data to the destination table in PostgreSQL.
5. Apache NiFi Webhook Call through NodeJS
import { Injectable } from '@nestjs/common';
import { Observable } from 'rxjs';
import { AxiosResponse } from 'axios';
import { HttpService } from '@nestjs/axios';
@Injectable()
export class NiFiService {
private readonly nifiBaseUrl = 'http://localhost:9095';
constructor(private readonly httpService: HttpService) {}
postDataToNiFi(data: any): Observable<AxiosResponse<any>> {
const url = `${this.nifiBaseUrl}/contentListener`;
return this.httpService.post(url, data);
}
}
import { Body, Controller, Post } from '@nestjs/common';
import { NiFiService } from './nifi.service';
@Controller('nifi')
export class NifiController {
constructor(private readonly nifiService: NiFiService) {}
@Post('send-data')
async sendDataToNiFi(@Body() data: any) {
try {
const response = await this.nifiService.postDataToNiFi(data).toPromise();
return response.data;
} catch (error) {
throw new Error(`Failed to send data to NiFi: ${error.message}`);
}
}
}
import { Module } from '@nestjs/common';
import { NifiController } from './nifi.controller';
import { NiFiService } from './nifi.service';
import { HttpModule } from '@nestjs/axios';
@Module({
imports: [HttpModule],
providers: [NiFiService],
controllers: [NifiController],
})
export class NifiModule {}
PostgreSQL Configuration
Use pgAdmin to create the database and tables:
Database Name: postgres
Table Names:
source_db (id, first_name, last_name)
destination_db (id, first_name, last_name)
destination_db_transformed (id, full_name)
- Change Wal level and it should be logical For Ex:
wal_level=logical
[ "postgres", "-c", "wal_level=logical" ]
Running the Setup
- Start Docker Compose: Run
docker-compose up -d
to start all services and verify all services are running without any error. If any error occur then debug using logs.
- Verify Kafka Connect: Use Kafka-UI to verify that the connectors are running and data is flowing correctly. One can verify using
http://localhost:8083
- Verify Connectors: Access
http://localhost:8083/connectors
to verify the list of connectors. If the list is empty then run the command through CLI to deploy connectors into Kafka connect.
- Verify Connector Status: Access
http://localhost:8083/connectors/{connector_name}/status
to check the status of a specific connector. Both connectors should be running like below.
- Configure Apache NiFi: Access the NiFi UI at
http://localhost:9090/nifi
to set up the data flow.
- Run NestJS Application: Ensure the NestJS application is running and properly configured to consume Kafka messages. Use CLI tool with the help of below commands :
npm i kafkajs npm i dotenv npm install npm run build npm run start npm run start:dev
Create Database and Tables: Use pgAdmin to create the
postgres
database and the necessary tables (source_db
,destination_db
,destination_db_transformed
).Create .env file for secret configurations. Use Dotenv NPM Package.
Kafka-UI
UI for Apache Kafka is a versatile, fast, and lightweight web UI for managing Apache Kafka® clusters. Built by developers, for developers.
The app is a free, open-source web UI to monitor and manage Apache Kafka clusters.
Shutting Down
To stop and remove all containers, networks, and volumes, run:
docker-compose down -v
Conclusion
This setup demonstrates a complete data pipeline for CDC replication, ETL processes, and data exposure using a combination of Kafka, Kafka Connect, Debezium, Apache NiFi, and NestJS. Docker Compose orchestrates the deployment, ensuring all components are integrated seamlessly and can scale effectively. This solution provides a robust framework for real-time data processing and transformation.
Architecture/Diagrams
References
KafkaJS NPM Library : https://www.npmjs.com/package/kafkajs?activeTab=readme
KafkaJS Official Site : https://kafka.js.org/
Kafka Docs: https://kafka.apache.org/documentation/
Apache NiFi Docs : https://nifi.apache.org/documentation/v2/
PostgreSQL Docs: https://www.postgresql.org/docs/16/index.html
NestJS Docs: https://docs.nestjs.com/
Kafka UI Docs: https://docs.kafka-ui.provectus.io/
Practical Explanation: CDC + Debezium + Kafka + Docker + MySQL : https://medium.com/nagoya-foundation/simple-cdc-with-debezium-kafka-a27b28d8c3b8
JDBC JAR file download : https://jdbc.postgresql.org/download/
Debezium Postgres JAR download(If needed): https://debezium.io/documentation/reference/stable/connectors/postgresql.html
Dotenv Package : https://www.npmjs.com/package/dotenv
Data Ingestion using Apache NiFi article : https://pratikbarjatya.medium.com/building-data-ingestion-system-using-apache-nifi-76e90765ac43#:~:text=Apache%20NiFi%20is%20a,Drivers%20for%20your%20relational%20database.
Debezium article : https://zakir-hossain.medium.com/debezium-source-connector-on-confluent-platform-d00494c29d17
Kafka integration article : https://zakir-hossain.medium.com/debezium-source-connector-on-confluent-platform-d00494c29d17
Debezium/Connect article : https://medium.com/@rayane.gouda/making-debezium-connect-support-confluent-schema-registry-b9510ed45f4b
CDC with Apache NiFi article : https://nikhil-suthar-bigdata.medium.com/cdc-with-apache-nifi-65020c748ff5
Subscribe to my newsletter
Read articles from Manish Agrawal directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Manish Agrawal
Manish Agrawal
Over 15 Years of Expertise in Software Development and Engineering I specialize in delivering innovative solutions across diverse programming languages, platforms, and architectures. 💡 Technical Expertise Backend: Node.js (Nest.js, Express.js), Java (Spring Boot), PHP (Laravel, CodeIgniter, YII, Phalcon, Symphony, CakePHP) Frontend: React, Angular, Vue, TypeScript, JavaScript, Bootstrap, Material design, Tailwind CMS: WordPress, MediaWiki, Moodle, Strapi Headless, Drupal, Magento, Joomla DevOps & Cloud: AWS, Azure, GCP, OpenShift, CI/CD, Docker, Kubernetes, Terraform, Ansible, GitHub Actions, Gitlab CI/CD, GitOps, Argo CD, Jenkins, Shell Scripting, Linux Observability & Monitoring: Datadog, Prometheus, Grafana, ELK Stack, PowerBI, Tableau Databases: MySQL, MariaDB, MongoDB, PostgreSQL, Elasticsearch Caching: Redis, Mamcachad Data Engineering & Streaming: Apache NiFi, Apache Flink, Kafka, RabbitMQ API Design: REST, gRPC, GraphQL Principles & Practices: SOLID, DRY, KISS, TDD Architectural Patterns: Microservices, Monolithic, Microfronend, Event-Driven, Serverless, OOPs Design Patterns: Singleton, Factory, Observer, Repository, Service Mesh, Sidecar Pattern Project Management: Agile, JIRA, Confluence, MS Excel Testing & Quality: Postman, Jest, SonarQube, Cucumber Architectural Tools: Draw.io, Lucid, Excalidraw 👥 Versatile Professional From small-scale projects to enterprise-grade solutions, I have excelled both as an individual contributor and as part of dynamic teams. 🎯 Lifelong Learner Beyond work, I’m deeply committed to personal and professional growth, dedicating my spare time to exploring new technologies. 🔍 Passionate about Research & Product Improvement & Reverse Engineering I’m dedicated to exploring and enhancing existing products, always ready to take on challenges to identify root causes and implement effective solutions. 🧠 Adaptable & Tech-Driven I thrive in dynamic environments and am always eager to adapt and work with new and emerging technologies. 🌱 Work Culture I Value I thrive in environments that foster autonomy, respect, and innovation — free from micromanagement, unnecessary bureaucracy. I value clear communication, open collaboration, self organizing teams,appreciation, rewards and continuous learning. 🧠 Core Belief I believe every problem has a solution—and every solution uncovers new challenges to grow from. 🌟 Let's connect to collaborate, innovate, and build something extraordinary together!