Testing a Kafka microservice with Karate

GastonGaston
5 min read

Introduction

Apache Kafka is an event streaming platform that is used to transport and transform data. It is one of the main broker architectures which are used in an event-driven microservices architecture. It is popular as it provides extremely high availability and scalability.

Karate is a popular test framework. It is used for API testing and is often used in testing microservices. It is built on top of Cucumber and it uses a BDD syntax for specifying tests. This makes tests very easy to read (and specify), and tests can be easily correlated with user stories or requirements.

While Karate uses a BDD syntax, it is focused on API testing. Tests can be defined for REST, GraphQL and even SOAP. Support for WebSockets is also included. The question is then, how to test APIs of microservices that communicate their data via Kafka. This article answers that question.

The wordcount microservice

A simple Spring boot microservice is provided to demonstrate testing Kafka with Karate. This service should sound familiar: the wordcount streaming application. This application reads a text from an input topic, counts the words, and then publishes the count per word in a Kafka state store. The store can be queried to get the count per word.

The streaming application is defined by a Topology:

@Autowired
public Topology createWordCountTopology(final StreamsBuilder streamsBuilder) {

    final Serde<String> stringSerde = Serdes.String();
    final KStream<String, String> textStream =
                streamsBuilder.stream(TEXT_TOPIC,      
                Consumed.with(stringSerde, stringSerde));
    final KTable<String, Long> wordCounts = textStream
        .mapValues((ValueMapper<String, String>) String::toLowerCase)
        .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
        .groupBy((key, word) -> word, 
            Grouped.with(stringSerde, stringSerde))
        .count(Materialized.as(WORDCOUNT_STORENAME));

    return streamsBuilder.build();
}

To test the wordcount streaming application, we need to send a text to the input topic and check the results by querying the state store. To send data, a bean is created that connects to the input topic and sends data. To read data from the store, a bean is created that connects to the store and reads the word count for a given word:

public Long getWordCount(final String word) {
    final var kafkaStreams = factoryBean.getKafkaStreams();
    if (Optional.ofNullable(kafkaStreams).isEmpty()) {
        return 0L;
    }
    if (State.RUNNING.equals(kafkaStreams.state())) {
        ReadOnlyKeyValueStore<String, Long> counts = 
            kafkaStreams.store(
             StoreQueryParameters.fromNameAndType(
                WORDCOUNT_STORENAME,
                QueryableStoreTypes.keyValueStore())
            );
        return Optional.ofNullable(counts.get(word)).orElse(0L);
    }
    return 0L;
}

Kafka cluster setup for test

In most setups, Kafka is deployed as a cluster. Multiple brokers cooperate to provide extremely high availability and scalability. In this test, a Kafka cluster is configured using testcontainers. A configurable number of brokers can be started, where the coordination is handled by ZooKeeper. Testcontainers provides a good example of how to set up a cluster. Note that in more recent versions of Kafka, ZooKeeper is no longer part of the cluster setup.

Testcontainers uses port mapping to ensure that an available port can be used for clients. This means that properties that address the Kafka cluster need to be overwritten, once the testcontainers are started. This can be done by providing a static method, annotated with @DynamicPropertySource. This will set the values of needed properties dynamically:

@DynamicPropertySource
static void registerKafkaProperties(DynamicPropertyRegistry registry) {
     registry.add("spring.kafka.bootstrap-servers",     
        KafkaContainerClusterUtil::getBootstrapServers);
     registry.add("spring.kafka.streams.replication-factor", 
        WordCountKarateTest::getReplicationFactor);
}

Also note that Testcontainers needs a Docker installation.

Setup of Karate

The karate test framework can be configured using a JavaScript file, karate-config.js, which must be on the classpath. This returns a JSON object of which the keys are available in feature files as variables. A Java class can be accessed in the karate-config.js using the syntax Java.type(‘fully qualified class name’). This mechanism is used to get the beans to send data to the input topic and read the wordcount store. To get to the beans – which are instantiated by Spring – a separate class is needed, which provides static methods to get the bean instances. This class needs the ApplicationContext to look up the beans. By implementing the ApplicationContextAware interface in a Spring bean, Spring will pass the ApplicationContext after creating it, and thus it is available to query specific beans. Here’s the code:

@Component
public class SpringContext implements ApplicationContextAware {
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(final ApplicationContext applicationContext) {
        SpringContext.applicationContext = applicationContext;
    }

    public static WordCountConsumer getWordCountConsumer() {
        return applicationContext.getBean(WordCountConsumer.class);
    }

    public static TextProducer getTextProducer() {
        return applicationContext.getBean(TextProducer.class);
    }
}

The karate-config.js can now access the beans. Here’s the content:

function fn() {
  var SpringContext = Java.type('nl.devgs.karatekafka.SpringContext');
  var wordCountConsumer = SpringContext.getWordCountConsumer();
  var textProducer = SpringContext.getTextProducer();
  return {
    wordCountConsumer: wordCountConsumer,
    textProducer: textProducer,
  };
}

Now the variables wordCountConsumer and textProducer are available for feature files. The feature file that tests the wordcount is as follows:

Feature: Test wordcount streaming app
  Background:
    * configure pauseIfNotPerf = true
    * def textProducer = karate.get('textProducer')
    * def wordCountConsumer = karate.get('wordCountConsumer')

  Scenario: Get wordcount
    When textProducer.send('A word on karate, test word')
    And karate.pause(1000)
    Then match wordCountConsumer.getWordCount('word') == 2

One thing to notice is the pausing. Since Kafka needs a bit of time to update the state store - it's eventually consistent - the test is paused. Karate will only pause the test if pauseIfNotPerf is set to true. This feature was intended to support a performance test, but it will also work in ‘normal’ mode. See Karate Gatling.

The test can now be executed using JUnit:

@Karate.Test
Karate getWordCountTest() {
    return Karate.run("classpath:wordcount/wordcount.feature");
}

Results

Running the test will generate a test summary HTML page as below.

These give a nice overview of the tests.

Alternatives

To test a Kafka streaming application, other alternatives exist as well:

  • The message producer and state store can be wrapped in REST controllers. This way Karate can be used as if testing normal REST interfaces. The pause would still be needed of course.

  • Apache Kafka comes with a REST proxy that can be used to send and receive messages. This REST proxy can be tested using Karate as in testing normal REST interfaces as well. A docker image is available for the REST proxy, so this could be deployed as well using testcontainers – as a general container.

Conclusion

In this article, we have seen how to test a Kafka streaming application using the Karate test framework, by using testcontainers. With some minimal code, we could connect Karate to the Kafka-related Spring beans and test them. Next to connecting to Spring Beans, some REST-based alternatives were also given.

The code for the sample can be found on github.

0
Subscribe to my newsletter

Read articles from Gaston directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Gaston
Gaston

Full stack Angular/Java architect and developer