DEV Community

Cover image for Domain Events in PHP Without a Kafka Cluster
Gabriel Anhaia
Gabriel Anhaia

Posted on

Domain Events in PHP Without a Kafka Cluster


The order saved. The customer saw the green checkmark. The transaction committed. And then the confirmation email never went out, because the SMTP call was queued after COMMIT and the PHP-FPM worker got reaped before it ran.

You have seen this bug. It is older than most of the codebases you have worked on. Every shop ships its own variant: a Stripe webhook that fired but never reached the inventory service, an analytics event that came through twice, a "user signed up" message that got published for a user whose row got rolled back.

The honest fix is a broker with durable delivery and consumer dedup. The dishonest sales pitch is that you need Kafka for it. You do not. An outbox_events table in the Postgres or MySQL you already run, plus 150 lines of PHP, gets you atomic publishing and at-least-once delivery without standing up a Kafka cluster, a Schema Registry, or a Debezium pipeline.

This is what that looks like.

The bug nobody owns

Every team writes the same naive code at some point:

public function placeOrder(PlaceOrderInput $input): OrderId
{
    $this->db->beginTransaction();

    $order = Order::place($input);
    $this->orders->save($order);

    $this->db->commit();

    $this->mailer->sendConfirmation($order);
    $this->inventory->reserve($order);
    $this->analytics->track('order.placed', $order);

    return $order->id();
}
Enter fullscreen mode Exit fullscreen mode

Three failure modes hide here. The mailer call can throw after commit, leaving an order with no email sent. The worker can be killed between commit and the inventory call. The analytics service can be flaky and retry, double-counting the order.

The first instinct is to move the side effects inside the transaction. Now the inventory HTTP call holds a row lock open for 400ms. Throughput drops. Worse: if the inventory call succeeds but the local commit fails, you reserved stock for an order that does not exist. Phantom side effect.

The second instinct is the opposite: push everything through Laravel jobs or Symfony Messenger, dispatched after commit. That works until the broker is unreachable for 90 seconds during a deploy. The order is in the database. The job never reached the broker. Nobody knows.

You need the side effect to be atomic with the aggregate write and isolated from the request lifecycle. Those two requirements are what the outbox pattern solves.

Outbox pattern at a glance: aggregate row and event row commit together, relay reads the table and publishes to the broker out of band.

The outbox pattern, in one paragraph

Write the event to a table in your own database, inside the same transaction as the aggregate. A separate worker reads that table and publishes to the broker. If the transaction rolls back, the event row rolls back too: no phantom message. If the publish fails, the row stays in the table: no lost message. The whole guarantee comes from one transactional boundary plus a polling loop.

That is it. The rest is plumbing.

The domain event

The aggregate is the one that knows when something interesting happened. Let it record the event in memory; the use case will hand the list off after the aggregate is in its final shape.

<?php declare(strict_types=1);

namespace App\Domain;

interface DomainEvent
{
    public function eventName(): string;
    public function eventId(): string;
    public function occurredAt(): \DateTimeImmutable;
    public function toPayload(): array;
}

final readonly class OrderPlaced implements DomainEvent
{
    public function __construct(
        public string $eventId,
        public string $orderId,
        public string $customerId,
        public int $amountCents,
        public string $currency,
        public \DateTimeImmutable $occurredAt,
    ) {}

    public function eventName(): string { return 'order.placed'; }
    public function eventId(): string { return $this->eventId; }
    public function occurredAt(): \DateTimeImmutable { return $this->occurredAt; }

    public function toPayload(): array
    {
        return [
            'event_id'    => $this->eventId,
            'order_id'    => $this->orderId,
            'customer_id' => $this->customerId,
            'amount'      => $this->amountCents,
            'currency'    => $this->currency,
            'occurred_at' => $this->occurredAt->format(\DATE_RFC3339_EXTENDED),
        ];
    }
}
Enter fullscreen mode Exit fullscreen mode

