Supercharge your Kafka producer with a custom serializer
Introduction
In this tutorial, we will explore how to create a custom Kafka serializer in Java. We'll implement a serializer that adds "hello" to the beginning of every message. This can be a handy tool if you want to isolate logic from your app and reuse it across multiple Kafka producer applications.
Prerequisites
Before we begin, ensure you have the following:
Java Development Kit (JDK) installed
Apache Kafka installed and running
Step 1: Create the Custom Serializer
Let's start by creating the custom Kafka serializer. We'll implement the org.apache.kafka.common.serialization.Serializer
interface to modify our messages. Below is the code for the custom serializer:
package com.example.kafka.serialization;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.errors.SerializationException;
import java.util.Map;
public class CustomSerializer implements Serializer<String> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// Configuration settings can be handled here if needed.
}
@Override
public byte[] serialize(String topic, String data) {
if (data == null) {
return null;
}
// Add "hello" to the beginning of the message.
String modifiedMessage = "hello " + data;
try {
// Convert the modified message to bytes.
return modifiedMessage.getBytes("UTF-8");
} catch (Exception e) {
throw new SerializationException("Error serializing message: " + e.getMessage(), e);
}
}
@Override
public void close() {
// This method can be used to perform any necessary cleanup.
}
}
In this code:
We implement the
serialize
method to add "hello" to the beginning of each message.The
configure
andclose
methods are implemented for configuration and cleanup, respectively.
Step 2: Set Up the Kafka Producer
Now that we have our custom serializer, let's set up a Kafka producer to test it. Here's the code for the Kafka producer:
package com.example.kafka;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "com.example.kafka.serialization.CustomSerializer");
Producer<String, String> producer = new KafkaProducer<>(properties);
String topic = "test";
String message = "world";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Message sent successfully to partition " + metadata.partition());
} else {
System.err.println("Error sending message: " + exception.getMessage());
}
});
producer.close();
}
}
Make sure to replace "com.example.kafka.serialization.CustomSerializer"
with the fully qualified class name of your custom serializer.
Step 3: Run the Producer
To test our custom serializer, compile and run the Kafka producer. It will send a message with "hello" added to the beginning.
Conclusion
In this tutorial, we've created a custom Kafka serializer that adds "hello" to the start of every message. You can modify this serializer to perform other transformations or data processing as needed. Custom serializers provide flexibility when working with Kafka, allowing you to adapt your data to meet specific requirements.
Explore more Kafka features and adapt your serializer to suit your use case. Happy coding!
Subscribe to my newsletter
Read articles from Akhil Kondapaneni directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Akhil Kondapaneni
Akhil Kondapaneni
Fascinated by the things that go beep!