CDC & ETL With Apache NiFi & Kafka

Manish AgrawalManish Agrawal
9 min read

Overview

This document outlines the setup and implementation of a comprehensive data pipeline involving Change Data Capture (CDC), ETL processes, data transformation, and API exposure. The solution leverages Kafka, Kafka Connect, Debezium, JDBC connectors, PostgreSQL, Kafka-UI, Apache NiFi, and NestJS. Docker Compose orchestrates all components for seamless integration and deployment.

Components Overview

  • Kafka with Kraft: Distributed event streaming platform.

  • Kafka Connect: Framework for integrating Kafka with external systems.

  • Debezium Connector: Captures changes from PostgreSQL for CDC.

  • JDBC Connector: Integrates with relational databases.

  • PostgreSQL: Relational database management system.

  • Kafka-UI: User interface for managing Kafka clusters.

  • Apache NiFi: ETL Tool for data routing and transformation.

  • NestJS: Framework for building scalable server-side applications.

Docker Compose Configuration

The docker-compose.yml file sets up all required services:

version: '3.8'

services:
  nifi:
    image: apache/nifi:latest
    container_name: nifi
    ports:
      - "9090:8080"
    volumes:
      - ../nifi_data/database_repository:/opt/nifi/nifi-current/database_repository
      - ../nifi_data/flowfile_repository:/opt/nifi/nifi-current/flowfile_repository
      - ../nifi_data/content_repository:/opt/nifi/nifi-current/content_repository
      - ../nifi_data/provenance_repository:/opt/nifi/nifi-current/provenance_repository
      - ../postgresql-42.7.3.jar:/opt/nifi-registry/libs/postgresql-42.7.3.jar
    environment:
      - NIFI_WEB_HTTP_PORT=8080
      - NIFI_CLUSTER_IS_NODE=false
      - NIFI_ELECTION_MAX_CANDIDATES=1
      - NIFI_ELECTION_MAX_WAIT=1 min
      - NIFI_VARIABLE_REGISTRY_PROPERTIES=../conf/variables.properties

  kafka:
    image: docker.io/bitnami/kafka:3.4
    container_name: kafka
    ports:
      - "29092:29092"
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:29092
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
      - BITNAMI_DEBUG=yes
      - KAFKA_CFG_NUM_PARTITIONS=2

    healthcheck:
      test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "kafka:9092"]
      interval: 10s
      timeout: 10s
      retries: 5

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8082:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_JMXPORT: "9999"

  kafka_connect:
    container_name: kafka_connect
    image: debezium/connect:1.8
    ports:
      - '8083:8083'
    links:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=medium_debezium
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
      - CONFIG_STORAGE_REPLICATION_FACTOR=1
      - OFFSET_STORAGE_REPLICATION_FACTOR=1
      - STATUS_STORAGE_REPLICATION_FACTOR=1
    volumes:
      - ../kafka_connect/jars:/kafka/connect/jars

  postgresdb:
    image: postgres
    container_name: local_pgdb
    restart: always
    ports:
      - "5433:5432"
    environment:
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    volumes:
      - ../local_pgdata:/var/lib/postgresql/data
    command: ["postgres", "-c", "wal_level=logical"]

  pgadmin:
    image: dpage/pgadmin4
    container_name: pgadmin4_container
    restart: always
    ports:
      - "8889:80"
    environment:
      PGADMIN_DEFAULT_EMAIL: ${PG_ADMIN_USER}
      PGADMIN_DEFAULT_PASSWORD: ${PG_ADMIN_PASSWORD}
      PGADMIN_CONFIG_SERVER_MODE: 'False'
    volumes:
      - ../pgadmin-data:/var/lib/pgadmin
  • JAR files are required - PostgreSQL for Apache NiFi, JDBC Connector for Sink Connector Config

Use Cases

1. CDC Replication Through Sink Connector and Source Connector Using Kafka Connect and Debezium

Configure Kafka Connect with Debezium connectors for CDC replication and JDBC connectors for integration with PostgreSQL.

Debezium Source Connector Configuration:

{
    "name": "source-postgres-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "host.docker.internal",
        "database.port": "5433",
        "database.user": "postgres",
        "database.password": "root",
        "database.dbname": "postgres",
        "database.server.name": "my-postgres-server",
        "table.include.list": "public.source_db",
        "plugin.name": "pgoutput",
        "slot.name": "debezium"
    }
}

JDBC Sink Connector Configuration:

