DEV Community

Cover image for Circuit Breaking: A Love Story Between Laravel and RabbitMQ
Igor Nosatov
Igor Nosatov

Posted on

Circuit Breaking: A Love Story Between Laravel and RabbitMQ

Implementing Circuit Breaker Pattern in Laravel with RabbitMQ

Introduction

The Circuit Breaker pattern is a critical design pattern for building resilient distributed systems. When working with message queues like RabbitMQ in Laravel applications, implementing this pattern becomes essential to prevent cascading failures and maintain system stability. This comprehensive guide will walk you through implementing a robust Circuit Breaker pattern in Laravel that monitors RabbitMQ connections and operations, automatically managing failures and recovery.

In modern microservices architectures, RabbitMQ serves as a crucial message broker that enables asynchronous communication between services. However, network issues, broker overload, or service failures can cause RabbitMQ operations to fail. Without proper protection mechanisms, these failures can cascade through your system, exhausting resources and causing widespread outages. The Circuit Breaker pattern addresses this challenge by providing a safety mechanism that detects failures and prevents the system from making repeated unsuccessful attempts.

Understanding the Circuit Breaker Pattern

Before diving into implementation, it's important to understand the three states of a circuit breaker:

Closed State

In the Closed state, the circuit breaker allows all requests to pass through normally. It monitors each request and tracks failures. If the number of failures exceeds a predefined threshold within a specific time window, the circuit breaker trips and transitions to the Open state.

Open State

When the circuit breaker enters the Open state, it immediately rejects all requests without attempting to execute them. This prevents the system from wasting resources on operations that are likely to fail. The circuit breaker remains in this state for a timeout period, giving the failing service time to recover.

Half-Open State

After the timeout expires, the circuit breaker transitions to the Half-Open state. In this state, it allows a limited number of test requests to pass through. If these requests succeed, the circuit breaker assumes the service has recovered and returns to the Closed state. If any request fails, it immediately returns to the Open state and restarts the timeout timer.

Architecture Overview

Our implementation will consist of several key components:

  1. CircuitBreakerService: The core service that manages circuit breaker state
  2. CircuitBreakerMiddleware: Middleware for HTTP requests to protected services
  3. RabbitMQCircuitBreaker: Specialized implementation for RabbitMQ operations
  4. State Storage: Redis-based storage for sharing state across application instances
  5. Monitoring and Alerts: Integration with Laravel's logging and notification systems

Key Features Covered:

Complete Circuit Breaker Implementation - Full state machine with Closed, Open, and Half-Open states
RabbitMQ Integration - Protected message publishing and consuming operations
Redis-based State Management - Shared state across multiple application instances
Fallback Strategies - Database queueing when RabbitMQ is unavailable
Exponential Backoff - Advanced timeout calculation for repeated failures
Monitoring & Alerting - Email and Slack notifications for state changes
Health Check Endpoints - API endpoints for system monitoring
Console Commands - Management commands for status checks and manual resets
Comprehensive Testing - Unit and integration tests for all components
Scheduled Tasks - Automated retry and monitoring jobs

The implementation is production-ready and includes:

Error handling and logging
Metrics collection
Manual override capabilities
Graceful degradation
Message persistence during failures
Automatic recovery mechanisms

Prerequisites

Before implementing the Circuit Breaker pattern, ensure you have the following dependencies installed in your Laravel application:

composer require php-amqplib/php-amqplib
composer require predis/predis
Enter fullscreen mode Exit fullscreen mode

Update your .env file with RabbitMQ and Redis configuration:

RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=/

REDIS_HOST=127.0.0.1
REDIS_PASSWORD=null
REDIS_PORT=6379

CIRCUIT_BREAKER_FAILURE_THRESHOLD=5
CIRCUIT_BREAKER_SUCCESS_THRESHOLD=2
CIRCUIT_BREAKER_TIMEOUT=60
CIRCUIT_BREAKER_HALF_OPEN_REQUESTS=3
Enter fullscreen mode Exit fullscreen mode

