DEV Community

Nacho Colomina Torregrosa
Nacho Colomina Torregrosa

Posted on • Updated on

Monitoring symfony messenger listening to the worker events

Symfony messenger is a great bundle which allows developers to easily send tasks execution to the background so that the user do not have to wait until they finish. Common tasks we can send to the background are sending an email, processing a payment and, in general, those tasks that can take some time to finish.
In this post we are going to use the events that messenger dispatches while it is handling a message to monitor it.

Requirements

To follow this post we will need the following elements installed:

  • A ready symfony project: To create a symfony project you can use the symfony-cli.
  • Composer: This is the most commonly used php package manager. Symfony manages its vendors using it. You can install composer from its site.
  • Docker and docker compose: We will use docker as a container manager and docker-compose as a tool to configure and start a redis container. If you have not used them so far, refer to the links to install them.
  • Symfony messenger: We also need to install symfony messenger using composer. After having composer installed, install messenger using the bellow command:
composer require symfony/messenger
Enter fullscreen mode Exit fullscreen mode
  • Redis bundle: We will use redis to hit messenger events. To install redis bundle use the following command:
composer require snc/redis-bundle
Enter fullscreen mode Exit fullscreen mode

In the next section we're going to configure redis bundle to use phpredis library. Install it before continuing

Installing redis server

We are going to install a redis server using a docker-compose file. Create a file named docker-compose.yaml in your project root folder and paste the following content:

version: '3'

services:
  redis:
    image: redis
    ports:
      - '6401:6379'
Enter fullscreen mode Exit fullscreen mode

In the above example, we're telling compose to build our docker container using the last redis image and linking the redis container port to the external port 6401.

Now, to start the container, go to your project root folder and use the following command:

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

The last command will download the redis image and start our container. After doing it we will have a redis server running and listening to the port 6401.

Configuring the redis bundle

Open the redis bundle package configuration file (config/packages/snc_redis.yaml) and paste the following content:

snc_redis:
    clients:
       default:
           type: phpredis
           alias: default
           dsn: "http://localhost:6401"
Enter fullscreen mode Exit fullscreen mode

When the redis bundle reads the above configuration, it will create a service with the following id: snc_redis.default. This service is an instance of \Redis class which receives localhost as a host and 6401 as a port.

Now, go to your services.yaml file and bind a variable which will hold the snc_redis.default service:

services:
    _defaults:
        autowire: true
        autoconfigure: true 
        bind:
            $redis: '@snc_redis.default'
Enter fullscreen mode Exit fullscreen mode

After doing it, we can inject the $redis variable in any of our services constructor.

The Messenger events

Symfony messenger dispatches the following events while it is handling a message:

  • WorkerMessageReceivedEvent: When a message is received
  • WorkerMessageHandledEvent: When a message is handled
  • WorkerMessageFailedEvent: When a message handling fails
  • WorkerMessageRetriedEvent: When a message handling is retried.

In the next sections we will listen to these events and will increment the corresponding redis key according to the event received.

The keys

We are going to use two types of keys:

  • The general key: This key will hold the hits for all messages. It will be created following the next format:
"messenger:monitor:<time_event>:<messenger_event>"
Enter fullscreen mode Exit fullscreen mode

There will be a general key for every time / event combination.

  • The message key: This key will hold the hits for an specific message. It will be created following the next format:
"messenger:<message>:monitor:<time_event>:<messenger_event>"
Enter fullscreen mode Exit fullscreen mode

There will be a message key for every message / time / event combination.

The service to hit redis

Before creating the service, let's create a couple of enums which will contain time events and messenger events.

enum MessengerEvents: string
{
    case RECEIVED_EVENT  = 'received';
    case HANDLED_EVENT   = 'handled';
    case FAILED_EVENT    = 'failed';
    case RETRIED_EVENT   = 'retried';
}

enum MessengerTimeEvents: string
{
    case DAY_TIME_EVENT  = 'day';
    case HOUR_TIME_EVENT = 'hour';
}
Enter fullscreen mode Exit fullscreen mode