{
    "name": "sink-postgres-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:postgresql://host.docker.internal:5433/postgres",
        "connection.user": "postgres",
        "connection.password": "root",
        "topics": "my-postgres-server.public.source_db",
        "auto.create": "true",
        "auto.evolve": "true",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "id",
        "table.name.format": "public.destination_db",
        "transforms": "extractFields",
        "transforms.extractFields.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
        "transforms.extractFields.field": "after",
        "transforms.extractFields.delimiter": ".",
        "transforms.extractFields.whitelist": "id,first_name,last_name"
    }
}

Commands to Register, Restart, and Delete Connectors:

# Register the source connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/ -d @connectors/source-postgres-connector.json

# Register the sink connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/ -d @connectors/sink-postgres-connector.json

# Restart the sink connector
curl -X POST http://localhost:8083/connectors/sink-postgres-connector/restart

# Delete the sink connector
curl -X DELETE http://localhost:8083/connectors/sink-postgres-connector

Verify Kafka Connect:

2. ETL Use Case Example - Two Separate Tables in Same PostgreSQL DB for Source and Destination After Transforming Data

This use case involves creating two separate tables in the same database for source and destination, with data transformation applied during the process.

The Apache NiFi flow includes:

  1. GenerateFlowFile: Generates a new FlowFile.

  2. ExecuteSQL: Executes a SQL query to fetch data from the source table.

  3. ConvertRecord: Converts the fetched data to a different format if needed.

  4. UpdateRecord: Applies transformations to the data.

  5. PutDatabaseRecord: Writes the transformed data to the destination table in PostgreSQL.

image-20240614-070302.png

  • GenerateFlowFile processor is a triggering point here so it can be anything and used this to avoid looping of ExecuteSQL processor

  • ConvertRecord processor requires services enabled for : AvroReader, JsonTreeReader, JsonRecordSetWriter

image-20240614-112500.png

image-20240614-112414.png

  • UpdateRecord processor used here for transforming but one can use other processor as well like JoltTransformJson

image-20240614-112544.png

  • PutDatabaseRecord processor requires enable and configured the DBCPConnectionPool Service with below Properties settings:

image-20240614-111057.png

Database Connection URL:     jdbc:postgresql://host.docker.internal:5433/postgres
Database Driver Class Name:  org.postgresql.Driver
Database Driver Location(s):  /opt/nifi-registry/libs/postgresql-42.7.3.jar
Postgres User:  *****
Postgres Password: *****

PutDatabaseRecord Properties Configuration :

image-20240614-112639.png

3. Apache NiFi Consumes Kafka Topic from Source Connector and Dumps Data to Table After Transformation

Configure Apache NiFi to consume Kafka topics, transform the data, and route it to the destination table in PostgreSQL.

The Apache NiFi flow includes:

  1. ConsumeKafka Processor: Consumes messages from the Kafka topic.

  2. ConvertRecord Processor: Converts the data format if needed.

  3. JoltTransformJSON Processor: Applies necessary transformations to the data.

  4. PutDatabaseRecord Processor: Writes the transformed data to the destination table in PostgreSQL.

image-20240614-070419.png

KafkaConsume Properties Configuration:

image-20240614-112853.png

JoltTransformJSON Properties Configuration:

image-20240614-113120.png

Jolt Specification:

[
  {
    "operation": "shift",
    "spec": {
      "*": {
        "payload": {
          "after": {
            "id": "id",
            "first_name": "first_name",
            "last_name": "last_name"
          }
        }
      }
    }
  },
  {
    "operation": "modify-overwrite-beta",
    "spec": {
      "full_name": "=concat(@(1,first_name),' ',@(1,last_name))"
    }
    },
  {
    "operation": "remove",
    "spec": {
      "first_name": "",
      "last_name": ""
    }
    }
]

4. Expose NestJS API and Use NiFi to Consume Kafka and Dump Data to Another Table in DB for Replication

Build a NestJS application that exposes the data via an API. The application will produce data to a Kafka topic using the KafkaJS library. Apache NiFi will then consume this data from the Kafka topic, apply necessary transformations, and write it to another similar table in the database for replication purposes.

NestJS API Configuration:

  • Kafka Producer: Configured to produce messages to the Kafka topic using KafkaJS.

  • API Endpoints: Provides endpoints to produce messages to the Kafka topic.

import { Module } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { KafkaController } from './kafka.controller';

