Modern app starts simple, then the business logic grows, gets tangled in the infrastructure, and before you know it, you’re wrestling an unmaintainable monolith. For complex, multi-step processes — like video processing — this trajectory is a nightmare.
That’s why Domain-Driven Design (DDD) isn’t just a pattern; it’s a lifeline. By placing the core business — the domain — at the center, we build robust, scalable, and most importantly, understandable applications. Let’s explore how DDD, coupled with Symfony’s power, can tame a simple video processing app.
The Power of DDD: More than Just Code
DDD isn’t about a specific technology or framework. Instead, it’s a design philosophy that puts the business at the very heart of the software development process. It’s about modeling our code to precisely reflect the real-world processes and language of a specific business domain. When we do this, we unlock a host of advantages:
A Common Language for Everyone
Forget the disconnect between developers and business stakeholders. With DDD, we establish a “Ubiquitous Language” — a shared vocabulary that everyone, from the domain expert to the junior developer, uses consistently. This language isn’t just for meetings; it’s the foundation of your code. If the business talks about a “Customer” and an “Order,” your code will have Customer and Order classes. This clarity eliminates misunderstandings and ensures that the software truly solves the business problem.Clarity, Modularity, and Scalability
DDD encourages us to break down a large, complex system into smaller, more manageable parts called “Bounded Contexts.” Each Bounded Context is a self-contained domain with its own model and rules. This approach leads to a modular architecture that is easier to understand, develop, and maintain. In the context of a Symfony application, each Bounded Context can be a separate bundle, allowing for clear separation of concerns. This modularity also paves the way for a microservices architecture, where each service is a Bounded Context, allowing for independent scaling and deployment.A Robust and Flexible Core
By separating the business logic (the domain) from the technical implementation (infrastructure), DDD helps us create a flexible and framework-independent core. Your domain model — containing entities, value objects, and business logic — is a pure expression of the business rules. This means you can change your database, switch to a different message queue, or even migrate to a new framework without rewriting the core business logic. This separation is a perfect match for Symfony’s layered architecture and its emphasis on decoupling.Clean and Testable Code
DDD naturally leads to clean, well-structured code. By isolating business logic in the domain layer, it becomes easier to write focused, high-quality unit and acceptance tests. You can test your core business rules without needing to spin up a database or a web server. This leads to more reliable applications and a more confident development team. Symfony’s testing tools like PHPUnit and Behat integrate perfectly with this approach.Alignment with Business Goals
At its core, DDD ensures that the software you’re building delivers tangible business value. By prioritizing the core domain and working in close collaboration with domain experts, you’re building a solution that is closely aligned with the business’s needs and challenges. The result is a high-quality, long-lasting application that can easily adapt to evolving business requirements.
In a world of increasing application complexity, DDD provides a structured, thoughtful way to build software that is not only technically sound but also strategically aligned with the business it serves.
The Domain-Driven Symfony Blueprint: Video Processing Context
This structure enforces a strict separation between the core business logic (Domain), the use cases (Application), and external details like the database or third-party tools (Infrastructure).
Core App Structure
.
├── src/
│ ├── Domain/
│ │ └── Video/
│ │ ├── Model/
│ │ │ ├── Video.php # The Aggregate Root (contains business rules)
│ │ │ └── ValueObject/
│ │ │ ├── VideoId.php
│ │ │ └── VideoStatus.php
│ │ ├── Repository/
│ │ │ └── VideoRepositoryInterface.php # The Interface for persistence
│ │ └── Service/
│ │ └── VideoChunkingService.php # Splits the file (stateless logic)
│ │
│ ├── Application/
│ │ ├── Command/
│ │ │ ├── ProcessVideoChunk.php # Command: Intention to process a chunk
│ │ │ └── CombineVideoChunks.php # Command: Intention to finalize the video
│ │ ├── Event/
│ │ │ └── VideoChunkProcessedEvent.php # Event: Fact that a chunk is done
│ │ └── Handler/
│ │ ├── ProcessVideoChunkHandler.php # Handles the ProcessVideoChunk command
│ │ ├── CombineVideoChunksHandler.php # Handles the CombineVideoChunks command
│ │ └── Reactor/
│ │ └── ChunkTrackerReactor.php # Listens to VideoChunkProcessedEvent
│ │
│ ├── Infrastructure/
│ │ ├── Persistence/
│ │ │ └── Doctrine/
│ │ │ └── DoctrineVideoRepository.php # Implements VideoRepositoryInterface
│ │ └── External/
│ │ └── FFMpeg/
│ │ └── FFMpegChunkingTool.php # Concrete implementation of video splitting/combining
│ │
│ └── Controller/
│ └── VideoUploadController.php # Entry point - Dispatches the initial command
└── config/
└── packages/
└── messenger.yaml # Routing Commands/Events
Component Responsibilities (The DDD Mapping)
The Domain Layer (src/Domain/Video)
This is the heart of the application and must not have any dependencies on Symfony, Doctrine, or external libraries.
- Video.php: Aggregate Root Encapsulates the complete state of a video. It contains methods like markChunkAsProcessed(ChunkId) and isReadyForCombination(). All business rules regarding the video lifecycle live here.
- VideoId.php: Value Object A wrapper around a UUID or integer to ensure the ID is always valid and self-describing.
- VideoRepositoryInterface.php: Repository Defines the contract for fetching and saving Video aggregates. Crucial for decoupling the domain from the database technology.
- VideoChunkingService.php: Domain Service Contains the business rule logic for how a video should be broken down (e.g., based on length, resolution, etc.), but delegates the actual technical work to the Infrastructure layer.
The Application Layer (src/Application)
This layer orchestrates the use cases and defines the flow of the asynchronous process. It manages the lifecycle using Commands and Events via Symfony Messenger.
- Command/*: Command. Represents the intent of the user or system (e.g., “Please process this chunk”). Dispatched by Controllers or other Handlers.
- Event/*: Domain Event. Represents a fact that has occurred (e.g., “This chunk is now processed”). Dispatched by a Handler after the aggregate state changes.
- Handler/*: Command Handler. The entry point for a use case. It loads the Video Aggregate via the Repository and calls a behavior method on it. It acts as the transaction boundary.
- Handler/Reactor/ChunkTrackerReactor.php: Event Listener / Reactor Listens to the VideoChunkProcessedEvent. It uses the VideoRepository to check the aggregate state and, if ready, dispatches the next Command (CombineVideoChunks).
The Infrastructure Layer (src/Infrastructure)
This layer deals with all the technical details: persistence, external APIs, and file system operations.
- DoctrineVideoRepository.php: Implementation of VideoRepositoryInterface using Doctrine ORM. Translates between the Domain’s Video object and Doctrine’s database mapping (e.g., XML/Attributes).
- FFMpegChunkingTool.php: Concrete implementation of the technical splitting and combining operations using the FFmpeg binary. A technical service that implements the contracts needed by the Domain or Application services (like VideoChunkingService).
What You Have to Change
Stop putting logic in Handlers: Business rules like “increment the chunk count” or “check if all chunks are done” must be moved from the ProcessVideoChunkHandler into the Video Aggregate root.
Define clear Interfaces: Create the VideoRepositoryInterface first. Your infrastructure class (DoctrineVideoRepository) must implement this contract. This allows you to swap out Doctrine for Redis or something else without touching your Application or Domain code.
Separate Command and Event Flow: Explicitly configure two flows in messenger.yaml:
- Command Bus: For synchronous commands (rarely used for heavy tasks) or internal commands.
- Event Bus: Dedicated solely to dispatching VideoChunkProcessedEvent and other Domain Events. This makes monitoring the system’s reaction to facts much clearer.
This structure gives us a robust foundation where the complexities of asynchronous processing are isolated, and the heart of our application — the Video domain — is clean and protected.
The Domain Layer
These files define the core entities, value objects, and contracts of our domain. They are technology-agnostic.
namespace App\Domain\Video\Model;
use App\Domain\Video\ValueObject\VideoId;
use App\Domain\Video\ValueObject\VideoStatus;
use App\Application\Event\VideoChunkProcessedEvent;
// The Aggregate Root. It manages its own state and records Domain Events.
class Video
{
private VideoId $id;
private string $path; // Source file path
private VideoStatus $status;
private int $totalChunks;
private int $processedChunks;
private array $chunkPaths = [];
private array $recordedEvents = []; // DDD: Event recorder pattern
private function __construct(VideoId $id, string $path, int $totalChunks, array $chunkPaths)
{
$this->id = $id;
$this->path = $path;
$this->status = VideoStatus::CHUNKING;
$this->totalChunks = $totalChunks;
$this->processedChunks = 0;
$this->chunkPaths = $chunkPaths;
}
public static function create(VideoId $id, string $path, int $totalChunks, array $chunkPaths): self
{
return new self($id, $path, $totalChunks, $chunkPaths);
}
// --- BEHAVIOR METHODS (Business Logic) ---
public function startProcessing(): void
{
if ($this->status !== VideoStatus::CHUNKING) {
throw new \LogicException('Cannot start processing. Video is in status: ' . $this->status->value);
}
$this->status = VideoStatus::PROCESSING;
}
public function markChunkAsProcessed(string $chunkPath): void
{
if ($this->status !== VideoStatus::PROCESSING) {
throw new \LogicException('Video is not being processed.');
}
// Domain Rule: Ensure this chunk belongs to this video
if (!in_array($chunkPath, $this->chunkPaths)) {
throw new \InvalidArgumentException('Chunk does not belong to this video.');
}
$this->processedChunks++;
// Record the fact as a Domain Event
$this->recordEvent(new VideoChunkProcessedEvent($this->id, $chunkPath));
}
public function startCombining(): void
{
if (!$this->isReadyForCombination()) {
throw new \LogicException('Not all chunks are processed yet.');
}
$this->status = VideoStatus::COMBINING;
}
public function isReadyForCombination(): bool
{
return $this->processedChunks === $this->totalChunks;
}
// --- EVENT RECORDING ---
private function recordEvent(object $event): void
{
$this->recordedEvents[] = $event;
}
public function releaseEvents(): array
{
$events = $this->recordedEvents;
$this->recordedEvents = [];
return $events;
}
// --- GETTERS ---
public function getId(): VideoId
{
return $this->id;
}
public function getPath(): string
{
return $this->path;
}
public function getStatus(): VideoStatus
{
return $this->status;
}
public function getTotalChunks(): int
{
return $this->totalChunks;
}
public function getChunkPaths(): array
{
return $this->chunkPaths;
}
}
namespace App\Domain\Video\ValueObject;
use Symfony\Component\Uid\Uuid;
// Represents a unique identifier for a Video aggregate.
// Using a simple wrapper around a string (UUID).
final class VideoId
{
private string $id;
private function __construct(string $id)
{
if (!Symfony\Component\Uid\Uuid::isValid($id)) {
throw new \InvalidArgumentException("Invalid VideoId format.");
}
$this->id = $id;
}
public static function fromString(string $id): self
{
return new self($id);
}
public function __toString(): string
{
return $this->id;
}
public function equals(VideoId $other): bool
{
return $this->id === $other->id;
}
}
namespace App\Domain\Video\ValueObject;
// Defines the allowed states for a Video lifecycle.
enum VideoStatus: string
{
case UPLOADED = 'uploaded';
case CHUNKING = 'chunking';
case PROCESSING = 'processing';
case CHUNKS_PROCESSED = 'chunks_processed';
case COMBINING = 'combining';
case COMPLETED = 'completed';
case FAILED = 'failed';
}
namespace App\Domain\Video\Repository;
use App\Domain\Video\Model\Video;
use App\Domain\Video\ValueObject\VideoId;
// The contract for persistence. Infrastructure must implement this.
interface VideoRepositoryInterface
{
public function get(VideoId $id): Video;
public function save(Video $video): void;
}
namespace App\Domain\Video\Service;
use App\Domain\Video\Model\Video;
// This service contains the business logic for deciding how to chunk a video.
// It is stateless and does not execute the technical splitting (that's Infrastructure's job).
final class VideoChunkingService
{
private const DEFAULT_CHUNK_COUNT = 10;
// This method decides how many chunks are needed based on video properties (not shown).
// It returns the intended chunk breakdown.
public function determineChunkBreakdown(Video $video): array
{
// For simplicity, we just return a fixed number of chunk IDs/paths.
$chunkPaths = [];
for ($i = 1; $i <= self::DEFAULT_CHUNK_COUNT; $i++) {
$chunkPaths[] = $video->getPath() . '.chunk_' . $i . '.temp';
}
return $chunkPaths;
}
}
Application Layer (Use Cases and Flow)
This layer contains the Commands, Events, and the Handlers/Reactors that orchestrate the process.
namespace App\Application\Command;
use App\Domain\Video\ValueObject\VideoId;
// Command: The intention to process a single video chunk.
final class ProcessVideoChunk
{
public function __construct(
public readonly VideoId $videoId,
public readonly string $chunkPath
) {}
}
namespace App\Application\Command;
use App\Domain\Video\ValueObject\VideoId;
// Command: The intention to combine all processed chunks.
final class CombineVideoChunks
{
public function __construct(
public readonly VideoId $videoId
) {}
}
namespace App\Application\Event;
use App\Domain\Video\ValueObject\VideoId;
// Domain Event: A statement of fact that a chunk finished processing.
final class VideoChunkProcessedEvent
{
public function __construct(
public readonly VideoId $videoId,
public readonly string $chunkPath
) {}
}
namespace App\Application\Handler;
use App\Application\Command\ProcessVideoChunk;
use App\Domain\Video\Repository\VideoRepositoryInterface;
use App\Infrastructure\External\FFMpeg\FFMpegChunkingTool;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
// Handler for the ProcessVideoChunk Command. This is an Application Service.
#[AsMessageHandler]
final class ProcessVideoChunkHandler
{
public function __construct(
private readonly VideoRepositoryInterface $videoRepository,
private readonly FFMpegChunkingTool $ffmpegTool,
private readonly MessageBusInterface $eventBus // Use a dedicated bus for events
) {}
public function __invoke(ProcessVideoChunk $command): void
{
// 1. Load the Aggregate
$video = $this->videoRepository->get($command->videoId);
// 2. Perform the technical task (Infrastructure Layer)
// This is where the 10x faster encoding/watermarking happens
$this->ffmpegTool->processChunk($command->chunkPath);
// 3. Delegate the business logic back to the Aggregate Root
$video->markChunkAsProcessed($command->chunkPath);
// 4. Persist the state change
$this->videoRepository->save($video);
// 5. Dispatch ALL recorded Domain Events
foreach ($video->releaseEvents() as $event) {
$this->eventBus->dispatch($event);
}
}
}
namespace App\Application\Handler;
use App\Application\Command\CombineVideoChunks;
use App\Domain\Video\Repository\VideoRepositoryInterface;
use App\Infrastructure\External\FFMpeg\FFMpegChunkingTool;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
// Handler for the CombineVideoChunks Command.
#[AsMessageHandler]
final class CombineVideoChunksHandler
{
public function __construct(
private readonly VideoRepositoryInterface $videoRepository,
private readonly FFMpegChunkingTool $ffmpegTool
) {}
public function __invoke(CombineVideoChunks $command): void
{
// 1. Load the Aggregate
$video = $this->videoRepository->get($command->videoId);
// 2. Perform the technical consolidation (Infrastructure Layer)
$outputFilePath = str_replace('.temp', '.final.mp4', $video->getPath());
$this->ffmpegTool->combineChunks($video->getChunkPaths(), $outputFilePath);
// Note: The Video Aggregate is not modified here as the final state is complex
// In a real app, Video would have a markAsCompleted() method, which we skip for brevity.
// 3. Clean up and finalize the status (e.g., mark as completed)
// $video->markAsCompleted($outputFilePath);
// $this->videoRepository->save($video);
}
}
namespace App\Application\Handler\Reactor;
use App\Application\Command\CombineVideoChunks;
use App\Application\Event\VideoChunkProcessedEvent;
use App\Domain\Video\Repository\VideoRepositoryInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
// Reactor: Listens to Domain Events and reacts by dispatching new Commands (orchestration).
// Note: In Symfony, this is often implemented using an Event Subscriber/Listener,
// but using the Messenger as the Event Bus is a clean alternative for async reaction.
#[AsMessageHandler]
final class ChunkTrackerReactor
{
public function __construct(
private readonly VideoRepositoryInterface $videoRepository,
private readonly MessageBusInterface $commandBus // Use a dedicated bus for commands
) {}
public function __invoke(VideoChunkProcessedEvent $event): void
{
// 1. Load the Aggregate to check the state
$video = $this->videoRepository->get($event->videoId);
if ($video->isReadyForCombination()) {
// 2. Update the state transition
$video->startCombining();
$this->videoRepository->save($video);
// 3. Dispatch the next Command to trigger the final step
$this->commandBus->dispatch(
new CombineVideoChunks($video->getId())
);
}
}
}
Infrastructure Layer (Technical Details)
This layer handles external systems like the database and FFmpeg.
namespace App\Infrastructure\Persistence\Doctrine;
use App\Domain\Video\Model\Video;
use App\Domain\Video\Repository\VideoRepositoryInterface;
use App\Domain\Video\ValueObject\VideoId;
// Infrastructure: Implements the Domain contract using Doctrine.
// This is a minimal mock for demonstration.
final class DoctrineVideoRepository implements VideoRepositoryInterface
{
private array $storage = []; // Mock storage
public function get(VideoId $id): Video
{
if (!isset($this->storage[(string) $id])) {
throw new \RuntimeException(sprintf("Video with ID %s not found.", $id));
}
// In a real application, Doctrine would hydrate the Video entity here.
return $this->storage[(string) $id];
}
public function save(Video $video): void
{
// In a real application, Doctrine's EntityManager would persist and flush here.
$this->storage[(string) $video->getId()] = $video;
// For debugging the flow
echo "LOG: Video Aggregate " . $video->getId() . " saved. Status: " . $video->getStatus()->value . "\n";
}
}
namespace App\Infrastructure\External\FFMpeg;
// Infrastructure: Handles the technical execution of FFmpeg commands.
final class FFMpegChunkingTool
{
// The FFMpeg binary path would be injected here.
private string $ffmpegPath = '/usr/bin/ffmpeg';
public function split(string $filePath, int $totalChunks): array
{
// Actual FFmpeg command to split the video would run here.
// Process is slow, but this is handled by the initial synchronous command dispatch.
echo "LOG: FFMpeg splitting video {$filePath} into {$totalChunks} chunks.\n";
$chunkPaths = [];
for ($i = 1; $i <= $totalChunks; $i++) {
$chunkPaths[] = $filePath . '.chunk_' . $i . '.temp';
}
return $chunkPaths;
}
public function processChunk(string $chunkPath): void
{
// Actual FFmpeg command for encoding, watermarking, etc. (the parallel heavy lifting).
// This runs in a worker process.
usleep(rand(50000, 500000)); // Simulate async work (50ms - 500ms)
echo "LOG: FFMpeg processed chunk: {$chunkPath}\n";
}
public function combineChunks(array $chunkPaths, string $outputFilePath): void
{
// Actual FFmpeg concat demuxer command (fast operation).
echo "LOG: FFMpeg combining " . count($chunkPaths) . " chunks into {$outputFilePath}\n";
}
}
While we are currently leveraging a mock repository for demonstration purposes, our application is architected for seamless persistence layer flexibility.
In a production environment, switching to a more robust data storage solution is straightforward and requires minimal code changes.
Thanks to the principles of Dependency Injection and Domain-Driven Design (DDD), our system’s core logic remains decoupled from the specific data store. To integrate a powerful ORM or ODM solution, such as Doctrine/ORM for relational databases or Doctrine/ODM for document databases, all we need to do is:
- Provide the relevant Doctrine service (e.g., EntityManager or DocumentManager) as a dependency.
- Pass this service into the constructor of our repository or aggregate root, allowing it to handle all data persistence operations.
This approach ensures our domain remains clean and free from infrastructure concerns, making the system highly adaptable and maintainable. This design pattern not only simplifies development but also makes it easy to scale or migrate your data persistence strategy in the future.
Entry Point (Controller)
The web entry point, which initiates the entire process by dispatching the initial command.
namespace App\Controller;
use App\Application\Command\ProcessVideoChunk;
use App\Domain\Video\Model\Video;
use App\Domain\Video\Repository\VideoRepositoryInterface;
use App\Domain\Video\Service\VideoChunkingService;
use App\Domain\Video\ValueObject\VideoId;
use App\Infrastructure\External\FFMpeg\FFMpegChunkingTool;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Attribute\Route;
use Ramsey\Uuid\Uuid;
final class VideoUploadController extends AbstractController
{
public function __construct(
private readonly MessageBusInterface $commandBus,
private readonly VideoRepositoryInterface $videoRepository,
private readonly VideoChunkingService $chunkingService,
private readonly FFMpegChunkingTool $ffmpegTool
) {}
#[Route('/video/upload', name: 'video_upload', methods: ['POST'])]
public function upload(): Response
{
// 1. Simulate file upload and path determination
$uploadedFilePath = '/tmp/uploads/' . Uuid::uuid4()->toString() . '.mp4';
$videoId = VideoId::fromString(Uuid::uuid4()->toString());
// 2. Domain Service determines the chunking breakdown
$dummyVideo = Video::create($videoId, $uploadedFilePath, 1, [$uploadedFilePath]);
$chunkPaths = $this->chunkingService->determineChunkBreakdown($dummyVideo);
$totalChunks = count($chunkPaths);
// 3. Create and save the initial Aggregate Root
$video = Video::create($videoId, $uploadedFilePath, $totalChunks, $chunkPaths);
$video->startProcessing(); // Transition state
$this->videoRepository->save($video);
// 4. Dispatch Commands for each chunk (this is the fast, synchronous part)
foreach ($chunkPaths as $chunkPath) {
$this->commandBus->dispatch(
new ProcessVideoChunk($videoId, $chunkPath)
);
}
return $this->json([
'status' => 'Video processing initiated.',
'videoId' => (string) $videoId,
'message' => 'The heavy lifting is offloaded to background workers.'
]);
}
}
In this example, we’re not uploading an actual file through the controller. However, it’s quite simple to adapt our example to handle real file uploads. This is a common practice in Domain-Driven Design (DDD), where we focus on the core business logic and use mocks or simplified inputs during development.
The current setup uses a simplified approach for demonstration, but our architecture is designed for seamless integration with real-world file handling. To enable actual file uploads, the changes required are minimal. The core of the system — the Domain Model — remains untouched. You would simply:
- Modify the Controller: Adjust the controller to accept a file upload request. Instead of receiving a simple data structure, it will handle a file object.
- Update the Command: Pass the file data (e.g., file path or stream) to the command that orchestrates the video processing pipeline.
- Integrate a Storage Service: Use a dedicated service to handle the file storage, whether it’s on a local disk, an S3 bucket, or another cloud storage provider. This separation of concerns ensures the domain remains clean and unaware of the infrastructure details.
By following this DDD approach, we maintain a clear separation between the application’s infrastructure (the controller and file storage) and its core business logic (the video processing). This design not only simplifies the development process but also ensures the system is scalable, robust, and easy to maintain in the long run.
Configuration
We configure the messenger.yaml file to define two separate logical buses: one for Commands (which should be handled synchronously or immediately) and one for Events (which are typically dispatched asynchronously).
framework:
messenger:
# Define the transport for asynchronous messages
transports:
async_command_bus:
dsn: '%env(MESSENGER_RABBITMQ_DSN)%'
async_event_bus:
dsn: '%env(MESSENGER_RABBITMQ_DSN)%'
# Define custom buses for clear separation
buses:
command.bus:
# Default to synchronous dispatch if no transport is configured
middleware:
- 'App\Infrastructure\Messenger\CommandLoggerMiddleware'
event.bus:
# Default to asynchronous dispatch for all Domain Events
middleware:
- 'App\Infrastructure\Messenger\EventLoggerMiddleware'
# Routing: Messages are routed to their appropriate bus/transport
routing:
# Commands are routed to the command bus, which is asynchronous
'App\Application\Command\ProcessVideoChunk': async_command_bus
'App\Application\Command\CombineVideoChunks': async_command_bus
# Events are routed to the event bus, which is also asynchronous
'App\Application\Event\VideoChunkProcessedEvent': async_event_bus
When architecting scalable and maintainable systems for video processing with Domain-Driven Design (DDD), message visibility and reliability are paramount. To address this, we’re implementing a robust logging strategy using a middleware pattern.
To ensure our codebase adheres to the DRY (Don’t Repeat Yourself) principle, we’ve designed an elegant solution:
- First, we’ll define a foundational abstract class, AbstractLoggerMiddleware. This class will contain the common logging logic, ensuring consistency and preventing code duplication.
- Next, we will extend this base class to create two concrete, domain-specific classes: CommandLoggerMiddleware and EventLoggerMiddleware.
This structured approach allows us to efficiently log all inbound commands and outbound messages within our video processing pipeline, providing critical insights for debugging and performance monitoring. By implementing this clean, reusable architecture, we enhance our system’s observability and future-proof its maintainability.
namespace App\Infrastructure\Messenger;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
// Base class to provide shared logging functionality for both Commands and Events.
abstract class AbstractLoggerMiddleware implements MiddlewareInterface
{
public function __construct(
protected readonly LoggerInterface $logger
) {}
// Subclasses MUST implement this method to specify the type being logged (e.g., 'Command' or 'Domain Event').
abstract protected function getLogType(): string;
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$message = $envelope->getMessage();
$logType = $this->getLogType();
// 1. Log the incoming message before processing
$this->logger->info(
sprintf('Dispatching %s: %s', $logType, $message::class),
[
'message_data' => $message,
'stamps' => array_keys($envelope->all()),
]
);
// 2. Pass the envelope to the next middleware in the stack or the handler
$envelope = $stack->next()->handle($envelope, $stack);
// 3. Log success after processing
$this->logger->info(
sprintf('%s handled successfully: %s', $logType, $message::class)
);
return $envelope;
}
}
namespace App\Infrastructure\Messenger;
// Middleware to log all commands dispatched through the command bus.
// It extends the abstract logger for reuse and only specifies the log type.
final class CommandLoggerMiddleware extends AbstractLoggerMiddleware
{
// Override the required method to specify the type for logging
protected function getLogType(): string
{
return 'Command';
}
}
namespace App\Infrastructure\Messenger;
// Middleware to log all Domain Events dispatched through the event bus.
// It extends the abstract logger for reuse and only specifies the log type.
final class EventLoggerMiddleware extends AbstractLoggerMiddleware
{
/**
* @return string The log type identifier for the abstract logger.
*/
protected function getLogType(): string
{
return 'Domain Event';
}
}
Conclusion
While Symfony Messenger provided the essential queues and workers, it was the application of Domain-Driven Design (DDD) that transformed a collection of technical handlers into a strategic, resilient system built for the long haul.
Our refactoring didn’t just move code; it established a clear Bounded Context for “Video Processing.” This crucial step allowed us to define a ubiquitous language where concepts like the Video Aggregate Root and the VideoChunkProcessedEvent are the authoritative source of truth.
The code no longer describes how to save data, but what the business is doing.
The Video aggregate, now rich with behavior, guards its own state, preventing impossible transitions and ensuring data integrity regardless of how many parallel workers are running.
This discipline — forcing all changes through the aggregate’s methods — is the bedrock of reliable asynchronous processing.
Perhaps the most significant victory lies in the decoupling achieved across the layers. Our Application Handlers are now beautifully thin, acting purely as orchestrators that load the aggregate, trigger the business behavior, and dispatch the resulting Domain Events.
They are no longer bogged down by the concerns of Infrastructure, which we successfully isolated in files like the DoctrineVideoRepository and the FFMpegChunkingTool.
This separation means when your team decides to swap RabbitMQ for AWS SQS, or move from Doctrine to a document database, only the Infrastructure layer needs modification. The core business logic in the Domain layer remains untouched.
Furthermore, the specialized Logger Middlewares we created demonstrate how technical concerns are cleanly intercepted, providing observability without polluting the critical path.
The introduction of dedicated Commands (intentions) and Domain Events (facts) also delivers immense long-term value. This explicit messaging strategy creates a reactive system. The moment a VideoChunkProcessedEvent is fired, our ChunkTrackerReactor listens and decides the next steps — an example of a clear, single-purpose Domain Service driving the flow.
This pattern future-proofs the application. Imagine needing to add a new requirement, like running AI-based content analysis only after the video is combined. You don’t alter the existing process; you simply create a new Reactor that listens to the final VideoCompletedEvent and dispatches a new command, say, StartAiAnalysisCommand. The system extends effortlessly without requiring intrusive changes.
Ultimately, by embracing DDD, we haven’t just made video processing fast; we’ve made the codebase predictable, scalable, and a precise reflection of the business requirements.
This strategic architecture ensures that the efficiency gained today isn’t eroded by complexity tomorrow. As you scale to handle ten thousand concurrent videos, the structure we’ve implemented guarantees that your application remains flexible, understandable, and highly performant.
It’s an investment that pays continuous dividends.
Continuing the Conversation
I’m keen to help you continue building out this application. What would you like to explore next?
- Security and Authorization: Where should we place the logic to check if a user is allowed to process a video chunk?
- Testing Strategy: How can we write isolated tests for the Video Aggregate Root without needing Doctrine or Symfony Messenger?
- Full Entity Mapping: Generate the Doctrine mapping for the Video Aggregate Root.
I’d love to hear your thoughts in comments!
Stay tuned — and let’s keep the conversation going.
Top comments (0)