Row-level transformations in Postgres CDC using Lua
Earlier this week, we launched PeerDB Streams, our latest product offering for real-time replication from Postgres to queues and message brokers such as Kafka, Redpanda, Google PubSub, Azure Event Hubs, and others.
Today, we are announcing one of the flagship features of this offering — support for row-level transformations as part of Postgres Change Data Capture (CDC). You can write simple Lua scripts to define a transformation and add it as part of the replication (MIRROR). With this feature, users will be able to seamlessly perform in-flight row-level transformations to Postgres data before it is streamed to the target.
In this blog, we will cover various use cases that require row-level transformations and how they can be accomplished using PeerDB. We will also walk through example use cases using sample Lua scripts. Toward the end, we will delve a bit deeper into why we chose Lua as the scripting language and how we implemented this feature.
Row-Level Transformation in Postgres CDC: Use Cases
There are multiple use cases that require row-level transformations during Postgres CDC. A few of the common scenarios include:
Masking PII Data: Replace sensitive PII with tokens or pseudonyms before data enters Kafka, obfuscating it from other micro-services in transactional outbox scenarios, thus enhancing privacy and compliance.
Changing Data Format: Transform data into required formats like Protobuf, JSON, MsgPack, Avro and so on for seamless integration and optimized handling across systems.
Generated Columns: Calculate new column values based on transformations of existing data, such as aggregations or derived metrics, and stream these new columns to enhance real-time data analysis and reporting.
Unnesting JSONs: Extract elements from JSON objects and flatten them into separate fields within Kafka messages, improving the accessibility and usability of data across different consumer applications.
Topic Routing: Distribute Change Data Capture (CDC) events to specific Kafka topics based on rules or conditions, facilitating targeted data streaming and processing.
Data Encryption: Apply encryption to sensitive data before it is written to Kafka, enhancing security and preventing unauthorized access as data moves between systems.
Now, let's see how some of the above use cases can be accomplished using PeerDB, through examples and sample Lua scripts.
Sample Schema
I will be using the users
table shown below to demonstrate the above use cases in PeerDB. This table includes various fields relevant to our testing scenarios."
CREATE TABLE users (
id SERIAL PRIMARY KEY,
first_name VARCHAR(255),
last_name VARCHAR(255),
ssn CHAR(11),
payload JSONB,
salary_in_usd NUMERIC(10, 2)
);
Masking the SSN Column
You can create a simple Lua script to mask the SSN column in the users table and add it as part of the MIRROR. See below:
local json = require 'json'
local function maskSSN(ssn)
if not ssn then
return nil
end
-- Replace all but the last four digits of the SSN with "XXX-XX-"
return string.gsub(ssn, "^(.-)(%d%d%d%d)$", "XXX-XX-%2")
end
local function RowToMap(row)
local map = peerdb.RowTable(row)
for col, val in pairs(map) do
local kind = peerdb.RowColumnKind(row, col)
if col == 'ssn' then
-- Apply the maskSSN function to the SSN column
map[col] = maskSSN(val)
elseif kind == 'bytes' or kind == 'bit' then
map[col] = json.bin(val)
end
end
return map
end
local RKINDMAP = {
insert = string.byte('i', 1),
update = string.byte('u', 1),
delete = string.byte('d', 1),
}
function onRecord(r)
local kind = RKINDMAP[r.kind]
if not kind then
return
end
local record = {
action = kind,
lsn = r.checkpoint,
time = r.commit_time,
source = r.source,
}
if r.old then
record.old = RowToMap(r.old)
end
if r.new then
record.new = RowToMap(r.new)
end
return json.encode(record)
end
PeerDB offers a straightforward script editor for creating the Lua script to define the transformation. After that, you can add this transformation through the UI while creating the MIRROR. See below demo for reference:
You can try this yourself in just 10 minutes by following this Quickstart guide.
Changing Data Format to MsgPack
For the users
table mentioned above, to change the data format to MsgPack, you can use this example Lua script. We've seen a few of our customers use MsgPack because MsgPack is more efficient than JSON because it uses a compact binary format, which reduces data size and speeds up both data transmission and parsing.
See the 2-minute demo below, which shows how this is done with PeerDB.
Generated Additional Columns
For the users table, let's say I want to add a new column, salary_in_cad
, as part of the replication, which converts the salary from dollars to Canadian dollars. You can create a script as shown in this example (see below) and add it as part of the MIRROR. Below is a snippet of the Lua script that does
local json = require "json"
local function RowToMap(row)
if not row then
return
end
local map = peerdb.RowTable(row)
map.salary_in_cad = map.salary_in_usd * 1.4
return map
end
local OPMAP = {
insert = "c",
update = "u",
delete = "d",
}
function onRecord(record)
local op = OPMAP[record.kind]
if not op then
return
end
return json.encode {
op = op,
before = RowToMap(record.old),
after = RowToMap(record.new),
commitms = record.commit_time.unix_milli,
table = record.source,
lsn = record.checkpoint,
}
end
Unnesting the Payload JSONB column
For the users table, let's say you want to unnest the payload JSONB column to separate fields in Kafka. You can create the script as shown in this example and add it as a part of the MIRROR.
Distribute Load of the Users Table Across Topics
For the users table, let's say you want to distribute the load across two Kafka topics, with odd IDs going to one topic and even IDs going to the other. You can create a script as shown in this example (see below) and add it as part of the MIRROR.
local bit32 = require "bit32"
local json = require "json"
local OPMAP = {
insert = "c",
update = "u",
delete = "d",
}
function onRecord(record)
local op = OPMAP[record.kind]
if not op then
return
end
local topic
if bit32.btest(record.row.id, 1) then
topic = "odd"
else
topic = "even"
end
return {
topic = topic,
value = json.encode {
op = op,
before = record.old,
after = record.new,
commitms = record.commit_time.unix_milli,
table = record.source,
lsn = record.checkpoint,
}
}
end
Why we chose Lua?
We had multiple options, including WASM, but we chose Lua as it provides a fine balance with respect to engineering velocity, integration with our Go-based platform, and end-user usability. Here are the key reasons for our decision:
In-process scripting: Lua supports in-process scripting, avoiding the serialization and deserialization steps required by external plugin systems.
Simplicity and flexibility: Lua's straightforward design as a glue language makes it easy to embed in various projects, with multiple robust implementations available.
Compatibility with Go: Lua works well with Go’s garbage collector, simplifying memory management compared to using alternatives like WASM, which would necessitate complex integration with Go's memory management.
Ease of use for end-users: Lua is an embedded language that allows for on-the-fly scripting without the need for compilation or additional setup steps, unlike systems like Debezium that use Java.
Long-standing presence and resources: Although Lua is a verbose language, its long-standing presence has resulted in a wealth of resources. This also enables LLM-based coding assistants to be quite accurate, helping users to easily script out row-level transformations.
Conclusion
We hope you enjoyed reading the blog. We think custom transformations offer a lot of added flexibility and enables many use cases. If you use Kafka, Pub-Sub, Redpanda or any other queues and wish to replicate data from Postgres to these using PeerDB, please check out the links below or reach out to us directly!
Subscribe to my newsletter
Read articles from Sai Srirampur directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by