5min.

State replication with Symfony Workflow, Messenger, and RabbitMQ

In this article, we’ll see how we can replicate some data between two different applications that live in two different locations. But the special thing is that the network is not reliable! Yeah, network is never reliable, but here, it’s really not 😁

The project has two applications:

  • the “Core”, that serves the website to the customer, hosted in the cloud. It’s a traditional e-commerce application built with Symfony.
  • the “Warehouse”, that manages the logistics, hosted in each warehouse. It’s an application built on top of Symfony, with a heavy use of the Workflow component.

Since warehouses are located deep in the country land, the overall connectivity can fail from time to time. Many people work there. They can’t stop working if the warehouse loses internet connection. That’s why we must host the application on premise. Finally, all warehouses must send the state of each article to the Core, in near real time.

Section intitulée the-architectureThe Architecture

The architecture In each zone, we have a Symfony application and a RabbitMQ instance, and other services which are not relevant here. Each warehouse will publish messages to their local RabbitMQ. And when the connectivity is up, RabbitMQ will move messages from the warehouse to the core RabbitMQ instance.

Section intitulée symfony-workflowSymfony Workflow

The workflow lives in the Warehouse application, and is quite big: Alt text When the article reaches certain places, we want to notify the core application. So, in the workflow definition, we’ll add this information on each place’s metadata we want to notify.

framework:
    workflows:
        article:
            places:
                -   name: 'new'
                -   name: 'received'
                    metadata:
                        sync-to-core: true
                -   name: 'opened'

Finally, we need a listener to publish a message with Messenger

namespace App\Article\Workflow;

use App\Article\Messenger\Message\SyncArticleToCoreMessage;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Workflow\Event\TransitionEvent;

class SyncListener
{
    public function __construct(
        private readonly MessageBusInterface $bus,
    ) {
    }

    #[AsEventListener('workflow.article.transition')]
    public function onEnteredNew(TransitionEvent $e)
    {
        // We have only one place since it's a state machine
        $to = $e->getTransition()->getTos()[0];
        $shouldSync = (bool) $e->getWorkflow()->getMetadataStore()->getMetadata('sync-to-core', $to);
        if (!$shouldSync) {
            return;
        }

        // SyncArticleToCoreMessage is a POPO
        $this->bus->dispatch(new SyncArticleToCoreMessage(
            $e->getSubject()->getId(),
            $to,
            $e->getContext(),
        ));
    }
}

From the workflow, we dispatch a Messenger message. But remember, this very application won’t consume the message. The message must be routed to another RabbitMQ, to be consumed by another application!

Section intitulée how-to-move-messages-from-one-rabbitmq-to-another-oneHow to Move Messages from one RabbitMQ to Another One?

We’ll use RabbitMQ Federation plugin! This plugin will connect one server (downstream – Core in our case) to another (upstream – warehouses). It will create buffer queues, exchanges and all it needs to perform the replication. This plugin is now integrated within the RabbitMQ server, so it’s only a matter of enabling it.

Section intitulée how-to-configure-rabbitmq-federationHow to Configure RabbitMQ Federation

First we need to enable the plugin. On all RabbitMQ nodes, run:

rabbitmq-plugins enable rabbitmq_federation_management

If you want to reproduce it, we provide this docker-compose.yml file:

version: "3.8"

services:
    rabbitmq:
        build:
            target: rabbitmq
        container_name: rabbitmq

    rabbitmq-wh-1:
        build:
            target: rabbitmq
        container_name: rabbitmq-wh-1

    rabbitmq-wh-2:
        build:
            target: rabbitmq
        container_name: rabbitmq-wh-2

And the dockerfile:

FROM rabbitmq:${RABBITMQ_VERSION:-management-alpine} as rabbitmq

ADD --chmod=0755 https://raw.githubusercontent.com/rabbitmq/rabbitmq-server/v3.11.x/deps/rabbitmq_management/bin/rabbitmqadmin /usr/local/bin/rabbitmqadmin

RUN rabbitmq-plugins enable rabbitmq_federation_management

On the downstream server, aka the “Core” server, we declare an exchange, a queue, and bind them. This queue will receive all messages from all warehouses.