We are going to hit redis when the following events occur:

  • received: When WorkerMessageReceivedEvent is received
  • handled: When WorkerMessageHandledEvent is received
  • failed: When WorkerMessageFailedEvent is received
  • retried: When WorkerMessageRetriedEvent is received

and, for every event received we are going to hit the general key and the message key twice: One for the day and the other one for the hour.

class MessengerHandler
{
    public function __construct(
        private readonly \Redis $redis
    ){}

    /**
     * @throws \RedisException
     */
    public function hitEvent(string $message, MessengerEvents $event): void
    {
        $pipeline = $this->redis->pipeline();
        $pipeline->hincrby($this->getMessageKey($message, $event, MessengerTimeEvents::DAY_TIME_EVENT), date('Ymd'), 1);
        $pipeline->hincrby($this->getMessageKey($message, $event, MessengerTimeEvents::HOUR_TIME_EVENT), date('Ymdh'), 1);
        $pipeline->hincrby($this->getGeneralKey($event, MessengerTimeEvents::DAY_TIME_EVENT), date('Ymd'), 1);
        $pipeline->hincrby($this->getGeneralKey($event, MessengerTimeEvents::HOUR_TIME_EVENT), date('Ymdh'), 1);
        $pipeline->exec();
        $pipeline->close();
    }

    private function getMessageKey(string $message, MessengerEvents $event, MessengerTimeEvents $timeEvent): string
    {
        return "messenger:{$message}:monitor:{$timeEvent->value}:{$event->value}";
    }

    private function getGeneralKey(MessengerEvents $event, MessengerTimeEvents $timeEvent): string
    {
        return "messenger:monitor:{$timeEvent->value}:{$event->value}";
    }
}
Enter fullscreen mode Exit fullscreen mode

Let's explain the code starting by the private methods:

  • getMessageKey: This method receives the message, the messenger event and the time event. It uses these parameters to build the key, using the format we saw in the last section, and returns it.

  • getGeneralKey: It behaves as getMessageKey but without using the message string.

Now, let's see the hitEvent method. As we can see, it receives the message (we will se later how to get the message class from the envelope) and a MessengerEvents enum instance holding the event received.
Then, the method opens a redis pipeline, makes the hit for every key and executes it.

We use a pipeline to send all hits to redis at once. This allows us to sent 1 request to redis instead of 4.

Notice that we are using a redis HASH type. We use dates as a keys (format "Ymd" for days and "YmdH" for hours). and the hit counts as a values.

The event subscriber

Now, let's create the subscriber which will listen to the messenger events and will use our recently created MessengerHandler service.

class MessengerSubscriber implements EventSubscriberInterface
{

    public function __construct(
        private readonly MessengerHandler $messengerHandler
    ){}

    public static function getSubscribedEvents(): array
    {
        return [
            WorkerMessageReceivedEvent::class => 'onMessageReceived',
            WorkerMessageHandledEvent::class  => 'onMessageHandled',
            WorkerMessageFailedEvent::class   => 'onMessageFailed',
            WorkerMessageRetriedEvent::class  => 'onMessageRetried'
        ];
    }

    /**
     * @throws \RedisException
     */
    public function onMessageReceived(WorkerMessageReceivedEvent $event): void
    {
        $messageClass = new \ReflectionClass($event->getEnvelope()->getMessage());
        $this->messengerHandler->hitEvent($messageClass->getShortName(), MessengerEvents::RECEIVED_EVENT);
    }

    /**
     * @throws \RedisException
     */
    public function onMessageHandled(WorkerMessageHandledEvent $event): void
    {
        $messageClass = new \ReflectionClass($event->getEnvelope()->getMessage());
        $this->messengerHandler->hitEvent($messageClass->getShortName(), MessengerEvents::HANDLED_EVENT);
    }

    /**
     * @throws \RedisException
     */
    public function onMessageFailed(WorkerMessageFailedEvent $event): void
    {
        $messageClass = new \ReflectionClass($event->getEnvelope()->getMessage());
        $this->messengerHandler->hitEvent($messageClass->getShortName(), MessengerEvents::FAILED_EVENT);
    }

