Batching Symfony Messenger messages
Sometimes we want to make messages in Symfony Messenger that are consumed as a batch and not one by one. Recently we came across a situation where we send updated translations for our entities through Messenger and then send them to our translation provider.
But since there is a strong rate limitation on this translation provider, we can’t send them one by one. We must send them to store all received ones by a given consumer and send all the messages if we wait more than 10 seconds without a new message or if we have more than 100 messages stored.
Now let’s show how we made it works:
// Symfony Messenger Message:
class TranslationUpdate
{
public function __construct(
public string $locale,
public string $key,
public string $value,
) {
}
}
class TranslationUpdateHandler implements MessageHandlerInterface
{
private const BUFFER_TIMER = 10; // in seconds
private const BUFFER_LIMIT = 100;
private array $buffer = [];
public function __construct(
private MessageBusInterface $messageBus,
) {
pcntl_async_signals(true);
pcntl_signal(SIGALRM, \Closure::fromCallable([$this, 'batchBuffer']));
}
public function __invoke(TranslationUpdate $message): void
{
$this->buffer[] = $message;
if (\count($this->buffer) >= self::BUFFER_LIMIT) {
$this->batchBuffer();
} else {
pcntl_alarm(self::BUFFER_TIMER);
}
}
private function batchBuffer(): void
{
if (0 === \count($this->buffer)) {
return;
}
$translationBatch = new TranslationBatch($this->buffer);
$this->messageBus->dispatch($translationBatch);
$this->buffer = [];
}
}
Here we can see the Messenger message, basically we dispatch it every time, we have a translation update but this could apply to any message we need.
Then we have the message handler which will get all the messages and put them in an array buffer. When the buffer hits 100 elements or if we have no new elements during 10s we will trigger the batchBuffer
method.
For the 10s timer, we are using pcntl_alarm
which allows us to make asynchronous call of the batchBuffer
method if needed.
PCNTL is a way to handle system signals in our PHP code, you can read more about it in the PHP documentation or if you can read french we made a blog post about it. We will set a timer that will send a SIGALRM signal to the process after the given number of seconds. Then when the signal is received by the process it will call the callback function we gave as the second argument of pcntl_signal
. The callback is set for our entire application, so we can use this batching trick only once.
Then in the batchBuffer
method we are using a new Messenger dispatch because we want to keep track of the messages if there are issues, and if we hit the method with pcntl we won’t have Messenger retry handling if there is an exception.
class TranslationBatch
{
/**
* @param TranslationUpdate[] $notifications
*/
public function __construct(
private array $notifications,
) {
}
}
class TranslationBatchHandler implements MessageHandlerInterface
{
public function __invoke(TranslationBatch $message): void
{
// handle all our messages
}
}
And finally we have the batch handler which will always get a list of messages to send! With that we can batch our Messenger messages with ease and avoid using a cron!
Edit: This method is only a proof of work, if you want to use it in production I recommend to use some more persistent storage for the buffer like Redis.
Commentaires et discussions
Symfony Messenger et l’interopérabilité
Le composant Messenger a été mergé dans Symfony 4.1, sorti en mai 2018. Il ajoute une couche d’abstraction entre un producteur de données (publisher) et son consommateur de données (consumer). Symfony est ainsi capable d’envoyer des messages (la donnée) dans un bus, le plus souvent…
Lire la suite de l’article Symfony Messenger et l’interopérabilité
Nos articles sur le même sujet
Ces clients ont profité de notre expertise
Afin de poursuivre son déploiement sur le Web, Arte a souhaité être accompagné dans le développement de son API REST “OPA” (API destinée à exposer les programmes et le catalogue vidéo de la chaine). En collaboration avec l’équipe technique Arte, JoliCode a mené un travail spécifique à l’amélioration des performances et de la fiabilité de l’API. Ces…
Nous avons accompagné Paris Dauphine dans la conception d’une application Symfony2 permettant la gestion complète de trois masters de l’université. Notre intervention a porté sur la conception de l’application, son architecture, et l’aide au développement, par les équipes de l’Université Paris Dauphine, de l’application.
JoliCode accompagne l’équipe technique Dayuse dans l’optimisation des performances de sa plateforme. Nous sommes intervenus sur différents sujets : La fonctionnalité de recherche d’hôtels, en remplaçant MongoDB et Algolia par Redis et Elasticsearch. La mise en place d’un workflow de réservation, la migration d’un site en Twig vers une SPA à base de…