docker exec rabbitmq rabbitmqadmin declare exchange name=upstream-wh type=fanout
docker exec rabbitmq rabbitmqadmin declare queue name=sync-from-wh
docker exec rabbitmq rabbitmqadmin declare binding source=upstream-wh destination=sync-from-wh

On the same server, we add all upstream servers:

docker exec rabbitmq rabbitmqctl set_parameter federation-upstream upstream-wh1 '{"uri":"amqp://rabbitmq-wh-1", "exchange":"to-core"}'
# …

And we setup a policy that configures the exchange to fetch messages from upstream:

docker exec rabbitmq rabbitmqctl set_policy --apply-to exchanges upstream-wh "^upstream-wh" '{"federation-upstream-set":"all"}'

Finally, we need to declare an exchange in each warehouse. These exchanges will automatically forward messages to the downstream server. To be accurate, the exchange will route messages to a local “buffer” queue, and the downstream server will consume them. If the connectivity is down, no problem! The messages will stay in warehouse buffers until the connectivity is back.

docker exec rabbitmq-wh-1 rabbitmqadmin declare exchange name=to-core type=fanout

Now, we need to configure a bit Symfony Messenger, because our setup is not the default one.

Section intitulée the-messenger-configurationThe Messenger Configuration

First thing first, we don’t want to send serialized PHP over the network. It’s not interoperable and can cause major crashes to the receiver. We already wrote about messenger and interoperability. So we won’t talk too much about JSON, custom serializer and Messenger configuration. We let you read the blog post for more information. Anyway, we need to configure the transport on each application.

Section intitulée configuration-of-the-warehouseConfiguration of the Warehouse

The Messenger configuration looks like this:

framework:
    messenger:
        transports:
            to-core:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                serializer: App\Messenger\Serializer\JsonSerializer
                options:
                    auto_setup: false
                    exchange:
                        name: to-core
                    read_timeout: 5
                    write_timeout: 5
                    connect_timeout: 5
                    confirm_timeout: 5
                    rpc_timeout: 5

        routing:
            App\Article\Messenger\Message\SyncArticleToCoreMessage: to-core
  1. we use a custom serializer for serializing the message in JSON
  2. we disable the auto_setup, we already configured it in a previous chapter
  3. we only specify which exchange we want to use: to-core
  4. we configure the routing to use this transport
  5. we configure all timeouts Fun fact: we don’t even need to create a handler since handling is done by another application

Section intitulée configuration-of-the-coreConfiguration of the Core

The Messenger configuration looks like this:

framework:
    messenger:
        transports:
            from-warehouse:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                serializer: App\Messenger\Serializer\JsonSerializer
                options:
                    queues:
                        sync-from-wh: ~
                    read_timeout: 5
                    write_timeout: 5
                    connect_timeout: 5
                    confirm_timeout: 5
                    rpc_timeout: 5

        routing:
            App\Article\Messenger\Message\SyncFromWareHouseMessage: from-warehouse

The configuration is similar to the warehouse, but here we specify the queue only. And we let Messenger set everything up if it’s not the case, so the retry mechanism can work.

Section intitulée conclusionConclusion

This use case is not so common, but we are pretty sure some of you can have the same. With the right tools, RabbitMQ, its federation plugin, and symfony/messenger, it’s straightforward to send updates from one application to another, and deal with network failure. To sum everything up, there following events occur

  1. Someone applies a transition in the warehouse’s workflow;
  2. We check if the place must be synced via the place metadata;
  3. We send a message to a special exchange with Messenger;
  4. This exchange will route the message to a queue;
  5. When the connectivity is up, the Core’s RabbitMQ will fetch all messages from the WH one, and place it in sync-from-wh queue;
  6. A Core handler consumes and processes the message;
  7. If it fails, the retry system will automatically republish the message. 🎉 Everything is now in sync 🎉

We hope you have learnt more about the different components used!

Commentaires et discussions

Nos formations sur ce sujet

Notre expertise est aussi disponible sous forme de formations professionnelles !

Voir toutes nos formations

Ces clients ont profité de notre expertise