Step 1: Creating the Circuit Breaker Service

Let's start by creating the core Circuit Breaker service. This service will manage the state machine and provide methods for executing protected operations.

Create a new file app/Services/CircuitBreaker/CircuitBreakerService.php:

<?php

namespace App\Services\CircuitBreaker;

use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Facades\Log;
use App\Exceptions\CircuitBreakerOpenException;

class CircuitBreakerService
{
    protected string $serviceName;
    protected int $failureThreshold;
    protected int $successThreshold;
    protected int $timeout;
    protected int $halfOpenMaxRequests;

    const STATE_CLOSED = 'closed';
    const STATE_OPEN = 'open';
    const STATE_HALF_OPEN = 'half_open';

    public function __construct(
        string $serviceName,
        int $failureThreshold = 5,
        int $successThreshold = 2,
        int $timeout = 60,
        int $halfOpenMaxRequests = 3
    ) {
        $this->serviceName = $serviceName;
        $this->failureThreshold = $failureThreshold;
        $this->successThreshold = $successThreshold;
        $this->timeout = $timeout;
        $this->halfOpenMaxRequests = $halfOpenMaxRequests;
    }

    /**
     * Execute a callable with circuit breaker protection
     */
    public function call(callable $callback)
    {
        $state = $this->getState();

        switch ($state) {
            case self::STATE_OPEN:
                if ($this->shouldAttemptReset()) {
                    $this->transitionToHalfOpen();
                    return $this->executeHalfOpen($callback);
                }
                throw new CircuitBreakerOpenException(
                    "Circuit breaker is OPEN for service: {$this->serviceName}"
                );

            case self::STATE_HALF_OPEN:
                return $this->executeHalfOpen($callback);

            case self::STATE_CLOSED:
            default:
                return $this->executeClosed($callback);
        }
    }

    /**
     * Execute callback in Closed state
     */
    protected function executeClosed(callable $callback)
    {
        try {
            $result = $callback();
            $this->recordSuccess();
            return $result;
        } catch (\Exception $e) {
            $this->recordFailure();

            if ($this->hasExceededFailureThreshold()) {
                $this->tripCircuitBreaker();
            }

            throw $e;
        }
    }

    /**
     * Execute callback in Half-Open state
     */
    protected function executeHalfOpen(callable $callback)
    {
        if (!$this->canAttemptRequestInHalfOpen()) {
            throw new CircuitBreakerOpenException(
                "Circuit breaker is in HALF_OPEN state with max requests reached"
            );
        }

        $this->incrementHalfOpenAttempts();

        try {
            $result = $callback();
            $this->recordHalfOpenSuccess();

            if ($this->hasReachedSuccessThreshold()) {
                $this->closeCircuitBreaker();
            }

            return $result;
        } catch (\Exception $e) {
            $this->tripCircuitBreaker();
            throw $e;
        }
    }

    /**
     * Get current circuit breaker state
     */
    public function getState(): string
    {
        return Redis::get($this->getStateKey()) ?? self::STATE_CLOSED;
    }

    /**
     * Set circuit breaker state
     */
    protected function setState(string $state): void
    {
        Redis::set($this->getStateKey(), $state);

        Log::info("Circuit Breaker state changed", [
            'service' => $this->serviceName,
            'new_state' => $state,
            'timestamp' => now()->toIso8601String()
        ]);
    }

    /**
     * Record a successful operation
     */
    protected function recordSuccess(): void
    {
        $this->resetFailureCount();
    }

    /**
     * Record a failed operation
     */
    protected function recordFailure(): void
    {
        $key = $this->getFailureCountKey();
        Redis::incr($key);
        Redis::expire($key, 60); // Failure window of 60 seconds
    }

