DEV Community

Benoit Travers
Benoit Travers

Posted on

Building Type-Safe AMQP Messaging with amqp-contract

If you've worked with RabbitMQ or AMQP messaging in TypeScript, you've probably experienced the frustration of dealing with untyped messages, scattered validation logic, and the constant fear of runtime errors from mismatched data structures. What if there was a better way?

Today, I'm excited to introduce amqp-contract — a TypeScript library that brings the power of contract-first development, end-to-end type safety, and automatic validation to AMQP messaging.

The Problem with Traditional AMQP Development

Let's start with a typical scenario. You're building a microservices architecture using RabbitMQ for inter-service communication. Your publisher looks something like this:

// ❌ Traditional approach - no type safety
import amqp from 'amqplib';

const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();

await channel.assertExchange('orders', 'topic', { durable: true });

// What fields should this have? What types?
channel.publish(
  'orders',
  'order.created',
  Buffer.from(JSON.stringify({
    orderId: 'ORD-123',
    amount: 99.99,
    // Did I forget any required fields?
  }))
);
Enter fullscreen mode Exit fullscreen mode

And your consumer:

// ❌ No type information
channel.consume('order-processing', (msg) => {
  const data = JSON.parse(msg.content.toString()); // unknown type
  console.log(data.orderId); // No autocomplete, no validation
  // Is this the right field name? Who knows!
});
Enter fullscreen mode Exit fullscreen mode

This approach has several critical issues:

  1. No Type Safety: You lose all TypeScript benefits at the messaging boundary
  2. Manual Validation: You need to manually validate every message, or risk runtime errors
  3. Scattered Definitions: Message structures are defined implicitly or scattered across your codebase
  4. Refactoring Nightmares: Change a field name? Good luck finding all the places it's used
  5. Documentation Drift: Your code and documentation quickly get out of sync

Enter amqp-contract

amqp-contract solves these problems by bringing a contract-first approach to AMQP messaging. Inspired by the excellent tRPC, oRPC, and ts-rest libraries, it adapts their philosophy of end-to-end type safety to the world of message queues.

Here's what the same code looks like with amqp-contract:

1. Define Your Contract

First, define your contract with full type safety using schema validation libraries like Zod, Valibot, or ArkType:

import {
  defineContract,
  defineExchange,
  defineQueue,
  definePublisher,
  defineConsumer,
  defineMessage,
  defineQueueBinding,
} from '@amqp-contract/contract';
import { z } from 'zod';

// Define your AMQP resources
const ordersExchange = defineExchange('orders', 'topic', { durable: true });
const orderProcessingQueue = defineQueue('order-processing', { durable: true });

// Define your message schema
const orderMessage = defineMessage(
  z.object({
    orderId: z.string(),
    customerId: z.string(),
    items: z.array(
      z.object({
        productId: z.string(),
        quantity: z.number().int().positive(),
        price: z.number().positive(),
      })
    ),
    totalAmount: z.number().positive(),
    status: z.enum(['pending', 'processing', 'completed']),
  })
);

// Compose your contract
export const contract = defineContract({
  exchanges: { orders: ordersExchange },
  queues: { orderProcessing: orderProcessingQueue },
  bindings: {
    orderBinding: defineQueueBinding(
      orderProcessingQueue,
      ordersExchange,
      { routingKey: 'order.created' }
    ),
  },
  publishers: {
    orderCreated: definePublisher(ordersExchange, orderMessage, {
      routingKey: 'order.created',
    }),
  },
  consumers: {
    processOrder: defineConsumer(orderProcessingQueue, orderMessage),
  },
});
Enter fullscreen mode Exit fullscreen mode

2. Type-Safe Publishing

Now use the contract to create a type-safe client:

import { TypedAmqpClient } from '@amqp-contract/client';
import { contract } from './contract';

const clientResult = await TypedAmqpClient.create({
  contract,
  urls: ['amqp://localhost'],
});

if (clientResult.isError()) {
  throw clientResult.error;
}

const client = clientResult.value;

// ✅ Fully typed! TypeScript knows exactly what fields are required
const result = await client.publish('orderCreated', {
  orderId: 'ORD-123',
  customerId: 'CUST-456',
  items: [
    { productId: 'PROD-789', quantity: 2, price: 49.99 }
  ],
  totalAmount: 99.98,
  status: 'pending',
});

result.match({
  Ok: () => console.log('✅ Published'),
  Error: (error) => console.error('❌ Failed:', error),
});
Enter fullscreen mode Exit fullscreen mode

3. Type-Safe Consuming

And create a type-safe worker for consuming messages:

import { TypedAmqpWorker } from '@amqp-contract/worker';
import { contract } from './contract';

