Phần 3: Đọc dữ liệu từ Kafka, parse JSON, và tạo bảng Flink

Henry CollinsHenry Collins
8 min read

Dưới đây là Phần 3 của loạt bài “Chạy Apache Flink embedded trong Spring Boot” (bối cảnh e-commerce). Trong phần này, chúng ta sẽ đi sâu vào việc đọc dữ liệu từ Kafka, chuyển đổi (parse) JSON thành dạng Row (hoặc một cấu trúc thích hợp), rồi tạo bảng Flink (Table) từ các DataStream tương ứng. Mục đích là chuẩn bị dữ liệu để JOIN và ghi xuống MongoDB ở các phần sau.


1. Tóm tắt yêu cầu

Như đã đề cập ở Phần 1, chúng ta có 2 topic Kafka chính trong bối cảnh e-commerce:

  • Topic A: order_events (chứa thông tin đơn hàng, ví dụ order_id, user_id, order_total, order_status, timestamp, …).

  • Topic B: shipping_events (chứa thông tin vận chuyển, ví dụ shipping_id, order_id, carrier, shipping_status, timestamp, …).

Flink sẽ đọc từ hai topic này, chuyển dữ liệu sang định dạng Row hoặc POJO, rồi đưa lên Table (Table API) bằng cách sử dụng tableEnv.fromChangelogStream(...) (hoặc phương pháp tương đương). Nhờ đó, ta có thể JOIN chúng trong Phần 4 bằng Flink SQL hoặc Table API.


2.1 Xác định phiên bản Kafka & cách deserialization

  • Phiên bản Kafka: Chúng ta giả định Kafka 2.x trở lên, đang chạy local hoặc trên server.

  • Boostrstrap servers: lấy từ application.yml (spring.kafka.bootstrap-servers).

  • GroupId: Thông thường ta tự đặt, ví dụ flink-ecommerce-group.

  • Deserialization: Ở ví dụ này, Kafka messages có dữ liệu JSON. Ta sẽ dùng Flink Kafka Connector với một deserializer tùy chỉnh (có thể là SimpleStringSchema, sau đó parse JSON thủ công hoặc dùng Avro/Json library).

2.2 Tạo KafkaSource<String> cho 2 topic

Ví dụ trong service FlinkPipelineService:

package com.example.service;

import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class KafkaSourceFactory {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    // Tạo KafkaSource cho topic order_events:
    public KafkaSource<String> createOrderEventsSource(String groupId) {
        return KafkaSource.<String>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics("order_events") // Topic A
                .setGroupId(groupId)
                // Khởi tạo offsets:
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                .setValueOnlyDeserializer(SimpleStringSchema.UTF8)
                .build();
    }

    // Tạo KafkaSource cho topic shipping_events:
    public KafkaSource<String> createShippingEventsSource(String groupId) {
        return KafkaSource.<String>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics("shipping_events") // Topic B
                .setGroupId(groupId)
                // Khởi tạo offsets:
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                .setValueOnlyDeserializer(SimpleStringSchema.UTF8)
                .build();
    }
}

Giải thích:

  • KafkaSource.<String>builder(): Sử dụng builder pattern cho KafkaSource (kể từ Flink 1.14+).

  • setValueOnlyDeserializer(SimpleStringSchema.UTF8): Cho biết ta nhận message dưới dạng String.

  • OffsetResetStrategy.EARLIEST: Nếu chưa có offset commit, bắt đầu đọc từ đầu topic.

  • Trong thực tế, bạn có thể dùng AvroDeserializationSchema, JSONDeserializationSchema, hoặc tự viết custom.

Sau đó, khi build pipeline, ta sẽ gọi createOrderEventsSource()createShippingEventsSource() để lấy KafkaSource tương ứng.


3. Chuyển JSON sang đối tượng Row hoặc POJO

3.1 Lấy DataStream từ KafkaSource

Từ StreamExecutionEnvironment, ta tạo DataStream như sau:

DataStreamSource<String> orderEventsStream = env.fromSource(
    kafkaSourceFactory.createOrderEventsSource("flink-ecommerce-group"),
    WatermarkStrategy.noWatermarks(),
    "Kafka Source - Order Events"
);

DataStreamSource<String> shippingEventsStream = env.fromSource(
    kafkaSourceFactory.createShippingEventsSource("flink-ecommerce-group"),
    WatermarkStrategy.noWatermarks(),
    "Kafka Source - Shipping Events"
);
  • WatermarkStrategy.noWatermarks(): Tạm thời ta bỏ qua watermark. Nếu muốn xử lý event-time, ta sẽ học cách tạo watermark ở Phần 4.

  • Tên source là "Kafka Source - ..." để nhận biết trong Flink UI/logs.

3.2 Parse JSON thủ công hoặc qua library

Giả sử chúng ta chọn cách parse thủ công bằng Jackson. Mỗi record JSON của Order Event:

{
  "order_id": "ORD123",
  "user_id": "U001",
  "order_total": 99.5,
  "order_status": "CREATED",
  "timestamp": 1679999999999
}

Shipping Event:

{
  "shipping_id": "SHIP987",
  "order_id": "ORD123",
  "carrier": "DHL",
  "shipping_status": "IN_TRANSIT",
  "timestamp": 1680000000999
}

Chúng ta có 2 POJO đại diện:

public class OrderEvent {
    public String orderId;
    public String userId;
    public double orderTotal;
    public String orderStatus;
    public long timestamp;
    // Constructors, getters, setters...
}

public class ShippingEvent {
    public String shippingId;
    public String orderId;
    public String carrier;
    public String shippingStatus;
    public long timestamp;
    // Constructors, getters, setters...
}

Bước parse (transform StringOrderEvent / ShippingEvent):

DataStream<OrderEvent> parsedOrderStream = orderEventsStream
    .map(json -> {
        // Parse JSON -> OrderEvent
        ObjectMapper mapper = new ObjectMapper();
        return mapper.readValue(json, OrderEvent.class);
    })
    .name("Parse-Order-JSON");

DataStream<ShippingEvent> parsedShippingStream = shippingEventsStream
    .map(json -> {
        // Parse JSON -> ShippingEvent
        ObjectMapper mapper = new ObjectMapper();
        return mapper.readValue(json, ShippingEvent.class);
    })
    .name("Parse-Shipping-JSON");

Lưu ý:

  • Tránh tạo new ObjectMapper() bên trong map() nhiều lần. Trong thực tế, bạn nên tạo một static final ObjectMapper, hoặc viết hẳn một MapFunction riêng để tái sử dụng.

  • Có thể quản lý exceptions (ví dụ JSON parse lỗi) tùy chiến lược (bỏ qua, gửi tới dead-letter queue, …).

3.3 Tùy chọn dùng Row thay vì POJO

Bạn cũng có thể trực tiếp chuyển JSON → Row, nhất là khi sử dụng Table API/SQL kèm fromChangelogStream(...). Nếu dữ liệu có row schema tĩnh (cột cố định), ta có thể khai báo RowTypeInfo hoặc Schema để parse.
Tuy nhiên, ví dụ này minh họa POJO approach, rồi ta sẽ build Table trên POJO.


4.1 Dùng Table API / Changelog

Có nhiều cách để biến DataStream thành Table:

  1. tableEnv.fromDataStream(DataStream<T>): Tạo bảng đơn giản với append-only mode.

  2. tableEnv.fromChangelogStream(DataStream<Row> rowStream, Schema, ChangelogMode): Dùng khi ta muốn upsert hay handle thay đổi (RowKind).

Trong bối cảnh e-commerce, mỗi order/ shipping event thường là kiểu “CREATE/UPDATE”. Nếu bạn muốn track thay đổi và upsert sang MongoDB, bạn có thể phải khai báo RowKind (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE).
Tuy nhiên, giả sử đơn giản ta chỉ nhận INSERT (mỗi record order/shipping event xem như 1 record “newest” – vì status update có thể đến với cùng order_id), ta có thể:

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

// ...

