Efficient Event Streaming: Mastering Pub/Sub with Fastify and DragonFly
By Luca Del Puppo
Building resilient and scalable applications is one of the most difficult challenges in our job. You must follow good practices and use the right tools in the right situation to achieve these objectives. This article illustrates a possible solution to scale applications using our delicious Fastify server and DragonFly. If you don’t know DragonFly, no worries; there is a small paragraph to introduce it, but if you are familiar with Redis, there is nothing to fear.
Scenario
One of the common scenarios we face in a project is to react when something happens.
Imagine a sign-up form; behind a simple form, there are many linked processes. For instance, save user data in the database, send an email to confirm the identity, send an SMS or anything else that should happen as a consequence of that.
In this post, we will use DragonFly as a Pub/Sub service to deliver an event when something happens in a Fastify application.
In this example, we will create a flow to handle the state history of users in a system, as the picture below shows.
Setup
We already prepared a project to get started. You will find it here.
The repository contains the whole application, and together we will see all the crucial parts of using DragonFly with Fastify.
To set up your environment, run the following commands, and you will be ready to start:
npm run infra:up
npm run db:migrate
npm run dev
The main folders relevant to this post are src/graphql
and src/services/message-consumer
. The first one contains the GraphQL server, and the second one contains the Fastify Plugin used as a consumer.
DragonFly, the quick introduction
DragonFly is a service very similar to Redis. It complies with the Redis API; if you want, you can switch between them, changing only the connection string.
DragonFly has performance among its objectives. To achieve that goal, it is implemented differently from Redis. In addition to that, in order to attract developers and consumers, DragonFly is 100% compliant with the Redis API and the Memcached API; still, the algorithms and data structures are different under the hood.
If you are looking for benchmarks, you can find them here.
Publish the first messages to DragonFly
There isn’t an official plugin to handle DragonFly with Fastify. Still, because DragonFly is compliant with the Redis API, you can simply use the @fastify/redis
plugin directly. For the sake of clarity, we’re going to customize it a little bit nonetheless.
In the src/plugins/dragonFly.js
path, you can find the plugin
import fp from 'fastify-plugin'
async function dragonFlyPlugin(app, opts) {
await app.register(import('@fastify/redis'), {
host: opts.config.DRAGONFLY_HOST,
port: opts.config.DRAGONFLY_PORT
})
app.decorate('dragonFly', app.redis)
}
export default fp(dragonFlyPlugin, {
fastify: '4.x',
name: 'dragon-fly'
})
As you can see, after the registration of the @fastify/redis plugin
, the Fastify server was decorated with a dragonFly
client, in this way, you can use the syntax app.dragonFly
to gain access to the dragonFly
client.
Publishing messages to DragonFly is one of the post’s goals and the first one we will tackle. To do this we create a new Fastify plugin that has the goal of publishing messages on DragonFly. You can find the code in src/plugins/eventEmitter.js
.
import { randomUUID } from 'crypto'
import fp from 'fastify-plugin'
async function eventEmitterPlugin(app) {
function buildEvent(payload) {
return {
eventId: randomUUID(),
payload,
eventAt: Date.now()
}
}
async function publishEvent(eventName, payload) {
const event = buildEvent(payload)
return await app.dragonFly.publish(eventName, JSON.stringify(event))
}
const emitter = {
publishEvent
}
app.decorate('eventEmitter', emitter)
}
export default fp(eventEmitterPlugin, {
fastify: '4.x',
dependencies: ['dragon-fly'],
name: 'event-emitter'
})
Looking at the code, you can see it is straightforward. This plugin depends on the dragonFly
plugin and decorates the server with the eventEmitter
decorator. This object contains only the publishEvent
method, used to publish the event in DragonFly, and it accepts the eventName and the payload of the message.
The event emitter is used when a user is created, accepted, or rejected. In this example, these actions are simulated using three simple GraphQL mutations. The GraphQL server’s setup is at src/graphql/index.js
and it’s pretty simple.
import mercurius from 'mercurius'
import mercuriusCache from 'mercurius-cache'
import { loaders, resolvers, schema } from './graphql.js'
export default async function graphqlService(app, opts) {
await app.register(import('../plugins/dragonFly.js'), opts)
await app.register(import('../plugins/eventEmitter.js'), opts)
const { dragonFly } = app
await app.register(mercurius, {
schema,
resolvers,
loaders,
graphiql: true
})
}
It registers the DragonFly and the EventEmitter plugins, and then, using Mercurius, it sets up the GraphQL server.
In the src/graphql/graphql.js
file there are the GraphQL mutations and as you can see, after the data is updated in the database, every mutation publishes the USER_STATUS_UPDATED_EVENT
event on DragonFly using the ctx.app.eventEmitter.publishEvent
method.
...
export const resolvers = {
Query: {
getUsers: async (_, __, ctx) => {
return getUsers(ctx.app.pg)
}
},
Mutation: {
createUser: async (_, { user }, ctx) => {
const newUser = await insertUser(ctx.app.pg, user)
await ctx.app.eventEmitter.publishEvent(
USER_STATUS_UPDATED_EVENT,
newUser
)
return newUser
},
approveUser: async (_, { userId }, ctx) => {
const updatedUser = await updateUserStatus(ctx.app.pg, userId, 'approved')
await ctx.app.eventEmitter.publishEvent(
USER_STATUS_UPDATED_EVENT,
updatedUser
)
return updatedUser
},
rejectUser: async (_, { userId }, ctx) => {
const updatedUser = await updateUserStatus(ctx.app.pg, userId, 'rejected')
await ctx.app.eventEmitter.publishEvent(
USER_STATUS_UPDATED_EVENT,
updatedUser
)
return updatedUser
}
}
}
...
With these simple steps, we have covered all the requirements to publish a message from Fastify to DragonFly, and as you can see, it is pretty simple.
To recap:
Register the DragonFly plugin
Create the EventEmitter plugin
Register the GraphQL server
Use the EventEmmiter decorator in the mutations
Channel subscription
To consume the message published into a DragonFly channel, we’ll have to create a consumer that subscribes to the channel, and every time it receives a message, handle it.
In the src/services/message-consumer/index.js
file, there is all the code required to subscribe to DragonFly and handle the message.
...
export const resolvers = {
Query: {
getUsers: async (_, __, ctx) => {
return getUsers(ctx.app.pg)
}
},
Mutation: {
createUser: async (_, { user }, ctx) => {
const newUser = await insertUser(ctx.app.pg, user)
await ctx.app.eventEmitter.publishEvent(
USER_STATUS_UPDATED_EVENT,
newUser
)
return newUser
},
approveUser: async (_, { userId }, ctx) => {
const updatedUser = await updateUserStatus(ctx.app.pg, userId, 'approved')
await ctx.app.eventEmitter.publishEvent(
USER_STATUS_UPDATED_EVENT,
updatedUser
)
return updatedUser
},
rejectUser: async (_, { userId }, ctx) => {
const updatedUser = await updateUserStatus(ctx.app.pg, userId, 'rejected')
await ctx.app.eventEmitter.publishEvent(
USER_STATUS_UPDATED_EVENT,
updatedUser
)
return updatedUser
}
}
}
...
First, we must register the DragonFly plugin and then using the subscribe
method will start the subscription to the channel.
Then, we’ll need to listen to the message
event to handle the messages. This listener will be called on each new message in DragonFly.
In this example, the code calls the insertRegistrationHistoryRow
method that saves the event in the user history to track the action. Finally, it’s also important to subscribe to Fastify’s onClose
hook to unsubscribe the client when the server shuts down.
Now it’s time to check the result of running our application:
Run
npm run dev
Navigate on http://localhost:3000/graphiql
Call the mutations
createUser
and thenapproveUser
with the ID of the last user createdIn the user’s history, you should see the history of such actions
As we have seen, creating a Pub/Sub service with DragonFly is a piece of cake, and the combo of DragonFly plus Fastify is also perfect for a high-performance solution.
Wrapping up
That’s a wrap on our journey on to scaling an application with Fastify and DragonFly. In this article, we discovered what DragonFly is and how it works. You learned how to use DragonFly as an event bus to scale and decouple the systems, and how easy it is to integrate it in a Fastify application with Mercurius.
Subscribe to my newsletter
Read articles from Nearform directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by