Users today expect applications to be fast and responsive, which can be a challenge when dealing with resource-intensive tasks. Instead of making a user wait for a report to generate, a video to process, or a large file to be uploaded, you can offload these tasks to the background. This is where asynchronous processing comes in, and the Symfony Messenger component is the perfect tool for the job.
The Problem with Synchronous Tasks
In a traditional synchronous web application, a user’s request triggers a series of actions that must be completed before a response is sent back to the browser. If one of these actions, such as sending an email or generating a PDF, takes several seconds to complete, the user experience suffers. The browser tab is left in a “pending” state, and the user may assume the application has frozen.
This synchronous model also limits scalability. A single web server can only handle so many simultaneous requests, and if a few are tied up with long-running tasks, it can quickly exhaust the available resources.
The Solution: Symfony Messenger
Symfony Messenger provides a powerful and elegant way to decouple your application’s logic. It’s a message bus that allows you to dispatch “messages” (simple data objects) to be handled later by a separate process, instance, workload. This approach is based on the Producer-Consumer pattern, where your web application acts as the producer, and a background worker acts as the consumer.
Key Concepts
Message: A simple PHP class that holds the data needed to perform a task. It should not contain any logic. For example, SendReportMessage might hold a userId and a reportType.
Message Handler: A service that contains the actual business logic for a specific message. When a message is received by the consumer, the corresponding handler is invoked to perform the task.
Message Bus: The central dispatcher that routes messages to their intended destination. Symfony Messenger provides a default bus, but you can configure multiple buses for different purposes (e.g., a command bus and an event bus).
Transport: The mechanism that stores and delivers messages. This is the heart of asynchronous processing. Symfony Messenger supports various transports out of the box, including Doctrine, RabbitMQ (AMQP), Redis, SQS
Consumer/Worker: A long-running command-line process that listens to a transport for new messages and dispatches them to their handlers.
To handle more sophisticated and compelling video processing workflows, such as simultaneously performing video compression and audio transcription on the same source file, we can implement Symfony Messenger consumer groups.
This powerful feature allows us to broadcast the same message to multiple, independent groups of consumers. As a result, each consumer group can receive an identical message and begin its specific task in parallel. This methodology is important for building highly scalable, fault-tolerant, and performant video processing services that can handle diverse workloads efficiently.
The Challenge: A Synchronous Bottleneck
Imagine a user uploads a 4K video to our application. If we try to process it in a single, synchronous request, the user’s browser will be stuck waiting for the entire operation to finish. This not only creates a terrible user experience but also ties up your web server, making it unable to serve other requests.
A better approach is to break the video into smaller pieces and process each piece concurrently. Symfony Messenger, combined with multiple workers, is the ideal tool for this.
The Solution: Asynchronous, Parallel Video Processing
Symfony Messenger lets you dispatch “messages” to a queue, where separate background processes, called “workers,” pick them up and handle them. For video processing, we’ll create a single ProcessVideoMessage that triggers the main process and then dispatches multiple ProcessVideoChunkMessage messages, each handled by its own worker.
Step 1: Install the Messenger Component
First, install the Messenger component and a robust message broker like RabbitMQ, which is perfect for this kind of high-volume, parallel processing.
composer require symfony/messenger symfony/amqp-messenger
Step 2: Configure the Transport
In your config/packages/messenger.yaml, set up your RabbitMQ transport and define your message routing.
framework:
messenger:
transports:
async_video: '%env(MESSENGER_RABBITMQ_DSN)%'
routing:
'App\Message\ProcessVideoMessage': async_video
'App\Message\ProcessVideoChunkMessage': async_video
In your .env file, configure the RabbitMQ DSN:
MESSENGER_RABBITMQ_DSN=amqp://guest:guest@localhost:5672/%2f
Step 3: Create the Messages and Handlers
We’ll need two messages and two handlers: one for the initial video split and one for each chunk.
The ProcessVideoMessage starts the process. Its handler will split the video and dispatch a message for each chunk.
// src/Message/ProcessVideoMessage.php
final class ProcessVideoMessage
{
public function __construct(private string $filePath) {}
public function getFilePath(): string { return $this->filePath; }
}
// src/MessageHandler/ProcessVideoMessageHandler.php
use Symfony\Component\Messenger\MessageBusInterface;
use App\Service\VideoSplitter;
final class ProcessVideoMessageHandler
{
public function __construct(
private MessageBusInterface $messageBus,
private VideoSplitter $videoSplitter
) {}
public function __invoke(ProcessVideoMessage $message): void
{
// Split the video into 10 chunks and get their paths
$chunkPaths = $this->videoSplitter->split($message->getFilePath(), 10);
// Dispatch a message for each chunk
foreach ($chunkPaths as $chunkPath) {
$this->messageBus->dispatch(new ProcessVideoChunkMessage($chunkPath));
}
}
}
The ProcessVideoChunkMessage handles a single video chunk. Its handler contains the actual processing logic (e.g., encoding, watermarking, etc.).
// src/Message/ProcessVideoChunkMessage.php
final class ProcessVideoChunkMessage
{
public function __construct(private string $chunkPath) {}
public function getChunkPath(): string { return $this->chunkPath; }
}
// src/MessageHandler/ProcessVideoChunkMessageHandler.php
use App\Service\VideoProcessor;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
final class ProcessVideoChunkMessageHandler
{
public function __construct(private VideoProcessor $videoProcessor) {}
public function __invoke(ProcessVideoChunkMessage $message): void
{
// Process the individual video chunk
$this->videoProcessor->encodeChunk($message->getChunkPath());
}
}
Step 4: Dispatch the Initial Message
In your controller or service, dispatch the ProcessVideoMessage as soon as the video is uploaded.
// src/Controller/VideoController.php
use Symfony\Component\Messenger\MessageBusInterface;
class VideoController
{
public function __construct(private MessageBusInterface $messageBus) {}
public function upload(): Response
{
$videoFilePath = 'path/to/uploaded/video.mp4';
// This is a fast operation; the heavy work will happen in the background.
$this->messageBus->dispatch(new ProcessVideoMessage($videoFilePath));
return new Response('Video processing started in the background!');
}
}
Step 5: Run the Workers
Now for the magic. We’ll run multiple workers to handle the messages from the queue. You can run 10 separate processes, each listening to the same queue. This is where your application scales horizontally.
# Run 10 workers, each in its own terminal or managed by a process supervisor
php bin/console messenger:consume async_video
php bin/console messenger:consume async_video
php bin/console messenger:consume async_video
# ... and so on for 10 workers
Each of these workers will independently grab a ProcessVideoChunkMessage from the RabbitMQ queue and process it. As one worker finishes a chunk, it immediately pulls the next one, ensuring all 10 workers are busy until the entire video is processed.
Of course, no one starts consumers manually in a production environment; for a reliable and scalable setup, we turn to Supervisord. This process manager is the ideal solution for launching and managing multiple (10 parallel) Symfony Messenger consumers within a single Docker container. A critical best practice is to ensure each consumer has a unique name, which is configured in your messenger.yaml file.
This specific naming convention is vital for the correct acknowledgement of messages within a consumer group, guaranteeing that messages are not lost and the system remains stable and performant.
The Strategy: A Final Consolidation Step
Once you’ve processed all the video chunks, you need to combine them back into a single, complete video file. This final step is also a long-running task that should be handled asynchronously. The best way to manage this is with another message, sent after all chunks have been processed.
Introduce a VideoChunkProcessedEvent: Instead of directly dispatching a final message from each worker, use an “event.” An event is a message that signals something has happened. In this case, each time a ProcessVideoChunkMessage is handled successfully, it dispatches a VideoChunkProcessedEvent.
A service or dedicated repository will keep a count of how many chunks have been processed for a given video. This service listens for the VideoChunkProcessedEvent.
When the service detects that the processed chunk count has reached the total number of chunks (e.g., 10), it dispatches a new CombineVideoChunksMessage. This message triggers the final step.
Create the CombineVideoChunksMessage and Handler: This message holds the video’s ID or file path, and its handler contains the logic to merge the chunks.
Step 1: The VideoChunkProcessedEvent
After the VideoProcessor completes its work in the ProcessVideoChunkMessageHandler, a new event is dispatched.
// src/MessageHandler/ProcessVideoChunkMessageHandler.php
use App\Message\VideoChunkProcessedEvent;
final class ProcessVideoChunkMessageHandler
{
// ...
public function __invoke(ProcessVideoChunkMessage $message): void
{
// ... (existing processing logic)
// Dispatch an event to signal that this chunk is done.
$this->messageBus->dispatch(new VideoChunkProcessedEvent($message->getChunkPath()));
}
}
Step 2: A Service to Track Progress
A dedicated service listens for the event and decides when the final consolidation can begin. This service uses a database table or a cache to track the progress.
// src/Service/ChunkTrackerService.php
use App\Message\VideoChunkProcessedEvent;
use App\Message\CombineVideoChunksMessage;
use Symfony\Component\Messenger\MessageBusInterface;
use Doctrine\ORM\EntityManagerInterface;
use App\Entity\Video; // Assuming a Video entity
class ChunkTrackerService
{
private EntityManagerInterface $entityManager;
private MessageBusInterface $messageBus;
public function __construct(EntityManagerInterface $entityManager, MessageBusInterface $messageBus)
{
$this->entityManager = $entityManager;
$this->messageBus = $messageBus;
}
public function onChunkProcessed(VideoChunkProcessedEvent $event): void
{
$video = $this->entityManager->getRepository(Video::class)
->findOneBy(['chunk_path' => $event->getChunkPath()]);
if (!$video) {
return; // Or handle error
}
$video->incrementProcessedChunks();
$this->entityManager->flush();
if ($video->getProcessedChunks() === $video->getTotalChunks()) {
// All chunks are done, trigger the final combine process.
$this->messageBus->dispatch(new CombineVideoChunksMessage($video->getId()));
}
}
}
You would configure this service to listen to the event using the #[AsEventListener] attribute or in services.yaml.
Step 3: The CombineVideoChunksMessage and Handler
This handler uses FFmpeg’s concat demuxer to combine the video files without re-encoding them, which is extremely fast.
// src/Message/CombineVideoChunksMessage.php
final class CombineVideoChunksMessage
{
public function __construct(private int $videoId) {}
public function getVideoId(): int { return $this->videoId; }
}
// src/MessageHandler/CombineVideoChunksMessageHandler.php
use App\Service\VideoCombiner;
use App\Repository\VideoRepository;
final class CombineVideoChunksMessageHandler
{
public function __construct(
private VideoCombiner $videoCombiner,
private VideoRepository $videoRepository
) {}
public function __invoke(CombineVideoChunksMessage $message): void
{
$video = $this->videoRepository->find($message->getVideoId());
if (!$video) {
return; // Or handle error
}
$chunkPaths = $video->getChunkPaths(); // Assuming this returns an array of paths
$outputFilePath = $video->getOutputFilePath();
$this->videoCombiner->combine($chunkPaths, $outputFilePath);
// Update video status to 'completed'
$video->setStatus('completed');
$this->videoRepository->save($video);
}
}
Step 4: The VideoCombiner Service
This service encapsulates the FFmpeg command for concatenation. It will write a list of files to a temporary file and then use FFmpeg to read that list and merge them.
// src/Service/VideoCombiner.php
use Symfony\Component\Process\Process;
class VideoCombiner
{
private string $ffmpegBinaryPath;
public function __construct(string $ffmpegBinaryPath)
{
$this->ffmpegBinaryPath = $ffmpegBinaryPath;
}
public function combine(array $chunkPaths, string $outputPath): void
{
// Create a temporary file list for FFmpeg
$listFilePath = sys_get_temp_dir() . '/' . uniqid('chunk_list_') . '.txt';
$fileListContent = '';
foreach ($chunkPaths as $path) {
$fileListContent .= "file '" . $path . "'\n";
}
file_put_contents($listFilePath, $fileListContent);
// Build the FFmpeg command
$command = [
$this->ffmpegBinaryPath,
'-f', 'concat',
'-safe', '0',
'-i', $listFilePath,
'-c', 'copy', // copy streams without re-encoding, this is very fast!
$outputPath,
];
$process = new Process($command);
$process->setTimeout(3600); // Set a timeout for the entire process
try {
$process->mustRun();
} catch (\Exception $e) {
unlink($listFilePath); // Clean up temp file
throw new \RuntimeException('Failed to combine video chunks.', 0, $e);
}
// Clean up the temporary file
unlink($listFilePath);
}
}
Conclusion
Embracing this asynchronous programming pattern with Symfony Messenger provides two major advantages.
- Effectively decouples our application components, leading to a more modular and maintainable architecture.
- Allows us to achieve high-performance data processing for large volumes of data by breaking down a massive dataset into smaller parts and leveraging parallel processing.
The scalability of this approach is virtually limitless, with the number of parallel workers and consumers constrained only by your server’s available resources and a sensible operational strategy.
Stay tuned — and let’s keep the conversation going.
Top comments (0)