Apache Flink Change Data Capture (CDC)

Manish AgrawalManish Agrawal
7 min read

Overview

This page provides a step-by-step guide to setting up and running a Change Data Capture (CDC) pipeline using Flink CDC to capture changes in a PostgreSQL database and replicate them to another PostgreSQL database.

Prerequisites

  • PostgreSQL: Ensure you have PostgreSQL installed and configured.

  • Flink: Set up a Flink environment.

  • Docker: Required for running services in containers.

PostgreSQL Setup

  1. Configure PostgreSQL for Logical Replication

    • Edit the postgresql.conf file and add the following configurations:

      wal_level = logical max_replication_slots = 4 max_wal_senders = 4

    • Restart PostgreSQL for the changes to take effect.

    • Monitoring PostgreSQL Settings

      • You can monitor relevant PostgreSQL settings using the following queries:

          SELECT name, setting FROM pg_settings WHERE name = 'wal_level';
        
          SELECT name, setting FROM pg_settings WHERE name = 'max_connections';
        
          SELECT name, setting FROM pg_settings;
        
          SELECT name, setting FROM pg_settings WHERE name = 'max_replication_slots';
        
          SELECT name, setting FROM pg_settings WHERE name IN ('wal_level', 'max_connections', 'max_replication_slots');
        
  2. Create Publication

    • Run the following SQL command to create a publication that captures all table changes:

      CREATE PUBLICATION nifi_publication FOR ALL TABLES;

  3. Provide required permissions

    • Run below SQL command on source database to give replication permission to the use

      GRANT rds_replication TO <user>; NOTE: replace <user> with username

    • Run below SQL command to monitor for db updates.

      ALTER TABLE <source_table> REPLICA IDENTITY FULL NOTE: Replace <source_table> with valid table name

  4. Create Logical Replication Slot

    • Execute the following SQL command to create a replication slot named flink_slot that uses the pgoutput plugin:

      SELECT * FROM pg_create_logical_replication_slot('flink_slot', 'pgoutput');

Flink CDC Setup

  1. Define Source and Sink Tables
  • Create the source and sink tables in Flink SQL. The source table captures changes from the PostgreSQL database, and the sink table writes the captured data to the target PostgreSQL database.
# Goto your container terminal and navigate to below path to open Flink SQL

cd /opt/flink/bin
sql-client.sh
  •   -- Source Table (captures changes from source_db table)
      CREATE TABLE source_db (
        id BIGINT NOT NULL,
        first_name STRING,
        last_name STRING,
        PRIMARY KEY (id) NOT ENFORCED
      ) WITH (
        'connector' = 'postgres-cdc',
        'debezium-json.schema-include' = 'false',
        'decoding.plugin.name' = 'pgoutput',
        'hostname' = 'host.docker.internal',
        'port' = '5433',
        'username' = 'postgres',
        'password' = 'root',
        'database-name' = 'postgres',
        'schema-name' = 'public',
        'table-name' = 'source_db',
        'slot.name' = 'flink_slot',
        'scan.incremental.snapshot.enabled' = 'true'
      );
    
      -- Sink Table (writes data to destination_db table)
      CREATE TABLE destination_db (
        id BIGINT NOT NULL,
        first_name STRING,
        last_name STRING,
        PRIMARY KEY (id) NOT ENFORCED
      ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://host.docker.internal:5433/postgres',
        'username' = 'postgres',
        'password' = 'root',
        'table-name' = 'destination_db'
      );
    
  1. Creating job for Inserting Data from Source to Sink

    • You can optionally insert some data into the source table to verify the pipeline functionality.

      INSERT INTO destination_db SELECT id, first_name, last_name FROM source_db;

Running Flink in Docker

  1. Docker Compose Configuration

    • Create a docker-compose.yml file to define Flink services :

        version: '3.8'
      
        services:
           jobmanager:
              user: flink:flink
              build: .
              ports:
                 - "8081:8081"
              command: jobmanager
              environment:
                 - |
                    FLINK_PROPERTIES=
                    jobmanager.rpc.address: jobmanager
                    parallelism.default: 2 
                    execution.checkpointing.interval: 10s
                    execution.checkpointing.mode: EXACTLY_ONCE
                    execution.checkpointing.timeout: 60s
                    execution.checkpointing.min-pause: 500
                    execution.checkpointing.max-concurrent: 1
                    execution.fail-on-checkpointing.errors: false
                    state.backend: filesystem
                    state.checkpoints.dir: file:///opt/flink/state/checkpoints
                    state.savepoints.dir: file:///opt/flink/state/savepoints
              volumes:
                 - ./flink_state:/opt/flink/state
      
           taskmanager:
              user: flink:flink
              build: .
              depends_on:
                 - jobmanager
              command: taskmanager
              environment:
                 - |
                    FLINK_PROPERTIES=
                    jobmanager.rpc.address: jobmanager
                    taskmanager.numberOfTaskSlots: 2
                    parallelism.default: 2
              volumes:
                 - ./flink_state:/opt/flink/state
      
           sql-client:
            image: flink:latest
            command: bin/sql-client.sh
            depends_on:
              - jobmanager
            environment:
              - |
                FLINK_PROPERTIES=
                jobmanager.rpc.address: jobmanager
                rest.address: jobmanager
            volumes:
              - ./flink_state:/opt/flink/state
      
        volumes:
          flink_state:
      
  2. Dockerfile for Flink

    • Create a Dockerfile to build a Flink image with required dependencies:

        FROM flink:1.18.0-scala_2.12
        COPY ./lib /opt/flink/lib
      
        RUN apt-get update && apt-get install tree
      
        RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.6/flink-connector-jdbc_2.12-1.14.6.jar; \
            wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/postgresql/postgresql/42.7.3/postgresql-42.7.3.jar
      

