DEV Community

Cover image for Building Modern Backends with Kaapi: Messaging with Kafka
ShyGyver
ShyGyver

Posted on

Building Modern Backends with Kaapi: Messaging with Kafka

Kaapi: A flexible, extensible backend framework for modern APIs with messaging, documentation, and type safety built right in.

This series is written for backend developers who love TypeScript and can appreciate Hapi's design philosophy.


Messaging?

What does β€œmessaging” actually mean in this context?

Messaging is an asynchronous communication pattern where applications exchange data by sending messages through a broker, rather than calling each other directly. This decouples systems, improves scalability and reliability, and allows components to process messages independently and at their own pace.

β€” ChatGPT

That definition is correct… but also not very helpful if you’ve never felt the problem it solves.

So let me give you a real-world example from my own experience at work about six years ago (yes, I’m feeling old).

It started with a simple requirement: β€œWhen a user signs up, send them a welcome email.”

Easy enough. I put it directly in the signup handler: call the email service, wait for the response, return success.

Then someone asked:

β€œCan we also send a Slack notification to the sales team?”

Then another request followed:

β€œCan we log this to our analytics pipeline?”

Before long, the signup handler was doing five different things and if any one of them failed, the entire signup failed.

That’s when it became clear: some things don’t need to happen right now. They just need to happen.


Kafka: the elephant in the room

There are many ways to do async messaging. Redis Pub/Sub. RabbitMQ. AWS SQS.

But if you're building something that needs to scale, handle millions of events, and never lose a message... you're probably looking at Kafka.

The problem? KafkaJS is powerful, but it's verbose. Producers, consumers, admin clients, group IDs, partitions, offsets... it's a lot of ceremony just to send a JSON object from A to B.

Kaapi's @kaapi/kafka-messaging package wraps all of that into something you can actually reason about.


Setting up Kafka locally

Before we write any code, let's get Kafka running.

One command:

docker run -d --name kafka -p 9092:9092 apache/kafka:latest
Enter fullscreen mode Exit fullscreen mode

That's it. You now have a Kafka broker at localhost:9092.


The scenario

We're building two services:

  1. User API: handles signup, publishes a user.created event
  2. Notification Service: listens for user.created, sends a welcome email

Two apps. One Kafka topic. Fully decoupled.


The User API

Let's start with the API that handles signups.

npm install @kaapi/kaapi @kaapi/kafka-messaging
Enter fullscreen mode Exit fullscreen mode
import { Kaapi } from '@kaapi/kaapi';
import { KafkaMessaging } from '@kaapi/kafka-messaging';

const messaging = new KafkaMessaging({
  clientId: 'user-api',
  brokers: ['localhost:9092'],
  name: 'user-api',
});

const app = new Kaapi({
  port: 3000,
  host: 'localhost',
});

app.route<{ Payload: { email: string; name: string } }>({
  method: 'POST',
  path: '/signup',
  handler: async ({ payload }) => {
    const user = {
      id: crypto.randomUUID(),
      email: payload.email,
      name: payload.name,
      createdAt: Date.now(),
    };

    // Save to database (pretend this happened)

    // Publish the event
    await messaging.publish('user.created', user);

    return { success: true, userId: user.id };
  },
});

const start = async () => {
  await app.listen();
  app.log('πŸš€ User API running at', app.base().info.uri);
};

start();
Enter fullscreen mode Exit fullscreen mode

That's your entire producer.

When a user signs up, we publish to user.created. The API doesn't know or care who's listening. It just announces what happened.


Plugging into Kaapi's lifecycle

You can use KafkaMessaging standalone (like we just did). But if you register it with your Kaapi app, you get lifecycle management for free:

const messaging = new KafkaMessaging({
  clientId: 'user-api',
  brokers: ['localhost:9092'],
  name: 'user-api',
});

const app = new Kaapi({
  port: 3000,
  host: 'localhost',
  messaging, // πŸ‘ˆ register it here
});
Enter fullscreen mode Exit fullscreen mode

Now when you call await app.stop(), Kaapi automatically calls await messaging.shutdown().

No manual cleanup. No forgotten connections. The messaging service follows the app's lifecycle.

You also get shortcuts:

// These are equivalent:
await messaging.publish('user.created', user);
await app.publish('user.created', user);

await messaging.subscribe('user.created', handler);
await app.subscribe('user.created', handler);
Enter fullscreen mode Exit fullscreen mode
// signup route
app.route({
    method: 'POST',
    path: '/signup',
    handler: async (req) => {
        const user = {
            //... 
        };
        // shortcut to publish messages
        await req.publish('user.created', user);
        return { success: true };
    },
});
Enter fullscreen mode Exit fullscreen mode

Use whichever feels right. The behavior is identical.


