Phần 4: JOIN hai bảng & ghi kết quả sang MongoDB

Henry CollinsHenry Collins
9 min read

Dưới đây là Phần 4 của chuỗi “Chạy Apache Flink embedded trong Spring Boot” (bối cảnh e-commerce). Ở phần này, chúng ta sẽ thực hiện JOIN hai bảng order_events_tableshipping_events_table, đồng thời ghi kết quả xuống MongoDB. Chúng ta cũng sẽ thảo luận về cách quản lý thời gian (tận dụng window/interval join, xử lý dữ liệu đến trễ) cũng như thiết lập chế độ upsert để bảo đảm dữ liệu được thêm hoặc cập nhật chính xác.


1. Tóm tắt ngữ cảnh

Trong Phần 3, chúng ta đã:

  1. Tạo KafkaSource cho 2 topic: order_events (Topic A) và shipping_events (Topic B).

  2. Chuyển dữ liệu JSON thành POJO OrderEvent / ShippingEvent.

  3. Tạo 2 bảng Flink (Table) tương ứng: order_events_tableshipping_events_table.

Hiện tại, chúng ta muốn tham chiếu (JOIN) hai bảng dựa trên order_id (khóa chung giữa đơn hàng và vận chuyển), rồi kết quả được “upsert” vào MongoDB.


2. Kịch bản JOIN

2.1 Logic business

  • Mỗi OrderEvent đại diện cho trạng thái đơn hàng (CREATED, PAID, CANCELLED, …), kèm order_id, user_id, order_total, timestamp.

  • Mỗi ShippingEvent đại diện cho trạng thái giao hàng (PREPARED, IN_TRANSIT, DELIVERED, …), kèm shipping_id, order_id, carrier, shipping_status, timestamp.

  • shippingEvent có thể đến sau orderEvent (hoặc ngược lại) và không đảm bảo đồng bộ, ta cần một cách JOIN có tính đến thời gian.

  1. Regular join (batch-style): Sẽ quét tất cả record hai bảng, ghép dựa trên cột chung. Không phù hợp khi streaming (vì luồng đến vô hạn).

  2. Interval join (sử dụng event-time hoặc processing-time + một khoảng thời gian). Ví dụ: shipping.timestamp cần nằm trong khoảng order.timestamp−X,order.timestamp+Yorder.timestamp - X, order.timestamp + Y.

  3. Temporal table join: dùng khi một bảng đóng vai trò “dimension table”.

  4. Window join: gộp record 2 bảng vào cùng một window.

Với mô hình e-commerce đơn giản, ta có thể dùng:

  • Interval join nếu cần giới hạn: “Xem shipping event chỉ match với order event trong vòng 30 phút”.

  • Left join (khi shipping đến sau order), v.v.

Ở đây, ta sẽ minh họa “interval join” bằng Flink SQL (hoặc Table API).


3.1 Khai báo event-time (tùy chọn)

Để xài interval join theo event-time, ta cần cột rowtime (event-time) thay vì proctime. Hiện ở Phần 3, ta có:

Table orderTable = tableEnv.fromDataStream(parsedOrderStream,
    $("orderId"),
    $("userId"),
    $("orderTotal"),
    $("orderStatus"),
    $("timestamp"), // cột raw timestamp (long)
    $("proctime").proctime()
);

Chúng ta có thể chuyển cột timestamp thành event-time. Ví dụ:

Table orderTable = tableEnv.fromDataStream(
    parsedOrderStream,
    Schema.newBuilder()
          .column("orderId", DataTypes.STRING())
          .column("userId", DataTypes.STRING())
          .column("orderTotal", DataTypes.DOUBLE())
          .column("orderStatus", DataTypes.STRING())
          .column("ts", DataTypes.BIGINT())            // raw timestamp
          .columnByExpression("rowtime", "TO_TIMESTAMP_LTZ(ts, 3)")
          .watermark("rowtime", "rowtime - INTERVAL '5' SECONDS") 
          .build()
);

Giải thích:

  • columnByExpression("rowtime", "TO_TIMESTAMP_LTZ(ts, 3)"): Tạo cột event-time từ ts (trong OrderEvent, timestamp).

  • watermark("rowtime", "rowtime - INTERVAL '5' SECONDS"): Đặt watermark trễ 5 giây.

  • Tương tự với shippingTable.

