In the article “Turbocharging AI Agents with Symfony’s Async Approach” we significantly sped up our AI Agent by using an asynchronous, message-bus, and CQRS-based approach.
We’re now ready to take the next step: making our application completely autonomous from human intervention for polling incoming information channels. This opens up a wealth of exciting future development opportunities.
Enhancing State Management and Scalability
In our previous guide, “Crafting Your Own AI Agent with Symfony: A 11-Minute Guide” we polled a mailbox without any state persistence, which could lead to reprocessing emails. That example was intentionally simple. Now, we’re upgrading our application and introducing state management without a database (Doctrine Component). This enhancement will allow us to run multiple parallel instances of our application, each with multi-threaded consumers. This approach not only improves hardware resource utilization but also ensures the seamless, reliable processing of incoming data.
We could, of course, retrieve only UNSEEN emails from a mailbox. However, this approach has several limitations. For example, another bot or human might have accessed the mailbox, or a previous error might have prevented successful processing. Moreover, this method is only suitable for IMAP and not for messages from platforms like messengers or SMS providers.
To overcome these limitations, we’ll start by learning how to store the state of the last successful polling of our incoming message channels. While we could use a full-fledged database for this, we’ll avoid it for now since we don’t need that level of complexity and performance gain just yet. This approach will allow us to maintain state efficiently without the overhead of a traditional database, making our application more flexible and scalable.
We can store our application’s current state in a file on the system. However, if we want to use a stateless approach for deploying application instances, we’ll lose the latest state with every container restart. While this might not be critical, it’s not an ideal solution. Additionally, with a file-based approach, multiple application instances will face significant issues with concurrent access and file locking.
The Redis Solution
So what’s the solution? In the article “Turbocharging AI Agents with Symfony’s Async Approach” we already integrated Redis for the transport layer of our asynchronous application. Why not use Redis to store our state as well? It’s a robust, distributed key-value store that’s perfect for this use case, allowing us to maintain a shared state across multiple instances without the file-locking problems. This approach ensures our application remains both scalable and stateless, providing a clean and efficient architecture.
Using Dedicated Cache Pools in Symfony
We can leverage the Symfony Cache Component to store data. However, it’s crucial that this data isn’t deleted when the standard cache:clear command is executed.
This is where unique cache pools come in. They are a core feature of the Symfony Cache Component that allows us to create separate, isolated storage areas. By configuring a dedicated pool for our state data, we ensure that it remains persistent and is not affected by the clearing of the application’s primary cache. This approach provides a clean and reliable way to manage persistent application state.
Installing the Symfony Cache Component
First, let’s install the Symfony Cache Component and its dependencies. We can do this easily using Composer. Run the following command in your terminal:
composer require symfony/cache
This command will install the necessary packages and integrate the component into your Symfony application, allowing us to start implementing our persistent cache pools.
We’ll add environment variable to our .env file to connect to the Redis Cache Database.
REDIS_DSN_CACHE=redis://localhost:6379/11
/11: This is a crucial part of the DSN. It specifies that the connection should use database number 11. Redis supports 16 databases (numbered 0–15) by default, which allows you to logically separate your data within the same Redis instance.
For this example, we’ll make some changes in config/packages/cache.yaml file:
framework:
cache:
app: cache.adapter.redis
default_redis_provider: '%env(REDIS_DSN_CACHE)%'
pools:
inbound.state.cache:
public: true
We’ve now successfully added a dedicated cache pool to store the state of our data-fetching operations from external sources. This is a crucial step towards building a truly autonomous application.
Why a Separate Cache Pool Matters
By creating a unique cache pool, we’ve achieved two key goals:
Persistence: The data in this pool will not be cleared when you run bin/console cache:clear. This ensures that our application will always remember the state of the last successful operation, even after a standard cache reset or a new deployment.
Isolation: This pool is a self-contained unit, isolated from the rest of the application’s cache. This prevents any other part of the system from accidentally modifying or deleting our state data, guaranteeing its integrity.
This approach is highly effective for building stateless, scalable applications. By storing the state in a shared, external system like Redis, multiple instances of our application can access and update the same state information seamlessly. This eliminates the file-locking issues we discussed earlier and enables parallel processing, significantly improving performance and hardware utilization.
Clearing the Cache Pool
Simply run the following command in your terminal:
./bin/console cache:pool:clear inbound.state.cache
This command specifically targets and clears the inbound.state.cache pool, leaving all other cache data untouched. It’s a useful tool for debugging or for a full application state reset when necessary, giving you full control over your cache management.
Abstracting the Channel State
Let’s create a DTO (Data Transfer Object) to manage the state of our inbound information channels. To prepare for future expansion and different types of channels, we’ll design a flexible and scalable structure.
First, we’ll create an abstract base class, ChannelState.php. This class will define the common properties that all our channels will share, such as the message ID and the date of creation or reception.
This abstract class will act as a blueprint, ensuring that every new channel we add in the future will have these essential parameters, creating a consistent data structure across our application.
namespace App\DTO\State;
abstract class ChannelState {
public function __construct(
protected ?string $lastMessageKey = null,
protected ?\DateTimeImmutable $lastMessageDate = null
)
{}
public function getLastMessageKey(): ?string
{
return $this->lastMessageKey;
}
public function setLastMessageKey(?string $lastMessageKey): void
{
$this->lastMessageKey = $lastMessageKey;
}
public function getLastMessageDate(): ?\DateTimeImmutable
{
return $this->lastMessageDate;
}
public function setLastMessageDate(?\DateTimeImmutable $lastMessageDate): void
{
$this->lastMessageDate = $lastMessageDate;
}
}
Implementing a Specific Channel
Next, we’ll extend this base class to create a concrete DTO for our email channel, MailChannelState.php. This class will inherit the common properties and add specific ones, such as the mailbox name and folder, which are unique to email processing.
namespace App\DTO\State\Channel;
use App\DTO\State\ChannelState;
class MailChannelState extends ChannelState
{
public function __construct(
protected ?string $lastMessageKey = null,
protected ?\DateTimeImmutable $lastMessageDate = null,
protected ?string $email = null,
protected ?string $folder = null,
)
{
parent::__construct($lastMessageKey, $lastMessageDate);
}
public function getEmail(): ?string
{
return $this->email;
}
public function setEmail(?string $email): void
{
$this->email = $email;
}
public function getFolder(): ?string
{
return $this->folder;
}
public function setFolder(?string $folder): void
{
$this->folder = $folder;
}
}
This approach allows us to easily add new channel types in the future (e.g., for SMS or messenger platforms) by simply creating new classes that extend our ChannelState abstract class. This design promotes code reuse and maintains a clean, organized architecture.
Creating the ChannelStateCacheService
We need to create a service to handle all our interactions with the cache pool. This service will be responsible for saving and retrieving state data from the cache, while intelligently handling the specific class type and additional parameters of our ChannelState implementations.
namespace App\Service;
use App\DTO\State\Channel\MailChannelState;
use App\DTO\State\ChannelState;
use Symfony\Contracts\Cache\CacheInterface;
readonly class ChannelStateService{
private const string STATE_CACHE_KEY_PREFIX = 'state_';
public function __construct(private CacheInterface $inboundStateCache){}
protected function getCacheKey(ChannelState $state): string
{
$reflection = new \ReflectionClass($state);
return self::STATE_CACHE_KEY_PREFIX.$reflection->getShortName().'_'.
hash('sha256',
match(get_class($state)){
MailChannelState::class => $state->getEmail() . $state->getFolder(),
default => ''
});
}
public function setChannelState(ChannelState $state): bool{
$cacheItem = $this->inboundStateCache->getItem($this->getCacheKey($state));
$cacheItem->set($state);
$this->inboundStateCache->save($cacheItem);
return true;
}
public function getChannelState(ChannelState $state): null|ChannelState{
$cacheItem = $this->inboundStateCache->getItem($this->getCacheKey($state));
if ($cacheItem->isHit()) {
return $cacheItem->get();
}
return null;
}
}
By centralizing this logic in a single service, we make our code cleaner, more manageable, and easier to test. This also abstracts the underlying cache implementation, so if we ever decide to switch from Redis to something else (like Memcached), we only need to update the configuration and this single service, without changing the rest of our application’s code.
Why Use a Hash?
A hash is a useful tool for generating a checksum from a list of parameters to create a unique cache key.
When we have multiple parameters that determine a unique cache key (for example, a mailbox name, folder, and an offset), simply concatenating them can be messy and lead to very long, unwieldy keys. A hash function takes all these parameters as input and produces a fixed-length string of characters, or a digest.
This digest acts as a unique fingerprint for that specific set of parameters. Even a minor change in any of the input values will result in a completely different hash, which is perfect for ensuring cache key uniqueness.
Step 1: Update the ImapMailService
Now we just need to update the ImapMailService to make its properties and methods protected. This will allow us to inherit from it and create a new service, ImapMailStateService, which will handle the logic for saving and retrieving the current state.
First, open ImapMailService.php and change the visibility of the properties and methods you want to be accessible to child classes from private to protected. This makes it possible for the new ImapMailStateService to interact with them directly.
Step 2: Create the ImapMailStateService
Next, create the new service ImapMailStateService.php and make it extend the ImapMailService you just modified. This new class will contain the logic for saving and retrieving the state using our previously created ChannelStateCacheService.
This is an example of the Decorator Pattern, where you extend a class’s functionality without changing its core behavior. Our ImapMailStateService will inherit all the original mail-fetching capabilities and simply “decorate” them with state management logic.
By doing this, we create a clean separation of concerns. The base ImapMailService remains focused on its primary task of interacting with IMAP, while the new service handles the state persistence. This makes the code more modular, easier to maintain, and more flexible for future development.
<?php
namespace App\Service;
use App\DTO\DataCollection;
use App\DTO\MailMessage;
use App\DTO\State\Channel\MailChannelState;
use App\DTO\State\ChannelState;
class ImapMailStateService extends ImapMailService
{
public function __construct(
private ChannelStateService $channelStateService,
protected string $host,
protected string $username,
protected string $password,
protected string $mailbox = 'INBOX',
)
{
parent::__construct($this->host, $this->username, $this->password, $this->mailbox);
}
public function fetchEmails(int $limit = 10): DataCollection
{
$collection = new DataCollection();
$connection = $this->connect();
/** @var null|MailChannelState $lastState */
$lastState = $this->channelStateService->getChannelState(new MailChannelState());
$criteria = SORTARRIVAL;
if (($lastState instanceof MailChannelState) && ($lastState->getLastMessageDate() instanceof \DateTimeImmutable)) {
$criteria = ' SINCE "' . $lastState->getLastMessageDate()->format('d-M-Y') . '"';
}
$emails = imap_sort($connection, $criteria, 1, SE_UID);
if (!$emails) {
return $collection;
}
$emails = array_slice($emails, 0, $limit);
$stateMailMessage = null;
foreach ($emails as $emailUid) {
$overview = imap_fetch_overview($connection, $emailUid, FT_UID);
$headers = imap_fetchheader($connection, $emailUid, FT_UID);
if (is_null($stateMailMessage)){
$stateMailMessage = new MailMessage(
$overview[0]->subject,
$overview[0]->from,
$overview[0]->to,
null,
$overview[0]->date,
$emailUid);
}
if (
$this->isCurrentMailboxState($lastState)
&&
(
$lastState->getLastMessageKey() === $emailUid
||
$lastState->getLastMessageDate()<=$overview[0]->date
)
) {
continue;
}
if ($this->isTextContentType($headers)) {
$body = imap_body($connection, $emailUid, FT_UID);
$mailMessage = new MailMessage(
$overview[0]->subject,
$overview[0]->from,
$overview[0]->to,
$body,
$overview[0]->date,
$emailUid
);
$collection->add($mailMessage);
}
}
imap_close($connection);
if ($stateMailMessage instanceof MailMessage) {
$this->saveCurrentState($stateMailMessage);
}
return $collection;
}
protected function saveCurrentState(MailMessage $message, ?ChannelState $currentState = null): void{
if (
$this->isCurrentMailboxState($currentState)
&&
($currentState->getLastMessageKey() <= $message->getId()
||
$currentState->getLastMessageDate() <= new \DateTime($message->getDate())
)
) {
return;
}
$channelState = new MailChannelState(
$message->getId(),
new \DateTimeImmutable($message->getDate()),
$this->username,
$this->mailbox
);
$this->channelStateService->setChannelState($channelState);
}
protected function isCurrentMailboxState(?ChannelState $state = null): bool{
return $state instanceof MailChannelState
&&
$state->getEmail()===$this->getUsername()
&&
$state->getFolder()===$this->getMailbox();
}
}
}
services:
...
App\Service\ImapMailStateService:
arguments:
$host: '%env(IMAP_HOST)%'
$username: '%env(IMAP_USERNAME)%'
$password: '%env(IMAP_PASSWORD)%'
With the core logic for state management and future scalability in place, we can now move on to the next crucial step: scheduling our application to check for new messages. Since we don’t have push notifications or incoming webhooks yet, we’ll need to initiate the checks ourselves.
Introducing the Symfony Scheduler Component
It provides a robust, built-in solution for running tasks at specific intervals. It is far more reliable and versatile than a simple crontab and integrates seamlessly with Symfony’s Messenger and dependency injection systems.
Using the Scheduler, we can:
- Define our task: We’ll create a task that runs our ImapMailStateService to check for new emails.
- Set a schedule: We can specify exactly when and how often this task should run (e.g., every minute, every 5 minutes, or on a specific day of the week).
- Decouple the process: The Scheduler will handle the execution of our task as a background process, ensuring our main application remains responsive.
Let’s install the Symfony Scheduler Component and its dependencies. We can do this easily using Composer. Run the following command in your terminal:
composer require symfony/scheduler
By implementing the Scheduler, we’re taking a significant step toward making our application truly autonomous, as it will now be able to poll for new messages without any manual intervention.
Cron Expression Triggers
We’ll use cron expressions to define our schedule. They provide a powerful and flexible way to specify recurring time intervals. Before we can use them, we need to install the necessary dependency.
composer require dragonmantank/cron-expression
This command will install the component, giving us the tools to define complex schedules for our application’s tasks, such as polling for new emails every minute.
Creating the Scheduler Message
Now, let’s create the message that the scheduler will use to trigger our email-fetching process. This is a crucial step in connecting our scheduler to our application’s business logic.
We’ll create a simple class named CheckInboundChannelsMessage. This message class doesn’t need to contain any data; its sole purpose is to serve as a signal for our message bus that a scheduled task needs to be executed.
Here’s the code for the message:
namespace App\Message\Schedule;
class CheckInboundChannelsMessage
{
}
This simple class acts as the “trigger” for our automated process. In the next steps, we’ll configure the Symfony Scheduler to dispatch this message at a specific interval CheckInboundChannelsProvider.php (e.g., using a cron expression). We will then create a handler CheckInboundChannelsMessageHandler.php that “listens” for this message and executes our ImapMailStateService to fetch new mail, completing the automated workflow.
namespace App\Scheduler;
use App\Message\Schedule\CheckInboundChannelsMessage;
use Symfony\Component\Scheduler\Attribute\AsSchedule;
use Symfony\Component\Scheduler\RecurringMessage;
use Symfony\Component\Scheduler\ScheduleProviderInterface;
use Symfony\Component\Scheduler\Schedule;
#[AsSchedule('default')]
class CheckInboundChannelsProvider implements ScheduleProviderInterface
{
public function getSchedule(): Schedule
{
return (new Schedule())
->add(
// This task will be triggered every day by UTC
RecurringMessage::cron('@daily', new CheckInboundChannelsMessage(),new \DateTimeZone('UTC'))
);
}
}
namespace App\Handler\Schedule;
use App\Message\Command\AIAgentSummarizeMessage;
use App\Message\Schedule\CheckInboundChannelsMessage;
use App\Service\ImapMailService;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
#[AsMessageHandler]
readonly class CheckInboundChannelsMessageHandler
{
public function __construct(
private ImapMailStateService $imapMailStateService,
private MessageBusInterface $messageBus,
)
{
}
public function __invoke(CheckInboundChannelsMessage $checkInboundChannelsMessage){
$emailCollection = $this->imapMailStateService->fetchEmails(2);
$this->messageBus->dispatch(
new Envelope(
new AIAgentSummarizeMessage(
$emailCollection,
'I have emails. Please summarize them into a concise overview (100-150 words) focusing on key decisions, action items, and deadlines. Use bullet points to organize the summary by email or theme, whichever is clearer. Here’s the email content:'
)
)
);
}
}
Now that all the necessary components are in place, we can launch our Symfony Messenger consumer. This single command will start the engine for our entire automated system.
Running the Consumer
The consumer will listen for messages from all configured transports, including a new one called scheduler_default. This scheduler_default transport is created automatically by Symfony thanks to the #[AsSchedule] attribute, which tells the framework to set up a dedicated pipeline for scheduled messages.
To start the consumer and have it listen to our scheduled tasks, simply run the following command in your terminal:
./bin/console messenger:consume --all -vv
Once this consumer is running, it will automatically receive and process the CheckInboundChannelsMessage every time the scheduler dispatches it according to your cron expression.
Your application is now fully autonomous, capable of checking for new messages on its own without any manual intervention.
Conclusion
By building on our asynchronous foundation, we have successfully transformed our AI agent from a manually-triggered tool into a fully autonomous, self-managing application.
By using Redis and a dedicated Symfony cache pool, we created a reliable system for storing the last successful operation state. This method ensures that our application can be deployed in a stateless manner, and that multiple instances can run in parallel without the risk of data loss or file-locking conflicts.
By leveraging Symfony Scheduler and Symfony Messenger components, we were able to detach the message-gathering process from manual triggers.
Our application now operates completely on its own, maximizing hardware utilization and ensuring that no incoming message is ever missed. This robust, event-driven architecture is a powerful blueprint for building intelligent, scalable, and fully autonomous applications.
All that’s left is to containerize our application into a Docker image and launch as many instances as we need, but we’ll cover that in future articles.
Stay tuned — and let’s keep the conversation going.
Top comments (0)