Change Data Capture (CDC) - Various Approaches

Manish AgrawalManish Agrawal
9 min read

Overview

Change Data Capture (CDC) is a design pattern used to track and respond to data changes. This document provides an overview and practical approaches to implement CDC without using Kafka and NiFi. The methods discussed include:

  • Custom Application Logic

  • AWS DMS (Database Migration Service)

  • Querying/Polling

  • Database Triggers/Event Sourcing

  • Database Change Streams (DB Pub/Sub) / Embedded CDC

  • CDC as a Service

  • Apache Nifi

  • WebSocket-based CDC

  • Talend

  • Apache Flink CDC

  • Airbyte

Custom Application Logic

  • Custom Middleware: Develop middleware to listen to database changes and process them according to business needs. Implement CDC logic within the application layer to track changes and push them to the target system.

  • Polling and Timestamp-based Queries: Utilize native APIs (e.g., JDBC for Java, SQLAlchemy for Python) to programmatically extract and manipulate data changes.

  • Database Log Reading: Read database transaction logs to capture changes.

  • Debezium also offers an embedded mode, allowing developers to embed Debezium’s change data capture capabilities directly into their Java applications without relying on Kafka Connect.Some CDC tools, like Debezium, offer options for direct integration with databases without requiring Kafka. For example, Debezium can be used in embedded mode within Java applications to capture changes from databases and process them without Kafka.

Create Middleware for CDC:

// Middleware to log changes
app.use((req, res, next) => {
  if (['POST', 'PUT', 'DELETE'].includes(req.method)) {
    // Log change
    const changeLog = {
      method: req.method,
      body: req.body,
      timestamp: new Date(),
    };
    // Save changeLog to a separate system or apply it directly to MySQL
  }
  next();
});

Process and Apply Changes:

  • Create a script or service to process these logs and apply changes to MySQL.
const { Client } = require('pg');
const mysql = require('mysql2/promise');

// PostgreSQL client setup
const pgClient = new Client({
  user: 'your_pg_user',
  host: 'your_pg_host',
  database: 'your_pg_db',
  password: 'your_pg_password',
  port: 5432,
});

pgClient.connect();

// MySQL client setup
const mysqlConnection = await mysql.createConnection({
  host: 'your_mysql_host',
  user: 'your_mysql_user',
  database: 'your_mysql_db',
  password: 'your_mysql_password',
});

// Function to process changes from PostgreSQL to MySQL
async function processChanges() {
  const res = await pgClient.query('SELECT * FROM change_log WHERE processed = false');
  for (let row of res.rows) {
    let query = '';
    const data = JSON.parse(row.data);

    if (row.method === 'POST') {
      query = `INSERT INTO ${row.table_name} SET ?`;
    } else if (row.method === 'PUT') {
      query = `UPDATE ${row.table_name} SET ? WHERE id = ?`;
    } else if (row.method === 'DELETE') {
      query = `DELETE FROM ${row.table_name} WHERE id = ?`;
    }

    await mysqlConnection.execute(query, [data, data.id]);
    await pgClient.query('UPDATE change_log SET processed = true WHERE id = $1', [row.id]);
  }
}

// Polling interval (e.g., every 5 seconds)
setInterval(processChanges, 5000);

AWS DMS (Database Migration Service)

  • Continuous Data Replication: AWS DMS reads changes from the source database's transaction logs and replicates them to the target database or stream.

  • Amazon Kinesis: Stream database changes into Kinesis for real-time processing and analytics.

  • Amazon EMR with Apache Spark: Use Spark Streaming for real-time data processing from various data sources.

  • AWS Glue: For data cataloging and ETL processes.

  • AWS Lambda: Process database change events in real-time, applying transformations or actions.

  • Main Components:

    • Replication Task: Defines migration and transformation rules.

    • Endpoint: Connection details for source and target databases.

    • Transformation Rules: Modify data during replication.

    • Table Mapping: Map source to target tables and columns.

    • Validation and Monitoring: Ensure data integrity and monitor replication.

