Batching Symfony Messenger messages
Sometimes we want to make messages in Symfony Messenger that are consumed as a batch and not one by one. Recently we came across a situation where we send updated translations for our entities through Messenger and then send them to our translation provider.
But since there is a strong rate limitation on this translation provider, we can’t send them one by one. We must send them to store all received ones by a given consumer and send all the messages if we wait more than 10 seconds without a new message or if we have more than 100 messages stored.
Now let’s show how we made it works:
// Symfony Messenger Message:
class TranslationUpdate
{
public function __construct(
public string $locale,
public string $key,
public string $value,
) {
}
}
class TranslationUpdateHandler implements MessageHandlerInterface
{
private const BUFFER_TIMER = 10; // in seconds
private const BUFFER_LIMIT = 100;
private array $buffer = [];
public function __construct(
private MessageBusInterface $messageBus,
) {
pcntl_async_signals(true);
pcntl_signal(SIGALRM, \Closure::fromCallable([$this, 'batchBuffer']));
}
public function __invoke(TranslationUpdate $message): void
{
$this->buffer[] = $message;
if (\count($this->buffer) >= self::BUFFER_LIMIT) {
$this->batchBuffer();
} else {
pcntl_alarm(self::BUFFER_TIMER);
}
}
private function batchBuffer(): void
{
if (0 === \count($this->buffer)) {
return;
}
$translationBatch = new TranslationBatch($this->buffer);
$this->messageBus->dispatch($translationBatch);
$this->buffer = [];
}
}
Here we can see the Messenger message, basically we dispatch it every time, we have a translation update but this could apply to any message we need.
Then we have the message handler which will get all the messages and put them in an array buffer. When the buffer hits 100 elements or if we have no new elements during 10s we will trigger the batchBuffer method.
For the 10s timer, we are using pcntl_alarm which allows us to make asynchronous call of the batchBuffer method if needed.
PCNTL is a way to handle system signals in our PHP code, you can read more about it in the PHP documentation or if you can read french we made a blog post about it. We will set a timer that will send a SIGALRM signal to the process after the given number of seconds. Then when the signal is received by the process it will call the callback function we gave as the second argument of pcntl_signal. The callback is set for our entire application, so we can use this batching trick only once.
Then in the batchBuffer method we are using a new Messenger dispatch because we want to keep track of the messages if there are issues, and if we hit the method with pcntl we won’t have Messenger retry handling if there is an exception.
class TranslationBatch
{
/**
* @param TranslationUpdate[] $notifications
*/
public function __construct(
private array $notifications,
) {
}
}
class TranslationBatchHandler implements MessageHandlerInterface
{
public function __invoke(TranslationBatch $message): void
{
// handle all our messages
}
}
And finally we have the batch handler which will always get a list of messages to send! With that we can batch our Messenger messages with ease and avoid using a cron!
Edit: This method is only a proof of work, if you want to use it in production I recommend to use some more persistent storage for the buffer like Redis.
Commentaires et discussions
Symfony Messenger et l’interopérabilité
Le composant Messenger a été mergé dans Symfony 4.1, sorti en mai 2018. Il ajoute une couche d’abstraction entre un producteur de données (publisher) et son consommateur de données (consumer). Symfony est ainsi…
par Grégoire Pineau
Nos articles sur le même sujet
Ces clients ont profité de notre expertise
Nous avons entrepris une refonte complète du site, initialement développé sur Drupal, dans le but de le consolider et de jeter les bases d’un avenir solide en adoptant Symfony. La plateforme est hautement sophistiquée et propose une pléthore de fonctionnalités, telles que la gestion des abonnements avec Stripe et Paypal, une API pour l’application…
Nous avons développé une plateforme de site génériques autour de l’API Phraseanet. À l’aide de Silex de composants Symfony2, nous avons accompagné Alchemy dans la réalisation d’un site déclinable pour leurs clients. Le produit est intégralement configurable et supporte de nombreux systèmes d’authentification (Ldap, OAuth2, Doctrine ou anonyme).
Afin de soutenir le développement de son trafic, Qobuz a fait appel à JoliCode afin d’optimiser l’infrastructure technique du site et les échanges d’informations entre les composants de la plateforme. Suite à la mise en place de solution favorisant l’asynchronicité et la performance Web côté serveur, nous avons outillé la recherche de performance et…