Introducing @platformatic/kafka-hooks: Bridging Kafka and HTTP with Ease

Matteo CollinaMatteo Collina
7 min read

Last month, we launched @platformatic/kafka to improve how to produce and consume messages via Apache Kafka in Node.js.

Today, we're excited to announce the release of @platformatic/kafka-hooks, a powerful, lightweight library that connects your Apache Kafka infrastructure with HTTP services.

If you've found yourself writing boilerplate code to:

  • Forward Kafka messages to HTTP endpoints

  • Enable web services to publish to Kafka topics

  • Implement request/response pattern

  • Handle retries and error flows

...then kafka-hooks is built for you.

With kafka-hooks, you can now seamlessly integrate event streams with your web applications, microservices, and APIs. Watch the full announcement here:

The Problem We're Solving

Event-driven architectures powered by Apache Kafka are fantastic for high-throughput, scalable systems. However, connecting Kafka with the HTTP-centric world of web services often requires custom code, complex setup, and careful error handling.

Note that consuming Kafka topics via HTTP webhooks is a simple and performant way to integrate with all kinds of systems without having to worry about Kafka versions, drivers, access control.

Key Features

📥 Consume from Kafka, Forward to HTTP

Configure kafka-hooks to consume messages from topics and forward them to HTTP endpoints. Each message is delivered as received with additional HTTP headers for Kafka metadata..

{
  "kafka": {
    "topics": [
      {
        "topic": "events",
        "url": "https://service.example.com"
      }
    ]
  }
}

📤 Publish to Kafka via HTTP

Use a simple HTTP API to publish messages to Kafka topics:

curl --request POST \
  --url http://127.0.0.1:3042/topics/topic \
  --header 'Content-Type: application/json' \
  --header 'x-plt-kafka-hooks-key: my-key' \
  --data '{ "name": "my test" }'

🔁 Configurable Retries and Concurrency

Fine-tune your message delivery with retry policies and concurrency settings:

{
  "kafka": {
    "topics": [
      {
        "topic": "events",
        "url": "https://service.example.com",
        "retries": 5,
        "retryDelay": 1000
      }
    ],
    "concurrency": 20
  }
}

💀 Dead Letter Queue (DLQ)

Failed messages automatically go to a Dead Letter Queue for later analysis or reprocessing:

{
  "kafka": {
    "topics": [
      {
        "topic": "events",
        "url": "https://service.example.com",
        "retries": 5,
        "retryDelay": 1000,
        "dql": "my-dead-letter-queue"
      }
    ],
    "concurrency": 20
  }
}

↩️Request/Response pattern

Exposing a topic as an HTTP API is as simple as configuring it:

{
  "kafka": {
    "requestResponse": [
      {
        "path": "/api/process",
        "requestTopic": "request-topic",
        "responseTopic": "response-topic",
        "timeout": 5000
      }
    ]
  }
}

Getting Started in Minutes

The first thing you need is a Kafka cluster. You can use our docker-compose.yml file: https://github.com/platformatic/kafka-hooks/blob/main/docker-compose.yml.

# To listen on broker, run ./kafka-console-consumer.sh --property print.key=true --property print.value=true --property print.headers=true --property print.offset=true --bootstrap-server localhost:9092 ...
---
services:
  kafka:
    image: apache/kafka:3.9.0
    ports:
      - '9092:9092'
    environment:
      _JAVA_OPTIONS: '-XX:UseSVE=0'
      KAFKA_NODE_ID: 1
      KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT://:19092,MAIN://:9092'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:19092,MAIN://localhost:9092'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,MAIN:PLAINTEXT'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR: 1
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'

Note that the _JAVA_OPTIONS env variable is only needed on Apple Silicon.

Standalone App

Running kafka-hooks standalone is useful if you want to have your internal APIs scaling independently, or you want to integrate an existing HTTP API with Kafka, like so:

Setting it up is easy with our generator:

npx --package @platformatic/kafka-hooks -c create-platformatic-kafka-hooks
cd kafka-hooks-app
npm i
npm start

Edit your .env file to configure the PLT_KAFKA_BROKER variable, and you're ready to go! To publish to any Kafka topic, you can now send a POST request to /topics/:topic.

Configuration

To consume topics, you need to configure your Kafka webhooks in the platformatic.json file:

{
  "kafka": {
    "brokers": ["localhost:9092"],
    "topics": [
      {
        "topic": "events",
        "url": "https://service.example.com"
      }
    ],
    "consumer": {
      "groupId": "plt-kafka-hooks",
      "maxWaitTime": 500
    }
  }
}

Running kafka-hooks inside Watt

@platformatic/kafka-hooks can run smoothly inside of Watt, the Node.js application server we launched last year. It allows multiple Node.js services to run as different threads inside the same process. Here’s a reference architecture using Watt and kafka-hooks as an example:

In this case, there is a public API that does some processing before pushing a message in Kafka. Then, kafka-hooks consumes the message and sends it down to your internal API. Let’s try building this system:

➜  kafka-hooks-demo git:(main) wattpm create --module=@platformatic/kafka-hooks
Hello Matteo Collina, welcome to Watt 2.63.4!
? Where would you like to create your project? .
? Which kind of project do you want to create? @platformatic/service
? What is the name of the service? public
? Do you want to create another service? yes
? Which kind of project do you want to create? @platformatic/kafka-hooks
✔ Installing @platformatic/kafka-hooks...
? What is the name of the service? kafka
? Do you want to create another service? yes
? Which kind of project do you want to create? @platformatic/service
? What is the name of the service? private
? Do you want to create another service? no
? Which service should be exposed? public
? Do you want to use TypeScript? no
? What port do you want to use? 3042
[18:04:56.302] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/package.json written!
[18:04:56.304] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/watt.json written!
[18:04:56.305] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/.env written!
[18:04:56.305] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/.env.sample written!
[18:04:56.305] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/.gitignore written!
[18:04:56.306] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/README.md written!
[18:04:56.306] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/package.json written!
[18:04:56.306] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/platformatic.json written!
[18:04:56.307] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/plugins/example.js written!
[18:04:56.307] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/routes/root.js written!
[18:04:56.308] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/test/helper.js written!
[18:04:56.308] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/test/plugins/example.test.js written!
[18:04:56.308] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/test/routes/root.test.js written!
[18:04:56.308] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/.gitignore written!
[18:04:56.309] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/global.d.ts written!
[18:04:56.309] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/README.md written!
[18:04:56.309] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/kafka/package.json written!
[18:04:56.309] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/kafka/platformatic.json written!
[18:04:56.309] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/kafka/.gitignore written!
[18:04:56.309] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/package.json written!
[18:04:56.309] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/platformatic.json written!
[18:04:56.310] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/plugins/example.js written!
[18:04:56.310] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/routes/root.js written!
[18:04:56.310] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/test/helper.js written!
[18:04:56.310] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/test/plugins/example.test.js written!
[18:04:56.311] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/test/routes/root.test.js written!
[18:04:56.311] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/.gitignore written!
[18:04:56.311] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/global.d.ts written!
[18:04:56.311] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/README.md written!
? Do you want to init the git repository? no
[18:05:01.291] INFO (55000): Installing dependencies for the application using npm ...
...skipped for brevity...
[18:05:09.408] INFO (55000): You are all set! Run `npm start` to start your project.

Then, we need to add the routes for the public services in web/public/routes/root.js:

/// <reference path="../global.d.ts" />
'use strict'
/** @param {import('fastify').FastifyInstance} fastify */
module.exports = async function (fastify, opts) {
  fastify.get('/example', async (request, reply) => {
    return { hello: fastify.example }
  })

  fastify.get('/events', async (request, reply) => {
    const res = await fetch('http://private.plt.local/events')
    if (res.ok) {
      const data = await res.json()
      return data
    } else {
      req.log.error({ response: await res.json(), status: res.status }, 'Error fetching events')
      reply.code(500)
      return { status: 'error' }
    }
  })

  fastify.post('/events', {
    schema: {
      body: {
        type: 'object',
        properties: {
          name: { type: 'string' },
          description: { type: 'string' }
        },
        required: ['name']
      }
    },
    handler: async (request, reply) => {
      const res = await fetch('http://kafka.plt.local/topics/events', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json'
        },
        body: JSON.stringify(request.body)
      })

      if (res.ok) {
        return { status: 'enqueued' }
      } else {
        reply.code(500)
        return { status: 'error' }
      }
    }
  })
}

Then, we add the routes in our private microservices for consuming the messages:

/// <reference path="../global.d.ts" />
'use strict'
/** @param {import('fastify').FastifyInstance} fastify */
module.exports = async function (fastify, opts) {
  const events = [] // Faking a database

  fastify.get('/events', async (request, reply) => {
    return events
  })

  fastify.post('/events', {
    schema: {
      body: {
        type: 'object',
        properties: {
          name: { type: 'string' },
          description: { type: 'string' }
        },
        required: ['name', 'description']
      }
    },
    handler: async (request, reply) => {
      const event = request.body
      events.push(event)
      console.log('Received event:', event)
      return { status: 'ok' }
    }
  })
}

Last but not least, we need to integrate them with the kafka-hooks configuration in web/kafka/platformatic.json:

{
  "$schema": "https://schemas.platformatic.dev/@platformatic/kafka-hooks/0.2.0.json",
  "module": "@platformatic/kafka-hooks",
  "kafka": {
    "brokers": [
      "{PLT_KAFKA_BROKER}"
    ],
    "topics": [
      {
        "topic": "events",
        "url": "http://private.plt.local/events"
      }
    ],
    "consumer": {
      "groupId": "{PLT_KAFKA_CONSUMER_GROUP}",
      "mode": "latest",
      "maxWaitTime": 500,
      "sessionTimeout": 10000,
      "rebalanceTimeout": 15000,
      "heartbeatInterval": 500
    }
  }
}

Then start the application with npm start and then via Scalar at http://localhost:3042/documentation or with curl:

$ curl http://127.0.0.1:3042/events \
  --request POST \
  --header 'Content-Type: application/json' \
  --data '{
  "name": "aa",
  "description": "vvv"
}'
{"status":"enqueued"}
$ curl http://127.0.0.1:3042/events
[{"name":"aa","description":"vvv"}]%

Note the messages being processed in the logs.

Technical Foundation

Built on top of the robust Platformatic ecosystem, kafka-hooks leverages:

What's Next

We're committed to enhancing kafka-hooks with:

  • Support for additional authentication methods

  • Enhanced monitoring and observability

  • Schema registry integration

Get Involved

We welcome contributions, feedback, and feature requests! Check out the project on GitHub and join our community.

Conclusion

Kafka-hooks simplifies the integration between Kafka and HTTP, reducing boilerplate code and developer toil, providing robust error handling, and enabling clean architectural patterns. Whether you're building microservices, integrating with third-party systems, or just need a reliable bridge between protocols, @platformatic/kafka-hooks has you covered.

Try it today and let us know what you think!

And if you have any questions or feedback, join us on Discord or send us an email at (hello@platformatic.dev).

Excited to see what you make of this!


Apache Kafka is a registered trademark of The Apache Software Foundation.

0
Subscribe to my newsletter

Read articles from Matteo Collina directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Matteo Collina
Matteo Collina