DEV Community

Cover image for Distributed Laravel: High-Performance Event-Driven Architectures
Sepehr Mohseni
Sepehr Mohseni

Posted on

Distributed Laravel: High-Performance Event-Driven Architectures

In distributed environments, "Fire and Forget" is a recipe for data corruption. Advanced Laravel architecture requires solving the Atomic Commitment Problem: ensuring your database and your message broker agree on the state of the world without compromising performance or consistency.

1. Defining the Contract: The Envelope Pattern

A "raw array" payload is an architectural liability. Sophisticated systems utilize a Message Envelope that encapsulates metadata for distributed tracing (OpenTelemetry), schema versioning, and idempotency.

<?php

namespace App\Core\Messaging;

use JsonSerializable;
use Illuminate\Support\Str;

readonly class MessageEnvelope implements JsonSerializable
{
    public function __construct(
        public string $id,
        public string $topic,
        public array $payload,
        public string $version = '1.0',
        public array $metadata = [],
        public float $occurredAt = 0.0,
    ) {
        $this->occurredAt = $this->occurredAt ?: microtime(true);
    }

    public static function create(string $topic, array $payload): self
    {
        return new self(
            id: Str::uuid()->toString(),
            topic: $topic,
            payload: $payload,
            metadata: [
                'correlation_id' => request()->header('X-Correlation-ID') ?? Str::uuid()->toString(),
            ]
        );
    }

    public function jsonSerialize(): array
    {
        return get_object_vars($this);
    }
}
Enter fullscreen mode Exit fullscreen mode

2. Low-Level Driver Implementation

To achieve 100% accuracy, we move beyond Laravel's default Queue abstraction and interact with the protocol-level features of our brokers.

Redis Streams (The High-Throughput Path)

Instead of standard Redis lists, we utilize Redis Streams (XADD). This provides native message persistence and consumer groups, allowing Laravel to handle massive event spikes without data loss.

namespace App\Infrastructure\Messaging\Drivers;

use App\Core\Messaging\MessageEnvelope;
use Illuminate\Support\Facades\Redis;

class RedisStreamDriver
{
    public function publish(MessageEnvelope $envelope): void
    {
        // Use MAXLEN ~ 10000 to implement an eviction policy
        Redis::connection('messaging')->xadd(
            $envelope->topic, 
            'MAXLEN', '~', 10000, 
            '*', 
            ['data' => json_encode($envelope)]
        );
    }
}
Enter fullscreen mode Exit fullscreen mode

RabbitMQ (The Reliable Path)

For RabbitMQ, we implement Publisher Confirms. Unlike Redis, this implementation forces the PHP process to wait for an acknowledgment from the RabbitMQ broker before marking the message as sent.

namespace App\Infrastructure\Messaging\Drivers;

use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class RabbitMQDriver
{
    public function publish(MessageEnvelope $envelope): void
    {
        $channel = app('amqp.connection')->channel();

        $channel->exchange_declare($envelope->topic, AMQPExchangeType::TOPIC, false, true, false);

        $msg = new AMQPMessage(json_encode($envelope), [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'content_type' => 'application/json',
        ]);

        $channel->confirm_select(); 
        $channel->basic_publish($msg, $envelope->topic);

        // Wait for ACK with a 5s timeout
        $channel->wait_for_pending_acks(5.0); 

        $channel->close();
    }
}
Enter fullscreen mode Exit fullscreen mode

3. The Transactional Outbox: Zero-Leak Dispatching

The most common failure point is dispatching an event inside a database transaction. If the transaction rolls back but the event was already sent, your system enters an inconsistent state. We solve this by writing to an Outbox within the same ACID transaction.

High-Performance Relay Worker

A naive relay spikes CPU. We use Row-Level Locking (lockForUpdate) to allow multiple relay instances to scale horizontally without duplicate processing.

namespace App\Console\Commands;

use App\Models\OutboxMessage;
use App\Infrastructure\Messaging\MessagingMapper;
use Illuminate\Console\Command;

class OutboxRelay extends Command
{
    protected $signature = 'messaging:relay {--driver=redis}';

    public function handle(): void
    {
        while (true) {
            $processed = OutboxMessage::whereNull('dispatched_at')
                ->where('retry_count', '<', 5)
                ->orderBy('created_at', 'asc')
                ->limit(100)
                ->lockForUpdate() 
                ->get()
                ->each(function ($msg) {
                    try {
                        $driver = MessagingMapper::get($this->option('driver'));
                        $driver->publish($msg->toEnvelope());

                        $msg->update(['dispatched_at' => now()]);
                    } catch (\Throwable $e) {
                        $msg->increment('retry_count');
                        report($e);
                    }
                });

            $processed->isEmpty() ? sleep(1) : usleep(100000);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

4. Consumer Idempotency: The "Exactly Once" Illusion

In distributed systems, you cannot guarantee a message is sent only once, but you can guarantee it is processed only once.

namespace App\Core\Messaging;

use Illuminate\Support\Facades\DB;

trait InteractsWithIdempotency
{
    public function executeOnce(string $messageId, callable $action): void
    {
        try {
            DB::table('processed_events')->insert([
                'id' => $messageId,
                'processed_at' => now(),
            ]);

            $action();
        } catch (\Illuminate\Database\QueryException $e) {
            if ($e->getCode() === '23000') {
                return;
            }
            throw $e;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Final Comparison

Feature Redis Streams RabbitMQ (AMQP)
Throughput ~150k msg/s ~40k msg/s
Acknowledge Consumer level Broker & Consumer level
Persistence In-memory / AOF Disk-backed
Best For Real-time Feeds, Metrics Financial Transactions, Orders

Top comments (0)