DEV Community

Maksim Matlakhov
Maksim Matlakhov

Posted on • Originally published at blog.vibetdd.dev

Event Handling: Inbox Pattern for Complex Scenarios

In my previous post, I covered direct event handling for simple, stateless operations. Today, I'm diving into the Inbox Pattern – a robust approach for scenarios where immediate processing isn't safe or when you need more sophisticated handling strategies.

Why Inbox Pattern?

Not all events can be processed immediately upon arrival. Some scenarios require careful, controlled processing:

  • Complex calculations – Long-running computations that might timeout
  • Document conversions – Converting HTML to PDF, processing large files is unpredictable
  • External service calls – Sending emails via providers like SendGrid or Mailgun
  • Multiple event aggregation – Combining data from several events before processing
  • Resource-intensive operations – Tasks that require significant CPU, memory, or I/O

The Problem with Immediate Processing

Processing events directly in the consumer can lead to serious issues:

Timeout Issues – Complex operations may exceed the message broker's in-flight time limit, causing the broker to assume failure and redeliver the message, potentially creating duplicate processing.

Retry Control Loss – When execution fails, you're at the mercy of the broker's retry policy. Most brokers use simple exponential backoff without fine-grained control over retry attempts, timing, or failure tracking.

Resource Exhaustion – High event volumes can overwhelm your service if each event triggers expensive operations synchronously.

The Inbox Pattern solves this by decoupling event reception from processing: we persist the event first, then process it asynchronously with our logic that we control.

Processing Types

Based on my experience, I've identified three distinct processing patterns:

1. Single Event Processing

Each event is processed independently without considering other events.

Example: A user status changes to "SUSPICIOUS". The fraud detection system needs to review all active orders for that user, check payment methods, and potentially flag transactions – all independently of other user status changes. In each update event I put previous and current state, so it's easy to identify a change with a single event.

2. Sequential Processing

Events for the same entity must be processed in order to maintain consistency.

Example: Calculating user reward points based on purchase history. Events like PurchaseCompleted → RefundIssued → BonusApplied must be processed in sequence per user. The final points balance depends on processing these events in the exact order they occurred.

3. Batch Processing

Multiple events are processed together for efficiency.

Example: Generating daily financial reports by collecting all transaction events throughout the day and processing them together at midnight. This avoids recalculating totals, averages, and summaries after every single transaction.

For this post, we'll focus on Single Event Processing.

Demo Architecture

To demonstrate the Inbox Pattern, I've created a modular monolith with three services:

  • User Service – Manages user accounts and status changes
  • Product Service – Handles product catalog
  • Order Service – Processes orders and subscribes to user and product events

Scenario: When a user's status changes, the User Service emits an event. The Order Service subscribes to these events, stores them in an inbox, and processes fraud checks when status transitions involve "SUSPICIOUS".

You can explore the complete implementation here: GitLab Repository

How It All Works Together

Let's walk through the complete flow before diving into the code:

  1. User Service changes a user's status to SUSPICIOUS
  2. Event is published via message broker (Kafka, RabbitMQ, etc.). In our case I use internal Spring implementation
  3. Order Service consumer receives the event and stores it immediately in the inbox
  4. Scheduler triggers every 10 seconds
  5. Runner fetches pending events from the inbox database
  6. Handler processes each event, performing fraud checks on the user's orders
  7. On success: Event marked as SENT, processing complete
  8. On failure: Event scheduled for retry with exponential backoff
  9. Retry continues until either success or max retry limit reached

Key insight: The event consumer's only job is to store the event quickly and acknowledge it to the broker. All complex processing happens asynchronously, with full control over retries and failure handling.

Implementation: Fraud Detection Example

Now let's see how this is implemented in code.

Step 1: Storing Events

When an event arrives, we store it immediately with context defining how it should be processed:

@EventConsumer
class UserInboxEventsConsumerV1(
    private val storage: OrderInboxEventStorageAdapter,
) {

    fun onStatusUpdated(event: EventV1<UserStatusUpdatedV1>) {
        storage.create(
            event,
            InboxEventContext.single(InboxTopic.FRAUD)
        )
    }
}
Enter fullscreen mode Exit fullscreen mode

Key points:

  • InboxEventContext.single() marks this for single event processing
  • InboxTopic.FRAUD groups related events together
  • The event is stored as-is – no processing happens here

Step 2: Defining Event Context

