In Part 1 of this series, we introduced RabbitMQ and built our proof-of-concept: the EventBusService, RabbitMQProvider, a DLX retry pattern, and our first emitter (CommentService.createComment()). By the end of Phase 0, we had one event running reliably in production behind a feature flag.
For Phase 1, we applied this pattern to all Tier 1 services. 14 Zod schemas, 25 queue handlers across 3 consumer classes were built, and emit sites were added to CommentService, RecordService, and OccurrenceCrudHelper. We also added 79 new tests. Here is how we executed Phase 1, the design choices we made, and a tricky bug that changed how we guard event bus access.
The Dual-Write Strategy
Our top priority for Phase 1 was safety. Since IHA has active users, we couldn't risk breaking the app. Switching entirely from direct service calls to event emissions wasn't safe yet.
Instead, we used a dual-write strategy. Services now emit events and run their existing inline side effects. If the event broker fails, the inline call still handles the work, and the user notices nothing.
void this.eventBus
.emit('record.created', { recordId: record.id, userId, title: "record.title, ... })"
.catch((err) => {
logger.error({ err, recordId: record.id }, '[RecordService] Failed to emit record.created event');
});
The inline side effects below it still run. The event goes to consumers who perform the same side effects redundantly. In Phase 2, once consumers are proven reliable under production load, the inline calls are removed.
The .catch() is crucial. Without it, a rejected emit() promise crashes the Node.js process or clutters the logs. Using void tells TypeScript we're intentionally not awaiting the promise—this is a fire-and-forget action. Once consumers prove reliable under production load in Phase 2, we will remove the inline calls.
Designing the Event Schemas
We added ten new schemas to the four we built in Phase 0. We made a few specific design choices to keep the system robust:
- Keep payloads minimal: We only send IDs, not full objects. Consumers fetch the extra data they need. This keeps payloads light and ensures consumers use current data, avoiding stale data from the exact moment of emission.
-
Include optional fields for update diffs: For events like
record.updated, we include the previous values of changed fields.
export const RecordUpdatedPayloadSchema = z.object({
recordId: z.string().uuid(),
userId: z.string(),
title: z.string().optional(),
// ...other current fields
previousTitle: z.string().optional(),
previousVisibility: z.string().optional()
});
This lets the consumer easily check if something important changed (like visibility) without fetching the old state from the database, which prevents race conditions.
-
Treat visibility changes as distinct events: Instead of bundling visibility changes into regular updates, we created a specific
visibility_changedevent. These are moderated actions with different rules and consumers. -
Only validate UUIDs when guaranteed: We didn't use
.uuid()validation foruserIdbecause we use Lucia session IDs, not UUIDs. We only strictly validated actual database UUIDs to avoid rejecting valid events.
The Emit Pattern
Every emit site uses a fire-and-forget pattern with .catch() logging. Because our ServiceRegistry doesn't support circular constructor injection, we use a lazy getter for the eventBus. It resolves the service on its first access:
private get eventBus(): EventBusService {
if (!this._eventBus) {
this._eventBus = getService<EventBusService>(SERVICE_NAMES.EVENT_BUS_SERVICE);
}
return this._eventBus;
}
For static helper classes like record-moderation.helper.ts, we can't use this. Instead, we use a dynamic import:
const { getService: _getService, SERVICE_NAMES: _SERVICE_NAMES } = await import('../../ServiceRegistry');
const eventBus = _getService<import('../../EventBusService').EventBusService>(_SERVICE_NAMES.EVENT_BUS_SERVICE);
void eventBus
.emit('record.visibility_changed', { recordId, userId: moderatorUserId, visible: !hide, moderatorNote: note })
.catch((err: unknown) => {
logger.warn({ recordId, err }, '[RecordModerationHelper] Failed to emit record.visibility_changed event');
});
Three Services, Three Integration Patterns
-
CommentService: Added three new emit sites for updating, deleting, and toggling visibility. -
RecordService: Added the lazy getter and emit sites for creating, updating, and deleting records. The update payload cleverly includes only the previous values for fields that actually changed. -
OccurrenceCrudHelper: This helper touches multiple services and isn't registered in theServiceRegistry. We made the getter null-safe with a try/catch, which led to an interesting bug.
The EventBus Emit Guard Problem
During testing, we hit a tricky bug with the null-safe getter in OccurrenceCrudHelper. We initially used optional chaining (_eventBus?.emit) to guard the call.
In tests, the mocked ServiceRegistry returns a truthy mock object, but it lacks the .emit method. Because the object isn't null, optional chaining proceeds, tries to call .emit, and throws a TypeError: _eventBus.emit is not a function.
The Fix: We replaced optional chaining with an explicit type guard.
const _eventBusForCreate = this.eventBus;
if (_eventBusForCreate && typeof _eventBusForCreate.emit === 'function') {
void _eventBusForCreate
.emit('occurrence.created', payload)
.catch((err: unknown) => {
logger.error({ err, occurrenceId: occurrence.id }, '[OccurrenceCrudHelper] Failed to emit occurrence.created event');
});
}
Checking typeof _eventBusForCreate.emit === 'function' safely verifies the method exists before calling it.
Bounded-Context Consumers
Instead of creating a consumer for every single event, we grouped them by entity context: CommentEventConsumers, RecordEventConsumers, and OccurrenceEventConsumers.
Each class registers its queue bindings in the constructor. A single event can fan out to multiple queues simultaneously. If a cache invalidation handler fails, it doesn't break the search indexer handler.
// record.updated -> re-index + notifications + cache
await eventBus.on<RecordUpdatedPayload>(
EVENT_NAMES.RECORD_UPDATED,
QUEUES.SEARCH_INDEXER_RECORD,
async (payload) => { await this.handleUpdatedIndexing(payload); }
);
await eventBus.on<RecordUpdatedPayload>(
EVENT_NAMES.RECORD_UPDATED,
QUEUES.CACHE_INVALIDATION_RECORD,
async (payload) => { await this.handleCacheInvalidation(payload.recordId); }
);
Testing Strategy
We wrote 79 new tests broken into two main groups:
- Schema Contract Tests (38 tests): These catch schema drift. We test valid payloads, missing fields, and invalid UUIDs. Note: Zod v4 strictly checks for UUID v4 formats, so your test fixtures must use valid v4 strings (with a '4' in the 13th position).
- Consumer Unit Tests (41 tests): We mocked the dependencies and tested the handler methods directly to ensure they call the right downstream services. No RabbitMQ broker is needed here.
What We Learned
- Dual-write works: Running both paths simultaneously was safe because our redundant side effects were idempotent.
- Explicit emit guards are necessary: Optional chaining isn't enough when dealing with test mocks. Always check if the method is a function.
- Contract tests save time: They caught schema drift three times during development, preventing silent runtime failures.
- Group consumers by context: Routing logic is much easier to manage in 3 entity-based files rather than 14 event-based files.
- Dynamic imports shine in static helpers: They solve circular dependency risks cleanly without cluttering constructors.
What Comes Next
In Phase 2, we will remove the inline side effects. The consumer will become the only path. We will also extract our tools into a Turborepo monorepo, spinning out the notification and search services into their own isolated processes. Our event boundary work makes this extraction safe and reliable.
Repository Updates (Phase 1):
- New Files: 10 Zod schemas (records/occurrences), 3 consumer classes, 69 unit/contract tests.
-
Modified Files: Constants, index exports,
CommentService,RecordService, helper classes, and RabbitMQ initialization.

Top comments (0)