DEV Community

Danil Khaliullin
Danil Khaliullin

Posted on

Asynchronous state machine with Symfony Workflows

A finite-state machine is an abstract machine that can be in exactly one of states. The Symfony Workflow component enables the creation and management of state machines. Let’s explore how it can prove extremely beneficial in constructing complex business logic.


Use-case

Let’s imagine the workflow: we initiate the creation of an order, sent it to the order service, send it to the user’s email, and mark it as “sent” in the database.

All these actions should be executed successfully, but there are potential points of failure, such as invalid order data during creation, failures in the order service, or issues with the vendor’s email provider.

Use-case example

Symfony workflow with retry logic allows to execute the complex business flow even if there are any errors during execution. Let’s examine main points that make the workflow fault tolerant and reliable:

  • any business logic flow is divided into state-machine transitions. Every transition is executed transactionally.
  • if the execution of a transition fails, we can retry it later by command or asynchronously.
  • every transition contains only business logic and doesn’t depend on the workflow implementation.

Let’s code!


Let’s code!

At first, we need to create a new Symfony workflow according to our workflow schema:

framework:
  workflows:
    order_send:
      type: state_machine
      supports:
        - App\Entity\WorkflowEntry
      marking_store:
        type: 'method'
        property: 'currentState'
      places:
        - initialised
        - verified
        - approved
        - sent_to_email
        - marked_as_sent
      transitions:
        verify_order:
          from: initialised
          to: verified
        approve_order:
          from: verified
          to: approved
        send_order_to_email:
          from: approved
          to: sent_to_email
        mark_order_as_sent:
          from: sent_to_email
          to: marked_as_sent
Enter fullscreen mode Exit fullscreen mode

So, what is the App\Entity\WorkflowEntry? This is an entity that contains all information about current workflow. Also, the App\Entity\WorkflowEntry keeps current and next states and stores business logic data:

<?php
declare(strict_types=1);

namespace App\Entity;

...

#[ORM\Entity(repositoryClass: WorkflowEntryRepository::class)]
class WorkflowEntry implements WorkflowInterface
{
    #[ORM\Id]
    #[ORM\Column(type: "uuid", unique: true)]
    #[ORM\GeneratedValue(strategy: "CUSTOM")]
    #[ORM\CustomIdGenerator(class: UuidGenerator::class)]
    private Uuid $id;

    #[ORM\Column(name: "current_state", type: "string")]
    private string $currentState = 'initialised';

    #[ORM\Column(name: "workflow_type", length: 32, enumType: WorkflowType::class, options: ["default" => "default"])]
    private WorkflowType $workflowType = WorkflowType::DefaultType;

    #[ORM\Column(name: "next_transition", type: "string", nullable: true)]
    private ?string $nextTransition = null;

    #[ORM\Column(type: "json")]
    private array $stamps = [];

    #[ORM\Column(enumType: WorkflowStatus::class, options: ["default" => "started"])]
    private WorkflowStatus $status = WorkflowStatus::Started;

    #[ORM\Column(type: "smallint")]
    private int $retries = 0;

    #[ORM\Column(name: "created_at", type: "datetime_immutable")]
    private \DateTimeImmutable $createdAt;

    #[ORM\Column(name: "updated_at", type: "datetime_immutable")]
    private \DateTimeImmutable $updatedAt;

    public function __construct()
    {
        $this->createdAt = new \DateTimeImmutable();
        $this->updatedAt = new \DateTimeImmutable();
    }

    public static function create(
        WorkflowType $type,
        string $nextTransition,
        array $stamps,
    ): WorkflowEntry {
        $entry = new WorkflowEntry();
        $entry->setWorkflowType($type);
        $entry->setNextTransition($nextTransition);
        $entry->setStamps($stamps);

        return $entry;
    }

    ...
}
Enter fullscreen mode Exit fullscreen mode

Business logic data might be stored in an “envelope” using stamps App\Service\Workflow\WorkflowStampInterface. For instance, we can store the order id in a stamp:

class OrderIdStamp implements WorkflowStampInterface
{
    private Uuid $orderId;

    public function getOrderId(): Uuid
    {
        return $this->orderId;
    }

    public function setOrderId(Uuid $orderId): void
    {
        $this->orderId = $orderId;
    }

    public static function createWithOrderId(Uuid $orderId): OrderIdStamp
    {
        $stamp = new OrderIdStamp();
        $stamp->setOrderId($orderId);

        return $stamp;
    }
}
Enter fullscreen mode Exit fullscreen mode

Stamps are serialized in the Envelope App\Service\Workflow\Envelope\WorkflowEnvelope. The App\Entity\WorkflowEntry entity is stored in the database after every transition to allow resuming the process after failures.


Let’s see how this approach improves the workflow:

Workflow entry

  • We store data as the envelope in the WorkflowEntry that goes through all transitions.
  • Every transition is executed transactionally.
  • After the transition is done, we keep its result and any additional data in the envelope.
  • If one of the transitions fails, we have the possibility to retry it in case of a temporary failure or totally fail the whole workflow.
<?php
declare(strict_types=1);

namespace App\Service\Workflow\Envelope;

use App\Service\Workflow\WorkflowStampInterface;

class WorkflowEnvelope
{
    private array $stamps;

    /**
     * @param WorkflowStampInterface[] $stamps
     */
    public function __construct(array $stamps = [])
    {
        foreach ($stamps as $stamp) {
            $this->addStamp($stamp);
        }
    }

    public function addStamp(WorkflowStampInterface $stamp): void
    {
        $this->stamps[$stamp::class][] = $stamp;
    }

    /**
     * @return WorkflowStampInterface[]
     */
    public function getStamps(): array
    {
        return $this->stamps;
    }

    public function getStamp(string $stampFqcn): WorkflowStampInterface
    {
        $stamps = $this->stamps[$stampFqcn] ?? [];

        if (count($stamps) === 0) {
            throw new \RuntimeException(sprintf('Stamp with type %s is not found', $stampFqcn));
        }

        return reset($stamps);
    }

    public function hasStamp(string $stampFqcn): bool
    {
        return isset($this->stamps[$stampFqcn]);
    }
}
Enter fullscreen mode Exit fullscreen mode

How to manage it with Symfony Workflow?

Let’s use events to manage it.

Firstly, we separate every transition into a single class to decouple the business logic and follow the Single Responsibility Principle (SRP). For example:

class VerifyOrder implements WorkflowTransitionInterface
{
    public function __construct(
        private readonly OrderRepository $orderRepository,
    ) {
    }

    public function handle(WorkflowEnvelope $envelope): WorkflowEnvelope
    {
        /** @var OrderIdStamp $orderIdStamp */
        $orderIdStamp = $envelope->getStamp(OrderIdStamp::class);
        $orderId = $orderIdStamp->getOrderId();

        $order = $this->orderRepository->find($orderId);

        // Here we can make verification actions

        return $envelope;
    }

    public function getNextTransition(): ?string
    {
        return Transition::ApproveOrder->value;
    }

    public function getState(): ?string
    {
        return State::Verified->value;
    }
}
Enter fullscreen mode Exit fullscreen mode

To apply a transition and update the current workflow state, we can use the Workflow::apply method. Let’s create a subscriber and subscribe to our custom WorkflowNextStateEvent to apply the next transition:

class WorkflowNextStateSubscriber implements EventSubscriberInterface
{
    public function __construct(
        private readonly ServiceLocator $workflows,
    ) {
    }

    public static function getSubscribedEvents(): array
    {
        return [
            WorkflowNextStateEvent::class => 'applyNextState',
        ];
    }

    public function applyNextState(WorkflowNextStateEvent $event): void
    {
        $workflowEntry = $event->getWorkflowEntry();

        if (!$this->workflows->has(
            $workflowEntry->getWorkflowType()->value)
        ) {
            throw new \RuntimeException(
                sprintf(
                    'There is no workflow with type %s',
                    $workflowEntry->getWorkflowType()->value
                )
            );
        }

        $workflow = $this->workflows->get(
            $workflowEntry->getWorkflowType()->value
        );

        $workflow->apply(
            $workflowEntry,
            $workflowEntry->getNextTransition()
        );
    }
}
Enter fullscreen mode Exit fullscreen mode

It gets the needed workflow by type and applies the next transition. To handle every transition transactionally, we can subscribe to workflow events and wrap each transition handle method in another subscriber:

class WorkflowTransitionSubscriber implements EventSubscriberInterface
{
    public function __construct(
        private readonly EntityManagerInterface $entityManager,
        private readonly EventDispatcherInterface $eventDispatcher,
        private readonly ServiceLocator $transitions,
        private readonly NormalizerInterface $normalizer,
        private readonly DenormalizerInterface $denormalizer,
    ) {
    }

    public static function getSubscribedEvents(): array
    {
        return [
            'workflow.transition' => 'handleTransition',
        ];
    }

