Introducing @platformatic/kafka-hooks: Bridging Kafka and HTTP with Ease


Last month, we launched @platformatic/kafka to improve how to produce and consume messages via Apache Kafka in Node.js.
Today, we're excited to announce the release of @platformatic/kafka-hooks, a powerful, lightweight library that connects your Apache Kafka infrastructure with HTTP services.
If you've found yourself writing boilerplate code to:
Forward Kafka messages to HTTP endpoints
Enable web services to publish to Kafka topics
Implement request/response pattern
Handle retries and error flows
...then kafka-hooks
is built for you.
With kafka-hooks
, you can now seamlessly integrate event streams with your web applications, microservices, and APIs. Watch the full announcement here:
The Problem We're Solving
Event-driven architectures powered by Apache Kafka are fantastic for high-throughput, scalable systems. However, connecting Kafka with the HTTP-centric world of web services often requires custom code, complex setup, and careful error handling.
Note that consuming Kafka topics via HTTP webhooks is a simple and performant way to integrate with all kinds of systems without having to worry about Kafka versions, drivers, access control.
Key Features
📥 Consume from Kafka, Forward to HTTP
Configure kafka-hooks to consume messages from topics and forward them to HTTP endpoints. Each message is delivered as received with additional HTTP headers for Kafka metadata..
{
"kafka": {
"topics": [
{
"topic": "events",
"url": "https://service.example.com"
}
]
}
}
📤 Publish to Kafka via HTTP
Use a simple HTTP API to publish messages to Kafka topics:
curl --request POST \
--url http://127.0.0.1:3042/topics/topic \
--header 'Content-Type: application/json' \
--header 'x-plt-kafka-hooks-key: my-key' \
--data '{ "name": "my test" }'
🔁 Configurable Retries and Concurrency
Fine-tune your message delivery with retry policies and concurrency settings:
{
"kafka": {
"topics": [
{
"topic": "events",
"url": "https://service.example.com",
"retries": 5,
"retryDelay": 1000
}
],
"concurrency": 20
}
}
💀 Dead Letter Queue (DLQ)
Failed messages automatically go to a Dead Letter Queue for later analysis or reprocessing:
{
"kafka": {
"topics": [
{
"topic": "events",
"url": "https://service.example.com",
"retries": 5,
"retryDelay": 1000,
"dql": "my-dead-letter-queue"
}
],
"concurrency": 20
}
}
↩️Request/Response pattern
Exposing a topic as an HTTP API is as simple as configuring it:
{
"kafka": {
"requestResponse": [
{
"path": "/api/process",
"requestTopic": "request-topic",
"responseTopic": "response-topic",
"timeout": 5000
}
]
}
}
Getting Started in Minutes
The first thing you need is a Kafka cluster. You can use our docker-compose.yml file: https://github.com/platformatic/kafka-hooks/blob/main/docker-compose.yml.
# To listen on broker, run ./kafka-console-consumer.sh --property print.key=true --property print.value=true --property print.headers=true --property print.offset=true --bootstrap-server localhost:9092 ...
---
services:
kafka:
image: apache/kafka:3.9.0
ports:
- '9092:9092'
environment:
_JAVA_OPTIONS: '-XX:UseSVE=0'
KAFKA_NODE_ID: 1
KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT://:19092,MAIN://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:19092,MAIN://localhost:9092'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,MAIN:PLAINTEXT'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR: 1
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
Note that the _JAVA_OPTIONS
env variable is only needed on Apple Silicon.
Standalone App
Running kafka-hooks
standalone is useful if you want to have your internal APIs scaling independently, or you want to integrate an existing HTTP API with Kafka, like so:
Setting it up is easy with our generator:
npx --package @platformatic/kafka-hooks -c create-platformatic-kafka-hooks
cd kafka-hooks-app
npm i
npm start
Edit your .env file to configure the PLT_KAFKA_BROKER
variable, and you're ready to go! To publish to any Kafka topic, you can now send a POST request to /topics/:topic
.
Configuration
To consume topics, you need to configure your Kafka webhooks in the platformatic.json file:
{
"kafka": {
"brokers": ["localhost:9092"],
"topics": [
{
"topic": "events",
"url": "https://service.example.com"
}
],
"consumer": {
"groupId": "plt-kafka-hooks",
"maxWaitTime": 500
}
}
}
Running kafka-hooks inside Watt
@platformatic/kafka-hooks
can run smoothly inside of Watt, the Node.js application server we launched last year. It allows multiple Node.js services to run as different threads inside the same process. Here’s a reference architecture using Watt and kafka-hooks
as an example:
In this case, there is a public API that does some processing before pushing a message in Kafka. Then, kafka-hooks
consumes the message and sends it down to your internal API. Let’s try building this system:
➜ kafka-hooks-demo git:(main) wattpm create --module=@platformatic/kafka-hooks
Hello Matteo Collina, welcome to Watt 2.63.4!
? Where would you like to create your project? .
? Which kind of project do you want to create? @platformatic/service
? What is the name of the service? public
? Do you want to create another service? yes
? Which kind of project do you want to create? @platformatic/kafka-hooks
✔ Installing @platformatic/kafka-hooks...
? What is the name of the service? kafka
? Do you want to create another service? yes
? Which kind of project do you want to create? @platformatic/service
? What is the name of the service? private
? Do you want to create another service? no
? Which service should be exposed? public
? Do you want to use TypeScript? no
? What port do you want to use? 3042
[18:04:56.302] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/package.json written!
[18:04:56.304] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/watt.json written!
[18:04:56.305] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/.env written!
[18:04:56.305] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/.env.sample written!
[18:04:56.305] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/.gitignore written!
[18:04:56.306] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/README.md written!
[18:04:56.306] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/package.json written!
[18:04:56.306] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/platformatic.json written!
[18:04:56.307] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/plugins/example.js written!
[18:04:56.307] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/routes/root.js written!
[18:04:56.308] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/test/helper.js written!
[18:04:56.308] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/test/plugins/example.test.js written!
[18:04:56.308] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/test/routes/root.test.js written!
[18:04:56.308] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/.gitignore written!
[18:04:56.309] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/global.d.ts written!
[18:04:56.309] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/public/README.md written!
[18:04:56.309] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/kafka/package.json written!
[18:04:56.309] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/kafka/platformatic.json written!
[18:04:56.309] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/kafka/.gitignore written!
[18:04:56.309] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/package.json written!
[18:04:56.309] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/platformatic.json written!
[18:04:56.310] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/plugins/example.js written!
[18:04:56.310] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/routes/root.js written!
[18:04:56.310] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/test/helper.js written!
[18:04:56.310] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/test/plugins/example.test.js written!
[18:04:56.311] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/test/routes/root.test.js written!
[18:04:56.311] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/.gitignore written!
[18:04:56.311] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/global.d.ts written!
[18:04:56.311] INFO (55000): /Users/matteo/repos/kafka-hooks-demo/web/private/README.md written!
? Do you want to init the git repository? no
[18:05:01.291] INFO (55000): Installing dependencies for the application using npm ...
...skipped for brevity...
[18:05:09.408] INFO (55000): You are all set! Run `npm start` to start your project.
Then, we need to add the routes for the public services in web/public/routes/root.js:
/// <reference path="../global.d.ts" />
'use strict'
/** @param {import('fastify').FastifyInstance} fastify */
module.exports = async function (fastify, opts) {
fastify.get('/example', async (request, reply) => {
return { hello: fastify.example }
})
fastify.get('/events', async (request, reply) => {
const res = await fetch('http://private.plt.local/events')
if (res.ok) {
const data = await res.json()
return data
} else {
req.log.error({ response: await res.json(), status: res.status }, 'Error fetching events')
reply.code(500)
return { status: 'error' }
}
})
fastify.post('/events', {
schema: {
body: {
type: 'object',
properties: {
name: { type: 'string' },
description: { type: 'string' }
},
required: ['name']
}
},
handler: async (request, reply) => {
const res = await fetch('http://kafka.plt.local/topics/events', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(request.body)
})
if (res.ok) {
return { status: 'enqueued' }
} else {
reply.code(500)
return { status: 'error' }
}
}
})
}
Then, we add the routes in our private microservices for consuming the messages:
/// <reference path="../global.d.ts" />
'use strict'
/** @param {import('fastify').FastifyInstance} fastify */
module.exports = async function (fastify, opts) {
const events = [] // Faking a database
fastify.get('/events', async (request, reply) => {
return events
})
fastify.post('/events', {
schema: {
body: {
type: 'object',
properties: {
name: { type: 'string' },
description: { type: 'string' }
},
required: ['name', 'description']
}
},
handler: async (request, reply) => {
const event = request.body
events.push(event)
console.log('Received event:', event)
return { status: 'ok' }
}
})
}
Last but not least, we need to integrate them with the kafka-hooks
configuration in web/kafka/platformatic.json:
{
"$schema": "https://schemas.platformatic.dev/@platformatic/kafka-hooks/0.2.0.json",
"module": "@platformatic/kafka-hooks",
"kafka": {
"brokers": [
"{PLT_KAFKA_BROKER}"
],
"topics": [
{
"topic": "events",
"url": "http://private.plt.local/events"
}
],
"consumer": {
"groupId": "{PLT_KAFKA_CONSUMER_GROUP}",
"mode": "latest",
"maxWaitTime": 500,
"sessionTimeout": 10000,
"rebalanceTimeout": 15000,
"heartbeatInterval": 500
}
}
}
Then start the application with npm start
and then via Scalar at http://localhost:3042/documentation or with curl:
$ curl http://127.0.0.1:3042/events \
--request POST \
--header 'Content-Type: application/json' \
--data '{
"name": "aa",
"description": "vvv"
}'
{"status":"enqueued"}
$ curl http://127.0.0.1:3042/events
[{"name":"aa","description":"vvv"}]%
Note the messages being processed in the logs.
Technical Foundation
Built on top of the robust Platformatic ecosystem, kafka-hooks leverages:
@platformatic/kafka: Platformatic's high-performance, developer-friendly Kafka Client
@platformatic/service: The building block for great APIs, built on top of Fastify
Undici: the next-gen HTTP client for Node.js, by Node.js maintainers
What's Next
We're committed to enhancing kafka-hooks
with:
Support for additional authentication methods
Enhanced monitoring and observability
Schema registry integration
Get Involved
We welcome contributions, feedback, and feature requests! Check out the project on GitHub and join our community.
Conclusion
Kafka-hooks simplifies the integration between Kafka and HTTP, reducing boilerplate code and developer toil, providing robust error handling, and enabling clean architectural patterns. Whether you're building microservices, integrating with third-party systems, or just need a reliable bridge between protocols, @platformatic/kafka-hooks
has you covered.
Try it today and let us know what you think!
And if you have any questions or feedback, join us on Discord or send us an email at (hello@platformatic.dev).
Excited to see what you make of this!
Apache Kafka is a registered trademark of The Apache Software Foundation.
Subscribe to my newsletter
Read articles from Matteo Collina directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