const workerResult = await TypedAmqpWorker.create({
  contract,
  handlers: {
    // ✅ message is fully typed based on your schema
    processOrder: async (message) => {
      console.log(`Processing order: ${message.orderId}`);
      console.log(`Customer: ${message.customerId}`);
      console.log(`Total: $${message.totalAmount}`);

      // ✅ Full autocomplete for all fields
      message.items.forEach((item) => {
        console.log(`- ${item.quantity}x Product ${item.productId}`);
      });
    },
  },
  urls: ['amqp://localhost'],
});

workerResult.match({
  Ok: (worker) => console.log('✅ Worker ready'),
  Error: (error) => { throw error; },
});
Enter fullscreen mode Exit fullscreen mode

Key Features That Make amqp-contract Special

🔒 End-to-End Type Safety

TypeScript types flow automatically from your contract to publishers and consumers. No manual type annotations needed. If you refactor your schema, TypeScript immediately shows you every place that needs updating.

✅ Automatic Validation

Messages are automatically validated at network boundaries using Standard Schema v1. This works with Zod, Valibot, and ArkType, giving you the flexibility to choose your preferred validation library.

🛠️ Compile-Time Checks

TypeScript catches errors before runtime:

// ❌ TypeScript error - "orderDeleted" doesn't exist
await client.publish('orderDeleted', { orderId: '123' });

// ❌ TypeScript error - missing handler
await TypedAmqpWorker.create({
  contract,
  handlers: {}, // forgot processOrder!
  urls: ['amqp://localhost'],
});
Enter fullscreen mode Exit fullscreen mode

📄 AsyncAPI 3.0 Generation

Automatically generate AsyncAPI specifications from your contracts:

import { AsyncAPIGenerator } from '@amqp-contract/asyncapi';
import { ZodToJsonSchemaConverter } from '@orpc/zod/zod4';

const generator = new AsyncAPIGenerator({
  schemaConverters: [new ZodToJsonSchemaConverter()],
});

const spec = await generator.generate(contract, {
  info: {
    title: 'Order Processing API',
    version: '1.0.0',
  },
  servers: {
    production: {
      host: 'rabbitmq.example.com:5672',
      protocol: 'amqp',
    },
  },
});
Enter fullscreen mode Exit fullscreen mode

🎯 First-Class NestJS Support

If you're using NestJS, amqp-contract provides dedicated integration packages:

import { Module } from '@nestjs/common';
import { AmqpWorkerModule } from '@amqp-contract/worker-nestjs';
import { AmqpClientModule } from '@amqp-contract/client-nestjs';

@Module({
  imports: [
    AmqpWorkerModule.forRoot({
      contract,
      handlers: {
        processOrder: async (message) => {
          console.log('Processing:', message.orderId);
        },
      },
      connection: process.env.RABBITMQ_URL,
    }),
    AmqpClientModule.forRoot({
      contract,
      connection: process.env.RABBITMQ_URL,
    }),
  ],
})
export class AppModule {}
Enter fullscreen mode Exit fullscreen mode

Why Choose amqp-contract?

Compared to Raw amqplib

  • ✅ Type safety vs ❌ No types
  • ✅ Automatic validation vs ❌ Manual validation
  • ✅ Compile-time checks vs ❌ Runtime errors
  • ✅ Refactoring support vs ❌ Find/replace
  • ✅ Documentation from code vs ❌ Manual docs

Compared to Other Solutions

Unlike other AMQP libraries, amqp-contract:

  • Focuses on type safety first — types are derived from your contract
  • Uses Standard Schema v1 — compatible with multiple validation libraries
  • Generates AsyncAPI specs — automatic documentation
  • Provides explicit error handling — uses Result types
  • Is framework agnostic — works standalone or with NestJS

Getting Started

Installation

# Core packages
pnpm add @amqp-contract/contract @amqp-contract/client @amqp-contract/worker

# Choose your schema library
pnpm add zod  # or valibot, or arktype

# AMQP client
pnpm add amqplib @types/amqplib
Enter fullscreen mode Exit fullscreen mode

Quick Start

  1. Define your contract with schemas
  2. Create a client to publish messages
  3. Create a worker to consume messages
  4. Enjoy type safety end-to-end!

Try It Today!

amqp-contract is open source (MIT license) and available on npm:

Check out the full documentation for detailed guides, API reference, and examples.

Conclusion

Type safety shouldn't stop at your application boundaries. With amqp-contract, you can bring the same level of type safety and developer experience you enjoy with TypeScript to your AMQP messaging layer.

Stop fighting runtime errors. Stop manually validating messages. Stop worrying about refactoring. Start building type-safe, validated, and maintainable messaging systems today.


What do you think? Have you tried amqp-contract? What are your experiences with type-safe messaging? Let me know in the comments!

Top comments (0)