Example Transformation Rule:

{
  "rules": [
    {
      "rule-type": "transformation",
      "rule-id": "1",
      "rule-name": "CombineNames",
      "rule-action": "combine",
      "rule-target": "column",
      "object-locator": {
        "schema-name": "public",
        "table-name": "employees"
      },
      "value": {
        "source-columns": ["first_name", "last_name"],
        "separator": " ",
        "target-column": "full_name"
      }
    }
  ]
}

Querying/Polling

  • Periodically query the source database to detect changes using SQL queries based on timestamps or version numbers.

  • Example: Develop a script to read changes from pg_logical, transform them into MySQL-compatible SQL statements, and apply them to MySQL.

  • Downside: Frequent querying can impact database performance.

DB Trigger/Event Sourcing

  • Database Triggers: Log changes to a separate table (CDC log table) using triggers for INSERT, UPDATE, and DELETE operations.

  • Event Sourcing: Capture every change to the state of an application as an event, stored in an event store for state reconstruction.

Set Up Change Log Table:

CREATE TABLE audit_table (
    id SERIAL PRIMARY KEY,
    operation VARCHAR(10),
    table_name VARCHAR(50),
    data JSON,
    processed BOOLEAN DEFAULT false,
    timestamp TIMESTAMPTZ DEFAULT NOW()
);

PostgreSQL Triggers:

CREATE OR REPLACE FUNCTION log_changes() RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        INSERT INTO audit_table (operation, table_name, data, changed_at)
        VALUES ('INSERT', TG_TABLE_NAME, row_to_json(NEW), current_timestamp);
        RETURN NEW;
    ELSIF TG_OP = 'UPDATE' THEN
        INSERT INTO audit_table (operation, table_name, data, changed_at)
        VALUES ('UPDATE', TG_TABLE_NAME, row_to_json(NEW), current_timestamp);
        RETURN NEW;
    ELSIF TG_OP = 'DELETE' THEN
        INSERT INTO audit_table (operation, table_name, data, changed_at)
        VALUES ('DELETE', TG_TABLE_NAME, row_to_json(OLD), current_timestamp);
        RETURN OLD;
    END IF;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER my_table_changes
AFTER INSERT OR UPDATE OR DELETE ON my_table
FOR EACH ROW EXECUTE FUNCTION log_changes();

Node.js Polling:

const { Client } = require('pg');
const mysql = require('mysql2/promise');

// PostgreSQL client setup
const pgClient = new Client({
  user: 'your_pg_user',
  host: 'your_pg_host',
  database: 'your_pg_db',
  password: 'your_pg_password',
  port: 5432,
});

pgClient.connect();

// MySQL client setup
const mysqlConnection = await mysql.createConnection({
  host: 'your_mysql_host',
  user: 'your_mysql_user',
  database: 'your_mysql_db',
  password: 'your_mysql_password',
});

// Function to process changes from PostgreSQL to MySQL
async function processChanges() {
  const res = await pgClient.query('SELECT * FROM audit_table WHERE processed = false');
  for (let row of res.rows) {
    let query = '';
    const data = JSON.parse(row.data);

    if (row.operation === 'INSERT') {
      query = `INSERT INTO ${row.table_name} SET ?`;
    } else if (row.operation === 'UPDATE') {
      query = `UPDATE ${row.table_name} SET ? WHERE id = ?`;
    } else if (row.operation === 'DELETE') {
      query = `DELETE FROM ${row.table_name} WHERE id = ?`;
    }

    await mysqlConnection.execute(query, [data, data.id]);
    await pgClient.query('UPDATE audit_table SET processed = true WHERE id = $1', [row.id]);
  }
}

// Polling interval (e.g., every 5 seconds)
setInterval(processChanges, 5000);

Database Change Streams/Embedded CDC

  • Utilize native support for change streams or publication/subscription models provided by modern databases (e.g., PostgreSQL logical replication, MongoDB change streams).

  • Example: Use pg_logical, wal2json, or pgoutput on PostgreSQL to capture changes from WAL logs.

