Learning Kafka: Advanced Journey With Shubham Gore

Shubham GoreShubham Gore
5 min read

Week 2: Mastering Kafka

In this post, I’ll take you through the advanced concepts I explored in my Kafka journey, building upon the fundamentals from Week 1. From stream processing to secure messaging, here’s how I tackled Kafka’s more intricate features and challenges.

Topics Covered

  1. Kafka Streams Advanced Features: Stateful processing, windowed operations, and interactive queries.

  2. Kafka Administration: Managing topics, partitions, retention policies, and monitoring clusters.

  3. Kafka Connect Advanced: JDBC and MongoDB connectors for data integration.

  4. Kafka Security: Enabling secure communication with SSL and ACLs.

  5. Event-Driven Architecture: Building a microservices-based system leveraging Kafka.


1. Kafka Streams Advanced Features

Kafka Streams makes it easy to process real-time data streams. This week, I delved into:

  • Stateful Processing: Using state stores to maintain intermediate data.

  • Windowed Operations: Aggregating events in 1-minute windows.

  • Interactive Queries: Accessing the state store directly.

Code Example: Windowed Event Count

Copy@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean
    public KafkaStreams kafkaStreams(StreamsBuilder streamsBuilder) {
        /** Consuming from the input-events topic */
        KStream<String, String> events = streamsBuilder.stream("input-events");

       /** Windowed aggregation */
        KTable<Windowed<String>, Long> eventCounts = events
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
                .count();

       /** Publishing events to different Kafka topic */
        eventCounts.toStream()
                .to("output-events", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));

      /** Starting Kafka streams */
        KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsConfig());
        streams.start();

        return streams;
    }

    private Properties streamsConfig() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowed-event-count");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return props;
    }
}

Key insights:

  • Windowing allows grouping events by time intervals.

  • Stateful processing is invaluable for tracking event counts or trends.


2. Kafka Administration

Steps

  • Creating and Managing Topics:

      Copy# Create a topic
      kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
    
      # Describe a topic
      kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
    
  • Monitoring with Prometheus and Grafana:

    • Set up Prometheus to scrape Kafka metrics.

    • Visualize metrics in Grafana dashboards.

Key takeaways:

  • Retention policies prevent storage overflow.

  • Monitoring ensures high availability and fault tolerance.


3. Kafka Connect Advanced

Kafka Connect simplifies integrating external systems with Kafka. This week, I:

  • Set up JDBC Source Connector to stream database changes.

  • Used MongoDB Source and Sink Connectors for seamless MongoDB integration.

Example: MongoDB Source Connector

Copy{
  "name": "mongodb-source-connector",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "tasks.max": "1",
    "connection.uri": "mongodb://localhost:27017",
    "database": "test_db",
    "collection": "test_collection",
    "topic.prefix": "mongodb-"
  }
}

MongoDB Sink Connector

Copy{
  "name": "mongodb-sink-connector",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "tasks.max": "1",
    "connection.uri": "mongodb://localhost:27017",
    "database": "test_db",
    "collection": "sink_collection",
    "topics": "mongodb-test_collection"
  }
}

4. Kafka Security

Securing Kafka is essential for production environments. I implemented:

  1. SSL Encryption:

    • Enabled encrypted communication between brokers and clients.

    • Configured SSL certificates using keytool.

  2. Access Control Lists (ACLs):

    • Restricted topic access to specific users.
    Copykafka-acls.sh --add --allow-principal User:alice --operation Read --topic my-topic --bootstrap-server localhost:9092

Key learnings:

  • SSL adds a layer of encryption, while ACLs ensure granular access control.

  • Both are crucial for protecting sensitive data.


5. Event-Driven Architecture

Architecture Overview

  1. Producer Service:

    • Provides REST APIs for sending events to Kafka topics dynamically.

    • Allows specifying the topic and message payload.

  2. Consumer Service:

    • Listens to specific topics and performs business logic based on the event payload.

    • Supports scalable event processing.

  3. Kafka Streams Service:

    • Processes and transforms events in real-time.

    • Publishes results back to different topics for further consumption.


Sample Producer API

The following example demonstrates how to dynamically publish messages to a topic using a REST API.

javaCopy code@RestController
@RequestMapping("/api/events")
public class EventController {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public EventController(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @PostMapping("/publish")
    public ResponseEntity<String> publish(
            @RequestParam String topic,
            @RequestBody String message) {
        kafkaTemplate.send(topic, message);
        return ResponseEntity.ok("Message published to topic: " + topic);
    }
}

Sample Consumer Service

This example demonstrates a Kafka listener that processes messages from a specific topic.

javaCopy code@Service
public class EventConsumer {

    @KafkaListener(topics = "input-events", groupId = "event-consumers")
    public void consume(String message) {
        System.out.println("Received message: " + message);

        // Perform business logic here
    }
}

Kafka Streams Service

The Kafka Streams application reads messages from one topic, processes them, and writes to another.

javaCopy code@EnableKafkaStreams
@Configuration
public class KafkaStreamsConfig {

    @Bean
    public KStream<String, String> processStream(StreamsBuilder streamsBuilder) {
        KStream<String, String> inputEvents = streamsBuilder.stream("input-events");

        // Example: Transforming message
        inputEvents
                .mapValues(value -> "Processed: " + value)
                .to("output-events");

        return inputEvents;
    }

    @Bean
    public Properties streamsConfig() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return props;
    }
}

Testing the Architecture

  1. Start Kafka Services:

    • Run the Kafka broker and Zookeeper.

    • Create required topics: input-events and output-events.

  2. Publish Messages: Use the Producer API to send test messages:

     bashCopy codecurl -X POST "http://localhost:8080/api/events/publish?topic=input-events" \
         -H "Content-Type: application/json" \
         -d '"Hello, Kafka!"'
    
  3. Verify Consumer Logs: Check the logs of the Consumer Service to confirm message consumption.

  4. Verify Stream Processing: Use a Kafka CLI consumer to check the output-events topic:

     bashCopy codekafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output-events --from-beginning
    

Insights

  • Kafka Streams excels at real-time processing, with powerful features like state stores and windowing.

  • Kafka Connect bridges external systems effortlessly, supporting various data sources and sinks.

  • Security practices like SSL and ACLs are non-negotiable for production.

  • Event-driven architectures enable decoupled, scalable systems with Kafka at their core.


Conclusion

This week, I pushed the boundaries of Kafka, mastering its advanced features and exploring how it integrates with real-world systems. From stream processing to secure pipelines, Kafka’s versatility continues to impress.


Engage with Me

Let me know your thoughts, questions, or feedback in the comments below. 🚀

0
Subscribe to my newsletter

Read articles from Shubham Gore directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Shubham Gore
Shubham Gore