Table orderTable = tableEnv.fromDataStream(parsedOrderStream,
    $("orderId"),
    $("userId"),
    $("orderTotal"),
    $("orderStatus"),
    $("timestamp"),
    $("proctime").proctime() // cột thời gian xử lý (nếu cần)
);

Table shippingTable = tableEnv.fromDataStream(parsedShippingStream,
    $("shippingId"),
    $("orderId"),
    $("carrier"),
    $("shippingStatus"),
    $("timestamp"),
    $("proctime").proctime()
);

Giải thích:

  • $("...") là cú pháp DSL của Table API (Flink 1.13+).

  • $("proctime").proctime() tạo cột “processing time”.

  • Tên bảng: tạm thời, ta có thể tạo “Temporary View”:

tableEnv.createTemporaryView("order_events_table", orderTable);
tableEnv.createTemporaryView("shipping_events_table", shippingTable);

4.2 Lưu ý về tính “Upsert” và RowKind

Nếu bạn muốn theo dõi các event “UPDATE” (ví dụ, order_status chuyển từ CREATEDPAID), bạn có thể:

  • Dùng tableEnv.fromChangelogStream(...) với RowRowKind.

  • Khi parse, bạn xác định logic thay đổi (INSERT, UPDATE_BEFORE, UPDATE_AFTER).

Tuy nhiên, phần này nâng cao. Để đơn giản, chúng ta giả sử mỗi record Kafka là một event “INSERT” (hoặc “upsert overwrite”). Mọi update của order_status coi như event mới. Flink gặp cùng order_id vẫn coi đó là “insert row” (và khi join, ta sẽ lấy giá trị mới nhất theo timestamp).

Ở Phần 4, chúng ta sẽ “Join” hai bảng này (theo order_id) và hoặc tạo logic update/upsert sang MongoDB.


5. Tích hợp đoạn code này vào FlinkPipelineService

Để tổng hợp, ta có thể viết trong class FlinkPipelineService (một ví dụ):

@Service
public class FlinkPipelineService {

    @Autowired
    private StreamExecutionEnvironment env;

    @Autowired
    private StreamTableEnvironment tableEnv;

    @Autowired
    private KafkaSourceFactory kafkaSourceFactory;

    public void setupAndRunPipeline() throws Exception {

        // 1. Tạo DataStream<String> từ Kafka
        DataStreamSource<String> orderEventsStream = env.fromSource(
                kafkaSourceFactory.createOrderEventsSource("flink-ecommerce-group"),
                WatermarkStrategy.noWatermarks(),
                "Kafka Source - Order Events"
        );

        DataStreamSource<String> shippingEventsStream = env.fromSource(
                kafkaSourceFactory.createShippingEventsSource("flink-ecommerce-group"),
                WatermarkStrategy.noWatermarks(),
                "Kafka Source - Shipping Events"
        );

        // 2. Parse JSON -> POJO
        DataStream<OrderEvent> parsedOrderStream = orderEventsStream
            .map(json -> {
                // parse JSON -> OrderEvent
                ObjectMapper mapper = new ObjectMapper();
                return mapper.readValue(json, OrderEvent.class);
            })
            .name("Parse-Order-JSON");

        DataStream<ShippingEvent> parsedShippingStream = shippingEventsStream
            .map(json -> {
                // parse JSON -> ShippingEvent
                ObjectMapper mapper = new ObjectMapper();
                return mapper.readValue(json, ShippingEvent.class);
            })
            .name("Parse-Shipping-JSON");

        // 3. Tạo Table từ DataStream
        Table orderTable = tableEnv.fromDataStream(parsedOrderStream,
                $("orderId"),
                $("userId"),
                $("orderTotal"),
                $("orderStatus"),
                $("timestamp"),
                $("proctime").proctime()
        );

        Table shippingTable = tableEnv.fromDataStream(parsedShippingStream,
                $("shippingId"),
                $("orderId"),
                $("carrier"),
                $("shippingStatus"),
                $("timestamp"),
                $("proctime").proctime()
        );

        // 4. Tạo Temporary View cho việc JOIN (ở phần sau)
        tableEnv.createTemporaryView("order_events_table", orderTable);
        tableEnv.createTemporaryView("shipping_events_table", shippingTable);

        // 5. Bật streaming job
        env.execute("E-commerce Flink Pipeline");
    }
}

