DEV Community

Cover image for Apache Kafka with Laravel: 5 Battle-Tested Integration Strategies for High-Performance Event-Driven Architecture
Igor Nosatov
Igor Nosatov

Posted on

Apache Kafka with Laravel: 5 Battle-Tested Integration Strategies for High-Performance Event-Driven Architecture

Why Kafka + Laravel? The Power Combination Explained

Apache Kafka has become the de facto standard for building real-time data pipelines and streaming applications, while Laravel remains one of the most beloved PHP frameworks. Combining these technologies allows you to build scalable, event-driven systems that can handle millions of messages with ease.

In this comprehensive guide, we'll explore five proven strategies for integrating Apache Kafka with Laravel, complete with real-world use cases and implementation patterns.


Strategy 1: Queue-Based Integration (Beginner-Friendly)

Overview

This approach treats Kafka as a sophisticated queue system, leveraging Laravel's built-in queue infrastructure with a Kafka driver.

When to Use

  • Migrating from Redis/SQS to Kafka
  • Simple pub-sub patterns
  • Background job processing
  • Email notifications and async tasks

Implementation

Step 1: Install Kafka Queue Driver

composer require rapideinternet/laravel-queue-kafka
Enter fullscreen mode Exit fullscreen mode

Step 2: Configure Queue Connection

// config/queue.php
'connections' => [
    'kafka' => [
        'driver' => 'kafka',
        'queue' => env('KAFKA_QUEUE', 'default'),
        'brokers' => env('KAFKA_BROKERS', 'localhost:9092'),
        'consumer_group_id' => env('KAFKA_CONSUMER_GROUP_ID', 'laravel'),
        'sasl_username' => env('KAFKA_SASL_USERNAME'),
        'sasl_password' => env('KAFKA_SASL_PASSWORD'),
    ],
],
Enter fullscreen mode Exit fullscreen mode

Step 3: Create a Job

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;

