Why Kafka + Laravel? The Power Combination Explained
Apache Kafka has become the de facto standard for building real-time data pipelines and streaming applications, while Laravel remains one of the most beloved PHP frameworks. Combining these technologies allows you to build scalable, event-driven systems that can handle millions of messages with ease.
In this comprehensive guide, we'll explore five proven strategies for integrating Apache Kafka with Laravel, complete with real-world use cases and implementation patterns.
Strategy 1: Queue-Based Integration (Beginner-Friendly)
Overview
This approach treats Kafka as a sophisticated queue system, leveraging Laravel's built-in queue infrastructure with a Kafka driver.
When to Use
- Migrating from Redis/SQS to Kafka
- Simple pub-sub patterns
- Background job processing
- Email notifications and async tasks
Implementation
Step 1: Install Kafka Queue Driver
composer require rapideinternet/laravel-queue-kafka
Step 2: Configure Queue Connection
// config/queue.php
'connections' => [
'kafka' => [
'driver' => 'kafka',
'queue' => env('KAFKA_QUEUE', 'default'),
'brokers' => env('KAFKA_BROKERS', 'localhost:9092'),
'consumer_group_id' => env('KAFKA_CONSUMER_GROUP_ID', 'laravel'),
'sasl_username' => env('KAFKA_SASL_USERNAME'),
'sasl_password' => env('KAFKA_SASL_PASSWORD'),
],
],
Step 3: Create a Job
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
class ProcessOrderEvent implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable;
public function __construct(
public array $orderData
) {}
public function handle()
{
// Process order logic
logger()->info('Processing order', $this->orderData);
}
}
Step 4: Dispatch Jobs
// Dispatch to Kafka
ProcessOrderEvent::dispatch($orderData)
->onConnection('kafka')
->onQueue('orders');
Pros & Cons
✅ Familiar Laravel syntax
✅ Easy to implement
✅ Automatic retries and failures handling
❌ Limited Kafka-specific features
❌ Not ideal for complex streaming scenarios
Strategy 2: Event Sourcing Pattern (Advanced)
Overview
Implement true event sourcing where every state change is captured as an immutable event in Kafka, providing a complete audit trail and the ability to rebuild state.
When to Use
- Financial systems requiring audit trails
- E-commerce platforms tracking order lifecycle
- Systems requiring temporal queries
- Microservices with eventual consistency
Implementation
Step 1: Install Event Sourcing Package
composer require spatie/laravel-event-sourcing
composer require junges/laravel-kafka
Step 2: Create Event Projector
namespace App\Projectors;
use Spatie\EventSourcing\EventHandlers\Projectors\Projector;
use App\Events\OrderCreated;
use App\Events\OrderPaid;
use App\Models\Order;
class OrderProjector extends Projector
{
public function onOrderCreated(OrderCreated $event)
{
Order::create([
'uuid' => $event->orderUuid,
'user_id' => $event->userId,
'status' => 'pending',
'total' => $event->total,
]);
}
public function onOrderPaid(OrderPaid $event)
{
Order::where('uuid', $event->orderUuid)
->update(['status' => 'paid', 'paid_at' => now()]);
}
}
Step 3: Publish Events to Kafka
namespace App\Services;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;
class KafkaEventPublisher
{
public function publish(string $topic, array $eventData)
{
$message = new Message(
headers: ['event-type' => $eventData['type']],
body: $eventData,
key: $eventData['aggregate_uuid'] ?? null
);
Kafka::publishOn($topic)
->withMessage($message)
->send();
}
}
Step 4: Consume Events
// app/Console/Commands/ConsumeKafkaEvents.php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Junges\Kafka\Facades\Kafka;
class ConsumeKafkaEvents extends Command
{
protected $signature = 'kafka:consume-events';
public function handle()
{
$consumer = Kafka::createConsumer(['order-events'])
->withBrokers('localhost:9092')
->withConsumerGroupId('laravel-projectors')
->withHandler(function($message) {
$eventData = json_decode($message->body, true);
// Replay event through event sourcing
event($eventData['type'], [$eventData]);
})
->build();
$consumer->consume();
}
}
Pros & Cons
✅ Complete audit trail
✅ Time-travel debugging
✅ Easy to rebuild state
❌ Complex to implement
❌ Storage overhead
❌ Eventual consistency challenges
Strategy 3: CQRS (Command Query Responsibility Segregation)
Overview
Separate read and write models, using Kafka to synchronize between them. Write operations go to the command model, while reads are served from optimized query models.
When to Use
- High read/write ratio applications
- Complex reporting requirements
- Microservices architectures
- Systems requiring different scaling for reads vs writes
Implementation
Step 1: Command Handler
namespace App\Commands;
class CreateOrderHandler
{
public function __construct(
private KafkaEventPublisher $publisher
) {}
public function handle(CreateOrderCommand $command)
{
// Validate and create order
$order = Order::create($command->toArray());
// Publish event to Kafka
$this->publisher->publish('order-commands', [
'type' => 'OrderCreated',
'aggregate_uuid' => $order->uuid,
'data' => $order->toArray(),
'timestamp' => now()->toISOString(),
]);
return $order;
}
}
Step 2: Read Model Builder
namespace App\ReadModels;
use Illuminate\Support\Facades\Cache;
class OrderReadModel
{
public function rebuildFromKafka()
{
$consumer = Kafka::createConsumer(['order-commands'])
->withAutoCommit()
->withHandler(function($message) {
$event = json_decode($message->body, true);
// Build optimized read model
$this->updateReadModel($event);
})
->build();
$consumer->consume();
}
private function updateReadModel(array $event)
{
// Update Redis/Elasticsearch for fast queries
Cache::put(
"order:{$event['aggregate_uuid']}",
$event['data'],
now()->addDay()
);
}
}
Step 3: Query Service
namespace App\Queries;
class OrderQueryService
{
public function getOrder(string $uuid)
{
// Query from read model (Redis/ES)
return Cache::get("order:{$uuid}");
}
public function getOrdersByUser(int $userId)
{
// Complex queries on optimized read model
return Cache::tags(['orders', "user:{$userId}"])->get('orders');
}
}
Pros & Cons
✅ Scalable reads and writes independently
✅ Optimized query models
✅ Better performance
❌ Eventual consistency
❌ Increased complexity
❌ Data synchronization overhead
Strategy 4: Change Data Capture (CDC) with Kafka Connect
Overview
Use Kafka Connect and Debezium to stream database changes to Kafka automatically, enabling real-time data synchronization across services.
When to Use
- Database replication across services
- Real-time analytics
- Data warehousing
- Legacy system integration
Implementation
Step 1: Setup Debezium Connector
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "secret",
"database.server.id": "184054",
"database.server.name": "laravel",
"database.include.list": "laravel_db",
"table.include.list": "laravel_db.orders,laravel_db.users",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes"
}
}
Step 2: Laravel CDC Consumer
namespace App\Services;
use Junges\Kafka\Facades\Kafka;
class CDCConsumer
{
public function consumeDatabaseChanges()
{
Kafka::createConsumer(['laravel.laravel_db.orders'])
->withBrokers('localhost:9092')
->withConsumerGroupId('cdc-processor')
->withHandler(function($message) {
$change = json_decode($message->body, true);
match($change['op']) {
'c' => $this->handleCreate($change['after']),
'u' => $this->handleUpdate($change['before'], $change['after']),
'd' => $this->handleDelete($change['before']),
default => null
};
})
->build()
->consume();
}
private function handleCreate(array $data)
{
// Sync to analytics DB, Elasticsearch, etc.
logger()->info('Order created in CDC', $data);
}
}
Step 3: Real-time Sync Service
namespace App\Services;
class DataSyncService
{
public function syncToElasticsearch(array $orderData)
{
// Index in Elasticsearch for fast search
Elasticsearch::index([
'index' => 'orders',
'id' => $orderData['id'],
'body' => $orderData
]);
}
public function syncToDataWarehouse(array $orderData)
{
// Push to data warehouse for analytics
DB::connection('warehouse')
->table('orders')
->insert($orderData);
}
}
Pros & Cons
✅ Zero application code changes
✅ Guaranteed data consistency
✅ Works with existing databases
❌ Requires Kafka Connect infrastructure
❌ Learning curve for Debezium
❌ Additional operational complexity
Strategy 5: Saga Pattern for Distributed Transactions
Overview
Implement the Saga pattern using Kafka to coordinate long-running transactions across multiple microservices with compensation logic.
When to Use
- Multi-service transactions (order → payment → shipping)
- Microservices requiring coordination
- Systems without distributed transaction support
- Complex business workflows
Implementation
Step 1: Saga Orchestrator
namespace App\Sagas;
use Junges\Kafka\Facades\Kafka;
class OrderSaga
{
private array $completedSteps = [];
public function execute(array $orderData)
{
try {
// Step 1: Reserve inventory
$this->reserveInventory($orderData);
$this->completedSteps[] = 'inventory';
// Step 2: Process payment
$this->processPayment($orderData);
$this->completedSteps[] = 'payment';
// Step 3: Create shipment
$this->createShipment($orderData);
$this->completedSteps[] = 'shipment';
$this->publishSuccess($orderData);
} catch (\Exception $e) {
$this->compensate($orderData);
throw $e;
}
}
private function reserveInventory(array $orderData)
{
Kafka::publishOn('inventory-commands')
->withMessage(new Message(body: [
'command' => 'ReserveInventory',
'order_id' => $orderData['id'],
'items' => $orderData['items']
]))
->send();
// Wait for confirmation (with timeout)
return $this->waitForConfirmation('inventory-replies', 30);
}
private function compensate(array $orderData)
{
// Rollback in reverse order
if (in_array('shipment', $this->completedSteps)) {
$this->cancelShipment($orderData);
}
if (in_array('payment', $this->completedSteps)) {
$this->refundPayment($orderData);
}
if (in_array('inventory', $this->completedSteps)) {
$this->releaseInventory($orderData);
}
}
}
Step 2: Saga Participants
namespace App\Services;
class InventoryService
{
public function handleCommands()
{
Kafka::createConsumer(['inventory-commands'])
->withHandler(function($message) {
$command = json_decode($message->body, true);
match($command['command']) {
'ReserveInventory' => $this->reserve($command),
'ReleaseInventory' => $this->release($command),
default => null
};
})
->build()
->consume();
}
private function reserve(array $command)
{
// Reserve inventory logic
$success = Inventory::reserve($command['items']);
// Reply to saga orchestrator
Kafka::publishOn('inventory-replies')
->withMessage(new Message(body: [
'order_id' => $command['order_id'],
'success' => $success,
'step' => 'inventory'
]))
->send();
}
}
Pros & Cons
✅ Handles distributed transactions
✅ Automatic compensation
✅ Loosely coupled services
❌ Complex to implement
❌ Debugging can be challenging
❌ Requires careful design
Performance Optimization Tips
1. Batch Processing
// Instead of one message at a time
Kafka::publishOn('orders')
->withMessages([
new Message(body: $order1),
new Message(body: $order2),
new Message(body: $order3),
])
->send();
2. Partitioning Strategy
// Use order_id as partition key for ordered processing
$message = new Message(
body: $orderData,
key: (string) $orderData['id'] // Same key → same partition
);
3. Consumer Groups for Scaling
// Multiple consumers in same group = parallel processing
Kafka::createConsumer(['orders'])
->withConsumerGroupId('order-processors')
->withHandler($handler)
->build()
->consume();
4. Connection Pooling
// Reuse Kafka connections
class KafkaConnectionPool
{
private static $producer;
public static function getProducer()
{
if (!self::$producer) {
self::$producer = Kafka::publishOn('topic')
->withBrokers(config('kafka.brokers'));
}
return self::$producer;
}
}
Monitoring and Debugging
Laravel Horizon for Queue Monitoring
// config/horizon.php
'environments' => [
'production' => [
'kafka-consumer' => [
'connection' => 'kafka',
'queue' => ['default', 'orders', 'notifications'],
'balance' => 'auto',
'processes' => 10,
'tries' => 3,
],
],
],
Kafka Metrics Integration
namespace App\Services;
use Prometheus\CollectorRegistry;
class KafkaMetrics
{
public function recordMessageProcessed(string $topic, float $duration)
{
$registry = app(CollectorRegistry::class);
$counter = $registry->getOrRegisterCounter(
'kafka',
'messages_processed_total',
'Total messages processed',
['topic']
);
$counter->inc([$topic]);
$histogram = $registry->getOrRegisterHistogram(
'kafka',
'message_processing_duration_seconds',
'Message processing duration',
['topic']
);
$histogram->observe($duration, [$topic]);
}
}
Common Pitfalls and Solutions
1. Message Ordering Issues
Problem: Messages processed out of order
Solution: Use partition keys and single consumer per partition
2. Duplicate Processing
Problem: Same message processed twice
Solution: Implement idempotency keys
public function handle(Message $message)
{
$idempotencyKey = $message->headers['idempotency-key'] ?? null;
if (Cache::has("processed:{$idempotencyKey}")) {
return; // Already processed
}
// Process message
Cache::put("processed:{$idempotencyKey}", true, now()->addHours(24));
}
3. Consumer Lag
Problem: Consumers falling behind producers
Solution: Scale consumer groups and optimize processing
Conclusion
Integrating Apache Kafka with Laravel opens up powerful possibilities for building scalable, event-driven systems. Choose the strategy that best fits your use case:
- Queue-Based: Start here for simple async processing
- Event Sourcing: When you need complete audit trails
- CQRS: For read-heavy applications
- CDC: For real-time data synchronization
- Saga: For distributed transactions
Remember: start simple, measure performance, and scale complexity only when needed.
Additional Resources
- Laravel Kafka Package Documentation
- Apache Kafka Official Docs
- Spatie Event Sourcing
- Kafka Connect & Debezium
Tags: #ApacheKafka #Laravel #PHP #EventDrivenArchitecture #Microservices #EventSourcing #CQRS #DistributedSystems #MessageQueue #RealTimeData #SoftwareArchitecture #BackendDevelopment
Top comments (0)