Nếu bạn chỉ dùng processing-time, vẫn có thể interval join bằng proctime, nhưng event-time minh họa tính thời gian thực hơn.

3.2 Tạo temporary view & viết câu lệnh JOIN

Giả sử sau khi đã chuẩn bị orderTableshippingTable, ta tạo view:

-- Tạo temporary view (nếu chưa tạo ở phần 3)
CREATE TEMPORARY VIEW order_events_table
AS SELECT orderId, userId, orderTotal, orderStatus, rowtime
   FROM <...> ; 

CREATE TEMPORARY VIEW shipping_events_table
AS SELECT shippingId, orderId, carrier, shippingStatus, rowtime
   FROM <...> ;

Rồi ta có thể JOIN:

SELECT 
    o.orderId,
    o.userId,
    o.orderTotal,
    o.orderStatus,
    s.shippingId,
    s.carrier,
    s.shippingStatus,
    o.rowtime AS orderRowtime,
    s.rowtime AS shippingRowtime
FROM order_events_table AS o
JOIN shipping_events_table AS s 
  ON o.orderId = s.orderId
  AND s.rowtime BETWEEN o.rowtime - INTERVAL '30' MINUTE
                   AND o.rowtime + INTERVAL '30' MINUTE

Phân tích:

  • AND s.rowtime BETWEEN o.rowtime - INTERVAL '30' MINUTE AND o.rowtime + INTERVAL '30' MINUTE: Đây chính là interval join.

  • Bất cứ shipping event nào xuất hiện trong vòng ±30 phút quanh order event sẽ được JOIN.

  • Tùy yêu cầu, bạn có thể left join, right join, hay chỉ BETWEEN o.rowtime AND o.rowtime + INTERVAL '2' HOUR,…

Kết quả là một dòng (row) có đủ thông tin order + shipping. Ta sẽ dùng nó để ghi xuống MongoDB.


4. Tạo bảng sink MongoDB với chế độ upsert

Flink cung cấp cơ chế tạo Table Sink bằng DDL. Ví dụ:

CREATE TABLE mongo_sink_table (
    `orderId` STRING,
    `userId` STRING,
    `orderTotal` DOUBLE,
    `orderStatus` STRING,
    `shippingId` STRING,
    `carrier` STRING,
    `shippingStatus` STRING,
    `orderRowtime` TIMESTAMP(3),
    `shippingRowtime` TIMESTAMP(3),
    PRIMARY KEY (orderId) NOT ENFORCED
) WITH (
    'connector' = 'mongodb',
    'uri' = 'mongodb://localhost:27017',
    'database' = 'ecommerce_db',
    'collection' = 'orders_shipping_join',
    -- Tùy chỉnh thêm nếu connector hỗ trợ
    'upsert-mode' = 'true' -- nếu connector có option upsert. 
    -- Lưu ý: Mỗi connector có config khác nhau. 
);

Lưu ý:

  • PRIMARY KEY (orderId) NOT ENFORCED cho phép Flink hiểu “chúng ta muốn upsert trên khóa orderId”.

  • Tùy connector MongoDB mà bạn sử dụng, có thể có hoặc không có thuộc tính 'upsert-mode' = 'true'. Một số connector cũ yêu cầu 'change-data-capture'='true',…

  • Nếu không có connector chính thức, bạn có thể dùng JDBC connector + driver MongoDB JDBC (hoặc custom sink).

4.2 Thực hiện INSERT INTO

Sau khi tạo mongo_sink_table, ta chạy lệnh:

INSERT INTO mongo_sink_table
SELECT
    o.orderId,
    o.userId,
    o.orderTotal,
    o.orderStatus,
    s.shippingId,
    s.carrier,
    s.shippingStatus,
    o.orderRowtime,
    s.shippingRowtime
FROM order_events_table AS o
JOIN shipping_events_table AS s
  ON o.orderId = s.orderId
  AND s.rowtime BETWEEN o.rowtime - INTERVAL '30' MINUTE 
                   AND o.rowtime + INTERVAL '30' MINUTE