class ProcessOrderEvent implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable;

    public function __construct(
        public array $orderData
    ) {}

    public function handle()
    {
        // Process order logic
        logger()->info('Processing order', $this->orderData);
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 4: Dispatch Jobs

// Dispatch to Kafka
ProcessOrderEvent::dispatch($orderData)
    ->onConnection('kafka')
    ->onQueue('orders');
Enter fullscreen mode Exit fullscreen mode

Pros & Cons

✅ Familiar Laravel syntax
✅ Easy to implement
✅ Automatic retries and failures handling

❌ Limited Kafka-specific features
❌ Not ideal for complex streaming scenarios


Strategy 2: Event Sourcing Pattern (Advanced)

Overview

Implement true event sourcing where every state change is captured as an immutable event in Kafka, providing a complete audit trail and the ability to rebuild state.

When to Use

  • Financial systems requiring audit trails
  • E-commerce platforms tracking order lifecycle
  • Systems requiring temporal queries
  • Microservices with eventual consistency

Implementation

Step 1: Install Event Sourcing Package

composer require spatie/laravel-event-sourcing
composer require junges/laravel-kafka
Enter fullscreen mode Exit fullscreen mode

Step 2: Create Event Projector

namespace App\Projectors;

use Spatie\EventSourcing\EventHandlers\Projectors\Projector;
use App\Events\OrderCreated;
use App\Events\OrderPaid;
use App\Models\Order;

class OrderProjector extends Projector
{
    public function onOrderCreated(OrderCreated $event)
    {
        Order::create([
            'uuid' => $event->orderUuid,
            'user_id' => $event->userId,
            'status' => 'pending',
            'total' => $event->total,
        ]);
    }

    public function onOrderPaid(OrderPaid $event)
    {
        Order::where('uuid', $event->orderUuid)
            ->update(['status' => 'paid', 'paid_at' => now()]);
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 3: Publish Events to Kafka

namespace App\Services;

use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;

class KafkaEventPublisher
{
    public function publish(string $topic, array $eventData)
    {
        $message = new Message(
            headers: ['event-type' => $eventData['type']],
            body: $eventData,
            key: $eventData['aggregate_uuid'] ?? null
        );

        Kafka::publishOn($topic)
            ->withMessage($message)
            ->send();
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 4: Consume Events

// app/Console/Commands/ConsumeKafkaEvents.php
namespace App\Console\Commands;

use Illuminate\Console\Command;
use Junges\Kafka\Facades\Kafka;

class ConsumeKafkaEvents extends Command
{
    protected $signature = 'kafka:consume-events';

    public function handle()
    {
        $consumer = Kafka::createConsumer(['order-events'])
            ->withBrokers('localhost:9092')
            ->withConsumerGroupId('laravel-projectors')
            ->withHandler(function($message) {
                $eventData = json_decode($message->body, true);

                // Replay event through event sourcing
                event($eventData['type'], [$eventData]);
            })
            ->build();

        $consumer->consume();
    }
}
Enter fullscreen mode Exit fullscreen mode

Pros & Cons

✅ Complete audit trail
✅ Time-travel debugging
✅ Easy to rebuild state

❌ Complex to implement
❌ Storage overhead
❌ Eventual consistency challenges


Strategy 3: CQRS (Command Query Responsibility Segregation)

Overview

Separate read and write models, using Kafka to synchronize between them. Write operations go to the command model, while reads are served from optimized query models.

When to Use

  • High read/write ratio applications
  • Complex reporting requirements
  • Microservices architectures
  • Systems requiring different scaling for reads vs writes

Implementation

Step 1: Command Handler

namespace App\Commands;

class CreateOrderHandler
{
    public function __construct(
        private KafkaEventPublisher $publisher
    ) {}

    public function handle(CreateOrderCommand $command)
    {
        // Validate and create order
        $order = Order::create($command->toArray());

        // Publish event to Kafka
        $this->publisher->publish('order-commands', [
            'type' => 'OrderCreated',
            'aggregate_uuid' => $order->uuid,
            'data' => $order->toArray(),
            'timestamp' => now()->toISOString(),
        ]);

        return $order;
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 2: Read Model Builder

namespace App\ReadModels;

use Illuminate\Support\Facades\Cache;

class OrderReadModel
{
    public function rebuildFromKafka()
    {
        $consumer = Kafka::createConsumer(['order-commands'])
            ->withAutoCommit()
            ->withHandler(function($message) {
                $event = json_decode($message->body, true);

                // Build optimized read model
                $this->updateReadModel($event);
            })
            ->build();

        $consumer->consume();
    }

    private function updateReadModel(array $event)
    {
        // Update Redis/Elasticsearch for fast queries
        Cache::put(
            "order:{$event['aggregate_uuid']}", 
            $event['data'], 
            now()->addDay()
        );
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 3: Query Service

namespace App\Queries;

class OrderQueryService
{
    public function getOrder(string $uuid)
    {
        // Query from read model (Redis/ES)
        return Cache::get("order:{$uuid}");
    }

    public function getOrdersByUser(int $userId)
    {
        // Complex queries on optimized read model
        return Cache::tags(['orders', "user:{$userId}"])->get('orders');
    }
}
Enter fullscreen mode Exit fullscreen mode

Pros & Cons

✅ Scalable reads and writes independently
✅ Optimized query models
✅ Better performance

❌ Eventual consistency
❌ Increased complexity
❌ Data synchronization overhead


Strategy 4: Change Data Capture (CDC) with Kafka Connect

Overview

Use Kafka Connect and Debezium to stream database changes to Kafka automatically, enabling real-time data synchronization across services.

When to Use

  • Database replication across services
  • Real-time analytics
  • Data warehousing
  • Legacy system integration

Implementation

Step 1: Setup Debezium Connector

{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "secret",
    "database.server.id": "184054",
    "database.server.name": "laravel",
    "database.include.list": "laravel_db",
    "table.include.list": "laravel_db.orders,laravel_db.users",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes"
  }
}
Enter fullscreen mode Exit fullscreen mode

Step 2: Laravel CDC Consumer

namespace App\Services;

use Junges\Kafka\Facades\Kafka;

class CDCConsumer
{
    public function consumeDatabaseChanges()
    {
        Kafka::createConsumer(['laravel.laravel_db.orders'])
            ->withBrokers('localhost:9092')
            ->withConsumerGroupId('cdc-processor')
            ->withHandler(function($message) {
                $change = json_decode($message->body, true);

                match($change['op']) {
                    'c' => $this->handleCreate($change['after']),
                    'u' => $this->handleUpdate($change['before'], $change['after']),
                    'd' => $this->handleDelete($change['before']),
                    default => null
                };
            })
            ->build()
            ->consume();
    }

    private function handleCreate(array $data)
    {
        // Sync to analytics DB, Elasticsearch, etc.
        logger()->info('Order created in CDC', $data);
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 3: Real-time Sync Service

namespace App\Services;

class DataSyncService
{
    public function syncToElasticsearch(array $orderData)
    {
        // Index in Elasticsearch for fast search
        Elasticsearch::index([
            'index' => 'orders',
            'id' => $orderData['id'],
            'body' => $orderData
        ]);
    }

    public function syncToDataWarehouse(array $orderData)
    {
        // Push to data warehouse for analytics
        DB::connection('warehouse')
            ->table('orders')
            ->insert($orderData);
    }
}
Enter fullscreen mode Exit fullscreen mode

Pros & Cons

✅ Zero application code changes
✅ Guaranteed data consistency
✅ Works with existing databases

❌ Requires Kafka Connect infrastructure
❌ Learning curve for Debezium
❌ Additional operational complexity


Strategy 5: Saga Pattern for Distributed Transactions

Overview

Implement the Saga pattern using Kafka to coordinate long-running transactions across multiple microservices with compensation logic.

When to Use

  • Multi-service transactions (order → payment → shipping)
  • Microservices requiring coordination
  • Systems without distributed transaction support
  • Complex business workflows

Implementation

Step 1: Saga Orchestrator

namespace App\Sagas;

use Junges\Kafka\Facades\Kafka;

class OrderSaga
{
    private array $completedSteps = [];

    public function execute(array $orderData)
    {
        try {
            // Step 1: Reserve inventory
            $this->reserveInventory($orderData);
            $this->completedSteps[] = 'inventory';

            // Step 2: Process payment
            $this->processPayment($orderData);
            $this->completedSteps[] = 'payment';

            // Step 3: Create shipment
            $this->createShipment($orderData);
            $this->completedSteps[] = 'shipment';

            $this->publishSuccess($orderData);
        } catch (\Exception $e) {
            $this->compensate($orderData);
            throw $e;
        }
    }

    private function reserveInventory(array $orderData)
    {
        Kafka::publishOn('inventory-commands')
            ->withMessage(new Message(body: [
                'command' => 'ReserveInventory',
                'order_id' => $orderData['id'],
                'items' => $orderData['items']
            ]))
            ->send();

        // Wait for confirmation (with timeout)
        return $this->waitForConfirmation('inventory-replies', 30);
    }

    private function compensate(array $orderData)
    {
        // Rollback in reverse order
        if (in_array('shipment', $this->completedSteps)) {
            $this->cancelShipment($orderData);
        }

        if (in_array('payment', $this->completedSteps)) {
            $this->refundPayment($orderData);
        }

        if (in_array('inventory', $this->completedSteps)) {
            $this->releaseInventory($orderData);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 2: Saga Participants

namespace App\Services;

class InventoryService
{
    public function handleCommands()
    {
        Kafka::createConsumer(['inventory-commands'])
            ->withHandler(function($message) {
                $command = json_decode($message->body, true);

                match($command['command']) {
                    'ReserveInventory' => $this->reserve($command),
                    'ReleaseInventory' => $this->release($command),
                    default => null
                };
            })
            ->build()
            ->consume();
    }

    private function reserve(array $command)
    {
        // Reserve inventory logic
        $success = Inventory::reserve($command['items']);

        // Reply to saga orchestrator
        Kafka::publishOn('inventory-replies')
            ->withMessage(new Message(body: [
                'order_id' => $command['order_id'],
                'success' => $success,
                'step' => 'inventory'
            ]))
            ->send();
    }
}
Enter fullscreen mode Exit fullscreen mode

Pros & Cons

✅ Handles distributed transactions
✅ Automatic compensation
✅ Loosely coupled services

❌ Complex to implement
❌ Debugging can be challenging
❌ Requires careful design


Performance Optimization Tips

1. Batch Processing

// Instead of one message at a time
Kafka::publishOn('orders')
    ->withMessages([
        new Message(body: $order1),
        new Message(body: $order2),
        new Message(body: $order3),
    ])
    ->send();
Enter fullscreen mode Exit fullscreen mode

2. Partitioning Strategy

// Use order_id as partition key for ordered processing
$message = new Message(
    body: $orderData,
    key: (string) $orderData['id'] // Same key → same partition
);
Enter fullscreen mode Exit fullscreen mode

3. Consumer Groups for Scaling

// Multiple consumers in same group = parallel processing
Kafka::createConsumer(['orders'])
    ->withConsumerGroupId('order-processors')
    ->withHandler($handler)
    ->build()
    ->consume();
Enter fullscreen mode Exit fullscreen mode

4. Connection Pooling

// Reuse Kafka connections
class KafkaConnectionPool
{
    private static $producer;

    public static function getProducer()
    {
        if (!self::$producer) {
            self::$producer = Kafka::publishOn('topic')
                ->withBrokers(config('kafka.brokers'));
        }
        return self::$producer;
    }
}
Enter fullscreen mode Exit fullscreen mode

Monitoring and Debugging

Laravel Horizon for Queue Monitoring

// config/horizon.php
'environments' => [
    'production' => [
        'kafka-consumer' => [
            'connection' => 'kafka',
            'queue' => ['default', 'orders', 'notifications'],
            'balance' => 'auto',
            'processes' => 10,
            'tries' => 3,
        ],
    ],
],
Enter fullscreen mode Exit fullscreen mode

Kafka Metrics Integration

namespace App\Services;

use Prometheus\CollectorRegistry;

class KafkaMetrics
{
    public function recordMessageProcessed(string $topic, float $duration)
    {
        $registry = app(CollectorRegistry::class);

        $counter = $registry->getOrRegisterCounter(
            'kafka',
            'messages_processed_total',
            'Total messages processed',
            ['topic']
        );
        $counter->inc([$topic]);

        $histogram = $registry->getOrRegisterHistogram(
            'kafka',
            'message_processing_duration_seconds',
            'Message processing duration',
            ['topic']
        );
        $histogram->observe($duration, [$topic]);
    }
}
Enter fullscreen mode Exit fullscreen mode

Common Pitfalls and Solutions

1. Message Ordering Issues

Problem: Messages processed out of order
Solution: Use partition keys and single consumer per partition

2. Duplicate Processing

Problem: Same message processed twice
Solution: Implement idempotency keys

public function handle(Message $message)
{
    $idempotencyKey = $message->headers['idempotency-key'] ?? null;

    if (Cache::has("processed:{$idempotencyKey}")) {
        return; // Already processed
    }

    // Process message

    Cache::put("processed:{$idempotencyKey}", true, now()->addHours(24));
}
Enter fullscreen mode Exit fullscreen mode

3. Consumer Lag

Problem: Consumers falling behind producers
Solution: Scale consumer groups and optimize processing


Conclusion

Integrating Apache Kafka with Laravel opens up powerful possibilities for building scalable, event-driven systems. Choose the strategy that best fits your use case:

  • Queue-Based: Start here for simple async processing
  • Event Sourcing: When you need complete audit trails
  • CQRS: For read-heavy applications
  • CDC: For real-time data synchronization
  • Saga: For distributed transactions

Remember: start simple, measure performance, and scale complexity only when needed.


Additional Resources

Tags: #ApacheKafka #Laravel #PHP #EventDrivenArchitecture #Microservices #EventSourcing #CQRS #DistributedSystems #MessageQueue #RealTimeData #SoftwareArchitecture #BackendDevelopment

Top comments (0)