Two fields earn their keep on every event. The eventId is what consumers will deduplicate on later. The occurredAt is the time the fact became true inside the domain, which matters for ordering and audit; do not let an adapter stamp it on the way out.

The aggregate records the event and the use case publishes it:

final class Order
{
    /** @var list<DomainEvent> */
    private array $pendingEvents = [];

    public static function place(
        string $id,
        string $customerId,
        array $items,
        \DateTimeImmutable $now,
    ): self {
        if (empty($items)) {
            throw new \DomainException('Order needs at least one line.');
        }

        $order = new self($id, $customerId, $items, $now);
        $order->pendingEvents[] = new OrderPlaced(
            eventId:     bin2hex(random_bytes(16)),
            orderId:     $id,
            customerId:  $customerId,
            amountCents: $order->totalCents(),
            currency:    'EUR',
            occurredAt:  $now,
        );
        return $order;
    }

    /** @return list<DomainEvent> */
    public function releaseEvents(): array
    {
        $events = $this->pendingEvents;
        $this->pendingEvents = [];
        return $events;
    }
}
Enter fullscreen mode Exit fullscreen mode

The entity never publishes. It records. The application coordinates.

The outbox table

One table. Same database as your aggregates. Postgres syntax shown; MySQL is the same with BIGINT UNSIGNED AUTO_INCREMENT and TEXT for payload.

CREATE TABLE outbox_events (
    id            BIGSERIAL    PRIMARY KEY,
    event_id      VARCHAR(36)  NOT NULL UNIQUE,
    event_name    VARCHAR(100) NOT NULL,
    payload       JSONB        NOT NULL,
    occurred_at   TIMESTAMPTZ  NOT NULL,
    dispatched_at TIMESTAMPTZ  NULL,
    retry_count   INT          NOT NULL DEFAULT 0,
    last_error    TEXT         NULL
);

CREATE INDEX idx_outbox_pending
    ON outbox_events (id)
    WHERE dispatched_at IS NULL;
Enter fullscreen mode Exit fullscreen mode

The partial index on dispatched_at IS NULL keeps the relay's pending-row scan fast even when the table holds millions of dispatched rows. MySQL does not support partial or filtered indexes; on MySQL, use a plain index on dispatched_at and rely on the nightly cold-storage job (below) to keep the dispatched-row count bounded so the index stays effective.

One thing this table does not do is shrink itself. Every dispatched row stays forever unless you delete it. Add a nightly job that moves rows older than 14 days into a cold table or to S3. Otherwise the table grows unbounded and your index loses its edge.

The dispatcher: an EventBus that writes to a table

The bus adapter writes to the same connection the use case is transacting on. No beginTransaction, no commit. It rides whatever the caller started.

<?php declare(strict_types=1);

namespace App\Adapter\Outbox;

use App\Domain\DomainEvent;
use App\Port\EventBus;
use Doctrine\DBAL\Connection;

final class OutboxEventBus implements EventBus
{
    public function __construct(
        private readonly Connection $connection,
        private readonly string $table = 'outbox_events',
    ) {}