Testing the CDC Pipeline

  1. Insert Data into Source Table

    • Insert some data into the source table (source_db) in the PostgreSQL database.

      INSERT INTO source_db (id, first_name, last_name) VALUES (1, 'John', 'Doe')

  2. Verify Data in Sink Table

    • Use a PostgreSQL client to connect to the target database and check the contents of the sink table (destination_db). You should see the data you inserted into the source table replicated there.

      psql -h host.docker.internal -U postgres -d postgres SELECT * FROM destination_db;

Clean Up

  • Drop the tables if you no longer need them:

    DROP TABLE source_db; DROP TABLE destination_db;

Additional Commands

  • Connect to the source PostgreSQL database:

    psql -h host.docker.internal -U postgres -d postgres

  • Run PostgreSQL with specific configurations (for troubleshooting purposes):

    postgres -c wal_level=logical -c max_wal_senders=10 -c max_replication_slots=10

Important Notes

  • Ensure all dependencies, including JDBC and PostgreSQL drivers, are correctly placed in the Flink lib directory.

  • Modify the Docker Compose file and Dockerfile as needed to fit your specific environment and requirements.

Fault Tolerance

To achieve fault tolerance for Apache Flink CDC (Change Data Capture) jobs running in containers, especially in scenarios where the container restarts, DB failure or the job crashes, you can follow these strategies:

  • Enable Checkpointing:

    • Configure Flink to use checkpoints, which periodically save the state of your streaming application to a durable storage (e.g., file system, distributed file system like HDFS, cloud storage).

    • Upon container restart or job crash, Flink can restore the state from the last successful checkpoint. This ensures that processing can resume from where it left off, minimizing data loss and maintaining consistency.

  • Use Save-points:

    • Save-points are similar to checkpoints but are triggered manually. They capture the state of the entire application at a specific point in time.

    • In case of failures, you can restart your Flink job from a save-point, providing a more controlled recovery mechanism than automatic checkpoints.

  • Externalized State Backends:

    • Choose a state backend that supports external storage, like Rocks DB or filesystem-based backends.

    • Externalizing state ensures that even if the container or job fails, the state can be recovered from the external storage, maintaining application integrity.

  • Handle Restart Policies:

    • Configure your container orchestration tool (e.g., Kubernetes) with appropriate restart policies to automatically restart failed containers.

    • Use Flink job management features to define how jobs should behave upon failure, including retry policies and backoff strategies.

By implementing these strategies, you can enhance the fault tolerance of your Apache Flink CDC jobs running in containers, ensuring reliable and consistent data processing even in the face of failures.

Useful commands

# Example if Job ID:  f3f932276256a287e1942f72313bfffb

# Make savepoint using docker-compose
docker-compose exec jobmanager ./bin/flink savepoint  f3f932276256a287e1942f72313bfffb  

# Restore savepoint using Flink SQL Client
SET 'execution.savepoint.path' = '/opt/flink/state/savepoints/savepoint-f3f932-55d8bd90aff7';

SET 'execution.checkpoint.path' = '/opt/flink/state/checkpoints/80ac0af942199f5de847b7ff611572fd';

# List all jobs
docker-compose exec jobmanager ./bin/flink list

Transformation example

Below sql script sets up a Flink job to perform Change Data Capture (CDC) from a PostgreSQL source to a PostgreSQL destination with some transformations.

Run below script with the command : ./bin/sql-client.sh -f ./bin/job-sql.sql

SET execution.checkpointing.interval = 10s;
SET state.backend = filesystem;
SET state.checkpoints.dir = 'file:///opt/flink/state/checkpoints';
SET state.savepoints.dir = 'file:///opt/flink/state/savepoints';

