Phần 3: Đọc dữ liệu từ Kafka, parse JSON, và tạo bảng Flink

Dưới đây là Phần 3 của loạt bài “Chạy Apache Flink embedded trong Spring Boot” (bối cảnh e-commerce). Trong phần này, chúng ta sẽ đi sâu vào việc đọc dữ liệu từ Kafka, chuyển đổi (parse) JSON thành dạng Row
(hoặc một cấu trúc thích hợp), rồi tạo bảng Flink (Table) từ các DataStream tương ứng. Mục đích là chuẩn bị dữ liệu để JOIN và ghi xuống MongoDB ở các phần sau.
1. Tóm tắt yêu cầu
Như đã đề cập ở Phần 1, chúng ta có 2 topic Kafka chính trong bối cảnh e-commerce:
Topic A:
order_events
(chứa thông tin đơn hàng, ví dụorder_id
,user_id
,order_total
,order_status
,timestamp
, …).Topic B:
shipping_events
(chứa thông tin vận chuyển, ví dụshipping_id
,order_id
,carrier
,shipping_status
,timestamp
, …).
Flink sẽ đọc từ hai topic này, chuyển dữ liệu sang định dạng Row hoặc POJO, rồi đưa lên Table (Table API) bằng cách sử dụng tableEnv.fromChangelogStream(...)
(hoặc phương pháp tương đương). Nhờ đó, ta có thể JOIN chúng trong Phần 4 bằng Flink SQL hoặc Table API.
2. Khai báo KafkaSource trong Flink (ở chế độ Embedded)
2.1 Xác định phiên bản Kafka & cách deserialization
Phiên bản Kafka: Chúng ta giả định Kafka 2.x trở lên, đang chạy local hoặc trên server.
Boostrstrap servers: lấy từ
application.yml
(spring.kafka.bootstrap-servers
).GroupId: Thông thường ta tự đặt, ví dụ
flink-ecommerce-group
.Deserialization: Ở ví dụ này, Kafka messages có dữ liệu JSON. Ta sẽ dùng Flink Kafka Connector với một deserializer tùy chỉnh (có thể là
SimpleStringSchema
, sau đó parse JSON thủ công hoặc dùng Avro/Json library).
2.2 Tạo KafkaSource<String>
cho 2 topic
Ví dụ trong service FlinkPipelineService
:
package com.example.service;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class KafkaSourceFactory {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
// Tạo KafkaSource cho topic order_events:
public KafkaSource<String> createOrderEventsSource(String groupId) {
return KafkaSource.<String>builder()
.setBootstrapServers(bootstrapServers)
.setTopics("order_events") // Topic A
.setGroupId(groupId)
// Khởi tạo offsets:
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(SimpleStringSchema.UTF8)
.build();
}
// Tạo KafkaSource cho topic shipping_events:
public KafkaSource<String> createShippingEventsSource(String groupId) {
return KafkaSource.<String>builder()
.setBootstrapServers(bootstrapServers)
.setTopics("shipping_events") // Topic B
.setGroupId(groupId)
// Khởi tạo offsets:
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(SimpleStringSchema.UTF8)
.build();
}
}
Giải thích:
KafkaSource.<String>builder()
: Sử dụng builder pattern cho KafkaSource (kể từ Flink 1.14+).setValueOnlyDeserializer(SimpleStringSchema.UTF8)
: Cho biết ta nhận message dưới dạng String.OffsetResetStrategy.EARLIEST
: Nếu chưa có offset commit, bắt đầu đọc từ đầu topic.Trong thực tế, bạn có thể dùng AvroDeserializationSchema, JSONDeserializationSchema, hoặc tự viết custom.
Sau đó, khi build pipeline, ta sẽ gọi createOrderEventsSource()
và createShippingEventsSource()
để lấy KafkaSource
tương ứng.
3. Chuyển JSON sang đối tượng Row hoặc POJO
3.1 Lấy DataStream từ KafkaSource
Từ StreamExecutionEnvironment
, ta tạo DataStream như sau:
DataStreamSource<String> orderEventsStream = env.fromSource(
kafkaSourceFactory.createOrderEventsSource("flink-ecommerce-group"),
WatermarkStrategy.noWatermarks(),
"Kafka Source - Order Events"
);
DataStreamSource<String> shippingEventsStream = env.fromSource(
kafkaSourceFactory.createShippingEventsSource("flink-ecommerce-group"),
WatermarkStrategy.noWatermarks(),
"Kafka Source - Shipping Events"
);
WatermarkStrategy.noWatermarks()
: Tạm thời ta bỏ qua watermark. Nếu muốn xử lý event-time, ta sẽ học cách tạo watermark ở Phần 4.Tên source là
"Kafka Source - ..."
để nhận biết trong Flink UI/logs.
3.2 Parse JSON thủ công hoặc qua library
Giả sử chúng ta chọn cách parse thủ công bằng Jackson. Mỗi record JSON của Order Event:
{
"order_id": "ORD123",
"user_id": "U001",
"order_total": 99.5,
"order_status": "CREATED",
"timestamp": 1679999999999
}
Và Shipping Event:
{
"shipping_id": "SHIP987",
"order_id": "ORD123",
"carrier": "DHL",
"shipping_status": "IN_TRANSIT",
"timestamp": 1680000000999
}
Chúng ta có 2 POJO đại diện:
public class OrderEvent {
public String orderId;
public String userId;
public double orderTotal;
public String orderStatus;
public long timestamp;
// Constructors, getters, setters...
}
public class ShippingEvent {
public String shippingId;
public String orderId;
public String carrier;
public String shippingStatus;
public long timestamp;
// Constructors, getters, setters...
}
Bước parse (transform String
→ OrderEvent
/ ShippingEvent
):
DataStream<OrderEvent> parsedOrderStream = orderEventsStream
.map(json -> {
// Parse JSON -> OrderEvent
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(json, OrderEvent.class);
})
.name("Parse-Order-JSON");
DataStream<ShippingEvent> parsedShippingStream = shippingEventsStream
.map(json -> {
// Parse JSON -> ShippingEvent
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(json, ShippingEvent.class);
})
.name("Parse-Shipping-JSON");
Lưu ý:
Tránh tạo
new ObjectMapper()
bên trongmap()
nhiều lần. Trong thực tế, bạn nên tạo mộtstatic final ObjectMapper
, hoặc viết hẳn một MapFunction riêng để tái sử dụng.Có thể quản lý exceptions (ví dụ JSON parse lỗi) tùy chiến lược (bỏ qua, gửi tới dead-letter queue, …).
3.3 Tùy chọn dùng Row thay vì POJO
Bạn cũng có thể trực tiếp chuyển JSON → Row
, nhất là khi sử dụng Table API
/SQL
kèm fromChangelogStream(...)
. Nếu dữ liệu có row schema tĩnh (cột cố định), ta có thể khai báo RowTypeInfo
hoặc Schema
để parse.
Tuy nhiên, ví dụ này minh họa POJO approach, rồi ta sẽ build Table trên POJO.
4. Tạo bảng Flink từ DataStream
4.1 Dùng Table API / Changelog
Có nhiều cách để biến DataStream thành Table:
tableEnv.fromDataStream(DataStream<T>)
: Tạo bảng đơn giản vớiappend-only
mode.tableEnv.fromChangelogStream(DataStream<Row> rowStream, Schema, ChangelogMode)
: Dùng khi ta muốn upsert hay handle thay đổi (RowKind).
Trong bối cảnh e-commerce, mỗi order/ shipping event thường là kiểu “CREATE/UPDATE”. Nếu bạn muốn track thay đổi và upsert sang MongoDB, bạn có thể phải khai báo RowKind
(INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE).
Tuy nhiên, giả sử đơn giản ta chỉ nhận INSERT (mỗi record order/shipping event xem như 1 record “newest” – vì status update có thể đến với cùng order_id
), ta có thể:
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
// ...
Table orderTable = tableEnv.fromDataStream(parsedOrderStream,
$("orderId"),
$("userId"),
$("orderTotal"),
$("orderStatus"),
$("timestamp"),
$("proctime").proctime() // cột thời gian xử lý (nếu cần)
);
Table shippingTable = tableEnv.fromDataStream(parsedShippingStream,
$("shippingId"),
$("orderId"),
$("carrier"),
$("shippingStatus"),
$("timestamp"),
$("proctime").proctime()
);
Giải thích:
$("...")
là cú pháp DSL của Table API (Flink 1.13+).$("proctime").proctime()
tạo cột “processing time”.Tên bảng: tạm thời, ta có thể tạo “Temporary View”:
tableEnv.createTemporaryView("order_events_table", orderTable);
tableEnv.createTemporaryView("shipping_events_table", shippingTable);
4.2 Lưu ý về tính “Upsert” và RowKind
Nếu bạn muốn theo dõi các event “UPDATE” (ví dụ, order_status
chuyển từ CREATED
→ PAID
), bạn có thể:
Dùng
tableEnv.fromChangelogStream(...)
với Row cóRowKind
.Khi parse, bạn xác định logic thay đổi (INSERT, UPDATE_BEFORE, UPDATE_AFTER).
Tuy nhiên, phần này nâng cao. Để đơn giản, chúng ta giả sử mỗi record Kafka là một event “INSERT” (hoặc “upsert overwrite”). Mọi update của order_status
coi như event mới. Flink gặp cùng order_id
vẫn coi đó là “insert row” (và khi join, ta sẽ lấy giá trị mới nhất theo timestamp).
Ở Phần 4, chúng ta sẽ “Join” hai bảng này (theo order_id
) và hoặc tạo logic update/upsert sang MongoDB.
5. Tích hợp đoạn code này vào FlinkPipelineService
Để tổng hợp, ta có thể viết trong class FlinkPipelineService
(một ví dụ):
@Service
public class FlinkPipelineService {
@Autowired
private StreamExecutionEnvironment env;
@Autowired
private StreamTableEnvironment tableEnv;
@Autowired
private KafkaSourceFactory kafkaSourceFactory;
public void setupAndRunPipeline() throws Exception {
// 1. Tạo DataStream<String> từ Kafka
DataStreamSource<String> orderEventsStream = env.fromSource(
kafkaSourceFactory.createOrderEventsSource("flink-ecommerce-group"),
WatermarkStrategy.noWatermarks(),
"Kafka Source - Order Events"
);
DataStreamSource<String> shippingEventsStream = env.fromSource(
kafkaSourceFactory.createShippingEventsSource("flink-ecommerce-group"),
WatermarkStrategy.noWatermarks(),
"Kafka Source - Shipping Events"
);
// 2. Parse JSON -> POJO
DataStream<OrderEvent> parsedOrderStream = orderEventsStream
.map(json -> {
// parse JSON -> OrderEvent
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(json, OrderEvent.class);
})
.name("Parse-Order-JSON");
DataStream<ShippingEvent> parsedShippingStream = shippingEventsStream
.map(json -> {
// parse JSON -> ShippingEvent
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(json, ShippingEvent.class);
})
.name("Parse-Shipping-JSON");
// 3. Tạo Table từ DataStream
Table orderTable = tableEnv.fromDataStream(parsedOrderStream,
$("orderId"),
$("userId"),
$("orderTotal"),
$("orderStatus"),
$("timestamp"),
$("proctime").proctime()
);
Table shippingTable = tableEnv.fromDataStream(parsedShippingStream,
$("shippingId"),
$("orderId"),
$("carrier"),
$("shippingStatus"),
$("timestamp"),
$("proctime").proctime()
);
// 4. Tạo Temporary View cho việc JOIN (ở phần sau)
tableEnv.createTemporaryView("order_events_table", orderTable);
tableEnv.createTemporaryView("shipping_events_table", shippingTable);
// 5. Bật streaming job
env.execute("E-commerce Flink Pipeline");
}
}
Khi Spring Boot start (hoặc khi bạn gọi /start-job
từ controller), hàm này sẽ chạy job Flink. Streaming job sẽ liên tục:
Lắng nghe hai topic Kafka.
Parse JSON thành
OrderEvent
,ShippingEvent
.Cập nhật vào bảng
order_events_table
vàshipping_events_table
.
Chúng ta chưa JOIN hay ghi xuống MongoDB ở đây. Đó là nội dung Phần 4.
6. Xác minh hoạt động
6.1 Chạy Spring Boot & gửi data vào Kafka
Chạy ứng dụng Spring Boot:
mvn spring-boot:run
(hoặcjava -jar spring-flink-embedded.jar
)Xem logs, xác nhận rằng pipeline đã “lắng nghe” 2 topic.
Gửi một vài thông điệp JSON vào Kafka:
Topic
order_events
, ví dụ:echo '{"order_id":"ORD001","user_id":"U100","order_total":120.0,"order_status":"CREATED","timestamp":1680000000001}' | kafka-console-producer.sh --broker-list localhost:9092 --topic order_events
Topic
shipping_events
, ví dụ:echo '{"shipping_id":"SHIP-456","order_id":"ORD001","carrier":"DHL","shipping_status":"PREPARED","timestamp":1680000000500}' | kafka-console-producer.sh --broker-list localhost:9092 --topic shipping_events
Kiểm tra logs:
- Xem log ứng dụng: Nếu parse JSON thành công, ta thấy pipeline “chạy” mà không báo lỗi.
6.2 Kiểm tra Table trong Flink SQL CLI (tùy chọn)
Vì đang embedded, ta không có Flink CLI chính thức để tương tác.
Tuy nhiên, có thể in DataStream ra console (debug) hoặc tạo temporary sink in ra log.
Ở Phần 4, ta sẽ JOIN và ghi xuống MongoDB, từ đó ta xác minh xem Mongo có data chưa.
7. Tóm tắt & Hướng dẫn tiếp theo
Trong Phần 3, bạn đã học được cách:
Khai báo KafkaSource cho 2 topic (
order_events
,shipping_events
).Tạo DataStream từ Kafka, parse JSON thành POJO
OrderEvent
vàShippingEvent
.Chuyển DataStream thành Table (append-only).
Tạo Temporary View để tiện JOIN trong bước kế.
Đây là nền tảng cốt lõi để chúng ta xây dựng logic JOIN. Ở Phần 4, chúng ta sẽ xử lý:
JOIN hai bảng
order_events_table
vàshipping_events_table
dựa trênorder_id
.Quản lý thời gian (có thể dùng window join hoặc interval join) vì shipping event đến sau order event.
Ghi kết quả cuối cùng sang MongoDB (thông qua Flink Connector hoặc DDL
CREATE TABLE ... WITH('connector'='mongodb')
).Cách sử dụng
ChangelogMode.upsert()
để đảm bảo bản ghi được upsert tương ứng (thêm hoặc cập nhật).
Sau đó, ở Phần 5, ta sẽ bàn sâu hơn về vận hành (checkpoint, scaling, Docker/K8s, …).
Điểm mấu chốt: Phần 3 này giúp bạn nắm rõ cách đọc (consume) Kafka topic trong môi trường Flink embedded, parse dữ liệu JSON, rồi “bắc cầu” sang Table API. Đây chính là khâu tiền xử lý trước khi ta thực hiện những thao tác phân tích, tính toán, hoặc JOIN phức tạp.
Hẹn gặp lại ở Phần 4, nơi chúng ta thực sự “làm giàu” dữ liệu bằng cách JOIN order_events_table
và shipping_events_table
, rồi ghi kết quả sang MongoDB!
Subscribe to my newsletter
Read articles from Henry Collins directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
