Apache Kafka: Setting up Producer and Consumer in Spring Boot

Juhilee NazareJuhilee Nazare
4 min read

Apache Kafka is an open-source event streaming platform used for building real-time data pipelines and streaming applications. For enterprise systems, Confluent Kafka which has the same base Kafka engine is used but it has enhanced features like Confluent Control Center (UI based monitoring) and commercial SLA-backed support.

In simple words, let’s take a real life example of a postman who delivered your parcel while you were away from home. In your absence, the postman returns your parcel to delivery centre. To overcome this issue, you might think of installing a postbox at your gate so that irrespective of when you get back home you’ll still be able to retrieve your parcel. Similarly, when two services - service A (producer) and service B (consumer) are trying to communicate in synchronous manner in a micro services based architecture, there is a possibility of loss of data during unavailability of service B.

Apache Kafka resolves this issue by acting as a messaging system or postal service for data. It helps different parts of an application (or different applications) send and receive data in real time.

Kafka Terminologies

  1. Producer: Sends/Publishes messages to Kafka topics.

  2. Consumer: Reads/Subscribes To messages from Kafka topics.

  3. Topic: A category or feed to which data is sent by producer.

  4. Partition: A topic can be split into partitions to achieve parallelism and scalability.

  5. Broker: A Kafka server that stores data and serves clients.

  6. Cluster: A group of Kafka brokers working together to ensure fault tolerance.

  7. ZooKeeper: A coordinator manage multiple Kafka brokers in a cluster, perform configuration management and stores information about topics, partitions, offsets etc. ZooKeeper is no longer mandatory for Apache Kafka as of Kafka 2.8.0.

  8. Consumer Group: A group of consumers that share the work of consuming messages from one topic (with partitions) or more topics.

Steps to set up Kafka Producer and Consumer

Pre-requisites

  1. Docker Desktop, it comes with in-built docker compose which is used to define and run multi-container Docker applications using a single yaml file. Check docker version using command:

     docker --version
    
  2. IDE of your choice, in this article we’ll be using IntelliJ IDEA with Docker and Spring Boot Assistance plugins installed.

  3. A Spring Boot 3 project created using Spring Initializr with Maven for dependency management and Java 17 or above. Also add Spring Web and Apache Kafka dependency before generating zip file.

Docker Compose for ZooKeeper and Kafka

  1. Create a file named docker-compose.yml in root directory of your project.

     version: '3'
     services:
       zookeeper:
         image: confluentinc/cp-zookeeper:7.0.1 #image name in docker hub
         environment:
           ZOOKEEPER_CLIENT_PORT: 2181
           ZOOKEEPER_TICK_TIME: 2000
    
       kafka:
         image: confluentinc/cp-kafka:7.0.1
         ports:
           - "9092:9092"
         depends_on:
           - zookeeper
         environment:
           KAFKA_BROKER_ID: 1
           KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
           KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
           KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    
  2. Run below command to start and run all services defined in your docker-compose.yml file.

     docker-compose up
    
  3. Verify that docker images are running using below command.

     docker ps
    
  4. Sample screenshot of successful docker pull and start of services

Spring Boot Project Structure

Spring Boot Configuration

In Spring Boot, application.yml is a configuration file used to define settings like:

  • Server ports

  • Database connections

  • Kafka properties

  • Custom app settings

It’s an alternative to application.properties and uses YAML syntax (indentation is important!).

spring:
  kafka:
    bootstrap-servers: localhost:9092 #kafka broker running on port 9092
    consumer:
      group-id: sample-group #consumer group 
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Kafka Producer

package com.example.jne.kafka.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

Kafka Consumer

package com.example.jne.kafka.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {
    //groupId same as specified in application.yml
    @KafkaListener(topics = "orders", groupId = "sample-group")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }
}

Controller

package com.example.jne.kafka.controller;

import com.example.jne.kafka.producer.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/kafka")
public class KafkaController {
    @Autowired
    private KafkaProducerService producer;

    @PostMapping("/publish")
    public String sendMessage(@RequestParam String message) {
        producer.sendMessage("orders", message);
        return "Message sent!";
    }
}

Main

package com.example.jne.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

Run Application

mvn spring-boot:run

Use curl or postman to access the end point and send message

curl -X POST "http://localhost:8080/api/kafka/publish?message=order123"

Output

Message sent!

Received message: order123

Summary

Apache Kafka is an open-source event streaming platform that enables real-time data pipelines and streaming applications. It is often used in enterprise environments with Confluent Kafka for enhanced features and support. This article explains Kafka's core concepts such as producers, consumers, topics, and partitions. It includes a practical example of setting up a Kafka producer and consumer using Spring Boot and Docker. Additionally, it provides configurations for setting up a Kafka broker with ZooKeeper, and demonstrates how to build a simple application that publishes and receives messages.

0
Subscribe to my newsletter

Read articles from Juhilee Nazare directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Juhilee Nazare
Juhilee Nazare

I've 3.5+ years of experience in software development and willing to share my knowledge while working on real-time IT solutions.