Apache Flink Change Data Capture (CDC)


Overview
This page provides a step-by-step guide to setting up and running a Change Data Capture (CDC) pipeline using Flink CDC to capture changes in a PostgreSQL database and replicate them to another PostgreSQL database.
Prerequisites
PostgreSQL: Ensure you have PostgreSQL installed and configured.
Flink: Set up a Flink environment.
Docker: Required for running services in containers.
PostgreSQL Setup
Configure PostgreSQL for Logical Replication
Edit the
postgresql.conf
file and add the following configurations:wal_level = logical max_replication_slots = 4 max_wal_senders = 4
Restart PostgreSQL for the changes to take effect.
Monitoring PostgreSQL Settings
You can monitor relevant PostgreSQL settings using the following queries:
SELECT name, setting FROM pg_settings WHERE name = 'wal_level'; SELECT name, setting FROM pg_settings WHERE name = 'max_connections'; SELECT name, setting FROM pg_settings; SELECT name, setting FROM pg_settings WHERE name = 'max_replication_slots'; SELECT name, setting FROM pg_settings WHERE name IN ('wal_level', 'max_connections', 'max_replication_slots');
Create Publication
Run the following SQL command to create a publication that captures all table changes:
CREATE PUBLICATION nifi_publication FOR ALL TABLES;
Provide required permissions
Run below SQL command on source database to give replication permission to the use
GRANT rds_replication TO <user>; NOTE: replace <user> with username
Run below SQL command to monitor for db updates.
ALTER TABLE <source_table> REPLICA IDENTITY FULL NOTE: Replace <source_table> with valid table name
Create Logical Replication Slot
Execute the following SQL command to create a replication slot named
flink_slot
that uses thepgoutput
plugin:SELECT * FROM pg_create_logical_replication_slot('flink_slot', 'pgoutput');
Flink CDC Setup
- Define Source and Sink Tables
- Create the source and sink tables in Flink SQL. The source table captures changes from the PostgreSQL database, and the sink table writes the captured data to the target PostgreSQL database.
# Goto your container terminal and navigate to below path to open Flink SQL
cd /opt/flink/bin
sql-client.sh
-- Source Table (captures changes from source_db table) CREATE TABLE source_db ( id BIGINT NOT NULL, first_name STRING, last_name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'debezium-json.schema-include' = 'false', 'decoding.plugin.name' = 'pgoutput', 'hostname' = 'host.docker.internal', 'port' = '5433', 'username' = 'postgres', 'password' = 'root', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'source_db', 'slot.name' = 'flink_slot', 'scan.incremental.snapshot.enabled' = 'true' ); -- Sink Table (writes data to destination_db table) CREATE TABLE destination_db ( id BIGINT NOT NULL, first_name STRING, last_name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://host.docker.internal:5433/postgres', 'username' = 'postgres', 'password' = 'root', 'table-name' = 'destination_db' );
Creating job for Inserting Data from Source to Sink
You can optionally insert some data into the source table to verify the pipeline functionality.
INSERT INTO destination_db SELECT id, first_name, last_name FROM source_db;
Running Flink in Docker
Docker Compose Configuration
Create a
docker-compose.yml
file to define Flink services :version: '3.8' services: jobmanager: user: flink:flink build: . ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager parallelism.default: 2 execution.checkpointing.interval: 10s execution.checkpointing.mode: EXACTLY_ONCE execution.checkpointing.timeout: 60s execution.checkpointing.min-pause: 500 execution.checkpointing.max-concurrent: 1 execution.fail-on-checkpointing.errors: false state.backend: filesystem state.checkpoints.dir: file:///opt/flink/state/checkpoints state.savepoints.dir: file:///opt/flink/state/savepoints volumes: - ./flink_state:/opt/flink/state taskmanager: user: flink:flink build: . depends_on: - jobmanager command: taskmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2 parallelism.default: 2 volumes: - ./flink_state:/opt/flink/state sql-client: image: flink:latest command: bin/sql-client.sh depends_on: - jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager rest.address: jobmanager volumes: - ./flink_state:/opt/flink/state volumes: flink_state:
Dockerfile for Flink
Create a
Dockerfile
to build a Flink image with required dependencies:FROM flink:1.18.0-scala_2.12 COPY ./lib /opt/flink/lib RUN apt-get update && apt-get install tree RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.6/flink-connector-jdbc_2.12-1.14.6.jar; \ wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/postgresql/postgresql/42.7.3/postgresql-42.7.3.jar
Testing the CDC Pipeline
Insert Data into Source Table
Insert some data into the source table (
source_db
) in the PostgreSQL database.INSERT INTO source_db (id, first_name, last_name) VALUES (1, 'John', 'Doe')
Verify Data in Sink Table
Use a PostgreSQL client to connect to the target database and check the contents of the sink table (
destination_db
). You should see the data you inserted into the source table replicated there.psql -h host.docker.internal -U postgres -d postgres SELECT * FROM destination_db;
Clean Up
Drop the tables if you no longer need them:
DROP TABLE source_db; DROP TABLE destination_db;
Additional Commands
Connect to the source PostgreSQL database:
psql -h host.docker.internal -U postgres -d postgres
Run PostgreSQL with specific configurations (for troubleshooting purposes):
postgres -c wal_level=logical -c max_wal_senders=10 -c max_replication_slots=10
Important Notes
Ensure all dependencies, including JDBC and PostgreSQL drivers, are correctly placed in the Flink
lib
directory.Modify the Docker Compose file and Dockerfile as needed to fit your specific environment and requirements.
Fault Tolerance
To achieve fault tolerance for Apache Flink CDC (Change Data Capture) jobs running in containers, especially in scenarios where the container restarts, DB failure or the job crashes, you can follow these strategies:
Enable Checkpointing:
Configure Flink to use checkpoints, which periodically save the state of your streaming application to a durable storage (e.g., file system, distributed file system like HDFS, cloud storage).
Upon container restart or job crash, Flink can restore the state from the last successful checkpoint. This ensures that processing can resume from where it left off, minimizing data loss and maintaining consistency.
Use Save-points:
Save-points are similar to checkpoints but are triggered manually. They capture the state of the entire application at a specific point in time.
In case of failures, you can restart your Flink job from a save-point, providing a more controlled recovery mechanism than automatic checkpoints.
Externalized State Backends:
Choose a state backend that supports external storage, like Rocks DB or filesystem-based backends.
Externalizing state ensures that even if the container or job fails, the state can be recovered from the external storage, maintaining application integrity.
Handle Restart Policies:
Configure your container orchestration tool (e.g., Kubernetes) with appropriate restart policies to automatically restart failed containers.
Use Flink job management features to define how jobs should behave upon failure, including retry policies and backoff strategies.
By implementing these strategies, you can enhance the fault tolerance of your Apache Flink CDC jobs running in containers, ensuring reliable and consistent data processing even in the face of failures.
Useful commands
# Example if Job ID: f3f932276256a287e1942f72313bfffb
# Make savepoint using docker-compose
docker-compose exec jobmanager ./bin/flink savepoint f3f932276256a287e1942f72313bfffb
# Restore savepoint using Flink SQL Client
SET 'execution.savepoint.path' = '/opt/flink/state/savepoints/savepoint-f3f932-55d8bd90aff7';
SET 'execution.checkpoint.path' = '/opt/flink/state/checkpoints/80ac0af942199f5de847b7ff611572fd';
# List all jobs
docker-compose exec jobmanager ./bin/flink list
Transformation example
Below sql script sets up a Flink job to perform Change Data Capture (CDC) from a PostgreSQL source to a PostgreSQL destination with some transformations.
Run below script with the command : ./bin/sql-client.sh -f ./bin/job-sql.sql
SET execution.checkpointing.interval = 10s;
SET state.backend = filesystem;
SET state.checkpoints.dir = 'file:///opt/flink/state/checkpoints';
SET state.savepoints.dir = 'file:///opt/flink/state/savepoints';
-- set sync mode
SET 'table.dml-sync' = 'true';
-- set the job name
SET 'pipeline.name' = 'SqlJob';
-- set the queue that the job submit to
SET 'yarn.application.queue' = 'root';
-- set the job parallelism
SET 'parallelism.default' = '100';
CREATE TABLE source_db (
id BIGINT NOT NULL,
first_name STRING,
last_name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'decoding.plugin.name' = 'pgoutput',
'hostname' = 'host.docker.internal',
'port' = '5433',
'username' = 'postgres',
'password' = 'root',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'source_db',
'slot.name' = 'flink_slot'
);
-- CREATE TABLE destination_db (
-- id BIGINT NOT NULL,
-- first_name STRING,
-- last_name STRING,
-- PRIMARY KEY (id) NOT ENFORCED
-- ) WITH (
-- 'connector' = 'jdbc',
-- 'url' = 'jdbc:postgresql://host.docker.internal:5433/postgres',
-- 'username' = 'postgres',
-- 'password' = 'root',
-- 'table-name' = 'destination_db'
-- );
CREATE TABLE destination_db_transformed (
id BIGINT NOT NULL,
full_name STRING,
created_at TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://host.docker.internal:5433/postgres',
'username' = 'postgres',
'password' = 'root',
'table-name' = 'destination_db_transformed'
);
-- INSERT INTO destination_db
-- SELECT id, first_name, last_name
-- FROM source_db;
INSERT INTO destination_db_transformed
SELECT
id,
CONCAT(first_name, ' ', last_name) AS full_name, NOW() AS created_at
FROM
source_db;
Conclusion
This Confluence page provides a comprehensive guide to setting up a CDC pipeline using Flink CDC to capture changes from a PostgreSQL database and replicate them to another PostgreSQL database. By following these steps and referencing the provided resources, you can establish a robust mechanism for keeping your target database in sync with your source data.
References
https://docs.risingwave.com/docs/current/ingest-from-postgres-cdc/
https://sap1ens.com/blog/2022/07/10/flink-cdc-for-postgres-lessons-learned/
https://stackoverflow.com/questions/76324459/data-persistence-for-apache-flink-sql-streaming-queries
https://gordonmurray.com/data/2023/10/25/using-checkpoints-in-apache-flink-jobs.html
https://github.com/gordonmurray/apache_flink_using_checkpoints
Subscribe to my newsletter
Read articles from Manish Agrawal directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Manish Agrawal
Manish Agrawal
Over 15 Years of Expertise in Software Development and Engineering I specialize in delivering innovative solutions across diverse programming languages, platforms, and architectures. 💡 Technical Expertise Backend: Node.js (Nest.js, Express.js), Java (Spring Boot), PHP (Laravel, CodeIgniter, YII, Phalcon, Symphony, CakePHP) Frontend: React, Angular, Vue, TypeScript, JavaScript, Bootstrap, Material design, Tailwind CMS: WordPress, MediaWiki, Moodle, Strapi Headless, Drupal, Magento, Joomla DevOps & Cloud: AWS, Azure, GCP, OpenShift, CI/CD, Docker, Kubernetes, Terraform, Ansible, GitHub Actions, Gitlab CI/CD, GitOps, Argo CD, Jenkins, Shell Scripting, Linux Observability & Monitoring: Datadog, Prometheus, Grafana, ELK Stack, PowerBI, Tableau Databases: MySQL, MariaDB, MongoDB, PostgreSQL, Elasticsearch Caching: Redis, Mamcachad Data Engineering & Streaming: Apache NiFi, Apache Flink, Kafka, RabbitMQ API Design: REST, gRPC, GraphQL Principles & Practices: SOLID, DRY, KISS, TDD Architectural Patterns: Microservices, Monolithic, Microfronend, Event-Driven, Serverless, OOPs Design Patterns: Singleton, Factory, Observer, Repository, Service Mesh, Sidecar Pattern Project Management: Agile, JIRA, Confluence, MS Excel Testing & Quality: Postman, Jest, SonarQube, Cucumber Architectural Tools: Draw.io, Lucid, Excalidraw 👥 Versatile Professional From small-scale projects to enterprise-grade solutions, I have excelled both as an individual contributor and as part of dynamic teams. 🎯 Lifelong Learner Beyond work, I’m deeply committed to personal and professional growth, dedicating my spare time to exploring new technologies. 🔍 Passionate about Research & Product Improvement & Reverse Engineering I’m dedicated to exploring and enhancing existing products, always ready to take on challenges to identify root causes and implement effective solutions. 🧠 Adaptable & Tech-Driven I thrive in dynamic environments and am always eager to adapt and work with new and emerging technologies. 🌱 Work Culture I Value I thrive in environments that foster autonomy, respect, and innovation — free from micromanagement, unnecessary bureaucracy. I value clear communication, open collaboration, self organizing teams,appreciation, rewards and continuous learning. 🧠 Core Belief I believe every problem has a solution—and every solution uncovers new challenges to grow from. 🌟 Let's connect to collaborate, innovate, and build something extraordinary together!