Essential Apache Kafka Interview Preparation Guide - II

Producer Role
When an event occurs, the producer formats a message, also referred to as a record, and sends it to a Kafka topic.
A message consists of one required field, the value, and three optional fields:
A key — The key is used to determine which partition the message is sent to.
A timestamp — the timestamp is used to order messages within a partition.
Headers. — Headers, like HTTP headers, are key-value pairs that can be used to store metadata about the message.
Producer Implementation in Java
Kafka Producer Configuration Bean
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.CLIENT_ID_CONFIG, "my-app");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Kafka producer service
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessageWithKey(String topic, String key, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.printf("Sent message with key '%s' to partition %d with offset %d%n",
key, result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
System.err.printf("Failed to send message with key '%s': %s%n", key, ex.getMessage());
}
});
}
}
Controller or Application Class
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class KafkaRunner implements CommandLineRunner {
@Autowired
private KafkaProducerService producerService;
@Override
public void run(String... args) {
producerService.sendMessageWithKey("my_topic", "key1", "Hello, Kafka with key!");
producerService.sendMessageWithKey("key2", "Another message with a different key");
}
}
Producer Internal
When a message is published to a Kafka topic
Kafka first determines the appropriate partition for the message. This partition selection is critical because it influences the distribution of data across the cluster. This is a two-step process:
Partition Determination:
Kafka uses a partitioning algorithm that hashes the message key to assign the message to a specific partition.
If the message does not have a key, Kafka can either round-robin the message to partitions or follow another partitioning logic defined in the producer configuration.
This ensures that messages with the same key always go to the same partition, preserving order at the partition level.
Broker Assignment:
Once the partition is determined, Kafka then identifies which broker holds that particular partition.
The mapping of partitions to specific brokers is managed by the Kafka cluster metadata, which is maintained by the Kafka controller (a role within the broker cluster).
The producer uses this metadata to send the message directly to the broker that hosts the target partition. 
Subscribe to my newsletter
Read articles from Pushkaraj Ogale directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