The Notification Service

Now let's build the service that reacts to those events.

import { KafkaMessaging } from '@kaapi/kafka-messaging';

const messaging = new KafkaMessaging({
  clientId: 'notification-service',
  brokers: ['localhost:9092'],
  name: 'notification-service',
});

interface UserCreatedEvent {
  id: string;
  email: string;
  name: string;
  createdAt: number;
}

const start = async () => {
  await messaging.subscribe<UserCreatedEvent>('user.created', async (user, ctx) => {
    console.log(`πŸ“§ Sending welcome email to ${user.email}...`);

    // await emailService.sendWelcome(user.email, user.name);

    console.log(`βœ… Email sent (offset: ${ctx.offset})`);
  });

  console.log('πŸ‘‚ Notification service listening...');
};

start();
Enter fullscreen mode Exit fullscreen mode

Run both services. Hit /signup. Watch the notification service pick up the event.

No polling. No webhooks. No coupling.


What's happening under the hood

When you call subscribe(), Kaapi:

  1. Creates a consumer with an auto-generated group ID (notification-service.user.created)
  2. Connects to Kafka
  3. Starts consuming messages
  4. Parses JSON automatically
  5. Passes typed messages to your handler

When you call publish(), Kaapi:

  1. Gets (or creates) a shared producer
  2. Serializes your message to JSON
  3. Adds metadata headers (service name, address)
  4. Sends it to the topic

You don't manage connections. You don't serialize anything. You just publish and subscribe.


When things go wrong

What happens if your email service is down?

By default, Kaapi logs the error and moves on. But sometimes you want more control.

await messaging.subscribe<UserCreatedEvent>('user.created', 
  async (user, ctx) => {
    await emailService.sendWelcome(user.email, user.name);
  },
  {
    onError: async (error, message, ctx) => {
      console.error(`❌ Failed to process message at offset ${ctx.offset}:`, error);

      // Send to dead-letter queue, alert ops, etc.
      await alerting.notify('notification-service', error);
    },
  }
);
Enter fullscreen mode Exit fullscreen mode

The onError callback gives you the error, the original message, and the context.
Do whatever you need, retry, log, alert, forward to a dead-letter topic.


Consumer groups: who gets what

By default, Kaapi generates a group ID based on your service name and the topic:

notification-service.user.created
Enter fullscreen mode Exit fullscreen mode

This means if you spin up 3 instances of the notification service, Kafka will distribute messages across them. Each message is processed by one instance, not all of them.

Need a custom group ID? Easy:

await messaging.subscribe('user.created', handler, {
  groupId: 'custom-notification-group',
});
Enter fullscreen mode Exit fullscreen mode

Or just customize the prefix:

await messaging.subscribe('user.created', handler, {
  groupIdPrefix: 'emails',  // β†’ emails.user.created
});
Enter fullscreen mode Exit fullscreen mode

Shutting down gracefully

When your service stops, you want to disconnect cleanly.

If you registered messaging with your Kaapi app, this happens automatically when you call app.stop().

For standalone usage, handle it yourself:

process.on('SIGTERM', async () => {
  const result = await messaging.shutdown();
  console.log(`Disconnected ${result.successConsumers} consumers, ${result.successProducers} producers`);
  process.exit(0);
});
Enter fullscreen mode Exit fullscreen mode

Kaapi tracks all producers, consumers, and admin clients you've created. shutdown() disconnects them all and tells you what happened.


What we didn't cover (on purpose)

This article focused on the essentials:

  • publish(): send a message
  • subscribe(): receive messages
  • onError: handle failures
  • shutdown(): clean disconnect

But @kaapi/kafka-messaging can do more:

Feature What it does
publishBatch() Send multiple messages in one call
createTopic() Programmatically create topics
fetchTopicOffsets() Check partition offsets
createProducer() / createConsumer() Manual control when you need it

See the README: https://www.npmjs.com/package/@kaapi/kafka-messaging


Wrapping up

You started with a monolith. You ended with two decoupled services talking over Kafka.

No boilerplate. No manual serialization. No connection juggling.

Just publish() and subscribe().

That's what messaging should feel like.


Source Code

Want to run the full example?

πŸ‘‰ github.com/shygyver/kaapi-monorepo-playground
The example lives under user-api and notification-app.
You can run the applications and the kafka broker in docker:

docker compose up --build -d user-api notification-app kafka
Enter fullscreen mode Exit fullscreen mode

More Kaapi articles are coming! It has plenty more tricks up its sleeve.


πŸ“¦ Get started now

npm install @kaapi/kafka-messaging kafkajs
Enter fullscreen mode Exit fullscreen mode

πŸ”— Learn more: https://github.com/demingongo/kaapi/wiki/KafkaMessaging


Top comments (0)