Understanding the basics of Kafka Binary Protocol

Snehasish RoySnehasish Roy
8 min read

Apache Kafka is a distributed event streaming platform used for high-performance data pipelines. In this article, we will take a look at the under belly of the Kafka and see how communication happens between the Kafka client and server.

Fundamentals

Let's start with the basics. Kafka uses a custom binary protocol for sending and receiving messages.

The specifications define the request header as follows:

FieldData typeDescription
request_api_keyINT16The API key for the request
request_api_versionINT16The version of the API for the request
correlation_idINT32A unique identifier for the request
client_idNULLABLE_STRINGThe client ID for the request
TAG_BUFFERCOMPACT_ARRAYOptional tagged fields

Specs defines the data types as

TypeDescription
INT16Represents an integer between -215 and 215-1 inclusive. The values are encoded using two bytes in network byte order (big-endian).
INT32Represents an integer between -231 and 231-1 inclusive. The values are encoded using four bytes in network byte order (big-endian).
COMPACT_ARRAYRepresents a sequence of objects of a given type T. Type T can be either a primitive type (e.g. STRING) or a structure. First, the length N + 1 is given as an UNSIGNED_VARINT. Then N instances of type T follow. A null array is represented with a length of 0. In protocol documentation an array of T instances is referred to as [T].

Here's an example of a request message:

00 00 00 23  // message_size:        35
00 12        // request_api_key:     18
00 04        // request_api_version: 4
6f 7f c6 61  // correlation_id:      1870644833
...

Every Kafka request is an API call. The Kafka protocol defines over 70 different APIs, all of which do different things. Here are some examples:

  • Produce writes events to partitions.

  • CreateTopics creates new topics.

  • ApiVersions returns the broker's supported API versions.

A Kafka request specifies the API its calling by using request_api_key header field.

Message body

The schemas for the request and response bodies are determined by the API being called.

For example, here are some of the fields that the Produce request body contains:

  • The name of the topic to write to.

  • The key of the partition to write to.

  • The event data to write.

On the other hand, the Produce response body contains a response code for each event. These response codes indicate if the writes succeeded.

As a reminder, requests and responses both have the following format:

  1. message_size

  2. Header

  3. Body

API versioning

Each API supports multiple versions, to allow for different schemas. Here's how API versioning works:

  • Requests use the header field request_api_version to specify the API version being requested.

  • Responses always use the same API version as the request. For example, a Produce Request (Version: 3) will always get a Produce Response (Version: 3) back.

  • Each API's version history is independent. So, different APIs with the same version are unrelated. For example, Produce Request (Version: 10) is not related to Fetch Request (Version: 10).

The ApiVersions API

The ApiVersions API returns the broker's supported API versions. For example, ApiVersions may say that the broker supports Produce versions 5 to 11, Fetch versions 0 to 3, etc.

Visualizing the Binary Protocol

Here is a great link that can help you visualize the binary protocol

Hands-on

So let’s build a POC of the Kafka server.

Let’s start our Server

public static void main(String[] args) {
    int port = 9092;
    try (ServerSocket server = new ServerSocket(port)) {
        server.setReuseAddress(true);
        log.info("Server started on port {}", port);
        while (true) {
            Socket client = server.accept();
            log.info("New client connected");
            handleClientAsync(client);
        }
    } catch (IOException e) {
        log.error("IOException: ", e);
    }
}

The server starts on port 9092 (the default Kafka port) and enters an infinite loop that waits for client connections. When a client connects, it passes the client socket to handleClientAsync(). The setReuseAddress(true) prevents "address already in use" errors when restarting the server.

Handle Multiple Clients Asynchronously

private static void handleClientAsync(Socket client) throws IOException {
    new Thread(() -> {
        try {
            processMessage(client);
        } catch (IOException e) {
            log.error("Error while handling client: ", e);
        } finally {
            try {
                client.close();
            } catch (IOException e) {
                log.error("Error closing client socket: ", e);
            }
        }
    }).start();
}

This method creates a new thread for each client connection, allowing the server to handle multiple clients simultaneously. It delegates the message handling to processMessage() This is a trivial implementation and will not scale well because of lack of Thread Pool.

Processing Client Messages