-- Enable logical replication
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 4;
ALTER SYSTEM SET max_wal_senders = 4;
SELECT pg_reload_conf();

-- Create a Replication Slot and Publication:
CREATE PUBLICATION my_publication FOR TABLE your_table;

SELECT * FROM pg_create_logical_replication_slot('slot_name', 'pgoutput');




Nodejs subscriber : 


const { Client } = require('pg');
const mysql = require('mysql2');

const pgClient = new Client({
  user: 'your_pg_user',
  host: 'your_pg_host',
  database: 'your_pg_database',
  password: 'your_pg_password',
  port: 5432,
});

const mysqlConnection = mysql.createConnection({
  host: 'your_mysql_host',
  user: 'your_mysql_user',
  database: 'your_mysql_database',
  password: 'your_mysql_password',
});

pgClient.connect();

async function getChanges() {
  const query = `START_REPLICATION SLOT my_slot LOGICAL 0/0 (proto_version '1', publication_names 'my_publication')`;
  const res = await pgClient.query(query);
  res.on('data', async (data) => {
    const change = parseChange(data);
    if (change) {
      await handleInsert(change);
    }
  });
}

function parseChange(data) {
  // Logic to parse the logical replication change
  // Example for an insert:
  if (data.startsWith('BEGIN')) {
    return null;
  }
  if (data.startsWith('COMMIT')) {
    return null;
  }
  if (data.startsWith('table public.my_table: INSERT:')) {
    const matches = data.match(/first_name\[text\]:'(.*?)' last_name\[text\]:'(.*?)'/);
    if (matches) {
      return {
        firstName: matches[1],
        lastName: matches[2],
      };
    }
  }
  return null;
}

async function handleInsert(change) {
  const fullName = `${change.firstName} ${change.lastName}`;
  const query = 'INSERT INTO your_mysql_table (full_name) VALUES (?)';
  await mysqlConnection.promise().query(query, [fullName]);
}

getChanges().catch(console.error);

CDC as a Service

  • Managed CDC Services: AWS DMS, Azure Data Factory, Google Cloud Dataflow, Attunity (Qlik) Replicate, HVR, Talend Open Studio, Striim, Oracle GoldenGate, Debezium on Red Hat OpenShift, Confluent Cloud, Fivetran, StreamSets.

  • Oracle GoldenGate replicates database transactions in real time within and across data centers to keep Oracle and non-Oracle data highly available, to prepare and propagate data for analytics and data science, and to ingest and analyze data while “in-motion.” GoldenGate offers a unique approach to connect heterogeneous data sources and targets to align operational continuity and analytic insights into a single data fabric. The list below illustrates all the connectivity.This solution by Oracle specifically targets Oracle databases and offers real-time data integration and replication capabilities.

WebSocket-based CDC

  • Use WebSocket connections to stream real-time changes from databases to client applications, suitable for real-time dashboards and monitoring.

Kafka and NiFi Alternatives

  • Kafka Alternatives: Apache Pulsar, Redpanda.

  • NiFi Alternatives: StreamSets Data Collector, Apache Airflow.

Talend

  • Talend: Perform data integration and ETL processes.Talend Change Data Capture is a complete Data Event Processing solution. It captures the transactions (or events) of a database (mainly insert, update, delete) in real time, in order to work on them and send them to targets.Talend Change Data Capture provides a comprehensive solution for online backup, distribution to eBusiness Internet/Intranet servers, decision support system data loading (real-time ETL), datawarehouse and big data incremental or real-time loading, inter-application exchange (EAI, ESB) requirements.
  • Use Apache Flink for CDC, ingesting and processing changes in real-time.Flink CDC is a distributed data integration tool for real time data and batch data. Flink CDC brings the simplicity and elegance of data integration via YAML to describe the data movement and transformation.

  • Flink CDC supports distributed scanning of historical data of database and then automatically switches to change data capturing. The switch uses the incremental snapshot algorithm which ensure the switch action does not lock the database.

