Phần 2: Cấu hình dự án Spring Boot & nhúng Flink

Dưới đây là Phần 2 trong chuỗi bài hướng dẫn “Chạy Apache Flink embedded trong Spring Boot” (bối cảnh e-commerce). Trong bài này, chúng ta sẽ tập trung vào cấu hình dự án cũng như nhúng Flink vào Spring Boot. Từ đó, bạn có thể khởi tạo những Bean cần thiết, sử dụng Dependency Injection và quản lý cấu hình (Kafka, MongoDB, Flink) thông qua Spring Boot.
1. Khởi tạo dự án Spring Boot
1.1 Sử dụng Spring Initializr (hoặc tạo thủ công)
Để khởi tạo dự án, có 2 cách phổ biến:
Spring Initializr: Vào https://start.spring.io/, điền groupId, artifactId, và chọn các dependencies (nếu sẵn có).
Tạo thủ công: Tạo cấu trúc Maven/Gradle rồi thêm các dependency cần thiết.
Trong cả hai trường hợp, bạn cần bảo đảm các thư viện Flink có mặt trong pom.xml
(hoặc build.gradle
).
1.2 Các dependency cốt lõi
Ví dụ file pom.xml
(Maven) rút gọn, tập trung vào các phần chính:
<project ...>
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>spring-flink-embedded</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<java.version>11</java.version>
<!-- Đảm bảo version này tương thích với Flink -->
<flink.version>1.15.2</flink.version>
<spring.boot.version>2.7.4</spring.boot.version>
</properties>
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<!-- Kafka (Spring for Apache Kafka nếu cần) -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.4</version>
</dependency>
<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- MongoDB driver hoặc Flink MongoDB connector (nếu có) -->
<!-- Hiện tại Flink có một số connector do bên thứ 3,
hoặc bạn có thể dùng Flink JDBC/Custom connector -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.12.11</version>
</dependency>
<!-- Các thư viện khác nếu cần (JSON, Avro, vv) -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
</dependency>
<!-- ... (các dependency hỗ trợ) ... -->
</dependencies>
<build>
<plugins>
<!-- Spring Boot Maven Plugin -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
</plugin>
</plugins>
</build>
</project>
Lưu ý:
flink-table-api-java-bridge_2.12
vàflink-table-runtime_2.12
giúp ta dùng Table API hoặc SQL trên Flink.Tùy phiên bản Scala, bạn có thể cần
_2.11
hoặc_2.12
.Tương tự với Gradle, bạn sẽ thêm các dependency tương ứng.
2. Cấu hình application.yml
(hoặc application.properties
)
Thông thường, ta sẽ có file src/main/resources/application.yml
để quản lý config. Ví dụ:
spring:
application:
name: SpringFlinkEmbedded
kafka:
bootstrap-servers: "localhost:9092"
spring:
data:
mongodb:
uri: "mongodb://localhost:27017"
database: "ecommerce_db"
# Cấu hình liên quan Flink (tuỳ ý)
flink:
checkpoint-interval: 60000 # checkpoint mỗi 60 giây
checkpoint-storage: "file:///tmp/flink-checkpoints"
parallelism: 2
spring.kafka.bootstrap-servers
: nơi Kafka đang chạy.spring.data.mongodb.uri
: URI kết nối MongoDB.Bất cứ config Flink nào custom bạn có thể đưa vào namespace
flink.*
rồi đọc trong code, hoặc inject vào Bean.
3. Tạo lớp cấu hình Flink (FlinkConfig
)
Khi chạy embedded, ta cần tự khởi tạo StreamExecutionEnvironment
(hoặc StreamTableEnvironment
) trong Spring Boot. Thực hiện điều này bằng cách tạo một @Configuration class:
package com.example.config;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FlinkConfig {
@Bean
public StreamExecutionEnvironment streamExecutionEnvironment() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Có thể đọc cấu hình từ application.yml:
// - Thời gian checkpoint
// - Parallelism
// ...
env.setParallelism(2);
env.enableCheckpointing(60_000); // 60 giây
// Nếu cần, cấu hình checkpoint storage
// env.getCheckpointConfig()
// .setCheckpointStorage("file:///tmp/flink-checkpoints");
return env;
}
@Bean
public StreamTableEnvironment streamTableEnvironment(StreamExecutionEnvironment env) {
// Tạo TableEnv dựa trên StreamEnv
return StreamTableEnvironment.create(env);
}
}
Giải thích:
@Configuration: Spring Boot sẽ phát hiện và load class này khi start.
streamExecutionEnvironment()
trả về một bean duy nhất của kiểuStreamExecutionEnvironment
. Khi Spring Boot khởi chạy, nó sẽ gọi hàm này để khởi tạo.streamTableEnvironment(StreamExecutionEnvironment env)
tạo TableEnvironment dựa trên StreamEnv, và cũng trở thành bean.
Kết quả: ở bất cứ đâu trong code, chúng ta có thể @Autowired hoặc @Inject StreamExecutionEnvironment
hoặc StreamTableEnvironment
để sử dụng.
4. Tổ chức code cho “pipeline” Flink
4.1 Tạo service chuyên xử lý pipeline
Để tách biệt logic Flink, ta thường tạo một Service (Spring Bean) riêng, ví dụ FlinkPipelineService
:
package com.example.service;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class FlinkPipelineService {
@Autowired
private StreamExecutionEnvironment env;
@Autowired
private StreamTableEnvironment tableEnv;
public void setupAndRunPipeline() throws Exception {
// 1. Khai báo KafkaSource cho 2 topic (order_events, shipping_events)
// 2. Chuyển đổi JSON -> Row
// 3. Tạo Table từ DataStream
// 4. Join logic
// 5. Ghi kết quả xuống MongoDB
// ...
// Ở bài này, ta mới dừng ở chỗ "khung" setup.
// Chi tiết phần đọc Kafka, join, etc. sẽ được nêu ở phần 3-4.
// IMPORTANT: job streaming sẽ chạy vô hạn => gọi env.execute()
// Khi app Spring Boot start, ta có 2 lựa chọn:
// a) Gọi thẳng env.execute() ở đây => Sẽ block thread
// b) Chạy async => Tạo 1 thread riêng, tránh block
}
}
Lưu ý:
Khi gọi
env.execute()
, luồng sẽ block (trong trường hợp streaming không dừng).Để tránh block toàn bộ Spring Boot, ta có thể chạy job này trong một Thread riêng, hoặc dùng
CompletableFuture.runAsync(...)
.Nếu bạn muốn job chạy ngay khi ứng dụng Spring Boot khởi động (và chấp nhận block), có thể làm ở bước CommandLineRunner.
4.2 Khởi chạy job khi Spring Boot lên (hoặc tuỳ ý qua REST)
Cách 1: Dùng CommandLineRunner
package com.example;
import com.example.service.FlinkPipelineService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringFlinkEmbeddedApplication implements CommandLineRunner {
@Autowired
private FlinkPipelineService flinkPipelineService;
public static void main(String[] args) {
SpringApplication.run(SpringFlinkEmbeddedApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
// Khi Spring Boot khởi động xong, sẽ gọi hàm này
flinkPipelineService.setupAndRunPipeline();
}
}
Khi
flinkPipelineService.setupAndRunPipeline()
gọienv.execute()
, luồng sẽ block.Trong môi trường dev/test, có thể chấp nhận.
Ứng dụng sẽ không kết thúc (vì streaming job chạy liên tục) trừ khi ta dừng Spring Boot.
Cách 2: Dùng REST Controller
package com.example.controller;
import com.example.service.FlinkPipelineService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class FlinkController {
@Autowired
private FlinkPipelineService flinkPipelineService;
private volatile boolean jobRunning = false;
@GetMapping("/start-job")
public String startJob() {
if (jobRunning) {
return "Job is already running!";
}
try {
new Thread(() -> {
try {
flinkPipelineService.setupAndRunPipeline();
} catch (Exception e) {
// log error
}
}).start();
jobRunning = true;
return "Flink job started (async)!";
} catch (Exception e) {
return "Error starting job: " + e.getMessage();
}
}
}
Ở đây, ta tạo một endpoint
/start-job
để khởi động job trong một thread riêng.Giúp Spring Boot không bị block, app vẫn trả lời API.
Tuy nhiên, cần cơ chế stop job (nếu cần), hoặc shutdown Spring Boot để dừng job.
Tùy vào kiến trúc và nhu cầu, bạn chọn cách nào phù hợp.
5. Các lưu ý khi nhúng Flink
5.1 Cách xử lý Checkpoint & Savepoint
Trong Flink cluster, bạn thường có UI/CLI để thực hiện savepoint, restore, v.v.
Khi embed, bạn phải “thủ công” hơn, có thể lập trình logic:
env.getCheckpointConfig().setCheckpointStorage(...)
chỉ đến S3/HDFS,Tự gọi
env.triggerSavepoint(...)
(Flink >= 1.15 có API).
Nếu job stateful, hãy đặc biệt chú ý khôi phục (restore) state sau khi restart ứng dụng.
5.2 Xung đột phiên bản (Flink vs. Spring Boot)
Bạn cần đảm bảo phiên bản Flink tương thích với version Scala, version Java của project.
Tránh xung đột version Guava, Jackson, Netty, ... (có thể cần exclusion trong Maven).
Trong thực tế, “fat jar” Spring Boot + Flink có thể lên tới hàng trăm MB, bạn nên cân nhắc.
5.3 Logging và Giám sát
Mặc định, Spring Boot dùng Logback. Flink có thể mang theo log4j.
Cần kiểm tra config logging để không bị xung đột.
Giám sát (metrics) Flink embedded: có thể config Prometheus, JMX, … như trong cluster.
6. Tổng kết
Trong Phần 2, chúng ta đã:
Khởi tạo dự án Spring Boot với các dependency Flink, Kafka, MongoDB.
Cấu hình thông qua
application.yml
, bao gồm cài đặt Kafka, MongoDB URI, cũng như thông số Flink.Tạo một class cấu hình (
FlinkConfig
) để khởi tạoStreamExecutionEnvironment
vàStreamTableEnvironment
dưới dạng Spring Bean.Tổ chức code cho “pipeline” bằng cách viết một service (
FlinkPipelineService
), nơi ta set up các nguồn Kafka, join, ghi xuống Mongo.Bàn về cách khởi chạy job: sử dụng
CommandLineRunner
hoặc REST Controller để bật/tắt pipeline.Lưu ý về checkpoint, savepoint, và các xung đột version có thể xảy ra.
Trong Phần 3, chúng ta sẽ đi sâu hơn vào việc đọc dữ liệu từ Kafka, parse JSON thành Row
, tạo bảng Flink (Table) và quản lý “changelog” (upsert). Từ đó, bạn sẽ thấy rõ hơn cách cài đặt KafkaSource, cách join hai topic “order_events” và “shipping_events” ở cấp độ streaming và/hoặc SQL/Table API.
Điểm mấu chốt: Ở mô hình embedded, bạn được “tích hợp” Flink ngay trong Spring Boot, gọn nhẹ cho việc dev/test và POC. Đồng thời, bạn cũng phải chấp nhận những nhược điểm về scale và quản lý state so với Flink cluster.
Hẹn gặp lại ở Phần 3, nơi chúng ta “nhúng tay” vào pipeline thực tế: đọc hai topic Kafka, xử lý streaming, rồi chuyển sang table để chuẩn bị cho bước join và lưu vào MongoDB!
Subscribe to my newsletter
Read articles from Henry Collins directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