Flink sẽ “chạy” truy vấn này dưới dạng streaming, theo dõi các cập nhật (changelog) của order_events_tableshipping_events_table. Mỗi khi có sự kiện mới, nó tính toán kết quả JOIN và ghi (upsert/delete) xuống mongo_sink_table.


5. Kết hợp lại trong code (Table API thay vì SQL, hoặc StatementSet)

5.1 Sử dụng StatementSet (nếu làm thuần code Java)

Trong Phần 3, ta có tableEnv.createTemporaryView(...). Ở phần này, ta tiếp tục dùng tableEnv.executeSql(...) để:

  1. Tạo table sink MongoDB.

  2. Gửi INSERT INTO ... SELECT ... vào StatementSet.

  3. Cuối cùng, statementSet.execute().

Ví dụ:

public void setupAndRunPipeline() throws Exception {
    // ... (đoạn code đọc Kafka, parse JSON, tạo orderTable, shippingTable)

    tableEnv.createTemporaryView("order_events_table", orderTable);
    tableEnv.createTemporaryView("shipping_events_table", shippingTable);

    // 1. DDL để tạo Mongo Sink
    String createMongoTableDDL = "CREATE TABLE mongo_sink_table ("
            + " orderId STRING,"
            + " userId STRING,"
            + " orderTotal DOUBLE,"
            + " orderStatus STRING,"
            + " shippingId STRING,"
            + " carrier STRING,"
            + " shippingStatus STRING,"
            + " orderRowtime TIMESTAMP(3),"
            + " shippingRowtime TIMESTAMP(3),"
            + " PRIMARY KEY (orderId) NOT ENFORCED"
            + ") WITH ("
            + " 'connector' = 'mongodb',"
            + " 'uri' = 'mongodb://localhost:27017',"
            + " 'database' = 'ecommerce_db',"
            + " 'collection' = 'orders_shipping_join'"
            // Nếu connector cần upsert-mode, add config
            // + " 'upsert-mode' = 'true',"
            + ")";

    tableEnv.executeSql(createMongoTableDDL);

    // 2. Lệnh INSERT SELECT
    String insertSQL = "INSERT INTO mongo_sink_table\n" +
            "SELECT\n" +
            "  o.orderId,\n" +
            "  o.userId,\n" +
            "  o.orderTotal,\n" +
            "  o.orderStatus,\n" +
            "  s.shippingId,\n" +
            "  s.carrier,\n" +
            "  s.shippingStatus,\n" +
            "  o.rowtime AS orderRowtime,\n" +
            "  s.rowtime AS shippingRowtime\n" +
            "FROM order_events_table AS o\n" +
            "JOIN shipping_events_table AS s\n" +
            "  ON o.orderId = s.orderId\n" +
            "  AND s.rowtime BETWEEN o.rowtime - INTERVAL '30' MINUTE\n" +
            "                   AND o.rowtime + INTERVAL '30' MINUTE";

    StatementSet stmtSet = tableEnv.createStatementSet();
    stmtSet.addInsertSql(insertSQL);

    // 3. Chạy job
    // Lưu ý: Sau khi gọi execute, luồng sẽ block do stream job
    stmtSet.execute();
}

Điểm quan trọng:

  • stmtSet.execute() sẽ thay thế env.execute(). Khi ta dùng Flink SQL/Table API, lệnh này cũng khởi động pipeline.

  • Nếu job streaming, nó chạy vô hạn, bạn phải dừng Spring Boot mới dừng job.

5.2 Lưu ý về chế độ Upsert (INSERT/UPDATE/DELETE)

  • Để thực sự “update” (thay vì insert nhiều bản ghi trùng orderId), Flink cần biết đâu là UPDATE_BEFORE, UPDATE_AFTER.

  • Khi ta JOIN 2 bảng append-only, Flink mặc định phát ra INSERT. Mỗi khi cặp record cũ bị thay thế, ta cần “phát” UPDATE_BEFORE, UPDATE_AFTER.

  • Giải pháp: Sử dụng “mini-batch look up” hoặc “Temporal Table” cho dimension, hoặc ChangelogMode.upsert().

  • Tùy use-case, nếu “INSERT-only” vẫn chấp nhận được (ta chấp nhận nhiều phiên bản), thì bạn không cần upsert.

