4min.

About Symfony Messenger and Interoperability

Cet article est aussi disponible en 🇫🇷 Français : Symfony Messenger et l’interopérabilité.

The Messenger component has been merged into Symfony 4.1, released in May 2018. It adds an abstraction layer between a data producer (or publisher) and its data consumer.

Symfony is thus able to send messages (the data) in a bus, usually asynchronous. In concrete terms: our controller creates an email, sends it on a bus (RabbitMQ, Redis, Doctrine, etc.) and a consumer sends the email synchronously. The advantage is that the heavy, time-consuming or sensitive tasks can be delegated to background workers.

When data producers and consumers are in the same application, this system works very well and is totally transparent. However, if they are two different Symfony applications, or two applications in two different languages, you’re going to have to prepare the groundwork.

Section intitulée messenger-architectureMessenger architecture

messenger achitecture

The overall architecture of the component is as follows:

  1. A publisher (controller, service, command, etc.) dispatches a message to the bus;
  2. If the bus is synchronous, the message is consumed by a handler;
  3. If the bus is asynchronous, the message is sent via a transport to a queue system (RabbitMQ, Redis, Doctrine, …);
  4. A daemon (also called a worker) fetches messages in real time from the queue system via the transport;
  5. It dispatches the message back to the bus;
  6. Now that the bus is synchronous, the message is consumed by a handler.

When a bus is synchronous, everything happens naturally. However, if the transport is asynchronous, then your message must be serialized. Symfony natively supports two serialization modes:

  • PHP-native serialization;
  • Serialization via the Serializer component.

Section intitulée message-serializationMessage serialization

By default, a message is serialized with PHP and looks like this:

O:36:\"Symfony\\Component\\Messenger\\Envelope\":2:{s:44:\"\0Symfony\\Component\\Messenger\\Envelope\0stamps\";a:1:{s:46:\"Symfony\\Component\\Messenger\\Stamp\\BusNameStamp\";a:1:{i:0;O:46:\"Symfony\\Component\\Messenger\\Stamp\\BusNameStamp\":1:{s:55:\"\0Symfony\\Component\\Messenger\\Stamp\\BusNameStamp\0busName\";s:21:\"messenger.bus.default\";}}}s:45:\"\0Symfony\\Component\\Messenger\\Envelope\0message\";O:18:\"App\\Message\\Foobar\":1:{s:24:\"\0App\\Message\\Foobar\0name\";s:6:\"coucou\";}}

We can see App\Message\Foobar, which represents the class of the object contained in the message.

From one application to another, this class may not exist. It’s even very unlikely to exist. And if the other application is in another language, it’s impossible (or almost impossible 😈 ) to deserialize PHP!

We’re going to use a more traditional exchange format. We’ve chosen JSON, but we could have used XML, Protobuf, or any other interoperable serialization language.

Section intitulée a-custom-serializerA custom serializer

We’ll need to implement the following SerializerInterface of the component:

namespace Symfony\Component\Messenger\Transport\Serialization;

use Symfony\Component\Messenger\Envelope;

interface SerializerInterface
{
    public function decode(array $encodedEnvelope): Envelope;

    public function encode(Envelope $envelope): array;
}
  • The decode() method deserializes what comes from our transport;
  • The encode() method serializes our business object for the transport.

In order to use a single serializer for all messages passing between our two applications, we’re going to use a type key which will identify which class/data we’re manipulating. We’ll create an interface to enforce this:

namespace App\Messenger\Serializer;

interface JsonSerializableInterface
{
    public function getJsonType(): string;
}

Let’s take the example of a Foobar object:

final class Foobar implements JsonSerializableInterface
{
    public function __construct(
        public readonly string $name,
    ) {
    }

    public function getJsonType(): string
    {
        return ‘foobar’;
    }
}

Once serialized, it will look like this:

{"name":"coucou"}

And here is the code for our serializer:

namespace App\Messenger\Serializer;

use App\Messenger\Foobar;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class JsonSerializer implements SerializerInterface
{
    public function decode(array $encodedEnvelope): Envelope
    {
        $message = match ($encodedEnvelope['headers']['type']) {
            Foobar::class => new Foobar($encodedEnvelope['body']['name']),
            default => throw new \LogicException('The message type is not supported.'),
        };

        $stamps = [];
        $retryCount = 0;
        foreach ($encodedEnvelope['headers']['x-death'] ?? [] as ['count' => $count]) {
            $retryCount += $count;
        }
        if ($retryCount) {
            $stamps[] = new RedeliveryStamp($retryCount);
        }

        return new Envelope($message, $stamps);
    }

    public function encode(Envelope $envelope): array
    {
        $message = $envelope->getMessage();

        if (!$message instanceof JsonSerializableInterface) {
            throw new \LogicException(sprintf('The message must implement "%s".', JsonSerializableInterface::class));
        }

        return [
            'body' => json_encode($message),
            'headers' => [
                'type' => $message->getJsonType(),
                'Content-Type' => 'application/json',
            ],
        ];
    }
}

And that’s it! You may wonder about the RedeliveryStamp code. It exists to be able to retry the message when the application cannot process it. You may want to add support for other stamps, but it’s not mandatory.

Now, all that remains is to plug this serializer into the messenger’s configuration:

framework:
    messenger:
        transports:
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                serializer: App\Messenger\Serializer\JsonSerializer

Section intitulée conclusionConclusion

Getting two PHP or non-PHP applications to communicate via a data bus is a simple thing to do with Symfony. As is often the case, by implementing an interface and with a little configuration, we can replace a part of Symfony to adapt it to our needs.

There is room for improvement in the code we’ve provided. We’d need to better validate the data entering the application (ie: validate the body). We could also use Symfony’s Serializer to convert “our PHP objects” 🔄 JSON, but that’s not the point of this article. You can also use Happyr/message-serializer for example.

Anyway, be creative, do your best, and if possible do a good job. But remember: Keep it simple!

Commentaires et discussions

Nos articles sur le même sujet

Nos formations sur ce sujet

Notre expertise est aussi disponible sous forme de formations professionnelles !

Voir toutes nos formations

Ces clients ont profité de notre expertise