Logo de Béjean Développement

Magento et les fils d'attente

Comment utiliser la file d'attente MySQL ?

L'utilisation de la file d'attente MySQL de Magento vous permettra de communiquer de manière asynchrone entre l'expéditeur et le destinataire du message.

Voici comment mettre rapidement en place un module de Message Queue MySQL et les opérations en bloc (Bulk Operations).

Objectif

Développer un module utilisant le Basic Message Queue System et Bulk Operations de Magento 2.

Le tutoriel est basé sur la version 2.3.5-p2 de Magento 2. Il n'a pas été testé sur la version 2.4.

Le module que nous allons développer va créer un message lors du lancement d'une commande CLI.

Tutoriel

Création du module

La première étape consiste à créer votre module. Pour cela, il faut créer les principaux fichiers, à savoir :

├── etc
|   └─ module.xml
├── CHANGELOG.md
├── composer.json
├── README.md
└── registration.php

Dans le fichier etc/module.xml, ajouter le module Magento_MysqlMq dans la séquence de chargement :

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Module/etc/module.xsd">
    <module name="NicolasBejean_MySqlMq" setup_version="1.0.0">
        <sequence>
            <module name="Magento_MysqlMq"/>
        </sequence>
    </module>
</config>

Configuration de la file d'attente

Pour créer une nouvelle file d'attente, il faut créer plusieurs fichiers dans le dossier etc :

  • communication.xml : Définit les aspects de la file d'attente
  • queue_consumer.xml : Définit la relation entre la file d'attente et son consommateur (consumer)
  • queue_topology.xml : Définit les règles de routage des messages, déclare les files d'attentes et les échanges
  • queue_publisher.xml : Définir l'échange où un sujet est publié

Si vous souhaitez plus d'informations, je vous invite à consulter la documentation de Magento.

La création des 4 fichiers va permettre de créer la nouvelle file d'attente nicolasbejean.mysqlmq.consumer.default. Lorsqu'un message sera publiée dans cette file d'attente, elle sera automatiquement traitée par la méthode process de la classe PHP NicolasBejean\MySqlMq\Model\Consumer\DefaultConsumer.

Création du fichier communication.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="nicolasbejean.mysqlmq.consumer.default" request="Magento\AsynchronousOperations\Api\Data\OperationInterface">
        <handler name="NicolasBejeanMySqlMqConsumer" type="NicolasBejean\MySqlMq\Model\Consumer\DefaultConsumer" method="process" />
    </topic>
</config>

Création du fichier queue_consumer.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <consumer name="NicolasBejeanMySqlMqConsumer"
              queue="nicolasbejean.mysqlmq.consumer.default"
              connection="db"
              maxMessages="500"
              consumerInstance="Magento\Framework\MessageQueue\Consumer"
              handler="NicolasBejean\MySqlMq\Model\Consumer\DefaultConsumer::process"/>
</config>

Création du fichier queue_topology.xml

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="nicolasbejean-mysqlmq-exchange" type="topic" connection="db">
        <binding id="NicolasBejeanMySqlMqConsumer" topic="nicolasbejean.mysqlmq.consumer.default" destinationType="queue" destination="nicolasbejean.mysqlmq.consumer.default"/>
    </exchange>
</config>

Création du fichier queue_publisher.xml

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
    <publisher topic="nicolasbejean.mysqlmq.consumer.default">
        <connection name="db" exchange="nicolasbejean-mysqlmq-exchange" />
    </publisher>
</config>

Création de la classe PHP

Créer le fichier PHP DefaultConsumer.php dans le répertoire Model/Consumer :

├── Model
│   └── Consumer
│       └── DefaultConsumer.php

Dès qu'elle réceptionnera le message de type OperationInterface, la méthode process va récupérer les données JSON. Ces dernières pourront alors être traitées à votre guise.

<?php
declare(strict_types=1);

namespace NicolasBejean\MySqlMq\Model\Consumer;

use Magento\AsynchronousOperations\Api\Data\OperationInterface;
use Magento\Framework\Serialize\Serializer\Json as JsonSerializer;
use Psr\Log\LoggerInterface;
use Exception;

/**
 * Class DefaultConsumer
 *
 * Crée un message dans le topic par défaut
 *
 * @author   Nicolas Béjean <[email protected]>
 * @link     https://www.bejean.eu
 */
