Xử Lý Dữ Liệu Streaming Với Flink và Spring Boot

Henry CollinsHenry Collins
5 min read

Phần 1: Giới thiệu tổng quan

  1. Bối cảnh & yêu cầu

    • Tóm tắt yêu cầu xử lý dữ liệu streaming từ nhiều nguồn (Kafka A/B).

    • Mô hình Flink pipeline (đọc, xử lý, join, ghi xuống MongoDB).

    • Tại sao chọn “chạy embedded trong Spring Boot” thay vì chạy mô hình Flink cluster?

    • Ưu/nhược điểm của mô hình embedded Flink.

  2. Kiến trúc tổng thể

    • Mô tả sơ đồ “Spring Boot + Flink embedded + Kafka + MongoDB”.

    • Giải thích luồng dữ liệu (Kafka -> Flink -> MongoDB).

    • Vai trò của Spring Boot và những gì Flink “embedded” sẽ đảm nhiệm.

  3. Chuẩn bị môi trường, công cụ

    • Phiên bản Java, Maven hoặc Gradle.

    • Thư viện Spring Boot, Spring dependencies.

    • Thư viện Flink (phiên bản, các module cần thiết: flink-core, flink-streaming, flink-table, …).

    • MongoDB, Kafka (có thể cài local hoặc dùng docker-compose).


  1. Tạo dự án Spring Boot

    • Tạo skeleton project (có thể dùng Spring Initializr).

    • Cấu hình pom.xml (hoặc build.gradle) để thêm dependencies cần thiết:

      • org.apache.flink:flink-core, org.apache.flink:flink-streaming-java, org.apache.flink:flink-table-api-java-bridge, …

      • Kết hợp các thư viện/connector Kafka, MongoDB (Flink Kafka Connector, Flink MongoDB connector - nếu có).

    • Lưu ý về phiên bản Flink tương thích với Spring Boot.

  2. Khởi tạo StreamExecutionEnvironmentStreamTableEnvironment trong Spring Boot

    • Tạo một class cấu hình, ví dụ FlinkConfig.

    • Tạo Bean:

        @Bean
        public StreamExecutionEnvironment streamExecutionEnvironment() {
            return StreamExecutionEnvironment.getExecutionEnvironment();
        }
      
        @Bean
        public StreamTableEnvironment streamTableEnvironment(StreamExecutionEnvironment env) {
            return StreamTableEnvironment.create(env);
        }
      
    • Giải thích các thông số: enableCheckpointing, checkpointStorage, parallelism,…

  3. Tổ chức code:

    • Service / Bean chứa logic Flink (nạp, xử lý, join, ghi xuống MongoDB).

    • Controller (nếu muốn expose REST API quản lý job).

    • Hoặc dùng CommandLineRunner/ApplicationRunner để khởi động job khi ứng dụng Spring lên.

  4. Chú ý về “blocking call” khi gọi env.execute()

    • Giải thích vì sao job streaming sẽ chạy vô hạn.

    • Gợi ý cách chạy job trong một thread riêng / hoặc async.

    • Cách dừng job (nếu cần).


Phần 3: Đọc dữ liệu từ Kafka và chuyển đổi sang Table (như logic yêu cầu)

  1. Khai báo KafkaSource

    • Cách cấu hình bootstrapServers, groupId.

    • Sử dụng KafkaSource.<String>builder() để build source.

    • Nêu ví dụ cụ thể cho topic A và topic B.

  2. Xử lý dữ liệu thô

    • Đoạn code chuyển JSON -> Avro schema (hoặc parse sang Row, List<Object>).

    • Giải thích RowKindMapFunction, pmt_processor_RowDataStream, …

  3. Từ DataStream -> Table

    • Cách dùng tableEnv.fromChangelogStream(...).

    • Thiết lập Schema, ChangelogMode.upsert().

    • Tạo “Temporary View” để thuận tiện viết SQL.

  4. Thực hiện join

    • Giải thích logic join 2 luồng: A (transactionId) và B (transactionuuid),

    • Giải thích window/interval join (vì 2 sự kiện có thể xuất hiện cách nhau vài phút).

    • Tối ưu rowtime/event-time nếu cần.


Phần 4: Ghi kết quả xuống MongoDB

  1. Khai báo sink MongoDB

    • Dùng DDL CREATE TABLE … WITH ('connector'='mongodb', … ) trong Flink SQL.

    • Cách truyền uri, database, collection, v.v.

    • Giải thích cơ chế “Upsert” (Insert/Update/Delete) và tương ứng với ChangelogMode.upsert().

  2. Thực hiện lệnh INSERT SELECT

    • Giải thích câu lệnh INSERT INTO <mongo_table> SELECT ... (join result).

    • Diễn giải logic “trường hợp A = PAYMENT_THAI_QR thì lấy thông tin từ B, ngược lại lấy từ A”.

  3. Kiểm tra kết quả

    • Kiểm tra data ghi xuống MongoDB (bằng Mongo Shell, hoặc Compass).

    • Xác minh Upsert đã hoạt động đúng (Update bản ghi cũ khi transaction trùng ID, …).


Phần 5: Quản lý, vận hành và scale mô hình embedded

  1. Cách chạy Spring Boot + Flink embedded

    • mvn spring-boot:run hoặc java -jar <app>.jar.

    • Kiểm tra log, xác nhận job streaming đang chạy.

  2. Triển khai trên Docker / K8s (nếu muốn)

    • Cho vào Dockerfile, build image.

    • Deploy lên K8s, nêu rõ “một pod chạy Spring Boot + Flink” => scale = chạy nhiều pod.

    • Phân tích rủi ro khi scale “embedded” (mỗi pod chạy một job cục bộ, state cục bộ, …).

  3. Tùy chỉnh checkpoint, metrics

    • Cấu hình checkpoint interval, checkpoint storage (S3, HDFS, …).

    • Setup metrics, logs…

  4. Những hạn chế của mô hình

    • Khó quản lý stateful streaming ở mức độ lớn.

    • Khó đồng bộ state khi chạy multi-instance.

    • Tùy vào use case có thể chấp nhận, hoặc nên chuyển sang Flink cluster.


Phần 6: Tổng kết & mở rộng

  1. Những điểm chính đạt được

    • Đã chạy được Flink streaming trong Spring Boot.

    • Đọc nhiều topic Kafka, join data, ghi xuống MongoDB theo chế độ Upsert.

  2. Mở rộng

    • Triển khai mô hình cluster để scale tốt hơn.

    • Sử dụng Flink SQL + DDL phức tạp, query optimization.

    • Quản lý job (dừng, resume, savepoint) tốt hơn bằng Flink REST API.

  3. Lời khuyên cuối cùng

    • Khi nào thì mô hình embedded phù hợp?

    • Khi nào cần chuyển sang mô hình Flink cluster thực thụ?


Tóm tắt

Dàn ý trên chia thành 6 phần lớn (có thể viết thành 6 bài hoặc rút gọn thành 3-4 bài tùy độ chi tiết). Nội dung chủ yếu:

  1. Giới thiệu & Kiến trúc

  2. Chuẩn bị, cấu hình Spring Boot + Flink

  3. Xử lý dữ liệu từ Kafka, join

  4. Lưu dữ liệu xuống MongoDB

  5. Vận hành, scale, triển khai

  6. Tổng kết & hướng mở rộng

0
Subscribe to my newsletter

Read articles from Henry Collins directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Henry Collins
Henry Collins