    /**
     * Record success in Half-Open state
     */
    protected function recordHalfOpenSuccess(): void
    {
        $key = $this->getSuccessCountKey();
        Redis::incr($key);
    }

    /**
     * Check if failure threshold exceeded
     */
    protected function hasExceededFailureThreshold(): bool
    {
        $failures = (int) Redis::get($this->getFailureCountKey());
        return $failures >= $this->failureThreshold;
    }

    /**
     * Check if success threshold reached in Half-Open state
     */
    protected function hasReachedSuccessThreshold(): bool
    {
        $successes = (int) Redis::get($this->getSuccessCountKey());
        return $successes >= $this->successThreshold;
    }

    /**
     * Trip the circuit breaker (transition to Open state)
     */
    protected function tripCircuitBreaker(): void
    {
        $this->setState(self::STATE_OPEN);
        Redis::set($this->getOpenedAtKey(), now()->timestamp);
        $this->resetCounters();

        Log::warning("Circuit Breaker OPENED", [
            'service' => $this->serviceName,
            'failure_count' => $this->failureThreshold
        ]);
    }

    /**
     * Close the circuit breaker (transition to Closed state)
     */
    protected function closeCircuitBreaker(): void
    {
        $this->setState(self::STATE_CLOSED);
        $this->resetCounters();

        Log::info("Circuit Breaker CLOSED", [
            'service' => $this->serviceName
        ]);
    }

    /**
     * Transition to Half-Open state
     */
    protected function transitionToHalfOpen(): void
    {
        $this->setState(self::STATE_HALF_OPEN);
        $this->resetCounters();

        Log::info("Circuit Breaker transitioned to HALF_OPEN", [
            'service' => $this->serviceName
        ]);
    }

    /**
     * Check if should attempt reset from Open to Half-Open
     */
    protected function shouldAttemptReset(): bool
    {
        $openedAt = Redis::get($this->getOpenedAtKey());

        if (!$openedAt) {
            return true;
        }

        return (now()->timestamp - $openedAt) >= $this->timeout;
    }

    /**
     * Check if can attempt request in Half-Open state
     */
    protected function canAttemptRequestInHalfOpen(): bool
    {
        $attempts = (int) Redis::get($this->getHalfOpenAttemptsKey());
        return $attempts < $this->halfOpenMaxRequests;
    }

    /**
     * Increment Half-Open attempts counter
     */
    protected function incrementHalfOpenAttempts(): void
    {
        Redis::incr($this->getHalfOpenAttemptsKey());
    }

    /**
     * Reset failure counter
     */
    protected function resetFailureCount(): void
    {
        Redis::del($this->getFailureCountKey());
    }

    /**
     * Reset all counters
     */
    protected function resetCounters(): void
    {
        Redis::del([
            $this->getFailureCountKey(),
            $this->getSuccessCountKey(),
            $this->getHalfOpenAttemptsKey(),
            $this->getOpenedAtKey()
        ]);
    }

    /**
     * Get Redis key for state
     */
    protected function getStateKey(): string
    {
        return "circuit_breaker:{$this->serviceName}:state";
    }

    /**
     * Get Redis key for failure count
     */
    protected function getFailureCountKey(): string
    {
        return "circuit_breaker:{$this->serviceName}:failures";
    }

    /**
     * Get Redis key for success count
     */
    protected function getSuccessCountKey(): string
    {
        return "circuit_breaker:{$this->serviceName}:successes";
    }

    /**
     * Get Redis key for Half-Open attempts
     */
    protected function getHalfOpenAttemptsKey(): string
    {
        return "circuit_breaker:{$this->serviceName}:half_open_attempts";
    }

    /**
     * Get Redis key for opened timestamp
     */
    protected function getOpenedAtKey(): string
    {
        return "circuit_breaker:{$this->serviceName}:opened_at";
    }

    /**
     * Manually reset the circuit breaker
     */
    public function reset(): void
    {
        $this->setState(self::STATE_CLOSED);
        $this->resetCounters();

        Log::info("Circuit Breaker manually reset", [
            'service' => $this->serviceName
        ]);
    }