private static void processMessage(Socket client) throws IOException {
    DataInputStream in = new DataInputStream(client.getInputStream());
    DataOutputStream out = new DataOutputStream(client.getOutputStream());
    while (!client.isClosed()) {
        // Read the message header
        byte[] messageSize = new byte[4];
        int read = in.read(messageSize);
        if (read < 4) {
            log.info("Read fewer characters than expected: {}", read);
            break;
        }

        // Read protocol metadata
        byte[] apiKey = new byte[2];
        in.readFully(apiKey);
        byte[] apiVersion = new byte[2];
        in.readFully(apiVersion);
        byte[] correlationId = new byte[4];
        in.readFully(correlationId);

        // Read client identification
        byte[] clientIdLength = new byte[2];
        in.readFully(clientIdLength);
        byte[] clientId = new byte[ByteBuffer.wrap(clientIdLength).getShort()];
        in.readFully(clientId);

        log.info("Request from clientID {} for apiKey {} apiVersion {} correlationId {}", 
                new String(clientId), new String(apiKey), new String(apiVersion), new String(correlationId));

This is where we start seeing the protocol details. The method reads a message header containing size information, API key and version, correlation ID, and client ID. The use of DataInputStream helps in precise byte reading the binary protocol with fixed-size fields.

Handle DescribeTopicPartitions Request

        byte[] topicsArrayLength = null;
        byte[] topicNameLength = null;
        byte[] topicName = null;
        if (ByteBuffer.wrap(apiKey).getShort() == 75) {
            log.info("Received DescribeTopicPartitions request");
            byte[] tagBufferLength = new byte[1];
            in.readFully(tagBufferLength);
            log.info("Tag Buffer Length: {} ", ByteBuffer.wrap(tagBufferLength).get());

            topicsArrayLength = new byte[1];
            in.readFully(topicsArrayLength);
            log.info("Topics Array Length: {}", ByteBuffer.wrap(topicsArrayLength).get());

            topicNameLength = new byte[1];
            in.readFully(topicNameLength);
            log.info("Topic Name Length: {}", ByteBuffer.wrap(topicNameLength).get());

            topicName = new byte[ByteBuffer.wrap(topicNameLength).get() - 1];
            in.readFully(topicName);
            log.info("Topic Name: {}", new String(topicName));

            // Read additional fields
            byte[] tagBufferLength2 = new byte[1];
            in.readFully(tagBufferLength2);
            log.info("Tag Buffer2 Length: {}", ByteBuffer.wrap(tagBufferLength2).get());

            byte[] responsePartitionLimit = new byte[4];
            in.readFully(responsePartitionLimit);
            log.info("Response Partition Limit: {}", ByteBuffer.wrap(responsePartitionLimit).getInt());

            byte[] cursor = new byte[1];
            in.readFully(cursor);
            log.info("Cursor: {}", ByteBuffer.wrap(cursor).get());
        }

This block handles a specific request type - API key 75, which is DescribeTopicPartitions command. It reads several fields including the topic name, buffer lengths, and cursor information.

Sending a Response

        log.info("Remaining bytes in input stream: {}", in.available());
        in.skip(in.available());

        // Handle the request and send a response
        ByteBuffer responseBuffer = createResponseBuffer(apiKey, apiVersion, correlationId, 
                                                        topicsArrayLength, topicNameLength, topicName);

        out.write(responseBuffer.array(), 0, responseBuffer.position());
        out.flush();
        log.info("Response sent to client");
    }
}

After parsing the request, the code skips any remaining bytes (a defensive practice) and constructs a response using the createResponseBuffer() method. The response is then sent back to the client. This completes one request-response cycle in the continuous communication loop.

Crafting the Response using ByteBuffer

If you are interested in the Json reference, the official kafka client has one.

private static ByteBuffer createResponseBuffer(byte[] apiKey, byte[] apiVersion, byte[] correlationID, 
                                               byte[] topicsArrayLength, byte[] topicNameLength, byte[] topicName) {
    log.info("Creating response buffer");
    ByteBuffer responseBuffer = ByteBuffer.allocate(1024);
    responseBuffer.putInt(0); // Placeholder for message length
    responseBuffer.put(correlationID);

    // If API Key == 0x4b (75) (DescribeTopicPartitions)
    if (ByteBuffer.wrap(apiKey).getShort() == 75) {
        // Create DescribeTopicPartitions response
        responseBuffer.put((byte) 0); // Tag Buffer
        responseBuffer.putInt(0); // Throttle time
        responseBuffer.put(topicsArrayLength);// Topic array length
        responseBuffer.putShort((short) 3); // Error code
        responseBuffer.put(topicNameLength); // Topic name length
        responseBuffer.put(topicName); // Topic name
        responseBuffer.put(new byte[16]); // 16-byte null ID
        responseBuffer.put((byte) 0); // IsInternal == 0
        responseBuffer.put((byte) 1); // partition count + 1
        responseBuffer.putInt(0x00000DF8); // TopicAuthorizedOperations
        responseBuffer.put((byte) 0); // compact-encoded empty TAG_BUFFER

        responseBuffer.put((byte) 0xff); // Cursor
        responseBuffer.put((byte) 0); // Tag Buffer
    } else {
        // Create API versions response
        short apiVersionValue = ByteBuffer.wrap(apiVersion).getShort();
        short errorCode = (apiVersionValue < 0 || apiVersionValue > 4) ? (short) 35 : (short) 0;
        responseBuffer.putShort(errorCode);

        responseBuffer.put((byte) 3);
        // First API
        responseBuffer.putShort((short) 18); // API Versions 18
        responseBuffer.putShort((short) 0); // Min version
        responseBuffer.putShort((short) 4); // Max version
        responseBuffer.put((byte) 0); // Tagged fields for this API (compact encoded 0)

        // Second API
        responseBuffer.putShort((short) 75); // DescribeTopicPartitions 75
        responseBuffer.putShort((short) 0); // Min version
        responseBuffer.putShort((short) 0); // Max version
        responseBuffer.put((byte) 0); // Tagged fields for this API (compact encoded 0)

        responseBuffer.putInt(0); // Throttle time
        responseBuffer.put((byte) 0); // No tagged fields
    }

    // Update the message length at the beginning of the buffer
    int messageLength = responseBuffer.position() - 4;
    log.info("Message length: {}", messageLength);
    responseBuffer.putInt(0, messageLength);
    return responseBuffer;
}