Khi Spring Boot start (hoặc khi bạn gọi /start-job từ controller), hàm này sẽ chạy job Flink. Streaming job sẽ liên tục:

  • Lắng nghe hai topic Kafka.

  • Parse JSON thành OrderEvent, ShippingEvent.

  • Cập nhật vào bảng order_events_tableshipping_events_table.

Chúng ta chưa JOIN hay ghi xuống MongoDB ở đây. Đó là nội dung Phần 4.


6. Xác minh hoạt động

6.1 Chạy Spring Boot & gửi data vào Kafka

  1. Chạy ứng dụng Spring Boot:

    • mvn spring-boot:run (hoặc java -jar spring-flink-embedded.jar)

    • Xem logs, xác nhận rằng pipeline đã “lắng nghe” 2 topic.

  2. Gửi một vài thông điệp JSON vào Kafka:

    • Topic order_events, ví dụ:

        echo '{"order_id":"ORD001","user_id":"U100","order_total":120.0,"order_status":"CREATED","timestamp":1680000000001}' | kafka-console-producer.sh --broker-list localhost:9092 --topic order_events
      
    • Topic shipping_events, ví dụ:

        echo '{"shipping_id":"SHIP-456","order_id":"ORD001","carrier":"DHL","shipping_status":"PREPARED","timestamp":1680000000500}' | kafka-console-producer.sh --broker-list localhost:9092 --topic shipping_events
      
  3. Kiểm tra logs:

    • Xem log ứng dụng: Nếu parse JSON thành công, ta thấy pipeline “chạy” mà không báo lỗi.
  • Vì đang embedded, ta không có Flink CLI chính thức để tương tác.

  • Tuy nhiên, có thể in DataStream ra console (debug) hoặc tạo temporary sink in ra log.

  • Phần 4, ta sẽ JOIN và ghi xuống MongoDB, từ đó ta xác minh xem Mongo có data chưa.


7. Tóm tắt & Hướng dẫn tiếp theo

Trong Phần 3, bạn đã học được cách:

  1. Khai báo KafkaSource cho 2 topic (order_events, shipping_events).

  2. Tạo DataStream từ Kafka, parse JSON thành POJO OrderEventShippingEvent.

  3. Chuyển DataStream thành Table (append-only).

  4. Tạo Temporary View để tiện JOIN trong bước kế.

Đây là nền tảng cốt lõi để chúng ta xây dựng logic JOIN. Ở Phần 4, chúng ta sẽ xử lý:

  • JOIN hai bảng order_events_tableshipping_events_table dựa trên order_id.

  • Quản lý thời gian (có thể dùng window join hoặc interval join) vì shipping event đến sau order event.

  • Ghi kết quả cuối cùng sang MongoDB (thông qua Flink Connector hoặc DDL CREATE TABLE ... WITH('connector'='mongodb')).

  • Cách sử dụng ChangelogMode.upsert() để đảm bảo bản ghi được upsert tương ứng (thêm hoặc cập nhật).

Sau đó, ở Phần 5, ta sẽ bàn sâu hơn về vận hành (checkpoint, scaling, Docker/K8s, …).

Điểm mấu chốt: Phần 3 này giúp bạn nắm rõ cách đọc (consume) Kafka topic trong môi trường Flink embedded, parse dữ liệu JSON, rồi “bắc cầu” sang Table API. Đây chính là khâu tiền xử lý trước khi ta thực hiện những thao tác phân tích, tính toán, hoặc JOIN phức tạp.

Hẹn gặp lại ở Phần 4, nơi chúng ta thực sự “làm giàu” dữ liệu bằng cách JOIN order_events_tableshipping_events_table, rồi ghi kết quả sang MongoDB!

0
Subscribe to my newsletter

Read articles from Henry Collins directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Henry Collins
Henry Collins