Understanding the basics of Kafka Binary Protocol


Apache Kafka is a distributed event streaming platform used for high-performance data pipelines. In this article, we will take a look at the under belly of the Kafka and see how communication happens between the Kafka client and server.
Fundamentals
Let's start with the basics. Kafka uses a custom binary protocol for sending and receiving messages.
The specifications define the request header as follows:
Field | Data type | Description |
request_api_key | INT16 | The API key for the request |
request_api_version | INT16 | The version of the API for the request |
correlation_id | INT32 | A unique identifier for the request |
client_id | NULLABLE_STRING | The client ID for the request |
TAG_BUFFER | COMPACT_ARRAY | Optional tagged fields |
Specs defines the data types as
Type | Description |
INT16 | Represents an integer between -215 and 215-1 inclusive. The values are encoded using two bytes in network byte order (big-endian). |
INT32 | Represents an integer between -231 and 231-1 inclusive. The values are encoded using four bytes in network byte order (big-endian). |
COMPACT_ARRAY | Represents a sequence of objects of a given type T. Type T can be either a primitive type (e.g. STRING) or a structure. First, the length N + 1 is given as an UNSIGNED_VARINT. Then N instances of type T follow. A null array is represented with a length of 0. In protocol documentation an array of T instances is referred to as [T]. |
Here's an example of a request message:
00 00 00 23 // message_size: 35
00 12 // request_api_key: 18
00 04 // request_api_version: 4
6f 7f c6 61 // correlation_id: 1870644833
...
Every Kafka request is an API call. The Kafka protocol defines over 70 different APIs, all of which do different things. Here are some examples:
Produce
writes events to partitions.CreateTopics
creates new topics.ApiVersions
returns the broker's supported API versions.
A Kafka request specifies the API its calling by using request_api_key header field.
Message body
The schemas for the request and response bodies are determined by the API being called.
For example, here are some of the fields that the Produce
request body contains:
The name of the topic to write to.
The key of the partition to write to.
The event data to write.
On the other hand, the Produce
response body contains a response code for each event. These response codes indicate if the writes succeeded.
As a reminder, requests and responses both have the following format:
message_size
Header
Body
API versioning
Each API supports multiple versions, to allow for different schemas. Here's how API versioning works:
Requests use the header field
request_api_version
to specify the API version being requested.Responses always use the same API version as the request. For example, a
Produce Request (Version: 3)
will always get aProduce Response (Version: 3)
back.Each API's version history is independent. So, different APIs with the same version are unrelated. For example,
Produce Request (Version: 10)
is not related toFetch Request (Version: 10)
.
The ApiVersions
API
The ApiVersions
API returns the broker's supported API versions. For example, ApiVersions
may say that the broker supports Produce
versions 5 to 11, Fetch
versions 0 to 3, etc.
Visualizing the Binary Protocol
Here is a great link that can help you visualize the binary protocol
Hands-on
So let’s build a POC of the Kafka server.
Let’s start our Server
public static void main(String[] args) {
int port = 9092;
try (ServerSocket server = new ServerSocket(port)) {
server.setReuseAddress(true);
log.info("Server started on port {}", port);
while (true) {
Socket client = server.accept();
log.info("New client connected");
handleClientAsync(client);
}
} catch (IOException e) {
log.error("IOException: ", e);
}
}
The server starts on port 9092 (the default Kafka port) and enters an infinite loop that waits for client connections. When a client connects, it passes the client socket to handleClientAsync()
. The setReuseAddress(true)
prevents "address already in use" errors when restarting the server.
Handle Multiple Clients Asynchronously
private static void handleClientAsync(Socket client) throws IOException {
new Thread(() -> {
try {
processMessage(client);
} catch (IOException e) {
log.error("Error while handling client: ", e);
} finally {
try {
client.close();
} catch (IOException e) {
log.error("Error closing client socket: ", e);
}
}
}).start();
}
This method creates a new thread for each client connection, allowing the server to handle multiple clients simultaneously. It delegates the message handling to processMessage()
This is a trivial implementation and will not scale well because of lack of Thread Pool.
Processing Client Messages
private static void processMessage(Socket client) throws IOException {
DataInputStream in = new DataInputStream(client.getInputStream());
DataOutputStream out = new DataOutputStream(client.getOutputStream());
while (!client.isClosed()) {
// Read the message header
byte[] messageSize = new byte[4];
int read = in.read(messageSize);
if (read < 4) {
log.info("Read fewer characters than expected: {}", read);
break;
}
// Read protocol metadata
byte[] apiKey = new byte[2];
in.readFully(apiKey);
byte[] apiVersion = new byte[2];
in.readFully(apiVersion);
byte[] correlationId = new byte[4];
in.readFully(correlationId);
// Read client identification
byte[] clientIdLength = new byte[2];
in.readFully(clientIdLength);
byte[] clientId = new byte[ByteBuffer.wrap(clientIdLength).getShort()];
in.readFully(clientId);
log.info("Request from clientID {} for apiKey {} apiVersion {} correlationId {}",
new String(clientId), new String(apiKey), new String(apiVersion), new String(correlationId));
This is where we start seeing the protocol details. The method reads a message header containing size information, API key and version, correlation ID, and client ID. The use of DataInputStream
helps in precise byte reading the binary protocol with fixed-size fields.
Handle DescribeTopicPartitions Request
byte[] topicsArrayLength = null;
byte[] topicNameLength = null;
byte[] topicName = null;
if (ByteBuffer.wrap(apiKey).getShort() == 75) {
log.info("Received DescribeTopicPartitions request");
byte[] tagBufferLength = new byte[1];
in.readFully(tagBufferLength);
log.info("Tag Buffer Length: {} ", ByteBuffer.wrap(tagBufferLength).get());
topicsArrayLength = new byte[1];
in.readFully(topicsArrayLength);
log.info("Topics Array Length: {}", ByteBuffer.wrap(topicsArrayLength).get());
topicNameLength = new byte[1];
in.readFully(topicNameLength);
log.info("Topic Name Length: {}", ByteBuffer.wrap(topicNameLength).get());
topicName = new byte[ByteBuffer.wrap(topicNameLength).get() - 1];
in.readFully(topicName);
log.info("Topic Name: {}", new String(topicName));
// Read additional fields
byte[] tagBufferLength2 = new byte[1];
in.readFully(tagBufferLength2);
log.info("Tag Buffer2 Length: {}", ByteBuffer.wrap(tagBufferLength2).get());
byte[] responsePartitionLimit = new byte[4];
in.readFully(responsePartitionLimit);
log.info("Response Partition Limit: {}", ByteBuffer.wrap(responsePartitionLimit).getInt());
byte[] cursor = new byte[1];
in.readFully(cursor);
log.info("Cursor: {}", ByteBuffer.wrap(cursor).get());
}
This block handles a specific request type - API key 75, which is DescribeTopicPartitions
command. It reads several fields including the topic name, buffer lengths, and cursor information.
Sending a Response
log.info("Remaining bytes in input stream: {}", in.available());
in.skip(in.available());
// Handle the request and send a response
ByteBuffer responseBuffer = createResponseBuffer(apiKey, apiVersion, correlationId,
topicsArrayLength, topicNameLength, topicName);
out.write(responseBuffer.array(), 0, responseBuffer.position());
out.flush();
log.info("Response sent to client");
}
}
After parsing the request, the code skips any remaining bytes (a defensive practice) and constructs a response using the createResponseBuffer()
method. The response is then sent back to the client. This completes one request-response cycle in the continuous communication loop.
Crafting the Response using ByteBuffer
If you are interested in the Json reference, the official kafka client has one.
private static ByteBuffer createResponseBuffer(byte[] apiKey, byte[] apiVersion, byte[] correlationID,
byte[] topicsArrayLength, byte[] topicNameLength, byte[] topicName) {
log.info("Creating response buffer");
ByteBuffer responseBuffer = ByteBuffer.allocate(1024);
responseBuffer.putInt(0); // Placeholder for message length
responseBuffer.put(correlationID);
// If API Key == 0x4b (75) (DescribeTopicPartitions)
if (ByteBuffer.wrap(apiKey).getShort() == 75) {
// Create DescribeTopicPartitions response
responseBuffer.put((byte) 0); // Tag Buffer
responseBuffer.putInt(0); // Throttle time
responseBuffer.put(topicsArrayLength);// Topic array length
responseBuffer.putShort((short) 3); // Error code
responseBuffer.put(topicNameLength); // Topic name length
responseBuffer.put(topicName); // Topic name
responseBuffer.put(new byte[16]); // 16-byte null ID
responseBuffer.put((byte) 0); // IsInternal == 0
responseBuffer.put((byte) 1); // partition count + 1
responseBuffer.putInt(0x00000DF8); // TopicAuthorizedOperations
responseBuffer.put((byte) 0); // compact-encoded empty TAG_BUFFER
responseBuffer.put((byte) 0xff); // Cursor
responseBuffer.put((byte) 0); // Tag Buffer
} else {
// Create API versions response
short apiVersionValue = ByteBuffer.wrap(apiVersion).getShort();
short errorCode = (apiVersionValue < 0 || apiVersionValue > 4) ? (short) 35 : (short) 0;
responseBuffer.putShort(errorCode);
responseBuffer.put((byte) 3);
// First API
responseBuffer.putShort((short) 18); // API Versions 18
responseBuffer.putShort((short) 0); // Min version
responseBuffer.putShort((short) 4); // Max version
responseBuffer.put((byte) 0); // Tagged fields for this API (compact encoded 0)
// Second API
responseBuffer.putShort((short) 75); // DescribeTopicPartitions 75
responseBuffer.putShort((short) 0); // Min version
responseBuffer.putShort((short) 0); // Max version
responseBuffer.put((byte) 0); // Tagged fields for this API (compact encoded 0)
responseBuffer.putInt(0); // Throttle time
responseBuffer.put((byte) 0); // No tagged fields
}
// Update the message length at the beginning of the buffer
int messageLength = responseBuffer.position() - 4;
log.info("Message length: {}", messageLength);
responseBuffer.putInt(0, messageLength);
return responseBuffer;
}
This final method builds the response message. It branches based on the API key to create either a DescribeTopicPartitions
response or the APIVersions
response. The message length is calculated and inserted at the beginning of the buffer, a common pattern in binary protocols.
Verification
echo -n "00000031004b0000589eecfb000c6b61666b612d746573746572000212756e6b6e6f776e2d746f7069632d73617a0000000001ff00" \
| xxd -r -p | nc 192.168.1.6 9092 | hexdump -C
00000000 00 00 00 37 58 9e ec fb 00 00 00 00 00 02 00 03 |...7X...........|
00000010 12 75 6e 6b 6e 6f 77 6e 2d 74 6f 70 69 63 2d 73 |.unknown-topic-s|
00000020 61 7a 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |az..............|
00000030 00 00 00 01 00 00 0d f8 00 ff 00 |...........|
0000003b
Hexdump of sent "DescribeTopicPartitions" request:
Idx | Hex | ASCII
-----+-------------------------------------------------+-----------------
0000 | 00 00 00 31 00 4b 00 00 58 9e ec fb 00 0c 6b 61 | ...1.K..X.....ka
0010 | 66 6b 61 2d 74 65 73 74 65 72 00 02 12 75 6e 6b | fka-tester...unk
0020 | 6e 6f 77 6e 2d 74 6f 70 69 63 2d 73 61 7a 00 00 | nown-topic-saz..
0030 | 00 00 01 ff 00 | .....
Hexdump of received "DescribeTopicPartitions" response:
Idx | Hex | ASCII
-----+-------------------------------------------------+-----------------
0000 | 00 00 00 37 58 9e ec fb 00 00 00 00 00 02 00 03 | ...7X...........
0010 | 12 75 6e 6b 6e 6f 77 6e 2d 74 6f 70 69 63 2d 73 | .unknown-topic-s
0020 | 61 7a 00 00 00 00 00 00 00 00 00 00 00 00 00 00 | az..............
0030 | 00 00 00 01 00 00 0d f8 00 ff 00 | ...........
.ResponseHeader
- .correlation_id (1486810363)
- .TAG_BUFFER
.ResponseBody
- .throttle_time_ms (0)
- .topic.length (1)
- .Topics[0]
- .error_code (3)
- .name (unknown-topic-saz)
- .topic_id (00000000-0000-0000-0000-000000000000)
- .is_internal (false)
- .num_partitions (0)
- .topic_authorized_operations (3576)
- .TAG_BUFFER
- .next_cursor (null)
- .TAG_BUFFER
Thank you for reading. Hope you learnt something new.
Appendix
Subscribe to my newsletter
Read articles from Snehasish Roy directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Snehasish Roy
Snehasish Roy
Experienced Software developer with a demonstrated history of working in the financial services and product industry. Worked on various projects over the years to improve customer satisfaction by making things faster and better. Proficient with functional and reactive paradigm. Skilled in Java 8, MVC & Spring framework, Distributed Databases (MemSQL, Greenplum, Aerospike) along with Kafka, ElasticSearch and Kibana Stack. Completed Bachelor of Technology (BTech) with Honors focused in IT from IIIT Allahabad with a CGPI of 9.15. Highly interested in solving complex technical/business problems by leveraging distributed systems. Occasionally have found security bugs while pen-testing random android apps e.g. BetterHalf.ai (Did a responsible disclosure). Competitive Programming Stats: LeetCode: Max Contest Rating of 2011, with a worldwide ranking of ~7K out of ~220K users. Best ranking of 228 in LeetCode Biweekly Contest 56. Second Best ranking of 466 in LeetCode Biweekly Contest 60. Third Best ranking of 578 in LeetCode Biweekly Contest 74. CodeForces: Max rating of 1423 (Specialist) CodeChef: Max rating of 1665 GeeksForGeeks: Achieved 27 rank out of ~1200 contestants in GFG Coding Challenge https://practice.geeksforgeeks.org/contest/the-easiest-ever-coding-challenge/leaderboard/ https://drive.google.com/file/d/1YS8GoZtE2nH0dnlcGWqWnjxzZbVt1WFh/view Facebook Hacker Cup 2021 Qualification Round 2021: Rank 746 Worldwide, Rank 149 India Round 1 2021: Rank 1327 Worldwide, Rank 220 India Round 2 2021: Rank 2775 Worldwide, Rank 527 India https://www.facebook.com/codingcompetitions/hacker-cup/2021/certificate/661693404384805