class DefaultConsumer
{
    /**
     * @var JsonSerializer
     */
    protected $jsonSerializer;

    /**
     * @var LoggerInterface
     */
    protected $logger;

    /**
     * Create constructor.
     * 
     * @param JsonSerializer $jsonSerializer
     * @param LoggerInterface $logger
     */
    public function __construct(
        JsonSerializer $jsonSerializer,
        LoggerInterface $logger
    ) {
        $this->jsonSerializer = $jsonSerializer;
        $this->logger = $logger;
    }

    /**
     * @param OperationInterface $data
     */
    public function process(OperationInterface $data)
    {
        $time_start = microtime(true);

        $data = $this->jsonSerializer->unserialize($data->getSerializedData());

        $time_end = microtime(true);
        $time = $time_end - $time_start;
        $this->logger->info('[NicolasBejean_MySqlMq] DefaultConsumer::process | Total execution time in seconds: ' . $time);
    }
}

Création de la commande CLI

Base de la commande

Pour créer une commande CLI sous Magento, c'est très simple. Il faut déclarer la commande dans le fichier di.xml :

<?xml version="1.0" encoding="utf-8" ?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:ObjectManager/etc/config.xsd">
    <type name="NicolasBejean\MySqlMq\Console\Command\DefaultMessage">
        <arguments>
            <argument name="name" xsi:type="string">nicolasbejean:mysqlmq:defaultmessage</argument>
        </arguments>
    </type>
    <type name="Magento\Framework\Console\CommandListInterface">
        <arguments>
            <argument name="commands" xsi:type="array">
                <item name="nicolasbejean_mysqlmq_defaultmessage" xsi:type="object">NicolasBejean\MySqlMq\Console\Command\DefaultMessage</item>
            </argument>
        </arguments>
    </type>
</config>

Puis, créer la classe correspondante. Les classes des commandes CLI doivent se situer dans le dossier Console/Command. Voici le contenu de base d'un fichier de commande :

<?php
namespace NicolasBejean\MySqlMq\Console\Command;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

/**
 * Class DefaultMessage
 *
 * Crée un message dans le topic par défaut
 *
 * @author   Nicolas Béjean <[email protected]>
 * @link     https://www.bejean.eu
 */
class DefaultMessage extends Command
{
    /**
     * Description de la commande
     */
    const DESCRIPTION = 'Crée un message dans le topic par défaut';

    /**
     * DefaultConsumer constructor.
     *
     * @param string|null $namee
     */
    public function __construct(
        ?string $name = null
    ) {
        parent::__construct($name);
    }

    /**
     * Configuration de la commande
     */
    protected function configure()
    {
        /* Options de la commande */
        $options = [];

        $this->setDefinition($options);
        $this->setDescription(self::DESCRIPTION);

        parent::configure();
    }

    /**
     * Méthode d'exécution de la commande
     *
     * @param InputInterface $input
     * @param OutputInterface $output
     * @return null
     * @throws LocalizedException
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {

    }
}

Avant de lancer la commande, il faut impérativement lancer les commandes suivantes : bin/magento setup:upgrade && bin/magento setup:di:compile. Puis vous pourrez lancer la commande bin/magento nicolasbejean:mysqlmq:defaultmessage.

Cependant, cette commande ne fera aucune action. Avant de continuer, mmodifions la méthode execute :

    /**
     * Méthode d'exécution de la commande
     *
     * @param InputInterface $input
     * @param OutputInterface $output
     * @return null
     * @throws LocalizedException
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $output->write('Start execution', true);
        $time_start = microtime(true);

        $output->write('Command ready!', true);

        $time_end = microtime(true);
        $time = $time_end - $time_start;
        $output->write('Total execution time in seconds: ' . $time, true);
    }

Une fois modifié, la commande affichera 3 messages dans le terminal.

Classes et méthodes pour publier un message

Pour publier un message en utilisant les opérations en bloc (Bull Operations), il faut instancier quelques classes.

Ajouter les classes suivantes :

use Magento\AsynchronousOperations\Api\Data\OperationInterface;
use Magento\AsynchronousOperations\Api\Data\OperationInterfaceFactory;
use Magento\Framework\Bulk\BulkManagementInterface;
use Magento\Framework\Bulk\OperationInterface as BulkOperationInterface;
use Magento\Framework\DataObject\IdentityGeneratorInterface;
use Magento\Framework\Exception\LocalizedException;
use Magento\Framework\Serialize\Serializer\Json as JsonSerializer;

Créer les variables :

    /**
     * @var JsonSerializer
     */
    protected $jsonSerializer;

