CDC with YugabyteDB, Debezium, Kafka and Parseable
In the previous blog CDC Pipeline with Debezium, Kafka and Parseable, we learnt the concept of CDC, its use cases and description of some of the components involved in the pipeline like Debezium and Kafka.
In this blog, we will use YugabyteDB as a source database and send the tracked changes to Parseable via Debezium and Kafka. We'll set up a test environment using Docker Compose. We'll then configure the Debezium connectors to capture changes from YugabyteDB and send them to Parseable for real-time analytics.
YugabyteDB
YugabyteDB is a PostgreSQL-compatible, high-performance, cloud-native, distributed SQL database. Unlike the traditional database replication system, YugabyteDB utilizes sharding to ensure high availability and fault tolerance. Sharding involves distributing data across multiple nodes in a cluster, where each node is responsible for storing a portion of the data.
By splitting the data into smaller pieces and distributing them among multiple nodes, YugabyteDB achieves parallelism and load balancing. In the event of a node failure, the shard based design ensures that the remaining nodes can seamlessly take over the responsibility of serving the data, maintaining uninterrupted availability.
It supports transactional CDC guaranteeing changes across tables are captured together. This enables use cases like real-time analytics, data warehousing, operational data replication, and event-driven architectures.
YugabyteDB supports the following methods for reading change events.
PostgreSQL Replication Protocol: This method uses the PostgreSQL replication protocol, ensuring compatibility with PostgreSQL CDC systems. Logical replication operates through a publish-subscribe model. It replicates data objects and their changes based on the replication identity. It works as follows:
Create Publications in the YugabyteDB cluster similar to PostgreSQL.
Deploy the YugabyteDB Connector in your preferred Kafka Connect environment.
The connector uses replication slots to capture change events and publishes them directly to a Kafka topic.
YugabyteDB gRPC Replication Protocol: This method involves setting up a change stream in YugabyteDB that uses the native gRPC replication protocol to publish change events. It works as follows:
Establish a change stream in the YugabyteDB cluster using the yb_admin CLI commands.
Deploy the YugabyteDB gRPC Connector in your preferred Kafka Connect environment.
The connector captures change events using YugabyteDB's native gRPC replication and directly publishes them to a Kafka topic.
Setup
Run the following commands to set up the Docker Compose environment
sudo wget https://raw.githubusercontent.com/parseablehq/blog-samples/main/cdc-pipeline-yugabyte-kafka/docker-compose.yaml
sudo docker compose -f docker-compose.yaml up -d
sudo docker ps
Setup YugabyteDB
We will use the PostgreSQL replication protocol for this demo.
YugabyteDB CDC captures changes made to data in the database and streams those changes to external processes, applications, or other databases. CDC allows you to track and propagate changes in a YugabyteDB database to downstream consumers based on its Write-Ahead Log (WAL). YugabyteDB CDC captures row-level changes resulting from INSERT, UPDATE, and DELETE operations in the configured database and publishes it further to be consumed by downstream applications.
In order to set up the components in YugabyteDB server, use below steps -
sudo docker exec -it yugabyte bash
bin/yugabyted status
Verify the status of the server as Running
and copy the ip address. This is needed for the next steps.
Next, we need to create the CDC stream id. In YugabyteDB, a CDC (Change Data Capture) stream ID is used to identify a specific change data capture stream that tracks changes in a database.
A CDC stream in YugabyteDB is a mechanism to capture real-time changes (inserts, updates, deletes) from a database or a specific set of tables. This stream is used to replicate data changes to downstream systems. When a new CDC stream is created, it is assigned a unique stream ID. This identifier is used to reference and manage the CDC stream for tasks such as configuring connectors, monitoring changes, or deleting the stream.
In order to create the stream id, run below command in the YugabyteDB container -
./bin/yb-admin --master_addresses <ip-address>:7100 create_change_data_stream ysql.yugabyte
You can fetch the CDC Stream ID
at any point by running below command -
yb-admin --master_addresses <ip-address>:7100 list_change_data_streams
You will need to capture the CDC Stream ID
for the next steps.
Next, download the script that creates the tables and insert some records in the YugabyteDB. In a different terminal, run below command:
sudo wget https://raw.githubusercontent.com/parseablehq/blog-samples/main/cdc-pipeline-yugabyte-kafka/scripts.sql
You can switch back to the YugabyteDB terminal and run below command to start the ysql
bin/ysqlsh --host=<ip-address>
You can copy the contents of the scripts.sql
and paste it in the terminal to execute all statements. This will create 4 tables in this YugabyteDB:
users
reviews
products
orders
Verify by running below command
\d
You will see the results as -
List of relations
Schema | Name | Type | Owner
--------+-----------------+----------+----------
public | orders | table | yugabyte
public | orders_id_seq | sequence | yugabyte
public | products | table | yugabyte
public | products_id_seq | sequence | yugabyte
public | reviews | table | yugabyte
public | reviews_id_seq | sequence | yugabyte
public | users | table | yugabyte
public | users_id_seq | sequence | yugabyte
(8 rows)
Setup Kafka Connectors
Now we need to create the source and target connectors in the kafka-connect
service which is running on localhost:8083
. Run below command to create yugabyte cdc connector and the parseable sink connectors in kafka-connect
.
Debezium yugabyte connector captures the changes from all the tables in YugabyteDB and publishes the events to different topics in Kafka. Parseable sink connectors consumes the events from each of the topics and send them to different streams in Parseable.
sudo wget https://raw.githubusercontent.com/parseablehq/blog-samples/main/cdc-pipeline-yugabyte-kafka/debezium-connector-yugabyte-orders.json \
https://raw.githubusercontent.com/parseablehq/blog-samples/main/cdc-pipeline-yugabyte-kafka/debezium-connector-yugabyte-products.json \
https://raw.githubusercontent.com/parseablehq/blog-samples/main/cdc-pipeline-yugabyte-kafka/debezium-connector-yugabyte-users.json \
https://raw.githubusercontent.com/parseablehq/blog-samples/main/cdc-pipeline-yugabyte-kafka/debezium-connector-yugabyte-reviews.json \
https://raw.githubusercontent.com/parseablehq/blog-samples/main/cdc-pipeline-yugabyte-kafka/parseable-connector-orders.json \
https://raw.githubusercontent.com/parseablehq/blog-samples/main/cdc-pipeline-yugabyte-kafka/parseable-connector-products.json \
https://raw.githubusercontent.com/parseablehq/blog-samples/main/cdc-pipeline-yugabyte-kafka/parseable-connector-users.json \
https://raw.githubusercontent.com/parseablehq/blog-samples/main/cdc-pipeline-yugabyte-kafka/parseable-connector-reviews.json
You need to replace the ip address present in the properties database.hostname
and database.master.addresses
in all the debezium connector json files with the ip address captured above when you executed bin/yugabyted status
command in the yugabyte shell. You also need to replace the cdc stream id present in the property database.streamid
in all the debezium connector json files with the stream id captured when you executed yb-admin --master_addresses <ip-address>:7100 list_change_data_streams
command in the yugabyte shell.
Then, you can run the below commands to create the source and sink connectors in kafka-connect.
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @debezium-connector-yugabyte-orders.json
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @debezium-connector-yugabyte-products.json
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @debezium-connector-yugabyte-users.json
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @debezium-connector-yugabyte-reviews.json
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @parseable-connector-orders.json
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @parseable-connector-products.json
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @parseable-connector-users.json
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @parseable-connector-reviews.json
Run below commands to list the connectors and verify if all connectors are setup successfully
curl -X GET http://localhost:8083/connectors
curl -s http://localhost:8083/connectors/ybconnector1/status | jq .
Parseable
Parseable is a powerful cloud native log analytics platform built for high speed log ingestion, query and analysis.
You can access the Parseable console at http://localhost:8000 using username as admin
and password as admin
.
Log Explore and Query
Once logged in, you will see the streams on the landing page. You can navigate to each of the stream and see the log event for all inserts/updates/deletes in all the tables used in the Yugabyte DB. You can apply filters and search for a particular log event to view the changes, save the queries and do lot more to enhance your debugging experience with the events.
Dashboards for Analytics
Parseable has in built dashboard features which allow you to visualize the log data and perform better analytics. For this use-case, you can use Parseable dashboards to
perform the real-time analytics
audit logs for all the record level changes in the tables along with the schema changes happening in the database
fraud detection
Conclusion
In this tutorial, you are able to successfully build a CDC pipeline that captures changes from YugabyteDB, streams them through Kafka, and ingests them into Parseable. This setup provides real-time data synchronization, opening up a world of possibilities for data analysis and system integration.
Subscribe to my newsletter
Read articles from Nikhil Sinha directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by