Learning Kafka: Advanced Journey With Shubham Gore


Week 2: Mastering Kafka
In this post, I’ll take you through the advanced concepts I explored in my Kafka journey, building upon the fundamentals from Week 1. From stream processing to secure messaging, here’s how I tackled Kafka’s more intricate features and challenges.
Topics Covered
Kafka Streams Advanced Features: Stateful processing, windowed operations, and interactive queries.
Kafka Administration: Managing topics, partitions, retention policies, and monitoring clusters.
Kafka Connect Advanced: JDBC and MongoDB connectors for data integration.
Kafka Security: Enabling secure communication with SSL and ACLs.
Event-Driven Architecture: Building a microservices-based system leveraging Kafka.
1. Kafka Streams Advanced Features
Kafka Streams makes it easy to process real-time data streams. This week, I delved into:
Stateful Processing: Using state stores to maintain intermediate data.
Windowed Operations: Aggregating events in 1-minute windows.
Interactive Queries: Accessing the state store directly.
Code Example: Windowed Event Count
Copy@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean
public KafkaStreams kafkaStreams(StreamsBuilder streamsBuilder) {
/** Consuming from the input-events topic */
KStream<String, String> events = streamsBuilder.stream("input-events");
/** Windowed aggregation */
KTable<Windowed<String>, Long> eventCounts = events
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.count();
/** Publishing events to different Kafka topic */
eventCounts.toStream()
.to("output-events", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));
/** Starting Kafka streams */
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsConfig());
streams.start();
return streams;
}
private Properties streamsConfig() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowed-event-count");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return props;
}
}
Key insights:
Windowing allows grouping events by time intervals.
Stateful processing is invaluable for tracking event counts or trends.
2. Kafka Administration
Steps
Creating and Managing Topics:
Copy# Create a topic kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 # Describe a topic kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
Monitoring with Prometheus and Grafana:
Set up Prometheus to scrape Kafka metrics.
Visualize metrics in Grafana dashboards.
Key takeaways:
Retention policies prevent storage overflow.
Monitoring ensures high availability and fault tolerance.
3. Kafka Connect Advanced
Kafka Connect simplifies integrating external systems with Kafka. This week, I:
Set up JDBC Source Connector to stream database changes.
Used MongoDB Source and Sink Connectors for seamless MongoDB integration.
Example: MongoDB Source Connector
Copy{
"name": "mongodb-source-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max": "1",
"connection.uri": "mongodb://localhost:27017",
"database": "test_db",
"collection": "test_collection",
"topic.prefix": "mongodb-"
}
}
MongoDB Sink Connector
Copy{
"name": "mongodb-sink-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"connection.uri": "mongodb://localhost:27017",
"database": "test_db",
"collection": "sink_collection",
"topics": "mongodb-test_collection"
}
}
4. Kafka Security
Securing Kafka is essential for production environments. I implemented:
SSL Encryption:
Enabled encrypted communication between brokers and clients.
Configured SSL certificates using
keytool
.
Access Control Lists (ACLs):
- Restricted topic access to specific users.
Copykafka-acls.sh --add --allow-principal User:alice --operation Read --topic my-topic --bootstrap-server localhost:9092
Key learnings:
SSL adds a layer of encryption, while ACLs ensure granular access control.
Both are crucial for protecting sensitive data.
5. Event-Driven Architecture
Architecture Overview
Producer Service:
Provides REST APIs for sending events to Kafka topics dynamically.
Allows specifying the topic and message payload.
Consumer Service:
Listens to specific topics and performs business logic based on the event payload.
Supports scalable event processing.
Kafka Streams Service:
Processes and transforms events in real-time.
Publishes results back to different topics for further consumption.
Sample Producer API
The following example demonstrates how to dynamically publish messages to a topic using a REST API.
javaCopy code@RestController
@RequestMapping("/api/events")
public class EventController {
private final KafkaTemplate<String, String> kafkaTemplate;
public EventController(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@PostMapping("/publish")
public ResponseEntity<String> publish(
@RequestParam String topic,
@RequestBody String message) {
kafkaTemplate.send(topic, message);
return ResponseEntity.ok("Message published to topic: " + topic);
}
}
Sample Consumer Service
This example demonstrates a Kafka listener that processes messages from a specific topic.
javaCopy code@Service
public class EventConsumer {
@KafkaListener(topics = "input-events", groupId = "event-consumers")
public void consume(String message) {
System.out.println("Received message: " + message);
// Perform business logic here
}
}
Kafka Streams Service
The Kafka Streams application reads messages from one topic, processes them, and writes to another.
javaCopy code@EnableKafkaStreams
@Configuration
public class KafkaStreamsConfig {
@Bean
public KStream<String, String> processStream(StreamsBuilder streamsBuilder) {
KStream<String, String> inputEvents = streamsBuilder.stream("input-events");
// Example: Transforming message
inputEvents
.mapValues(value -> "Processed: " + value)
.to("output-events");
return inputEvents;
}
@Bean
public Properties streamsConfig() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return props;
}
}
Testing the Architecture
Start Kafka Services:
Run the Kafka broker and Zookeeper.
Create required topics:
input-events
andoutput-events
.
Publish Messages: Use the Producer API to send test messages:
bashCopy codecurl -X POST "http://localhost:8080/api/events/publish?topic=input-events" \ -H "Content-Type: application/json" \ -d '"Hello, Kafka!"'
Verify Consumer Logs: Check the logs of the Consumer Service to confirm message consumption.
Verify Stream Processing: Use a Kafka CLI consumer to check the
output-events
topic:bashCopy codekafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output-events --from-beginning
Insights
Kafka Streams excels at real-time processing, with powerful features like state stores and windowing.
Kafka Connect bridges external systems effortlessly, supporting various data sources and sinks.
Security practices like SSL and ACLs are non-negotiable for production.
Event-driven architectures enable decoupled, scalable systems with Kafka at their core.
Conclusion
This week, I pushed the boundaries of Kafka, mastering its advanced features and exploring how it integrates with real-world systems. From stream processing to secure pipelines, Kafka’s versatility continues to impress.
Engage with Me
Hashnode: Subscribe for weekly learning journals.
Let me know your thoughts, questions, or feedback in the comments below. 🚀
Subscribe to my newsletter
Read articles from Shubham Gore directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
