Essential Apache Kafka Interview Preparation Guide - II

Pushkaraj OgalePushkaraj Ogale
3 min read

Producer Role

When an event occurs, the producer formats a message, also referred to as a record, and sends it to a Kafka topic.

A message consists of one required field, the value, and three optional fields:

  1. A key — The key is used to determine which partition the message is sent to.

  2. A timestamp — the timestamp is used to order messages within a partition.

  3. Headers. — Headers, like HTTP headers, are key-value pairs that can be used to store metadata about the message.

💡
Key : While not strictly required, the key is used to determine which partition the message is sent to. If you don't provide a key, Kafka will randomly assign the message to a partition. So when designing a large, distributed system like you're likely to be asked about in your interview, you'll want to use keys to ensure that messages are processed in order, and the choice of that key is important.

Producer Implementation in Java

💡
You can put Kafka configuration in either Application.yml / application.properties Or A map and put it in ProducerFactory()

Kafka Producer Configuration Bean

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.CLIENT_ID_CONFIG, "my-app");
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Kafka producer service

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;

public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessageWithKey(String topic, String key, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);

        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.printf("Sent message with key '%s' to partition %d with offset %d%n",
                        key, result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                System.err.printf("Failed to send message with key '%s': %s%n", key, ex.getMessage());
            }
        });
    }
}

Controller or Application Class

import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class KafkaRunner implements CommandLineRunner {

    @Autowired
    private KafkaProducerService producerService;

    @Override
    public void run(String... args) {
        producerService.sendMessageWithKey("my_topic", "key1", "Hello, Kafka with key!");
        producerService.sendMessageWithKey("key2", "Another message with a different key");
    }
}

Producer Internal

When a message is published to a Kafka topic

Kafka first determines the appropriate partition for the message. This partition selection is critical because it influences the distribution of data across the cluster. This is a two-step process:

  1. Partition Determination:

    • Kafka uses a partitioning algorithm that hashes the message key to assign the message to a specific partition.

    • If the message does not have a key, Kafka can either round-robin the message to partitions or follow another partitioning logic defined in the producer configuration.

    • This ensures that messages with the same key always go to the same partition, preserving order at the partition level.

  2. Broker Assignment:

    • Once the partition is determined, Kafka then identifies which broker holds that particular partition.

    • The mapping of partitions to specific brokers is managed by the Kafka cluster metadata, which is maintained by the Kafka controller (a role within the broker cluster).

    • The producer uses this metadata to send the message directly to the broker that hosts the target partition. 

0
Subscribe to my newsletter

Read articles from Pushkaraj Ogale directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Pushkaraj Ogale
Pushkaraj Ogale