The context structure tells our system how to process events:

data class InboxEventContext(
    val topic: String,
    val processingType: ProcessingType
) {
    companion object {
        fun single(topic: String) = InboxEventContext(
            topic = topic,
            processingType = ProcessingType.SINGLE,
        )
    }
}

enum class ProcessingType {
    SINGLE,
    SEQUENTIAL,
    BATCH
}
Enter fullscreen mode Exit fullscreen mode

Each stored event contains:

data class InboxEventData(
    val id: UUID,
    val context: InboxEventContext,
    val event: EventV1<out EventDtoBody>,
    val notification: Notification = Notification(),
)
Enter fullscreen mode Exit fullscreen mode

Step 3: Tracking Processing State

The Notification object tracks processing attempts and status:

data class Notification(
    val status: Status = Status.PENDING,
    val attempts: Int = 0,
    val executeAt: Instant? = Instant.now(),
    val failedReasons: List<FailedReason> = listOf(),
) {
    enum class Status {
        PENDING, SENT, FAILED
    }
}
Enter fullscreen mode Exit fullscreen mode

Built-in retry logic with exponential backoff:

fun Notification.toFailure(
    exception: Exception,
    maxRetries: Int,
    baseRetryIn: Int,
): Notification {
    val limitExceeded = attempts >= maxRetries
    val newAttempts = attempts + 1
    val nextExecuteAt =
        if (limitExceeded) null
        else executeAt?.plusSeconds(baseRetryIn * newAttempts.toLong())

    return copy(
        status = if (limitExceeded) Status.FAILED else Status.PENDING,
        attempts = newAttempts,
        failedReasons = failedReasons + FailedReason(
            occurredAt = Instant.now(),
            message = exception.message ?: "Unknown error"
        ),
        executeAt = nextExecuteAt,
    )
}
Enter fullscreen mode Exit fullscreen mode

Step 4: Configuring the Scheduler

Define when and how often to process events:

@ConfigurationProperties(prefix = "order.inbox.single")
data class OrderSingleInboxEventProps(
    override val pageSize: Int = 20,
    override val maxRetries: Int = 15,
    override val baseRetryIn: Int = 2,
    override val executeEvery: Duration = Duration.ofSeconds(10),
    override val topic: String? = null,
) : VTInboxEventProps
Enter fullscreen mode Exit fullscreen mode

Configuration breakdown:

  • pageSize: 20 – Process 20 events per batch
  • maxRetries: 15 – Retry failed events up to 15 times
  • baseRetryIn: 2 – Start with 2-second delays, increasing exponentially
  • executeEvery: 10s – Check for new events every 10 seconds
  • topic: null – Process all SINGLE type events (or specify a topic for granular control)

Step 5: Setting Up Processing Infrastructure

Wire everything together with Spring configuration:

@Configuration
@EnableSchedulerLock(defaultLockAtMostFor = "PT30S")
@EnableConfigurationProperties(OrderSingleInboxEventProps::class)
class OrderInboxEventConfig {

    @Bean
    fun orderSingleEventRunner(
        orderInboxEventStorageAdapter: OrderInboxEventStorageAdapter,
        orderInboxEventHandlers: List<VTInboxEventHandler<*>>,
        props: OrderSingleInboxEventProps,
    ) = VTSingleEventRunner(
        repository = orderInboxEventStorageAdapter,
        handlers = orderInboxEventHandlers,
        props = props,
    )

    @Bean
    fun orderSingleEventScheduler(
        orderMongoTemplate: MongoTemplate,
        orderSingleEventRunner: VTSingleEventRunner,
        taskScheduler: TaskScheduler,
        props: OrderSingleInboxEventProps,
    ) = VTEventScheduler(
        lockProvider = MongoLockProvider(orderMongoTemplate.db),
        eventRunner = orderSingleEventRunner,
        taskScheduler = taskScheduler,
        props = props
    )
}
Enter fullscreen mode Exit fullscreen mode

Important: @EnableSchedulerLock prevents duplicate processing in distributed environments using ShedLock.

Step 6: The Event Runner

The runner fetches pending events and processes them in batches:

class VTSingleEventRunner(
    private val repository: VTInboxEventStoragePort,
    private val handlers: List<VTInboxEventHandler<*>>,
    private val props: VTInboxEventProps,
) : VTEventRunner {

    private val log = KotlinLogging.logger {}

    override fun run() {
        var events: List<InboxEventData>
        do {
            events = props.topic?.let { repository.findPendingByTopic(it, props.pageSize) }
                ?: repository.findPendingByType(ProcessingType.SINGLE, props.pageSize)
            events.forEach { processOne(it) }
        } while (events.isNotEmpty())
    }

    private fun processOne(event: InboxEventData) {
        var notification: Notification = event.notification
        try {
            log.info { "Processing inbox event: ${event.id}, topic: ${event.context.topic}" }
            handlers
                .find { it.topic == event.context.topic }
                ?.let {
                    @Suppress("UNCHECKED_CAST")
                    (it as VTInboxEventHandler<EventDtoBody>).handle(event.event as EventV1<EventDtoBody>)
                }

            notification = event.notification.toSuccess()
        } catch (e: Exception) {
            log.error(e) { "Failed to process event: ${event.id}" }
            notification = event.notification.toFailure(e, props.maxRetries, props.baseRetryIn)
        } finally {
            updateEvent(event, notification)
        }
    }

    private fun updateEvent(event: InboxEventData, notification: Notification) {
        try {
            repository.update(event.copy(notification = notification))
        } catch (e: Exception) {
            log.error(e) { "Failed to update event: ${event.id}" }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Processing flow:

  1. Fetch pending events (by topic or processing type)
  2. Find the appropriate handler for each event's topic
  3. Execute the handler
  4. Mark as success or schedule retry on failure
  5. Continue until no pending events remain

Step 7: The Scheduler

The scheduler ensures regular processing with distributed locking:

class VTEventScheduler(
    private val eventRunner: VTEventRunner,
    private val taskScheduler: TaskScheduler,
    private val props: VTInboxEventProps,
    lockProvider: ExtensibleLockProvider
) : ApplicationListener<ContextRefreshedEvent> {

    private val schedulingLockProvider = SchedulingLockProvider(lockProvider)
    private val lockName = "inbox-${props.topic ?: eventRunner::class.simpleName}"

    override fun onApplicationEvent(event: ContextRefreshedEvent) {
        taskScheduler.scheduleWithFixedDelay(
            { processEvents() },
            props.executeEvery
        )
    }

    private fun processEvents() {
        DefaultLockingTaskExecutor(schedulingLockProvider)
            .executeWithLock(
                Runnable { eventRunner.run() },
                LockConfiguration(
                    Instant.now(),
                    lockName,
                    schedulingLockProvider.getLockAtMostFor(),
                    Duration.ZERO
                )
            )
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 8: Creating the Handler

Finally, implement the business logic:

@Component
class FraudTopicHandler(
    override val topic: String = InboxTopic.FRAUD,
    private val coreFactory: OrderCoreFactory
) : VTInboxEventHandler<UserStatusUpdatedV1> {

    override fun handle(event: EventV1<UserStatusUpdatedV1>) = runBlocking {
        coreFactory.fraudUseCase.execute(event.toCommand())
    }
}

fun EventV1<UserStatusUpdatedV1>.toCommand() = ManageFraudCommand(
    userId = modelId,
    previousStatus = body.previous.name,
    currentStatus = body.current.name,
)
Enter fullscreen mode Exit fullscreen mode

Handler responsibilities:

  • Listen for events with topic FRAUD
  • Transform the event into a domain command
  • Execute the business logic (fraud check in this case)

Key Benefits

Reliability – Events aren't lost if processing fails

Observability – Track processing attempts and failures

Flexibility – Configure retry behavior per use case

Scalability – Distributed locks prevent duplicate processing

Separation of Concerns – Event reception and processing are independent

Timeout Protection – Long-running operations don't block message consumers

Coming Next

In the next post, we'll tackle a critical challenge: bootstrapping event-driven services.

When you create a new service (like our Order Service), the upstream data already exists in other services. Users and products are already in the system, but your new Order Service has none of this data. How do you populate it?

We'll explore strategies for:

  • Initial data synchronization when deploying new services
  • Backfilling events from existing data sources
  • Handling the gap between service creation and going live
  • Ensuring consistency during the bootstrap phase

This is essential for building truly independent services that don't rely on direct database access to other services' data.


The Inbox Pattern adds complexity, but for critical business operations requiring guaranteed processing, it's indispensable. Check out the complete example on GitLab to see it in action!

Top comments (0)