@Module({
  providers: [KafkaService],
  controllers: [KafkaController],
})
export class KafkaModule {}
export interface IMessageObject {
  id: number;
  firstName: string;
  lastName: string;
}
import { Controller, Post, Body } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { IMessageObject } from './contract/message-object';

@Controller('kafka')
export class KafkaController {
  constructor(private readonly kafkaService: KafkaService) {}

  @Post('produce')
  async produceMessage(
    @Body('topic') topic: string,
    @Body('message') message: IMessageObject,
  ) {
    await this.kafkaService.produce(topic, message);
    return { status: 'Message sent' };
  }
}
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Kafka, Producer } from 'kafkajs';
import { IMessageObject } from './contract/message-object';

@Injectable()
export class KafkaService implements OnModuleInit {
  private kafka: Kafka;
  private producer: Producer;

  constructor() {
    this.kafka = new Kafka({
      clientId: process.env.KAFKA_CLIENT_ID || 'nestjs-kafka',
      brokers: process.env.KAFKA_BROKERS
        ? process.env.KAFKA_BROKERS.split(',')
        : ['localhost:29092'],
    });
    this.producer = this.kafka.producer();
  }

  async onModuleInit() {
    await this.producer.connect();
  }

  async produce(topic: string, message: IMessageObject) {
    await this.producer.send({
      topic,
      messages: [{ value: JSON.stringify(message) }],
    });
  }
}

Postman API Client:

image-20240614-071627.png

{
  "topic": "kafka_consume_nifi",
  "message": {
      "id" : 31,
      "first_name": "test1",
      "last_name": "test1_"
  }
}
curl --location 'http://localhost:3000/kafka/produce' \
--header 'Content-Type: application/json' \
--data '{
  "topic": "kafka_consume_nifi",
  "message": {
      "id" : 31,
      "first_name": "test1",
      "last_name": "test1_"
  }
}'

Apache NiFi Flow:

  1. ConsumeKafka Processor: Consumes messages from the Kafka topic produced by the NestJS API.

  2. ConvertRecord Processor: Converts the data format if needed.

  3. PutDatabaseRecord Processor: Writes the transformed data to the destination table in PostgreSQL.

image-20240614-070512.png

5. Apache NiFi Webhook Call through NodeJS

image-20240618-110053.png

image-20240618-110121.png

import { Injectable } from '@nestjs/common';
import { Observable } from 'rxjs';
import { AxiosResponse } from 'axios';
import { HttpService } from '@nestjs/axios';

@Injectable()
export class NiFiService {
  private readonly nifiBaseUrl = 'http://localhost:9095';

  constructor(private readonly httpService: HttpService) {}

  postDataToNiFi(data: any): Observable<AxiosResponse<any>> {
    const url = `${this.nifiBaseUrl}/contentListener`;

    return this.httpService.post(url, data);
  }
}
import { Body, Controller, Post } from '@nestjs/common';
import { NiFiService } from './nifi.service';

@Controller('nifi')
export class NifiController {
  constructor(private readonly nifiService: NiFiService) {}

  @Post('send-data')
  async sendDataToNiFi(@Body() data: any) {
    try {
      const response = await this.nifiService.postDataToNiFi(data).toPromise();
      return response.data;
    } catch (error) {
      throw new Error(`Failed to send data to NiFi: ${error.message}`);
    }
  }
}
import { Module } from '@nestjs/common';
import { NifiController } from './nifi.controller';
import { NiFiService } from './nifi.service';
import { HttpModule } from '@nestjs/axios';

@Module({
  imports: [HttpModule],
  providers: [NiFiService],
  controllers: [NifiController],
})
export class NifiModule {}

image-20240618-110325.png

PostgreSQL Configuration

Use pgAdmin to create the database and tables:

Database Name: postgres

Table Names:

  • source_db (id, first_name, last_name)

  • destination_db (id, first_name, last_name)

  • destination_db_transformed (id, full_name)

image-20240614-073225.png

  • Change Wal level and it should be logical For Ex:

wal_level=logical

[ "postgres", "-c", "wal_level=logical" ]

Running the Setup

  1. Start Docker Compose: Run docker-compose up -d to start all services and verify all services are running without any error. If any error occur then debug using logs.

image-20240614-114707.png

  1. Verify Kafka Connect: Use Kafka-UI to verify that the connectors are running and data is flowing correctly. One can verify using http://localhost:8083

image-20240614-114146.png

  1. Verify Connectors: Access http://localhost:8083/connectors to verify the list of connectors. If the list is empty then run the command through CLI to deploy connectors into Kafka connect.

