Phần 4: JOIN hai bảng & ghi kết quả sang MongoDB

Dưới đây là Phần 4 của chuỗi “Chạy Apache Flink embedded trong Spring Boot” (bối cảnh e-commerce). Ở phần này, chúng ta sẽ thực hiện JOIN hai bảng order_events_table
và shipping_events_table
, đồng thời ghi kết quả xuống MongoDB. Chúng ta cũng sẽ thảo luận về cách quản lý thời gian (tận dụng window/interval join, xử lý dữ liệu đến trễ) cũng như thiết lập chế độ upsert để bảo đảm dữ liệu được thêm hoặc cập nhật chính xác.
1. Tóm tắt ngữ cảnh
Trong Phần 3, chúng ta đã:
Tạo KafkaSource cho 2 topic:
order_events
(Topic A) vàshipping_events
(Topic B).Chuyển dữ liệu JSON thành POJO
OrderEvent
/ShippingEvent
.Tạo 2 bảng Flink (
Table
) tương ứng:order_events_table
vàshipping_events_table
.
Hiện tại, chúng ta muốn tham chiếu (JOIN) hai bảng dựa trên order_id
(khóa chung giữa đơn hàng và vận chuyển), rồi kết quả được “upsert” vào MongoDB.
2. Kịch bản JOIN
2.1 Logic business
Mỗi OrderEvent đại diện cho trạng thái đơn hàng (CREATED, PAID, CANCELLED, …), kèm
order_id
,user_id
,order_total
,timestamp
.Mỗi ShippingEvent đại diện cho trạng thái giao hàng (PREPARED, IN_TRANSIT, DELIVERED, …), kèm
shipping_id
,order_id
,carrier
,shipping_status
,timestamp
.Vì
shippingEvent
có thể đến sauorderEvent
(hoặc ngược lại) và không đảm bảo đồng bộ, ta cần một cách JOIN có tính đến thời gian.
2.2 Các cách JOIN trong Flink
Regular join (batch-style): Sẽ quét tất cả record hai bảng, ghép dựa trên cột chung. Không phù hợp khi streaming (vì luồng đến vô hạn).
Interval join (sử dụng event-time hoặc processing-time + một khoảng thời gian). Ví dụ:
shipping.timestamp
cần nằm trong khoảng order.timestamp−X,order.timestamp+Yorder.timestamp - X, order.timestamp + Y.Temporal table join: dùng khi một bảng đóng vai trò “dimension table”.
Window join: gộp record 2 bảng vào cùng một window.
Với mô hình e-commerce đơn giản, ta có thể dùng:
Interval join nếu cần giới hạn: “Xem shipping event chỉ match với order event trong vòng 30 phút”.
Left join (khi shipping đến sau order), v.v.
Ở đây, ta sẽ minh họa “interval join” bằng Flink SQL (hoặc Table API).
3. Tạo logic JOIN bằng Flink SQL
3.1 Khai báo event-time (tùy chọn)
Để xài interval join theo event-time, ta cần cột rowtime
(event-time) thay vì proctime
. Hiện ở Phần 3, ta có:
Table orderTable = tableEnv.fromDataStream(parsedOrderStream,
$("orderId"),
$("userId"),
$("orderTotal"),
$("orderStatus"),
$("timestamp"), // cột raw timestamp (long)
$("proctime").proctime()
);
Chúng ta có thể chuyển cột timestamp
thành event-time. Ví dụ:
Table orderTable = tableEnv.fromDataStream(
parsedOrderStream,
Schema.newBuilder()
.column("orderId", DataTypes.STRING())
.column("userId", DataTypes.STRING())
.column("orderTotal", DataTypes.DOUBLE())
.column("orderStatus", DataTypes.STRING())
.column("ts", DataTypes.BIGINT()) // raw timestamp
.columnByExpression("rowtime", "TO_TIMESTAMP_LTZ(ts, 3)")
.watermark("rowtime", "rowtime - INTERVAL '5' SECONDS")
.build()
);
Giải thích:
columnByExpression("rowtime", "TO_TIMESTAMP_LTZ(ts, 3)")
: Tạo cột event-time từts
(trongOrderEvent
,timestamp
).watermark("rowtime", "rowtime - INTERVAL '5' SECONDS")
: Đặt watermark trễ 5 giây.Tương tự với
shippingTable
.
Nếu bạn chỉ dùng processing-time, vẫn có thể interval join bằng proctime
, nhưng event-time minh họa tính thời gian thực hơn.
3.2 Tạo temporary view & viết câu lệnh JOIN
Giả sử sau khi đã chuẩn bị orderTable
và shippingTable
, ta tạo view:
-- Tạo temporary view (nếu chưa tạo ở phần 3)
CREATE TEMPORARY VIEW order_events_table
AS SELECT orderId, userId, orderTotal, orderStatus, rowtime
FROM <...> ;
CREATE TEMPORARY VIEW shipping_events_table
AS SELECT shippingId, orderId, carrier, shippingStatus, rowtime
FROM <...> ;
Rồi ta có thể JOIN:
SELECT
o.orderId,
o.userId,
o.orderTotal,
o.orderStatus,
s.shippingId,
s.carrier,
s.shippingStatus,
o.rowtime AS orderRowtime,
s.rowtime AS shippingRowtime
FROM order_events_table AS o
JOIN shipping_events_table AS s
ON o.orderId = s.orderId
AND s.rowtime BETWEEN o.rowtime - INTERVAL '30' MINUTE
AND o.rowtime + INTERVAL '30' MINUTE
Phân tích:
AND s.rowtime BETWEEN o.rowtime - INTERVAL '30' MINUTE AND o.rowtime + INTERVAL '30' MINUTE
: Đây chính là interval join.Bất cứ shipping event nào xuất hiện trong vòng ±30 phút quanh order event sẽ được JOIN.
Tùy yêu cầu, bạn có thể left join, right join, hay chỉ
BETWEEN o.rowtime AND o.rowtime + INTERVAL '2' HOUR
,…
Kết quả là một dòng (row) có đủ thông tin order + shipping. Ta sẽ dùng nó để ghi xuống MongoDB.
4. Tạo bảng sink MongoDB với chế độ upsert
4.1 Dùng Flink SQL DDL
Flink cung cấp cơ chế tạo Table Sink bằng DDL. Ví dụ:
CREATE TABLE mongo_sink_table (
`orderId` STRING,
`userId` STRING,
`orderTotal` DOUBLE,
`orderStatus` STRING,
`shippingId` STRING,
`carrier` STRING,
`shippingStatus` STRING,
`orderRowtime` TIMESTAMP(3),
`shippingRowtime` TIMESTAMP(3),
PRIMARY KEY (orderId) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'uri' = 'mongodb://localhost:27017',
'database' = 'ecommerce_db',
'collection' = 'orders_shipping_join',
-- Tùy chỉnh thêm nếu connector hỗ trợ
'upsert-mode' = 'true' -- nếu connector có option upsert.
-- Lưu ý: Mỗi connector có config khác nhau.
);
Lưu ý:
PRIMARY KEY (orderId) NOT ENFORCED
cho phép Flink hiểu “chúng ta muốn upsert trên khóa orderId”.Tùy connector MongoDB mà bạn sử dụng, có thể có hoặc không có thuộc tính
'upsert-mode' = 'true'
. Một số connector cũ yêu cầu'change-data-capture'='true'
,…Nếu không có connector chính thức, bạn có thể dùng JDBC connector + driver MongoDB JDBC (hoặc custom sink).
4.2 Thực hiện INSERT INTO
Sau khi tạo mongo_sink_table
, ta chạy lệnh:
INSERT INTO mongo_sink_table
SELECT
o.orderId,
o.userId,
o.orderTotal,
o.orderStatus,
s.shippingId,
s.carrier,
s.shippingStatus,
o.orderRowtime,
s.shippingRowtime
FROM order_events_table AS o
JOIN shipping_events_table AS s
ON o.orderId = s.orderId
AND s.rowtime BETWEEN o.rowtime - INTERVAL '30' MINUTE
AND o.rowtime + INTERVAL '30' MINUTE
Flink sẽ “chạy” truy vấn này dưới dạng streaming, theo dõi các cập nhật (changelog) của order_events_table
và shipping_events_table
. Mỗi khi có sự kiện mới, nó tính toán kết quả JOIN và ghi (upsert/delete) xuống mongo_sink_table
.
5. Kết hợp lại trong code (Table API thay vì SQL, hoặc StatementSet)
5.1 Sử dụng StatementSet (nếu làm thuần code Java)
Trong Phần 3, ta có tableEnv.createTemporaryView(...)
. Ở phần này, ta tiếp tục dùng tableEnv.executeSql(...)
để:
Tạo table sink MongoDB.
Gửi
INSERT INTO ... SELECT ...
vào StatementSet.Cuối cùng,
statementSet.execute()
.
Ví dụ:
public void setupAndRunPipeline() throws Exception {
// ... (đoạn code đọc Kafka, parse JSON, tạo orderTable, shippingTable)
tableEnv.createTemporaryView("order_events_table", orderTable);
tableEnv.createTemporaryView("shipping_events_table", shippingTable);
// 1. DDL để tạo Mongo Sink
String createMongoTableDDL = "CREATE TABLE mongo_sink_table ("
+ " orderId STRING,"
+ " userId STRING,"
+ " orderTotal DOUBLE,"
+ " orderStatus STRING,"
+ " shippingId STRING,"
+ " carrier STRING,"
+ " shippingStatus STRING,"
+ " orderRowtime TIMESTAMP(3),"
+ " shippingRowtime TIMESTAMP(3),"
+ " PRIMARY KEY (orderId) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'mongodb',"
+ " 'uri' = 'mongodb://localhost:27017',"
+ " 'database' = 'ecommerce_db',"
+ " 'collection' = 'orders_shipping_join'"
// Nếu connector cần upsert-mode, add config
// + " 'upsert-mode' = 'true',"
+ ")";
tableEnv.executeSql(createMongoTableDDL);
// 2. Lệnh INSERT SELECT
String insertSQL = "INSERT INTO mongo_sink_table\n" +
"SELECT\n" +
" o.orderId,\n" +
" o.userId,\n" +
" o.orderTotal,\n" +
" o.orderStatus,\n" +
" s.shippingId,\n" +
" s.carrier,\n" +
" s.shippingStatus,\n" +
" o.rowtime AS orderRowtime,\n" +
" s.rowtime AS shippingRowtime\n" +
"FROM order_events_table AS o\n" +
"JOIN shipping_events_table AS s\n" +
" ON o.orderId = s.orderId\n" +
" AND s.rowtime BETWEEN o.rowtime - INTERVAL '30' MINUTE\n" +
" AND o.rowtime + INTERVAL '30' MINUTE";
StatementSet stmtSet = tableEnv.createStatementSet();
stmtSet.addInsertSql(insertSQL);
// 3. Chạy job
// Lưu ý: Sau khi gọi execute, luồng sẽ block do stream job
stmtSet.execute();
}
Điểm quan trọng:
stmtSet.execute()
sẽ thay thếenv.execute()
. Khi ta dùng Flink SQL/Table API, lệnh này cũng khởi động pipeline.Nếu job streaming, nó chạy vô hạn, bạn phải dừng Spring Boot mới dừng job.
5.2 Lưu ý về chế độ Upsert (INSERT/UPDATE/DELETE)
Để thực sự “update” (thay vì insert nhiều bản ghi trùng
orderId
), Flink cần biết đâu làUPDATE_BEFORE
,UPDATE_AFTER
.Khi ta JOIN 2 bảng append-only, Flink mặc định phát ra INSERT. Mỗi khi cặp record cũ bị thay thế, ta cần “phát”
UPDATE_BEFORE
,UPDATE_AFTER
.Giải pháp: Sử dụng “mini-batch look up” hoặc “Temporal Table” cho dimension, hoặc
ChangelogMode.upsert()
.Tùy use-case, nếu “INSERT-only” vẫn chấp nhận được (ta chấp nhận nhiều phiên bản), thì bạn không cần upsert.
Nếu bắt buộc upsert chuẩn, bạn cần:
Dữ liệu upstream cần có
RowKind
(INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE).Hoặc kích hoạt dynamic table mode, cài
primary key
+ChangelogMode.upsert()
=> Flink tự tính delta.
Vì phần này có thể rất phức tạp, tùy vào Mongo connector mà bạn dùng, hãy tham khảo tài liệu chính thức.
6. Kiểm tra kết quả trong MongoDB
Sau khi job chạy, ta mong muốn collection orders_shipping_join
trong ecommerce_db
được cập nhật. Bạn có thể:
Dùng Mongo Shell hoặc Mongo Compass để xem dữ liệu:
use ecommerce_db; db.orders_shipping_join.find().pretty();
Quan sát cột
orderId
đóng vai trò unique key (nếu upsert).Nếu Flink bắn nhiều event INSERT trùng
orderId
mà connector không upsert, bạn sẽ thấy nhiều documents với cùngorderId
.Nếu connector hỗ trợ “_id = orderId” hay upsert, bạn chỉ thấy 1 document cho mỗi
orderId
, cập nhật trườngshippingStatus
nếu có thay đổi.
7. Quản lý dữ liệu đến trễ & tái xử lý
7.1 Late events (dữ liệu đến trễ)
Nếu shipping event đến sau watermark (ví dụ, trễ hơn 1 giờ so với order), Flink có thể “bỏ qua” event.
Bạn cần cấu hình watermark, allowed lateness (nếu window-based).
Interval join trong Flink SQL sẽ không match record đã đóng interval.
7.2 Tái xử lý (reprocessing)
Nếu cần “chạy lại từ đầu” (ví dụ, reprocess lịch sử 1 tháng data), bạn phải xóa offset Kafka hoặc cài
OffsetResetStrategy.EARLIEST
+ càicheckpoint
+ cẩn thận tránh spam Mongo.Ở môi trường production, ta thường quản lý offset cẩn thận, hoặc dùng Kafka replay.
8. Tóm tắt & hướng dẫn tiếp theo
Trong Phần 4, bạn đã được hướng dẫn:
JOIN hai bảng
order_events_table
vàshipping_events_table
dựa trênorder_id
.Cách tạo interval join sử dụng event-time và
rowtime
, đảm bảo shipping event chỉ ghép nếu đến trong khoảng thời gian cho phép.Tạo table sink MongoDB bằng DDL, thiết lập primary key (orderId), kích hoạt chế độ upsert (tùy connector).
INSERT INTO table MongoDB, biến Flink SQL thành pipeline streaming.
Lưu ý về changelog (INSERT/UPDATE/DELETE), late events, và tái xử lý.
Về cơ bản, ta đã hoàn thành pipeline: (1) Đọc 2 topic Kafka, (2) Join real-time, (3) Ghi kết quả xuống MongoDB. Phần 5 sẽ nói nhiều hơn về vận hành (deployment, checkpoint, scaling) và những bài học để ứng dụng trong môi trường thực tế:
Cách chạy ứng dụng Spring Boot + Flink embedded trên Docker/Kubernetes.
Thiết lập checkpoint (S3, HDFS) để bảo toàn state.
Triển khai mô hình multi-instance (nếu muốn scale), và rủi ro stateful streaming.
Điểm mấu chốt: Phần 4 này cho thấy sức mạnh của Flink SQL/Table API khi làm các thao tác JOIN phức tạp, đồng thời vẫn dễ dàng ghi kết quả sang MongoDB (hay bất kỳ sink nào khác). Tuy nhiên, việc upsert hay update đòi hỏi bạn phải hiểu rõ changelog của Flink. Ở mức “demo” hoặc “POC”, logic JOIN + INSERT có thể là đủ. Khi triển khai enterprise scale, bạn cần đầu tư thêm vào quản lý state, chuyển đổi RowKind, và connector MongoDB tương xứng.
Hẹn gặp lại ở Phần 5, nơi chúng ta bàn sâu hơn về quản lý checkpoint, savepoint, cũng như vận hành mô hình embedded Flink - Spring Boot trong môi trường production!
Subscribe to my newsletter
Read articles from Henry Collins directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