Airbyte

  • An open-source data integration platform supporting CDC for various databases and APIs.

  • Many popular databases may be configured to create an on-disk log known as a transaction log which is also known as a write-ahead log (WAL). This log stores a record of changes that are made to the database. Examples of events that are written into this transaction log are inserts, updates, or deletes.

    When using CDC replication, the source database is configured to enable transaction logging. Then Airbyte is configured to read this log file and transmit the logged changes to a destination system. These changes are applied on the destination to keep it in sync with the source.

  • Airbyte supports log-based CDC from Postgres, MySQL, and Microsoft SQL Server to a large number of destinations.

  • To support CDC, Airbyte uses Debezium internally.

Terminology

  • Transaction Logs/Redo Logs: Database log reading.

  • Write-Ahead Log (WAL): E.g., PostgreSQL's WAL.

  • Logical Replication: Enable logical replication in PostgreSQL.

Conclusion

This guide provides a comprehensive overview and practical steps to implement CDC from PostgreSQL to MySQL without using Kafka and NiFi. Each method offers different benefits and can be chosen based on specific use cases and requirements.

References

0
Subscribe to my newsletter

Read articles from Manish Agrawal directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Manish Agrawal
Manish Agrawal

Over 15 Years of Expertise in Software Development and Engineering I specialize in delivering innovative solutions across diverse programming languages, platforms, and architectures. 💡 Technical Expertise Backend: Node.js (Nest.js, Express.js), Java (Spring Boot), PHP (Laravel, CodeIgniter, YII, Phalcon, Symphony, CakePHP) Frontend: React, Angular, Vue, TypeScript, JavaScript, Bootstrap, Material design, Tailwind CMS: WordPress, MediaWiki, Moodle, Strapi Headless, Drupal, Magento, Joomla DevOps & Cloud: AWS, Azure, GCP, OpenShift, CI/CD, Docker, Kubernetes, Terraform, Ansible, GitHub Actions, Gitlab CI/CD, GitOps, Argo CD, Jenkins, Shell Scripting, Linux Observability & Monitoring: Datadog, Prometheus, Grafana, ELK Stack, PowerBI, Tableau Databases: MySQL, MariaDB, MongoDB, PostgreSQL, Elasticsearch Caching: Redis, Mamcachad Data Engineering & Streaming: Apache NiFi, Apache Flink, Kafka, RabbitMQ API Design: REST, gRPC, GraphQL Principles & Practices: SOLID, DRY, KISS, TDD Architectural Patterns: Microservices, Monolithic, Microfronend, Event-Driven, Serverless, OOPs Design Patterns: Singleton, Factory, Observer, Repository, Service Mesh, Sidecar Pattern Project Management: Agile, JIRA, Confluence, MS Excel Testing & Quality: Postman, Jest, SonarQube, Cucumber Architectural Tools: Draw.io, Lucid, Excalidraw 👥 Versatile Professional From small-scale projects to enterprise-grade solutions, I have excelled both as an individual contributor and as part of dynamic teams. 🎯 Lifelong Learner Beyond work, I’m deeply committed to personal and professional growth, dedicating my spare time to exploring new technologies. 🔍 Passionate about Research & Product Improvement & Reverse Engineering I’m dedicated to exploring and enhancing existing products, always ready to take on challenges to identify root causes and implement effective solutions. 🧠 Adaptable & Tech-Driven I thrive in dynamic environments and am always eager to adapt and work with new and emerging technologies. 🌱 Work Culture I Value I thrive in environments that foster autonomy, respect, and innovation — free from micromanagement, unnecessary bureaucracy. I value clear communication, open collaboration, self organizing teams,appreciation, rewards and continuous learning. 🧠 Core Belief I believe every problem has a solution—and every solution uncovers new challenges to grow from. 🌟 Let's connect to collaborate, innovate, and build something extraordinary together!