This final method builds the response message. It branches based on the API key to create either a DescribeTopicPartitions response or the APIVersions response. The message length is calculated and inserted at the beginning of the buffer, a common pattern in binary protocols.

Verification

echo -n "00000031004b0000589eecfb000c6b61666b612d746573746572000212756e6b6e6f776e2d746f7069632d73617a0000000001ff00" \
| xxd -r -p | nc 192.168.1.6 9092 | hexdump -C

00000000  00 00 00 37 58 9e ec fb  00 00 00 00 00 02 00 03  |...7X...........|
00000010  12 75 6e 6b 6e 6f 77 6e  2d 74 6f 70 69 63 2d 73  |.unknown-topic-s|
00000020  61 7a 00 00 00 00 00 00  00 00 00 00 00 00 00 00  |az..............|
00000030  00 00 00 01 00 00 0d f8  00 ff 00                 |...........|
0000003b

Hexdump of sent "DescribeTopicPartitions" request:
Idx  | Hex                                             | ASCII
-----+-------------------------------------------------+-----------------
0000 | 00 00 00 31 00 4b 00 00 58 9e ec fb 00 0c 6b 61 | ...1.K..X.....ka
0010 | 66 6b 61 2d 74 65 73 74 65 72 00 02 12 75 6e 6b | fka-tester...unk
0020 | 6e 6f 77 6e 2d 74 6f 70 69 63 2d 73 61 7a 00 00 | nown-topic-saz..
0030 | 00 00 01 ff 00                                  | .....

Hexdump of received "DescribeTopicPartitions" response:
Idx  | Hex                                             | ASCII
-----+-------------------------------------------------+-----------------
0000 | 00 00 00 37 58 9e ec fb 00 00 00 00 00 02 00 03 | ...7X...........
0010 | 12 75 6e 6b 6e 6f 77 6e 2d 74 6f 70 69 63 2d 73 | .unknown-topic-s
0020 | 61 7a 00 00 00 00 00 00 00 00 00 00 00 00 00 00 | az..............
0030 | 00 00 00 01 00 00 0d f8 00 ff 00                | ...........

.ResponseHeader
- .correlation_id (1486810363)
- .TAG_BUFFER
.ResponseBody
- .throttle_time_ms (0)
- .topic.length (1)
- .Topics[0]
  - .error_code (3)
  - .name (unknown-topic-saz)
  - .topic_id (00000000-0000-0000-0000-000000000000)
  - .is_internal (false)
  - .num_partitions (0)
  - .topic_authorized_operations (3576)
  - .TAG_BUFFER
- .next_cursor (null)
- .TAG_BUFFER

Thank you for reading. Hope you learnt something new.

Appendix

11
Subscribe to my newsletter

Read articles from Snehasish Roy directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Snehasish Roy
Snehasish Roy

Experienced Software developer with a demonstrated history of working in the financial services and product industry. Worked on various projects over the years to improve customer satisfaction by making things faster and better. Proficient with functional and reactive paradigm. Skilled in Java 8, MVC & Spring framework, Distributed Databases (MemSQL, Greenplum, Aerospike) along with Kafka, ElasticSearch and Kibana Stack. Completed Bachelor of Technology (BTech) with Honors focused in IT from IIIT Allahabad with a CGPI of 9.15. Highly interested in solving complex technical/business problems by leveraging distributed systems. Occasionally have found security bugs while pen-testing random android apps e.g. BetterHalf.ai (Did a responsible disclosure). Competitive Programming Stats: LeetCode: Max Contest Rating of 2011, with a worldwide ranking of ~7K out of ~220K users. Best ranking of 228 in LeetCode Biweekly Contest 56. Second Best ranking of 466 in LeetCode Biweekly Contest 60. Third Best ranking of 578 in LeetCode Biweekly Contest 74. CodeForces: Max rating of 1423 (Specialist) CodeChef: Max rating of 1665 GeeksForGeeks: Achieved 27 rank out of ~1200 contestants in GFG Coding Challenge https://practice.geeksforgeeks.org/contest/the-easiest-ever-coding-challenge/leaderboard/ https://drive.google.com/file/d/1YS8GoZtE2nH0dnlcGWqWnjxzZbVt1WFh/view Facebook Hacker Cup 2021 Qualification Round 2021: Rank 746 Worldwide, Rank 149 India Round 1 2021: Rank 1327 Worldwide, Rank 220 India Round 2 2021: Rank 2775 Worldwide, Rank 527 India https://www.facebook.com/codingcompetitions/hacker-cup/2021/certificate/661693404384805