    /**
     * Get circuit breaker metrics
     */
    public function getMetrics(): array
    {
        return [
            'service' => $this->serviceName,
            'state' => $this->getState(),
            'failure_count' => (int) Redis::get($this->getFailureCountKey()),
            'success_count' => (int) Redis::get($this->getSuccessCountKey()),
            'half_open_attempts' => (int) Redis::get($this->getHalfOpenAttemptsKey()),
            'opened_at' => Redis::get($this->getOpenedAtKey()),
        ];
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 2: Creating Custom Exception

Create a custom exception for when the circuit breaker is open:

<?php

namespace App\Exceptions;

use Exception;

class CircuitBreakerOpenException extends Exception
{
    protected $message = 'Circuit breaker is open';
    protected $code = 503;
}
Enter fullscreen mode Exit fullscreen mode

Step 3: RabbitMQ Service with Circuit Breaker

Now let's create a RabbitMQ service that integrates the Circuit Breaker pattern:

<?php

namespace App\Services\Queue;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use App\Services\CircuitBreaker\CircuitBreakerService;
use App\Exceptions\CircuitBreakerOpenException;
use Illuminate\Support\Facades\Log;

class RabbitMQService
{
    protected ?AMQPStreamConnection $connection = null;
    protected ?object $channel = null;
    protected CircuitBreakerService $circuitBreaker;

    public function __construct()
    {
        $this->circuitBreaker = new CircuitBreakerService(
            'rabbitmq',
            config('circuit_breaker.failure_threshold', 5),
            config('circuit_breaker.success_threshold', 2),
            config('circuit_breaker.timeout', 60),
            config('circuit_breaker.half_open_requests', 3)
        );
    }

    /**
     * Establish connection to RabbitMQ with circuit breaker protection
     */
    public function connect(): void
    {
        if ($this->connection && $this->connection->isConnected()) {
            return;
        }

        try {
            $this->circuitBreaker->call(function () {
                $this->connection = new AMQPStreamConnection(
                    config('queue.connections.rabbitmq.host'),
                    config('queue.connections.rabbitmq.port'),
                    config('queue.connections.rabbitmq.user'),
                    config('queue.connections.rabbitmq.password'),
                    config('queue.connections.rabbitmq.vhost'),
                    false,
                    'AMQPLAIN',
                    null,
                    'en_US',
                    3.0,  // Connection timeout
                    10.0, // Read/write timeout
                    null,
                    true, // Keepalive
                    30    // Heartbeat
                );

                $this->channel = $this->connection->channel();

                Log::info('RabbitMQ connection established successfully');
            });
        } catch (CircuitBreakerOpenException $e) {
            Log::warning('Cannot connect to RabbitMQ: Circuit breaker is OPEN', [
                'metrics' => $this->circuitBreaker->getMetrics()
            ]);
            throw $e;
        } catch (\Exception $e) {
            Log::error('RabbitMQ connection failed', [
                'error' => $e->getMessage(),
                'trace' => $e->getTraceAsString()
            ]);
            throw $e;
        }
    }

    /**
     * Publish message to queue with circuit breaker protection
     */
    public function publish(
        string $queue,
        array $data,
        array $properties = []
    ): bool {
        try {
            return $this->circuitBreaker->call(function () use ($queue, $data, $properties) {
                $this->connect();
                $this->declareQueue($queue);

                $messageBody = json_encode($data);
                $message = new AMQPMessage($messageBody, array_merge([
                    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                    'content_type' => 'application/json',
                    'timestamp' => time(),
                ], $properties));

                $this->channel->basic_publish($message, '', $queue);

                Log::info('Message published to RabbitMQ', [
                    'queue' => $queue,
                    'data_size' => strlen($messageBody)
                ]);

                return true;
            });
        } catch (CircuitBreakerOpenException $e) {
            Log::warning('Cannot publish to RabbitMQ: Circuit breaker is OPEN', [
                'queue' => $queue,
                'metrics' => $this->circuitBreaker->getMetrics()
            ]);

            // Store message for later retry
            $this->storeForRetry($queue, $data, $properties);

            return false;
        } catch (\Exception $e) {
            Log::error('Failed to publish message to RabbitMQ', [
                'queue' => $queue,
                'error' => $e->getMessage()
            ]);

            throw $e;
        }
    }

    /**
     * Consume messages from queue with circuit breaker protection
     */
    public function consume(
        string $queue,
        callable $callback,
        array $options = []
    ): void {
        try {
            $this->circuitBreaker->call(function () use ($queue, $callback, $options) {
                $this->connect();
                $this->declareQueue($queue);

                $consumerTag = $options['consumer_tag'] ?? 'consumer_' . getmypid();
                $noLocal = $options['no_local'] ?? false;
                $noAck = $options['no_ack'] ?? false;
                $exclusive = $options['exclusive'] ?? false;
                $noWait = $options['no_wait'] ?? false;
                $prefetchCount = $options['prefetch_count'] ?? 1;

                // Set QoS
                $this->channel->basic_qos(null, $prefetchCount, null);

                // Wrap callback with error handling
                $wrappedCallback = function ($msg) use ($callback) {
                    try {
                        $data = json_decode($msg->body, true);
                        $callback($data, $msg);
                        $msg->ack();

                        Log::info('Message consumed successfully', [
                            'delivery_tag' => $msg->getDeliveryTag()
                        ]);
                    } catch (\Exception $e) {
                        Log::error('Error processing message', [
                            'error' => $e->getMessage(),
                            'delivery_tag' => $msg->getDeliveryTag()
                        ]);

                        // Reject and requeue the message
                        $msg->nack(true);
                    }
                };

                $this->channel->basic_consume(
                    $queue,
                    $consumerTag,
                    $noLocal,
                    $noAck,
                    $exclusive,
                    $noWait,
                    $wrappedCallback
                );

                Log::info('Started consuming messages from RabbitMQ', [
                    'queue' => $queue,
                    'consumer_tag' => $consumerTag
                ]);

                // Listen for messages
                while ($this->channel->is_consuming()) {
                    try {
                        $this->channel->wait(null, false, $options['timeout'] ?? 0);
                    } catch (AMQPTimeoutException $e) {
                        // Timeout is normal, continue consuming
                        continue;
                    }
                }
            });
        } catch (CircuitBreakerOpenException $e) {
            Log::warning('Cannot consume from RabbitMQ: Circuit breaker is OPEN', [
                'queue' => $queue,
                'metrics' => $this->circuitBreaker->getMetrics()
            ]);

            // Wait before retrying
            sleep($this->circuitBreaker->getMetrics()['timeout'] ?? 60);
        }
    }

    /**
     * Declare queue with standard configuration
     */
    protected function declareQueue(string $queue): void
    {
        $this->channel->queue_declare(
            $queue,
            false,  // passive
            true,   // durable
            false,  // exclusive
            false,  // auto_delete
            false,  // nowait
            [
                'x-message-ttl' => ['I', 86400000], // 24 hours
                'x-max-length' => ['I', 10000],
            ]
        );
    }

    /**
     * Store failed message for later retry
     */
    protected function storeForRetry(string $queue, array $data, array $properties): void
    {
        try {
            \DB::table('failed_queue_messages')->insert([
                'queue' => $queue,
                'payload' => json_encode($data),
                'properties' => json_encode($properties),
                'attempts' => 0,
                'created_at' => now(),
                'updated_at' => now(),
            ]);

            Log::info('Message stored for retry', [
                'queue' => $queue
            ]);
        } catch (\Exception $e) {
            Log::error('Failed to store message for retry', [
                'error' => $e->getMessage()
            ]);
        }
    }

    /**
     * Retry failed messages
     */
    public function retryFailedMessages(int $limit = 100): int
    {
        $messages = \DB::table('failed_queue_messages')
            ->where('attempts', '<', 5)
            ->orderBy('created_at')
            ->limit($limit)
            ->get();

        $retried = 0;

        foreach ($messages as $message) {
            try {
                $data = json_decode($message->payload, true);
                $properties = json_decode($message->properties, true);

                if ($this->publish($message->queue, $data, $properties)) {
                    \DB::table('failed_queue_messages')
                        ->where('id', $message->id)
                        ->delete();

                    $retried++;
                }
            } catch (\Exception $e) {
                \DB::table('failed_queue_messages')
                    ->where('id', $message->id)
                    ->update([
                        'attempts' => $message->attempts + 1,
                        'last_error' => $e->getMessage(),
                        'updated_at' => now(),
                    ]);
            }
        }

        return $retried;
    }

    /**
     * Get circuit breaker status
     */
    public function getCircuitBreakerStatus(): array
    {
        return $this->circuitBreaker->getMetrics();
    }

    /**
     * Manually reset circuit breaker
     */
    public function resetCircuitBreaker(): void
    {
        $this->circuitBreaker->reset();
    }

    /**
     * Close connections gracefully
     */
    public function disconnect(): void
    {
        try {
            if ($this->channel) {
                $this->channel->close();
            }

            if ($this->connection) {
                $this->connection->close();
            }

            Log::info('RabbitMQ connection closed gracefully');
        } catch (\Exception $e) {
            Log::error('Error closing RabbitMQ connection', [
                'error' => $e->getMessage()
            ]);
        }
    }

    public function __destruct()
    {
        $this->disconnect();
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 4: Configuration File

Create a configuration file for circuit breaker settings at config/circuit_breaker.php:

<?php

return [
    /*
    |--------------------------------------------------------------------------
    | Circuit Breaker Configuration
    |--------------------------------------------------------------------------
    |
    | Configure the circuit breaker behavior for different services
    |
    */

    'failure_threshold' => env('CIRCUIT_BREAKER_FAILURE_THRESHOLD', 5),
    'success_threshold' => env('CIRCUIT_BREAKER_SUCCESS_THRESHOLD', 2),
    'timeout' => env('CIRCUIT_BREAKER_TIMEOUT', 60),
    'half_open_requests' => env('CIRCUIT_BREAKER_HALF_OPEN_REQUESTS', 3),

    'services' => [
        'rabbitmq' => [
            'failure_threshold' => env('CB_RABBITMQ_FAILURE_THRESHOLD', 5),
            'success_threshold' => env('CB_RABBITMQ_SUCCESS_THRESHOLD', 2),
            'timeout' => env('CB_RABBITMQ_TIMEOUT', 60),
        ],

        'external_api' => [
            'failure_threshold' => env('CB_API_FAILURE_THRESHOLD', 3),
            'success_threshold' => env('CB_API_SUCCESS_THRESHOLD', 2),
            'timeout' => env('CB_API_TIMEOUT', 30),
        ],
    ],

    'notifications' => [
        'enabled' => env('CB_NOTIFICATIONS_ENABLED', true),
        'channels' => ['slack', 'mail'],
        'recipients' => env('CB_NOTIFICATION_RECIPIENTS', 'ops@example.com'),
    ],
];
Enter fullscreen mode Exit fullscreen mode

Step 5: Database Migration for Failed Messages

Create a migration to store failed messages:

<?php

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

return new class extends Migration
{
    public function up(): void
    {
        Schema::create('failed_queue_messages', function (Blueprint $table) {
            $table->id();
            $table->string('queue');
            $table->longText('payload');
            $table->text('properties')->nullable();
            $table->integer('attempts')->default(0);
            $table->text('last_error')->nullable();
            $table->timestamps();

            $table->index(['queue', 'created_at']);
            $table->index('attempts');
        });
    }

    public function down(): void
    {
        Schema::dropIfExists('failed_queue_messages');
    }
};
Enter fullscreen mode Exit fullscreen mode

Step 6: Creating Console Commands

Create commands for managing the circuit breaker and retrying failed messages:

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use App\Services\Queue\RabbitMQService;

class CircuitBreakerStatus extends Command
{
    protected $signature = 'circuit-breaker:status {service?}';
    protected $description = 'Display circuit breaker status';

    public function handle(RabbitMQService $rabbitMQ): int
    {
        $service = $this->argument('service') ?? 'rabbitmq';

        if ($service === 'rabbitmq') {
            $status = $rabbitMQ->getCircuitBreakerStatus();

            $this->info("Circuit Breaker Status for RabbitMQ:");
            $this->table(
                ['Metric', 'Value'],
                [
                    ['Service', $status['service']],
                    ['State', strtoupper($status['state'])],
                    ['Failure Count', $status['failure_count']],
                    ['Success Count', $status['success_count']],
                    ['Half-Open Attempts', $status['half_open_attempts']],
                    ['Opened At', $status['opened_at'] ? date('Y-m-d H:i:s', $status['opened_at']) : 'N/A'],
                ]
            );

            return Command::SUCCESS;
        }

        $this->error("Unknown service: {$service}");
        return Command::FAILURE;
    }
}
Enter fullscreen mode Exit fullscreen mode
<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use App\Services\Queue\RabbitMQService;

class CircuitBreakerReset extends Command
{
    protected $signature = 'circuit-breaker:reset {service}';
    protected $description = 'Manually reset a circuit breaker';

    public function handle(RabbitMQService $rabbitMQ): int
    {
        $service = $this->argument('service');

        if ($service === 'rabbitmq') {
            $rabbitMQ->resetCircuitBreaker();
            $this->info("Circuit breaker for RabbitMQ has been reset.");
            return Command::SUCCESS;
        }

        $this->error("Unknown service: {$service}");
        return Command::FAILURE;
    }
}
Enter fullscreen mode Exit fullscreen mode
<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use App\Services\Queue\RabbitMQService;

class RetryFailedMessages extends Command
{
    protected $signature = 'queue:retry-failed {--limit=100}';
    protected $description = 'Retry failed queue messages';

    public function handle(RabbitMQService $rabbitMQ): int
    {
        $limit = (int) $this->option('limit');

        $this->info("Retrying failed messages (limit: {$limit})...");

        $retried = $rabbitMQ->retryFailedMessages($limit);

        $this->info("Successfully retried {$retried} messages.");

        return Command::SUCCESS;
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 7: Usage Examples

Publishing Messages

Here's how to use the RabbitMQ service to publish messages:


php
<?php

namespace App\Http\Controllers;

use App\Services\Queue\RabbitMQService;
use Illuminate\Http\Request;

class OrderController extends Controller
{
    protected RabbitMQService $rabbitMQ;

    public function __construct(RabbitMQService $rabbitMQ)
    {
        $this->rabbitMQ = $rabbitMQ;
    }

    public function processOrder(Request $request)
    {
        $orderData = [
            'order_id' => $request->input('order_id'),
            'customer_id' => $request->input('customer_id'),
            'items' => $request->input('items'),
            'total' => $request->input('total'),
            'timestamp' => now()->toIso8601String(),
        ];

        try {
            $published = $this->rabbitMQ->publish('orders', $orderData);

            if ($published) {
                return response()->json([
                    'success' => true,
                    'message' => 'Order queued for processing'
                ]);
            } else {
                return response()->json([
                    'success' => false,
                    'message' => 'Order saved but queue is unavailable. Will retry later.'
                ], 202);
            }
Enter fullscreen mode Exit fullscreen mode

Top comments (0)