Xử Lý Dữ Liệu Streaming Với Flink và Spring Boot

Phần 1: Giới thiệu tổng quan
Bối cảnh & yêu cầu
Tóm tắt yêu cầu xử lý dữ liệu streaming từ nhiều nguồn (Kafka A/B).
Mô hình Flink pipeline (đọc, xử lý, join, ghi xuống MongoDB).
Tại sao chọn “chạy embedded trong Spring Boot” thay vì chạy mô hình Flink cluster?
Ưu/nhược điểm của mô hình embedded Flink.
Kiến trúc tổng thể
Mô tả sơ đồ “Spring Boot + Flink embedded + Kafka + MongoDB”.
Giải thích luồng dữ liệu (Kafka -> Flink -> MongoDB).
Vai trò của Spring Boot và những gì Flink “embedded” sẽ đảm nhiệm.
Chuẩn bị môi trường, công cụ
Phiên bản Java, Maven hoặc Gradle.
Thư viện Spring Boot, Spring dependencies.
Thư viện Flink (phiên bản, các module cần thiết: flink-core, flink-streaming, flink-table, …).
MongoDB, Kafka (có thể cài local hoặc dùng docker-compose).
Phần 2: Cấu hình dự án Spring Boot và nhúng Flink
Tạo dự án Spring Boot
Tạo skeleton project (có thể dùng Spring Initializr).
Cấu hình
pom.xml
(hoặcbuild.gradle
) để thêm dependencies cần thiết:org.apache.flink:flink-core
,org.apache.flink:flink-streaming-java
,org.apache.flink:flink-table-api-java-bridge
, …Kết hợp các thư viện/connector Kafka, MongoDB (Flink Kafka Connector, Flink MongoDB connector - nếu có).
Lưu ý về phiên bản Flink tương thích với Spring Boot.
Khởi tạo
StreamExecutionEnvironment
vàStreamTableEnvironment
trong Spring BootTạo một class cấu hình, ví dụ
FlinkConfig
.Tạo Bean:
@Bean public StreamExecutionEnvironment streamExecutionEnvironment() { return StreamExecutionEnvironment.getExecutionEnvironment(); } @Bean public StreamTableEnvironment streamTableEnvironment(StreamExecutionEnvironment env) { return StreamTableEnvironment.create(env); }
Giải thích các thông số:
enableCheckpointing
,checkpointStorage
,parallelism
,…
Tổ chức code:
Service / Bean chứa logic Flink (nạp, xử lý, join, ghi xuống MongoDB).
Controller (nếu muốn expose REST API quản lý job).
Hoặc dùng
CommandLineRunner
/ApplicationRunner
để khởi động job khi ứng dụng Spring lên.
Chú ý về “blocking call” khi gọi
env.execute()
Giải thích vì sao job streaming sẽ chạy vô hạn.
Gợi ý cách chạy job trong một thread riêng / hoặc async.
Cách dừng job (nếu cần).
Phần 3: Đọc dữ liệu từ Kafka và chuyển đổi sang Table (như logic yêu cầu)
Khai báo KafkaSource
Cách cấu hình bootstrapServers, groupId.
Sử dụng
KafkaSource.<String>builder()
để build source.Nêu ví dụ cụ thể cho topic A và topic B.
Xử lý dữ liệu thô
Đoạn code chuyển JSON -> Avro schema (hoặc parse sang
Row
,List<Object>
).Giải thích
RowKindMapFunction
,pmt_processor_RowDataStream
, …
Từ DataStream -> Table
Cách dùng
tableEnv.fromChangelogStream(...)
.Thiết lập
Schema
,ChangelogMode.upsert()
.Tạo “Temporary View” để thuận tiện viết SQL.
Thực hiện join
Giải thích logic join 2 luồng: A (
transactionId
) và B (transactionuuid
),Giải thích window/interval join (vì 2 sự kiện có thể xuất hiện cách nhau vài phút).
Tối ưu
rowtime
/event-time nếu cần.
Phần 4: Ghi kết quả xuống MongoDB
Khai báo sink MongoDB
Dùng DDL
CREATE TABLE … WITH ('connector'='mongodb', … )
trong Flink SQL.Cách truyền
uri
,database
,collection
, v.v.Giải thích cơ chế “Upsert” (Insert/Update/Delete) và tương ứng với
ChangelogMode.upsert()
.
Thực hiện lệnh INSERT SELECT
Giải thích câu lệnh
INSERT INTO <mongo_table> SELECT ...
(join result).Diễn giải logic “trường hợp A = PAYMENT_THAI_QR thì lấy thông tin từ B, ngược lại lấy từ A”.
Kiểm tra kết quả
Kiểm tra data ghi xuống MongoDB (bằng Mongo Shell, hoặc Compass).
Xác minh Upsert đã hoạt động đúng (Update bản ghi cũ khi transaction trùng ID, …).
Phần 5: Quản lý, vận hành và scale mô hình embedded
Cách chạy Spring Boot + Flink embedded
mvn spring-boot:run
hoặcjava -jar <app>.jar
.Kiểm tra log, xác nhận job streaming đang chạy.
Triển khai trên Docker / K8s (nếu muốn)
Cho vào Dockerfile, build image.
Deploy lên K8s, nêu rõ “một pod chạy Spring Boot + Flink” => scale = chạy nhiều pod.
Phân tích rủi ro khi scale “embedded” (mỗi pod chạy một job cục bộ, state cục bộ, …).
Tùy chỉnh checkpoint, metrics
Cấu hình checkpoint interval, checkpoint storage (S3, HDFS, …).
Setup metrics, logs…
Những hạn chế của mô hình
Khó quản lý stateful streaming ở mức độ lớn.
Khó đồng bộ state khi chạy multi-instance.
Tùy vào use case có thể chấp nhận, hoặc nên chuyển sang Flink cluster.
Phần 6: Tổng kết & mở rộng
Những điểm chính đạt được
Đã chạy được Flink streaming trong Spring Boot.
Đọc nhiều topic Kafka, join data, ghi xuống MongoDB theo chế độ Upsert.
Mở rộng
Triển khai mô hình cluster để scale tốt hơn.
Sử dụng Flink SQL + DDL phức tạp, query optimization.
Quản lý job (dừng, resume, savepoint) tốt hơn bằng Flink REST API.
Lời khuyên cuối cùng
Khi nào thì mô hình embedded phù hợp?
Khi nào cần chuyển sang mô hình Flink cluster thực thụ?
Tóm tắt
Dàn ý trên chia thành 6 phần lớn (có thể viết thành 6 bài hoặc rút gọn thành 3-4 bài tùy độ chi tiết). Nội dung chủ yếu:
Giới thiệu & Kiến trúc
Chuẩn bị, cấu hình Spring Boot + Flink
Xử lý dữ liệu từ Kafka, join
Lưu dữ liệu xuống MongoDB
Vận hành, scale, triển khai
Tổng kết & hướng mở rộng
Subscribe to my newsletter
Read articles from Henry Collins directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