    /**
     * @var int
     */
    private $bulkSize;

    /**
     * @var IdentityGeneratorInterface
     */
    private $identityService;

    /**
     * @var BulkManagementInterface
     */
    private $bulkManagement;

    /**
     * @var OperationInterfaceFactory
     */
    private $operationFactory;

Modifier le constructeur :

        /**
     * DefaultConsumer constructor.
     *
     * @param string|null $name
     * @param JsonSerializer $jsonSerializer
     * @param IdentityGeneratorInterface $identityService
     * @param BulkManagementInterface $bulkManagement
     * @param OperationInterfaceFactory $operationFactory
     * @param int $bulkSize
     */
    public function __construct(
        JsonSerializer $jsonSerializer,
        IdentityGeneratorInterface $identityService,
        BulkManagementInterface $bulkManagement,
        OperationInterfaceFactory $operationFactory,
        int $bulkSize = 100,
        ?string $name = null
    ) {
        $this->jsonSerializer = $jsonSerializer;
        $this->bulkSize = $bulkSize;
        $this->identityService = $identityService;
        $this->bulkManagement = $bulkManagement;
        $this->operationFactory = $operationFactory;

        parent::__construct($name);
    }

Ajouter les 2 méthodes suivantes :

    /**
     * Publie dans la base de données
     *
     * @param string $data
     * @throws LocalizedException
     */
    private function publish(string $data):void
    {
        $bulkUuid = $this->identityService->generateId();
        $bulkDescription = __('Insert Message: ' . $data);
        $operations = [];

        $operations[] = $this->makeOperation(
            $data,
            'nicolasbejean.mysqlmq.consumer.default',
            $bulkUuid
        );

        if (!empty($operations)) {
            $result = $this->bulkManagement->scheduleBulk(
                $bulkUuid,
                $operations,
                $bulkDescription
            );
            if (!$result) {
                throw new LocalizedException(
                    __('Something went wrong while processing the request.')
                );
            }
        }
    }

    /**
     * Crée le bulk dans Magento Bulk
     *
     * @param $dataToEncode
     * @param $queue
     * @param $bulkUuid
     * @return OperationInterface
     */
    private function makeOperation(
        $dataToEncode,
        $queue,
        $bulkUuid
    ):OperationInterface {
        $data = [
            'data' => [
                'bulk_uuid' => $bulkUuid,
                'topic_name' => $queue,
                'serialized_data' => $this->jsonSerializer->serialize($dataToEncode),
                'status' => BulkOperationInterface::STATUS_TYPE_OPEN,
            ]
        ];

        return $this->operationFactory->create($data);
    }

La méthode publish va permettre de créer un bulk et de le programmer. La méthode makeOperation prépare les données pour créer l'opération dans le bulk.

Avant de terminer, ajouter $this->publish('Données de test'); à la place de $output->write('Command ready!', true); dans la méthode execute.

Contenu complet de la commande CLI

<?php
namespace NicolasBejean\MySqlMq\Console\Command;

use Magento\AsynchronousOperations\Api\Data\OperationInterface;
use Magento\AsynchronousOperations\Api\Data\OperationInterfaceFactory;
use Magento\Framework\Bulk\BulkManagementInterface;
use Magento\Framework\Bulk\OperationInterface as BulkOperationInterface;
use Magento\Framework\DataObject\IdentityGeneratorInterface;
use Magento\Framework\Exception\LocalizedException;
use Magento\Framework\Serialize\Serializer\Json as JsonSerializer;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

/**
 * Class DefaultMessage
 *
 * Crée un message dans le topic par défaut
 *
 * @author   Nicolas Béjean <[email protected]>
 * @link     https://www.bejean.eu
 */
class DefaultMessage extends Command
{
    /**
     * Description de la commande
     */
    const DESCRIPTION = 'Crée un message dans le topic par défaut';

    /**
     * @var JsonSerializer
     */
    protected $jsonSerializer;

    /**
     * @var int
     */
    private $bulkSize;

    /**
     * @var IdentityGeneratorInterface
     */
    private $identityService;

    /**
     * @var BulkManagementInterface
     */
    private $bulkManagement;