    /**
     * @throws \RedisException
     */
    public function onMessageRetried(WorkerMessageRetriedEvent $event): void
    {
        $messageClass = new \ReflectionClass($event->getEnvelope()->getMessage());
        $this->messengerHandler->hitEvent($messageClass->getShortName(), MessengerEvents::RETRIED_EVENT);

    }
}
Enter fullscreen mode Exit fullscreen mode

The subscriber is pretty straightforward. It links a method for each messenger event using the getSubscribedEvents function. The four subscriber methods perform the same logic. They create a \ReflectionClass from the envelope message and use $messengerhandler to hit the event. To get the message class name, they rely on the \ReflectionClass getShortName method. As a second parameter, each method passes its corresponding event from the enum.

Cheking the work

To check that all of this works, we are going to create a message and its handler and route it to a transport. Since we've installed redis, we will use the built-in messenger redis transport. Install redis-messenger using the following command:

composer require symfony/redis-messenger
Enter fullscreen mode Exit fullscreen mode

If you get the following error: Invalid configuration for path "snc_redis.clients.default": You must install "ocramius/proxy-manager" or "friendsofphp/proxy-manager-lts" in order to enable logging for phpredis client, simply install proxy-manager-lts using composer too:

composer require friendsofphp/proxy-manager-lts
Enter fullscreen mode Exit fullscreen mode

Now, let's create the message and the message handler:

namespace App\Messenger\Message;

class HelloWorldMessage
{
    public function __construct(
        public readonly string $name
    ){}
}
Enter fullscreen mode Exit fullscreen mode
namespace App\Messenger\Message;

use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class HelloWorldMessageHandler
{
    public function __invoke(HelloWorldMessage $message): void
    {
        echo 'Hello, my name is ' . $message->name;
    }
}
Enter fullscreen mode Exit fullscreen mode

The message holds a parameter name and the handler simply writes an introducing sentence using the message's parameter.
Now, let's configure the transport in the messenger package file (config/packages/messenger.yaml)

framework:
    messenger:
        transports:
            async: "redis://localhost:6401"
        routing:
            'App\Messenger\Message\HelloWorldMessage': async
Enter fullscreen mode Exit fullscreen mode

The above configuration creates the async transport which will use the redis server we installed before. Then, it configures our HelloWorldMessage to be routed to the async transport.

Now, create a symfony command, inject the Symfony\Component\Messenger\MessageBusInterface and queue a HelloWorldMessage message.

public function execute(InputInterface $input, OutputInterface $output): int
{

   $this->bus->dispatch(new HelloWorldMessage('Peter Parker'));
   return Command::SUCCESS;
}
Enter fullscreen mode Exit fullscreen mode

After executing the command you will have a new enqueued message. Now, execute a worker so you can consume it:

bin/console messenger:consume async -vv --limit=1
Enter fullscreen mode Exit fullscreen mode

If everything went fine, the general and message keys should have been created. To check it, install redis-cli and enter into the server shell using the following command:

redis-cli localhost:<your_port>
Enter fullscreen mode Exit fullscreen mode

Once you are into shell, use the keys command to check that the keys exist:

Messenger keys

To check the keys data, use the hgetall command

Let's check now that the failed and retried keys are also created. We will have to force the message handler to fail. To do it, comment the echo line and throw an exception instead:

public function __invoke(HelloWorldMessage $message): void
{
    //echo 'Hello, my name is ' . $message->name;
    throw new \Exception('I am failed :(');
}
Enter fullscreen mode Exit fullscreen mode

If we queue another message and start a worker, we will se that it fails and symfony retries it 3 times (which is the default retry policy).

Let's check now that the failed and retry keys have been created:

Failed and Retry keys

Conclusion

In this article we have learned how to use redis and the symfony features to monitor messenger. This can show us how our message queueing system is working in order to detect anomalies and possible failures

I hope you found this post useful and interesting. If you enjoy my content and like the Symfony framework, consider reading my book: Building an Operation-Oriented Api using PHP and the Symfony Framework: A step-by-step guide

Top comments (0)