    public function publishAll(iterable $events): void
    {
        foreach ($events as $event) {
            $this->connection->executeStatement(
                "INSERT INTO {$this->table}
                   (event_id, event_name, payload, occurred_at)
                 VALUES (?, ?, ?, ?)",
                [
                    $event->eventId(),
                    $event->eventName(),
                    json_encode($event->toPayload(), \JSON_THROW_ON_ERROR),
                    $event->occurredAt()->format('Y-m-d H:i:s.uP'),
                ],
            );
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The use case wraps the aggregate write and the publish in one transaction:

final readonly class PlaceOrder
{
    public function __construct(
        private OrderRepository $orders,
        private EventBus $events,
        private Connection $connection,
        private \Closure $clock,
    ) {}

    public function execute(PlaceOrderInput $input): string
    {
        return $this->connection->transactional(function () use ($input) {
            $order = Order::place(
                id:         bin2hex(random_bytes(16)),
                customerId: $input->customerId,
                items:      $input->items,
                now:        ($this->clock)(),
            );

            $this->orders->save($order);
            $this->events->publishAll($order->releaseEvents());

            return $order->id();
        });
    }
}
Enter fullscreen mode Exit fullscreen mode

If save succeeds and publishAll crashes, the order rolls back too. If both succeed and the COMMIT fails on disk, both vanish. If the COMMIT succeeds and PHP-FPM kills the worker the next millisecond, the row is durable in the database. No code path produces a published event for an unsaved order, or an unsaved order with a missed event.

That is the whole guarantee. The transaction does the work distributed-transactions people sell you 2PC for.

The relay worker

A separate process polls the table, publishes to the broker, marks the row dispatched. Run it as a systemd unit, a Kubernetes deployment, or a supervisord child — whatever your platform speaks.

<?php declare(strict_types=1);

namespace App\Adapter\Outbox;

use Doctrine\DBAL\Connection;
use Psr\Log\LoggerInterface;

final class OutboxRelay
{
    public function __construct(
        private readonly Connection $connection,
        private readonly EventPublisher $publisher,
        private readonly LoggerInterface $log,
        private readonly int $batchSize = 100,
    ) {}

    public function tick(): int
    {
        $rows = $this->connection->fetchAllAssociative(
            "SELECT id, event_id, event_name, payload, occurred_at
               FROM outbox_events
              WHERE dispatched_at IS NULL
              ORDER BY id ASC
              LIMIT {$this->batchSize}
              FOR UPDATE SKIP LOCKED",
        );

        $dispatched = 0;
        foreach ($rows as $row) {
            try {
                $this->publisher->publish(
                    eventName: $row['event_name'],
                    payload:   json_decode($row['payload'], true, flags: \JSON_THROW_ON_ERROR),
                    headers:   [
                        'event_id'    => $row['event_id'],
                        'occurred_at' => $row['occurred_at'],
                    ],
                );
                $this->markDispatched((int) $row['id']);
                $dispatched++;
            } catch (\Throwable $e) {
                $this->log->warning('outbox publish failed', [
                    'event_id' => $row['event_id'],
                    'error'    => $e->getMessage(),
                ]);
                $this->markFailure((int) $row['id'], $e);
            }
        }
        return $dispatched;
    }

    private function markDispatched(int $id): void
    {
        $this->connection->executeStatement(
            'UPDATE outbox_events SET dispatched_at = NOW() WHERE id = ?',
            [$id],
        );
    }

    private function markFailure(int $id, \Throwable $e): void
    {
        $this->connection->executeStatement(
            'UPDATE outbox_events
                SET retry_count = retry_count + 1, last_error = ?
              WHERE id = ?',
            [substr($e->getMessage(), 0, 1000), $id],
        );
    }
}
Enter fullscreen mode Exit fullscreen mode

The FOR UPDATE SKIP LOCKED clause is what lets you run two or three replicas of the relay without them stepping on each other. Postgres 9.5+ and MySQL 8.0+ support it. The replicas each take their own batch, the locks release on commit, neither blocks the other. SQLite does not support row locking — one of several reasons production is not SQLite.

A thin script wraps the loop:

$relay = $container->get(OutboxRelay::class);

while (true) {
    $dispatched = $relay->tick();
    if ($dispatched === 0) {
        usleep(250_000);
    }
}
Enter fullscreen mode Exit fullscreen mode

250ms idle sleep is a reasonable default. Tune it to your tolerance for "real-time": a hot path that needs sub-100ms event propagation is past the point where polling fits, and Postgres LISTEN/NOTIFY or Debezium-style CDC is the escape hatch.

The relay's atomicity story: aggregate and outbox row commit together; the publish and dispatched-flag UPDATE are separate, so consumers must deduplicate on event_id.

Consumer dedup is non-negotiable

The relay can publish a row, get SIGKILLd before the UPDATE dispatched_at, and on the next tick publish the same event again. That is at-least-once delivery, and it is the trade you make for not running 2PC.

Consumers handle it. A processed_events table with a unique constraint on event_id is enough:

public function handle(IncomingMessage $msg): void
{
    $eventId = $msg->header('event_id');

    $this->connection->transactional(function () use ($eventId, $msg) {
        $inserted = $this->connection->executeStatement(
            'INSERT INTO processed_events (event_id, processed_at)
             VALUES (?, NOW())
             ON CONFLICT (event_id) DO NOTHING',
            [$eventId],
        );

        if ($inserted === 0) {
            return;
        }

        $this->readModel->upsert($msg->payload());
    });

    $msg->ack();
}
Enter fullscreen mode Exit fullscreen mode

ON CONFLICT DO NOTHING is Postgres. MySQL uses INSERT IGNORE or INSERT ... ON DUPLICATE KEY UPDATE. The contract is the same: claim the event id; if you did not get it, someone else already processed it, drop it.

For external side effects (sending an email, charging a card) the same idea applies, but the external call has to be idempotent on the event id. Stripe gives you an idempotency key parameter for this. SMTP does not, so the dedup table is the line of defense.

When the outbox is enough, and when it is not

The outbox covers a wide range of real PHP systems. It is enough when:

  • Throughput stays below a few thousand events per second. A Postgres INSERT plus a partial index handle that comfortably.
  • Consumers run inside your own infrastructure. The broker (RabbitMQ, SQS, Redis Streams) is a delivery hop, not a long-term event log.
  • Strict ordering only matters per aggregate, not globally. Same order_id events arrive in order; cross-aggregate ordering needs more.

It stops being enough when:

  • You need event sourcing. The outbox publishes events; it does not let you rebuild aggregate state from them.
  • Multiple consumers want to replay months of history. Kafka with retention is the right tool for that.
  • Sustained throughput exceeds what one database can ingest. At that point you are running Kafka anyway, and the outbox becomes a CDC source that Debezium streams into the topic.

For most PHP services — the order checkouts, the SaaS sign-up flows, the marketplace listings — none of those limits bite. The 150 lines above are the whole infrastructure investment.

What you keep, what you drop

Keep:

  • One transactional boundary: aggregate write and event insert commit together.
  • One unique event_id per event, present in the payload and in the broker message header.
  • A processed_events table on every consumer.
  • A relay that uses FOR UPDATE SKIP LOCKED so it can run as multiple replicas without coordination.

Drop, until you actually need them:

  • Kafka, Schema Registry, and the Debezium pipeline.
  • Distributed transactions and two-phase commit.
  • Bespoke retry frameworks. The retry_count column and a small backoff in the relay is plenty for years.
  • "Exactly-once delivery." It does not exist. Idempotency does.

You get an order checkout that publishes a confirmation event, an inventory reservation, and an analytics ping. All atomic with the order row, durable across broker outages, with no Kafka cluster sitting between your app and your data.

The day you outgrow the outbox, you will know. Until then, the cheapest event-driven PHP service you can run is the one whose broker is the database you already have.


If this was useful

The outbox is one chapter of Decoupled PHP. The rest of the book walks the same shape across every adapter — HTTP, DB, queues, observability — so the application keeps working when the framework, the broker, or the database under it changes. If event-driven architecture itself is what you want to go deeper on, the Event-Driven Architecture Pocket Guide covers saga, CQRS, and the failure modes the slides do not mention.

Decoupled PHP — Clean and Hexagonal Architecture for Applications That Outlive the Framework

Available on Kindle, Paperback, and Hardcover. English, German, and Japanese editions out now — Portuguese and Spanish coming soon.

Top comments (0)