image-20240614-114254.png

  1. Verify Connector Status: Access http://localhost:8083/connectors/{connector_name}/status to check the status of a specific connector. Both connectors should be running like below.

image-20240614-114413.png

image-20240614-114434.png

  1. Configure Apache NiFi: Access the NiFi UI at http://localhost:9090/nifi to set up the data flow.

image-20240614-115007.png

  1. Run NestJS Application: Ensure the NestJS application is running and properly configured to consume Kafka messages. Use CLI tool with the help of below commands :

npm i kafkajs npm i dotenv npm install npm run build npm run start npm run start:dev

  1. Create Database and Tables: Use pgAdmin to create the postgres database and the necessary tables (source_db, destination_db, destination_db_transformed).

  2. Create .env file for secret configurations. Use Dotenv NPM Package.

Kafka-UI

UI for Apache Kafka is a versatile, fast, and lightweight web UI for managing Apache Kafka® clusters. Built by developers, for developers.

The app is a free, open-source web UI to monitor and manage Apache Kafka clusters.

image-20240614-103146.png

Shutting Down

To stop and remove all containers, networks, and volumes, run:

docker-compose down -v

Conclusion

This setup demonstrates a complete data pipeline for CDC replication, ETL processes, and data exposure using a combination of Kafka, Kafka Connect, Debezium, Apache NiFi, and NestJS. Docker Compose orchestrates the deployment, ensuring all components are integrated seamlessly and can scale effectively. This solution provides a robust framework for real-time data processing and transformation.

Architecture/Diagrams

image-20240614-104024.png

source-kafka-sink-connectors-20240607-133831.png

image-20240614-104136.png

image-20240614-104232.png

image-20240614-104253.png

References

0
Subscribe to my newsletter

Read articles from Manish Agrawal directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Manish Agrawal
Manish Agrawal

Over 15 Years of Expertise in Software Development and Engineering I specialize in delivering innovative solutions across diverse programming languages, platforms, and architectures. 💡 Technical Expertise Backend: Node.js (Nest.js, Express.js), Java (Spring Boot), PHP (Laravel, CodeIgniter, YII, Phalcon, Symphony, CakePHP) Frontend: React, Angular, Vue, TypeScript, JavaScript, Bootstrap, Material design, Tailwind CMS: WordPress, MediaWiki, Moodle, Strapi Headless, Drupal, Magento, Joomla DevOps & Cloud: AWS, Azure, GCP, OpenShift, CI/CD, Docker, Kubernetes, Terraform, Ansible, GitHub Actions, Gitlab CI/CD, GitOps, Argo CD, Jenkins, Shell Scripting, Linux Observability & Monitoring: Datadog, Prometheus, Grafana, ELK Stack, PowerBI, Tableau Databases: MySQL, MariaDB, MongoDB, PostgreSQL, Elasticsearch Caching: Redis, Mamcachad Data Engineering & Streaming: Apache NiFi, Apache Flink, Kafka, RabbitMQ API Design: REST, gRPC, GraphQL Principles & Practices: SOLID, DRY, KISS, TDD Architectural Patterns: Microservices, Monolithic, Microfronend, Event-Driven, Serverless, OOPs Design Patterns: Singleton, Factory, Observer, Repository, Service Mesh, Sidecar Pattern Project Management: Agile, JIRA, Confluence, MS Excel Testing & Quality: Postman, Jest, SonarQube, Cucumber Architectural Tools: Draw.io, Lucid, Excalidraw 👥 Versatile Professional From small-scale projects to enterprise-grade solutions, I have excelled both as an individual contributor and as part of dynamic teams. 🎯 Lifelong Learner Beyond work, I’m deeply committed to personal and professional growth, dedicating my spare time to exploring new technologies. 🔍 Passionate about Research & Product Improvement & Reverse Engineering I’m dedicated to exploring and enhancing existing products, always ready to take on challenges to identify root causes and implement effective solutions. 🧠 Adaptable & Tech-Driven I thrive in dynamic environments and am always eager to adapt and work with new and emerging technologies. 🌱 Work Culture I Value I thrive in environments that foster autonomy, respect, and innovation — free from micromanagement, unnecessary bureaucracy. I value clear communication, open collaboration, self organizing teams,appreciation, rewards and continuous learning. 🧠 Core Belief I believe every problem has a solution—and every solution uncovers new challenges to grow from. 🌟 Let's connect to collaborate, innovate, and build something extraordinary together!