    /**
     * @var OperationInterfaceFactory
     */
    private $operationFactory;

    /**
     * DefaultConsumer constructor.
     *
     * @param string|null $name
     * @param JsonSerializer $jsonSerializer
     * @param IdentityGeneratorInterface $identityService
     * @param BulkManagementInterface $bulkManagement
     * @param OperationInterfaceFactory $operationFactory
     * @param int $bulkSize
     */
    public function __construct(
        JsonSerializer $jsonSerializer,
        IdentityGeneratorInterface $identityService,
        BulkManagementInterface $bulkManagement,
        OperationInterfaceFactory $operationFactory,
        int $bulkSize = 100,
        ?string $name = null
    ) {
        $this->jsonSerializer = $jsonSerializer;

        $this->bulkSize = $bulkSize;
        $this->identityService = $identityService;
        $this->bulkManagement = $bulkManagement;
        $this->operationFactory = $operationFactory;

        parent::__construct($name);
    }

    /**
     * Configuration de la commande
     */
    protected function configure()
    {
        /* Options de la commande */
        $options = [];

        $this->setDefinition($options);
        $this->setDescription(self::DESCRIPTION);

        parent::configure();
    }

    /**
     * Méthode d'exécution de la commande
     *
     * @param InputInterface $input
     * @param OutputInterface $output
     * @return null
     * @throws LocalizedException
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $output->write('Start execution', true);
        $time_start = microtime(true);

        $this->publish('Données de test');

        $time_end = microtime(true);
        $time = $time_end - $time_start;
        $output->write('Total execution time in seconds: ' . $time, true);
    }

    /**
     * Publie dans la base de données
     *
     * @param string $data
     * @throws LocalizedException
     */
    private function publish(string $data):void
    {
        $bulkUuid = $this->identityService->generateId();
        $bulkDescription = __('Insert Message: ' . $data);
        $operations = [];

        $operations[] = $this->makeOperation(
            $data,
            'nicolasbejean.mysqlmq.consumer.default',
            $bulkUuid
        );

        if (!empty($operations)) {
            $result = $this->bulkManagement->scheduleBulk(
                $bulkUuid,
                $operations,
                $bulkDescription
            );
            if (!$result) {
                throw new LocalizedException(
                    __('Something went wrong while processing the request.')
                );
            }
        }
    }

    /**
     * Crée le bulk dans Magento Bulk
     *
     * @param $dataToEncode
     * @param $queue
     * @param $bulkUuid
     * @return OperationInterface
     */
    private function makeOperation(
        $dataToEncode,
        $queue,
        $bulkUuid
    ):OperationInterface {
        $data = [
            'data' => [
                'bulk_uuid' => $bulkUuid,
                'topic_name' => $queue,
                'serialized_data' => $this->jsonSerializer->serialize($dataToEncode),
                'status' => BulkOperationInterface::STATUS_TYPE_OPEN,
            ]
        ];

        return $this->operationFactory->create($data);
    }
}

Résultat

Lorsque la commande bin/magento setup:upgrade est exécutée, une nouvelle file d'attente est créée dans la table queue.

Dans une console de requête SQL, vous pouvez lancer la commande :

SELECT * 
FROM queue 
WHERE queue.name = 'nicolasbejean.mysqlmq.consumer.default';

Vous devriez avoir une entrée enregistrée dans la base de données avec un identifiant.

Dès lors qu'un message est publié, l'opération est inscrite dans la table magento_bulk. Dans la table queue_message, vous pouvez récupérer le message, en lançant la requête SQL :

SELECT *
FROM queue_message 
WHERE queue.topic_name = 'nicolasbejean.mysqlmq.consumer.default';

À cette requête, vous pouvez joindre la table queue_message_status pour obtenir le status du message :

SELECT *
FROM queue_message
LEFT JOIN queue_message_status on queue_message.id = queue_message_status.message_id
WHERE queue.topic_name = 'nicolasbejean.mysqlmq.consumer.default';

Traitement des messages

Les messages sont traités par les tâches automatisées (CRON), pour cela, vérifier que vos crons sont fonctionnels, puis vérifier que le fichier app/etc/env.php comporte la configuration suivante :

'cron_consumers_runner' => [
    'cron_run' => true,
    'max_messages' => 1000,
    'consumers' => []
]

Sans cela, vos crons ne lanceront jamais le traitement de vos messages.

Sources