Nếu bắt buộc upsert chuẩn, bạn cần:

  1. Dữ liệu upstream cần RowKind (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE).

  2. Hoặc kích hoạt dynamic table mode, cài primary key + ChangelogMode.upsert() => Flink tự tính delta.

Vì phần này có thể rất phức tạp, tùy vào Mongo connector mà bạn dùng, hãy tham khảo tài liệu chính thức.


6. Kiểm tra kết quả trong MongoDB

Sau khi job chạy, ta mong muốn collection orders_shipping_join trong ecommerce_db được cập nhật. Bạn có thể:

  • Dùng Mongo Shell hoặc Mongo Compass để xem dữ liệu:

      use ecommerce_db;
      db.orders_shipping_join.find().pretty();
    
  • Quan sát cột orderId đóng vai trò unique key (nếu upsert).

  • Nếu Flink bắn nhiều event INSERT trùng orderId mà connector không upsert, bạn sẽ thấy nhiều documents với cùng orderId.

  • Nếu connector hỗ trợ “_id = orderId” hay upsert, bạn chỉ thấy 1 document cho mỗi orderId, cập nhật trường shippingStatus nếu có thay đổi.


7. Quản lý dữ liệu đến trễ & tái xử lý

7.1 Late events (dữ liệu đến trễ)

  • Nếu shipping event đến sau watermark (ví dụ, trễ hơn 1 giờ so với order), Flink có thể “bỏ qua” event.

  • Bạn cần cấu hình watermark, allowed lateness (nếu window-based).

  • Interval join trong Flink SQL sẽ không match record đã đóng interval.

7.2 Tái xử lý (reprocessing)

  • Nếu cần “chạy lại từ đầu” (ví dụ, reprocess lịch sử 1 tháng data), bạn phải xóa offset Kafka hoặc cài OffsetResetStrategy.EARLIEST + cài checkpoint + cẩn thận tránh spam Mongo.

  • Ở môi trường production, ta thường quản lý offset cẩn thận, hoặc dùng Kafka replay.


8. Tóm tắt & hướng dẫn tiếp theo

Trong Phần 4, bạn đã được hướng dẫn:

  1. JOIN hai bảng order_events_tableshipping_events_table dựa trên order_id.

  2. Cách tạo interval join sử dụng event-time và rowtime, đảm bảo shipping event chỉ ghép nếu đến trong khoảng thời gian cho phép.

  3. Tạo table sink MongoDB bằng DDL, thiết lập primary key (orderId), kích hoạt chế độ upsert (tùy connector).

  4. INSERT INTO table MongoDB, biến Flink SQL thành pipeline streaming.

  5. Lưu ý về changelog (INSERT/UPDATE/DELETE), late events, và tái xử lý.

Về cơ bản, ta đã hoàn thành pipeline: (1) Đọc 2 topic Kafka, (2) Join real-time, (3) Ghi kết quả xuống MongoDB. Phần 5 sẽ nói nhiều hơn về vận hành (deployment, checkpoint, scaling) và những bài học để ứng dụng trong môi trường thực tế:

  • Cách chạy ứng dụng Spring Boot + Flink embedded trên Docker/Kubernetes.

  • Thiết lập checkpoint (S3, HDFS) để bảo toàn state.

  • Triển khai mô hình multi-instance (nếu muốn scale), và rủi ro stateful streaming.

Điểm mấu chốt: Phần 4 này cho thấy sức mạnh của Flink SQL/Table API khi làm các thao tác JOIN phức tạp, đồng thời vẫn dễ dàng ghi kết quả sang MongoDB (hay bất kỳ sink nào khác). Tuy nhiên, việc upsert hay update đòi hỏi bạn phải hiểu rõ changelog của Flink. Ở mức “demo” hoặc “POC”, logic JOIN + INSERT có thể là đủ. Khi triển khai enterprise scale, bạn cần đầu tư thêm vào quản lý state, chuyển đổi RowKind, và connector MongoDB tương xứng.

Hẹn gặp lại ở Phần 5, nơi chúng ta bàn sâu hơn về quản lý checkpoint, savepoint, cũng như vận hành mô hình embedded Flink - Spring Boot trong môi trường production!

0
Subscribe to my newsletter

Read articles from Henry Collins directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Henry Collins
Henry Collins