Kaapi: A flexible, extensible backend framework for modern APIs with messaging, documentation, and type safety built right in.
This series is written for backend developers who love TypeScript and can appreciate Hapi's design philosophy.
Messaging?
What does βmessagingβ actually mean in this context?
Messaging is an asynchronous communication pattern where applications exchange data by sending messages through a broker, rather than calling each other directly. This decouples systems, improves scalability and reliability, and allows components to process messages independently and at their own pace.
β ChatGPT
That definition is correctβ¦ but also not very helpful if youβve never felt the problem it solves.
So let me give you a real-world example from my own experience at work about six years ago (yes, Iβm feeling old).
It started with a simple requirement: βWhen a user signs up, send them a welcome email.β
Easy enough. I put it directly in the signup handler: call the email service, wait for the response, return success.
Then someone asked:
βCan we also send a Slack notification to the sales team?β
Then another request followed:
βCan we log this to our analytics pipeline?β
Before long, the signup handler was doing five different things and if any one of them failed, the entire signup failed.
Thatβs when it became clear: some things donβt need to happen right now. They just need to happen.
Kafka: the elephant in the room
There are many ways to do async messaging. Redis Pub/Sub. RabbitMQ. AWS SQS.
But if you're building something that needs to scale, handle millions of events, and never lose a message... you're probably looking at Kafka.
The problem? KafkaJS is powerful, but it's verbose. Producers, consumers, admin clients, group IDs, partitions, offsets... it's a lot of ceremony just to send a JSON object from A to B.
Kaapi's @kaapi/kafka-messaging package wraps all of that into something you can actually reason about.
Setting up Kafka locally
Before we write any code, let's get Kafka running.
One command:
docker run -d --name kafka -p 9092:9092 apache/kafka:latest
That's it. You now have a Kafka broker at localhost:9092.
The scenario
We're building two services:
- User API: handles signup, publishes a user.created event
- Notification Service: listens for user.created, sends a welcome email
Two apps. One Kafka topic. Fully decoupled.
The User API
Let's start with the API that handles signups.
npm install @kaapi/kaapi @kaapi/kafka-messaging
import { Kaapi } from '@kaapi/kaapi';
import { KafkaMessaging } from '@kaapi/kafka-messaging';
const messaging = new KafkaMessaging({
clientId: 'user-api',
brokers: ['localhost:9092'],
name: 'user-api',
});
const app = new Kaapi({
port: 3000,
host: 'localhost',
});
app.route<{ Payload: { email: string; name: string } }>({
method: 'POST',
path: '/signup',
handler: async ({ payload }) => {
const user = {
id: crypto.randomUUID(),
email: payload.email,
name: payload.name,
createdAt: Date.now(),
};
// Save to database (pretend this happened)
// Publish the event
await messaging.publish('user.created', user);
return { success: true, userId: user.id };
},
});
const start = async () => {
await app.listen();
app.log('π User API running at', app.base().info.uri);
};
start();
That's your entire producer.
When a user signs up, we publish to user.created. The API doesn't know or care who's listening. It just announces what happened.
Plugging into Kaapi's lifecycle
You can use KafkaMessaging standalone (like we just did). But if you register it with your Kaapi app, you get lifecycle management for free:
const messaging = new KafkaMessaging({
clientId: 'user-api',
brokers: ['localhost:9092'],
name: 'user-api',
});
const app = new Kaapi({
port: 3000,
host: 'localhost',
messaging, // π register it here
});
Now when you call await app.stop(), Kaapi automatically calls await messaging.shutdown().
No manual cleanup. No forgotten connections. The messaging service follows the app's lifecycle.
You also get shortcuts:
// These are equivalent:
await messaging.publish('user.created', user);
await app.publish('user.created', user);
await messaging.subscribe('user.created', handler);
await app.subscribe('user.created', handler);
// signup route
app.route({
method: 'POST',
path: '/signup',
handler: async (req) => {
const user = {
//...
};
// shortcut to publish messages
await req.publish('user.created', user);
return { success: true };
},
});
Use whichever feels right. The behavior is identical.
The Notification Service
Now let's build the service that reacts to those events.
import { KafkaMessaging } from '@kaapi/kafka-messaging';
const messaging = new KafkaMessaging({
clientId: 'notification-service',
brokers: ['localhost:9092'],
name: 'notification-service',
});
interface UserCreatedEvent {
id: string;
email: string;
name: string;
createdAt: number;
}
const start = async () => {
await messaging.subscribe<UserCreatedEvent>('user.created', async (user, ctx) => {
console.log(`π§ Sending welcome email to ${user.email}...`);
// await emailService.sendWelcome(user.email, user.name);
console.log(`β
Email sent (offset: ${ctx.offset})`);
});
console.log('π Notification service listening...');
};
start();
Run both services. Hit /signup. Watch the notification service pick up the event.
No polling. No webhooks. No coupling.
What's happening under the hood
When you call subscribe(), Kaapi:
- Creates a consumer with an auto-generated group ID (
notification-service.user.created) - Connects to Kafka
- Starts consuming messages
- Parses JSON automatically
- Passes typed messages to your handler
When you call publish(), Kaapi:
- Gets (or creates) a shared producer
- Serializes your message to JSON
- Adds metadata headers (service name, address)
- Sends it to the topic
You don't manage connections. You don't serialize anything. You just publish and subscribe.
When things go wrong
What happens if your email service is down?
By default, Kaapi logs the error and moves on. But sometimes you want more control.
await messaging.subscribe<UserCreatedEvent>('user.created',
async (user, ctx) => {
await emailService.sendWelcome(user.email, user.name);
},
{
onError: async (error, message, ctx) => {
console.error(`β Failed to process message at offset ${ctx.offset}:`, error);
// Send to dead-letter queue, alert ops, etc.
await alerting.notify('notification-service', error);
},
}
);
The onError callback gives you the error, the original message, and the context.
Do whatever you need, retry, log, alert, forward to a dead-letter topic.
Consumer groups: who gets what
By default, Kaapi generates a group ID based on your service name and the topic:
notification-service.user.created
This means if you spin up 3 instances of the notification service, Kafka will distribute messages across them. Each message is processed by one instance, not all of them.
Need a custom group ID? Easy:
await messaging.subscribe('user.created', handler, {
groupId: 'custom-notification-group',
});
Or just customize the prefix:
await messaging.subscribe('user.created', handler, {
groupIdPrefix: 'emails', // β emails.user.created
});
Shutting down gracefully
When your service stops, you want to disconnect cleanly.
If you registered messaging with your Kaapi app, this happens automatically when you call app.stop().
For standalone usage, handle it yourself:
process.on('SIGTERM', async () => {
const result = await messaging.shutdown();
console.log(`Disconnected ${result.successConsumers} consumers, ${result.successProducers} producers`);
process.exit(0);
});
Kaapi tracks all producers, consumers, and admin clients you've created. shutdown() disconnects them all and tells you what happened.
What we didn't cover (on purpose)
This article focused on the essentials:
-
publish(): send a message -
subscribe(): receive messages -
onError: handle failures -
shutdown(): clean disconnect
But @kaapi/kafka-messaging can do more:
| Feature | What it does |
|---|---|
publishBatch() |
Send multiple messages in one call |
createTopic() |
Programmatically create topics |
fetchTopicOffsets() |
Check partition offsets |
createProducer() / createConsumer()
|
Manual control when you need it |
See the README: https://www.npmjs.com/package/@kaapi/kafka-messaging
Wrapping up
You started with a monolith. You ended with two decoupled services talking over Kafka.
No boilerplate. No manual serialization. No connection juggling.
Just publish() and subscribe().
That's what messaging should feel like.
Source Code
Want to run the full example?
π github.com/shygyver/kaapi-monorepo-playground
The example lives under user-api and notification-app.
You can run the applications and the kafka broker in docker:
docker compose up --build -d user-api notification-app kafka
More Kaapi articles are coming! It has plenty more tricks up its sleeve.
π¦ Get started now
npm install @kaapi/kafka-messaging kafkajs
π Learn more: https://github.com/demingongo/kaapi/wiki/KafkaMessaging
Top comments (0)