-- set sync mode
SET 'table.dml-sync' = 'true';

-- set the job name
SET 'pipeline.name' = 'SqlJob';

-- set the queue that the job submit to
SET 'yarn.application.queue' = 'root';

-- set the job parallelism
SET 'parallelism.default' = '100';


CREATE TABLE source_db (
  id BIGINT NOT NULL,
  first_name STRING,
  last_name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'postgres-cdc',
  'decoding.plugin.name' = 'pgoutput',
  'hostname' = 'host.docker.internal',
  'port' = '5433',
  'username' = 'postgres',
  'password' = 'root',
  'database-name' = 'postgres',
  'schema-name' = 'public',
  'table-name' = 'source_db',
  'slot.name' = 'flink_slot'
);

-- CREATE TABLE destination_db (
--   id BIGINT NOT NULL,
--   first_name STRING,
--   last_name STRING,
--   PRIMARY KEY (id) NOT ENFORCED
-- ) WITH (
--   'connector' = 'jdbc',
--   'url' = 'jdbc:postgresql://host.docker.internal:5433/postgres',
--   'username' = 'postgres',
--   'password' = 'root',
--   'table-name' = 'destination_db'
-- );

CREATE TABLE destination_db_transformed (
  id BIGINT NOT NULL,
  full_name STRING,
  created_at TIMESTAMP,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://host.docker.internal:5433/postgres',
  'username' = 'postgres',
  'password' = 'root',
  'table-name' = 'destination_db_transformed'
);

-- INSERT INTO destination_db
-- SELECT id, first_name, last_name
-- FROM source_db;

INSERT INTO destination_db_transformed
SELECT
  id,
  CONCAT(first_name, ' ', last_name) AS full_name, NOW() AS created_at
FROM
  source_db;

Conclusion

This Confluence page provides a comprehensive guide to setting up a CDC pipeline using Flink CDC to capture changes from a PostgreSQL database and replicate them to another PostgreSQL database. By following these steps and referencing the provided resources, you can establish a robust mechanism for keeping your target database in sync with your source data.

References

0
Subscribe to my newsletter

Read articles from Manish Agrawal directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Manish Agrawal
Manish Agrawal

Over 15 Years of Expertise in Software Development and Engineering I specialize in delivering innovative solutions across diverse programming languages, platforms, and architectures. 💡 Technical Expertise Backend: Node.js (Nest.js, Express.js), Java (Spring Boot), PHP (Laravel, CodeIgniter, YII, Phalcon, Symphony, CakePHP) Frontend: React, Angular, Vue, TypeScript, JavaScript, Bootstrap, Material design, Tailwind CMS: WordPress, MediaWiki, Moodle, Strapi Headless, Drupal, Magento, Joomla DevOps & Cloud: AWS, Azure, GCP, OpenShift, CI/CD, Docker, Kubernetes, Terraform, Ansible, GitHub Actions, Gitlab CI/CD, GitOps, Argo CD, Jenkins, Shell Scripting, Linux Observability & Monitoring: Datadog, Prometheus, Grafana, ELK Stack, PowerBI, Tableau Databases: MySQL, MariaDB, MongoDB, PostgreSQL, Elasticsearch Caching: Redis, Mamcachad Data Engineering & Streaming: Apache NiFi, Apache Flink, Kafka, RabbitMQ API Design: REST, gRPC, GraphQL Principles & Practices: SOLID, DRY, KISS, TDD Architectural Patterns: Microservices, Monolithic, Microfronend, Event-Driven, Serverless, OOPs Design Patterns: Singleton, Factory, Observer, Repository, Service Mesh, Sidecar Pattern Project Management: Agile, JIRA, Confluence, MS Excel Testing & Quality: Postman, Jest, SonarQube, Cucumber Architectural Tools: Draw.io, Lucid, Excalidraw 👥 Versatile Professional From small-scale projects to enterprise-grade solutions, I have excelled both as an individual contributor and as part of dynamic teams. 🎯 Lifelong Learner Beyond work, I’m deeply committed to personal and professional growth, dedicating my spare time to exploring new technologies. 🔍 Passionate about Research & Product Improvement & Reverse Engineering I’m dedicated to exploring and enhancing existing products, always ready to take on challenges to identify root causes and implement effective solutions. 🧠 Adaptable & Tech-Driven I thrive in dynamic environments and am always eager to adapt and work with new and emerging technologies. 🌱 Work Culture I Value I thrive in environments that foster autonomy, respect, and innovation — free from micromanagement, unnecessary bureaucracy. I value clear communication, open collaboration, self organizing teams,appreciation, rewards and continuous learning. 🧠 Core Belief I believe every problem has a solution—and every solution uncovers new challenges to grow from. 🌟 Let's connect to collaborate, innovate, and build something extraordinary together!