    public function handleTransition(Event $event): void
    {
        /** @var WorkflowEntry $workflowEntry */
        $workflowEntry = $event->getSubject();
        $this->entityManager->getConnection()->beginTransaction();

        try {
            $transitionKey = sprintf(
                '%s.%s',
                $workflowEntry->getWorkflowType()->value,
                $workflowEntry->getNextTransition(),
            );

            /** @var WorkflowTransitionInterface $transition */
            $transition = $this->transitions->get($transitionKey);

            $envelope = $this->denormalizer->denormalize($workflowEntry->getStamps(), WorkflowEnvelope::class);
            $envelope = $transition->handle($envelope);

            /** @var array $stamps */
            $stamps = $this->normalizer->normalize($envelope, 'array');

            $workflowEntry->setStamps($stamps);
            $workflowEntry->setCurrentState($transition->getState());
            $workflowEntry->setNextTransition($transition->getNextTransition());

            if ($workflowEntry->getNextTransition() === null) {
                $workflowEntry->setStatus(WorkflowStatus::Finished);
            }

            $this->entityManager->persist($workflowEntry);
            $this->entityManager->flush();

            $this->entityManager->getConnection()->commit();
        } catch (\Throwable $exception) {
            $this->entityManager->getConnection()->rollBack();

            throw $exception;
        }

        if ($workflowEntry->getNextTransition() !== null) {
            $this->eventDispatcher->dispatch(new WorkflowNextStateEvent($workflowEntry));
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Let’s go point by point:

  • to start the workflow, we dispatch the WorkflowNextStateEvent with the created WorkflowEntry object, which contains “stamps” — our order data.
  • TheWorkflowNextStateSubscriber handles the event, defines the workflow that should be applied and call the Workflow::apply method.
  • TheWorkflowTransitionSubscriber subscribes to the workflow.transition event, which is dispatched when the WorkflowEntry is going through this transition. WorkflowTransitionSubscriber begins the transaction, prepares the envelope with stamps, handles the WorkflowTransitionInterface::handle method, and commits or rolls back the transaction.
  • After that, the WorkflowTransitionSubscriber dispatches the WorkflowNextStateEvent event to apply the next transition until the workflow is done.

Magic


What about failures?

Since we store the result of every transition, it’s easy to continue and finish the workflow from any state. Consider the case when the email service fails:

Failure scenario

In this case, we can handle the service exception and mark the workflow as failed. After that, we can retry the workflow using a cron job or even send it to a queue to finish it asynchronously. If the error is permanent or we exceed the retries count, we can totally stop the workflow.

<?php

class WorkflowHandler
{
    public function __construct(
        private readonly EventDispatcherInterface $eventDispatcher,
        private readonly LoggerInterface $logger,
        private readonly EntityManagerInterface $entityManager,
        private readonly NormalizerInterface $normalizer,
        private readonly DenormalizerInterface $denormalizer,
        private readonly MessageBusInterface $bus,
    ) {
    }

    public function handle(WorkflowEntry $workflowEntry): void
    {
        try {
            $this->eventDispatcher->dispatch(new WorkflowNextStateEvent($workflowEntry));
        } catch (StopWorkflowException $exception) {
            $this->logger->error(
                sprintf(
                    'An permanent internal error occurred during handling workflow "%s". Workflow state: %s. The workflow stopped.',
                    $workflowEntry->getWorkflowType()->value,
                    $workflowEntry->getCurrentState(),
                ),
                [
                    $exception
                ]
            );

            $workflowEntry->setStatus(WorkflowStatus::Stopped);

            $this->entityManager->persist($workflowEntry);
            $this->entityManager->flush();
        } catch (WorkflowInternalErrorException | \Throwable  $exception) {
            $this->logger->error(
                sprintf(
                    'An internal error occurred during handling workflow "%s". Workflow state: %s',
                    $workflowEntry->getWorkflowType()->value,
                    $workflowEntry->getCurrentState(),
                ),
                [
                    $exception
                ]
            );

            $workflowEntry->setStatus(WorkflowStatus::Failed);
            /** @var WorkflowEnvelope $envelope */
            $envelope = $this->denormalizer->denormalize($workflowEntry->getStamps(), WorkflowEnvelope::class);

            $envelope->addStamp(new WorkflowInternalErrorStamp(
                $exception->getMessage(),
            ));

            /** @var array<WorkflowStampInterface> $stamps */
            $stamps = $this->normalizer->normalize($envelope, 'array');
            $workflowEntry->setStamps($stamps);

            $this->entityManager->persist($workflowEntry);
            $this->entityManager->flush();
        }
    }

    public function retry(WorkflowEntry $workflowEntry): void
    {
        $workflowEntry->addRetry();
        $workflowEntry->setStatus(WorkflowStatus::Started);

        $this->entityManager->persist($workflowEntry);
        $this->entityManager->flush();

        $this->handle($workflowEntry);
    }
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

The finite-state machine is a quite good instrument to manage complex business case logic, splitting its parts into logical steps, and handling failures, allowing the construction of fault-tolerant systems. The Symfony Workflow helps achieve this.

I hope you found this article helpful and that it provided some insights into working with Symfony Worklows.

You can check out the whole project in my Github: https://github.com/bifidokk/symfony-asynchronous-workflows

Feel free to leave your feedback or questions in the comments section below.

Happy coding! 😌

Top comments (0)