Comment utiliser la file d’attente MySQL ?

L’utilisation de la file d’attente MySQL de Magento vous permettra de communiquer de manière asynchrone entre l’expéditeur et le destinataire du message.

Voici comment mettre rapidement en place un module de Message Queue MySQL et les opérations en bloc (Bulk Operations).

Objectif

Développer un module utilisant le "Basic Message Queue System" et "Bulk Operations" de Magento 2.

Le tutoriel est basé sur la version 2.3.5-p2 de Magento 2. Il n’a pas été testé sur la version 2.4.

Le module que nous allons développer va créer un message lors du lancement d’une commande CLI.

Tutoriel

Création du module

La première étape consiste à créer votre module. Pour cela, il faut créer les principaux fichiers, à savoir :

├── etc
│   ├── module.xml
├── CHANGELOG.md
├── composer.json
├── README.md
└── registration.php

Dans le fichier etc/module.xml<code>, ajouter le module </code>Magento_MysqlMq dans la séquence de chargement :

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Module/etc/module.xsd">
    <module name="NicolasBejean_MySqlMq" setup_version="1.0.0">
        <sequence>
            <module name="Magento_MysqlMq"/>
        </sequence>
    </module>
</config>

Configuration de la file d’attente

Pour créer une nouvelle file d’attente, il faut créer plusieurs fichiers dans le dossier etc :

  • communication.xml : Définit les aspects de la file d’attente
  • queue_consumer.xml : Définit la relation entre la file d’attente et son consommateur (consumer)
  • queue_topology.xml : Définit les règles de routage des messages, déclare les files d’attentes et les échanges
  • queue_publisher.xml : Définir l’échange où un sujet est publié

Si vous souhaitez plus d’informations, je vous invite à consulter la documentation de Magento.

La création des 4 fichiers va permettre de créer la nouvelle file d’attente nicolasbejean.mysqlmq.consumer.default<code>. Lorsqu'un message sera publiée dans cette file d'attente, elle sera automatiquement traitée par la méthode </code>process<code> de la classe PHP </code>NicolasBejean\MySqlMq\Model\Consumer\DefaultConsumer.

Création du fichier communication.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="nicolasbejean.mysqlmq.consumer.default" request="Magento\AsynchronousOperations\Api\Data\OperationInterface">
        <handler name="NicolasBejeanMySqlMqConsumer" type="NicolasBejean\MySqlMq\Model\Consumer\DefaultConsumer" method="process" />
    </topic>
</config>

Création du fichier queue_consumer.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <consumer name="NicolasBejeanMySqlMqConsumer"
              queue="nicolasbejean.mysqlmq.consumer.default"
              connection="db"
              maxMessages="500"
              consumerInstance="Magento\Framework\MessageQueue\Consumer"
              handler="NicolasBejean\MySqlMq\Model\Consumer\DefaultConsumer::process"/>
</config>

Création du fichier queue_topology.xml

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="nicolasbejean-mysqlmq-exchange" type="topic" connection="db">
        <binding id="NicolasBejeanMySqlMqConsumer" topic="nicolasbejean.mysqlmq.consumer.default" destinationType="queue" destination="nicolasbejean.mysqlmq.consumer.default"/>
    </exchange>
</config>

Création du fichier queue_publisher.xml

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
    <publisher topic="nicolasbejean.mysqlmq.consumer.default">
        <connection name="db" exchange="nicolasbejean-mysqlmq-exchange" />
    </publisher>
</config>

Création de la classe PHP

Créer le fichier PHP DefaultConsumer.php<code> dans le répertoire </code>Model/Consumer :

├── Model
│   └── Consumer
│       └── DefaultConsumer.php

Dès qu’elle réceptionnera le message de type OperationInterface<code>, la méthode </code>process va récupérer les données JSON. Ces dernières pourront alors être traitées à votre guise.

<?php
declare(strict_types=1);

namespace NicolasBejean\MySqlMq\Model\Consumer;

use Magento\AsynchronousOperations\Api\Data\OperationInterface;
use Magento\Framework\Serialize\Serializer\Json as JsonSerializer;
use Psr\Log\LoggerInterface;
use Exception;

/**
 * Class DefaultConsumer
 *
 * Crée un message dans le topic par défaut
 *
 * @author   Nicolas Béjean <[email protected]>
 * @link     https://www.bejean.eu
 */
class DefaultConsumer
{
    /**
     * @var JsonSerializer
     */
    protected $jsonSerializer;

    /**
     * @var LoggerInterface
     */
    protected $logger;

    /**
     * Create constructor.
     * 
     * @param JsonSerializer $jsonSerializer
     * @param LoggerInterface $logger
     */
    public function __construct(
        JsonSerializer $jsonSerializer,
        LoggerInterface $logger
    ) {
        $this->jsonSerializer = $jsonSerializer;
        $this->logger = $logger;
    }

    /**
     * @param OperationInterface $data
     */
    public function process(OperationInterface $data)
    {
        $time_start = microtime(true);

        $data = $this->jsonSerializer->unserialize($data->getSerializedData());

        $time_end = microtime(true);
        $time = $time_end - $time_start;
        $this->logger->info('[NicolasBejean_MySqlMq] DefaultConsumer::process | Total execution time in seconds: ' . $time);
    }
}

Création de la commande CLI

Base de la commande

Pour créer une commande CLI sous Magento, c’est très simple. Il faut déclarer la commande dans le fichier di.xml :

<?xml version="1.0" encoding="utf-8" ?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:ObjectManager/etc/config.xsd">
    <type name="NicolasBejean\MySqlMq\Console\Command\DefaultMessage">
        <arguments>
            <argument name="name" xsi:type="string">nicolasbejean:mysqlmq:defaultmessage</argument>
        </arguments>
    </type>
    <type name="Magento\Framework\Console\CommandListInterface">
        <arguments>
            <argument name="commands" xsi:type="array">
                <item name="nicolasbejean_mysqlmq_defaultmessage" xsi:type="object">NicolasBejean\MySqlMq\Console\Command\DefaultMessage</item>
            </argument>
        </arguments>
    </type>
</config>

Puis, créer la classe correspondante. Les classes des commandes CLI doivent se situer dans le dossier Console/Command. Voici le contenu de base d’un fichier de commande :

<?php
namespace NicolasBejean\MySqlMq\Console\Command;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

/**
 * Class DefaultMessage
 *
 * Crée un message dans le topic par défaut
 *
 * @author   Nicolas Béjean <[email protected]>
 * @link     https://www.bejean.eu
 */
class DefaultMessage extends Command
{
    /**
     * Description de la commande
     */
    const DESCRIPTION = 'Crée un message dans le topic par défaut';

    /**
     * DefaultConsumer constructor.
     *
     * @param string|null $namee
     */
    public function __construct(
        ?string $name = null
    ) {
        parent::__construct($name);
    }

    /**
     * Configuration de la commande
     */
    protected function configure()
    {
        /* Options de la commande */
        $options = [];

        $this->setDefinition($options);
        $this->setDescription(self::DESCRIPTION);

        parent::configure();
    }

    /**
     * Méthode d'exécution de la commande
     *
     * @param InputInterface $input
     * @param OutputInterface $output
     * @return null
     * @throws LocalizedException
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {

    }
}

Avant de lancer la commande, il faut impérativement lancer les commandes suivantes : bin/magento setup:upgrade && bin/magento setup:di:compile<code>. Puis vous pourrez lancer la commande </code>bin/magento nicolasbejean:mysqlmq:defaultmessage.

Cependant, cette commande ne fera aucune action. Avant de continuer, mmodifions la méthode execute :

    /**
     * Méthode d'exécution de la commande
     *
     * @param InputInterface $input
     * @param OutputInterface $output
     * @return null
     * @throws LocalizedException
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $output->write('Start execution', true);
        $time_start = microtime(true);

        $output->write('Command ready!', true);

        $time_end = microtime(true);
        $time = $time_end - $time_start;
        $output->write('Total execution time in seconds: ' . $time, true);
    }

Une fois modifié, la commande affichera 3 messages dans le terminal.

Classes et méthodes pour publier un message

Pour publier un message en utilisant les opérations en bloc (Bull Operations), il faut instancier quelques classes.

Ajouter les classes suivantes :

use Magento\AsynchronousOperations\Api\Data\OperationInterface;
use Magento\AsynchronousOperations\Api\Data\OperationInterfaceFactory;
use Magento\Framework\Bulk\BulkManagementInterface;
use Magento\Framework\Bulk\OperationInterface as BulkOperationInterface;
use Magento\Framework\DataObject\IdentityGeneratorInterface;
use Magento\Framework\Exception\LocalizedException;
use Magento\Framework\Serialize\Serializer\Json as JsonSerializer;

Créer les variables :

    /**
     * @var JsonSerializer
     */
    protected $jsonSerializer;

    /**
     * @var int
     */
    private $bulkSize;

    /**
     * @var IdentityGeneratorInterface
     */
    private $identityService;

    /**
     * @var BulkManagementInterface
     */
    private $bulkManagement;

    /**
     * @var OperationInterfaceFactory
     */
    private $operationFactory;

Modifier le constructeur :

        /**
     * DefaultConsumer constructor.
     *
     * @param string|null $name
     * @param JsonSerializer $jsonSerializer
     * @param IdentityGeneratorInterface $identityService
     * @param BulkManagementInterface $bulkManagement
     * @param OperationInterfaceFactory $operationFactory
     * @param int $bulkSize
     */
    public function __construct(
        JsonSerializer $jsonSerializer,
        IdentityGeneratorInterface $identityService,
        BulkManagementInterface $bulkManagement,
        OperationInterfaceFactory $operationFactory,
        int $bulkSize = 100,
        ?string $name = null
    ) {
        $this->jsonSerializer = $jsonSerializer;
        $this->bulkSize = $bulkSize;
        $this->identityService = $identityService;
        $this->bulkManagement = $bulkManagement;
        $this->operationFactory = $operationFactory;

        parent::__construct($name);
    }

Ajouter les 2 méthodes suivantes :

    /**
     * Publie dans la base de données
     *
     * @param string $data
     * @throws LocalizedException
     */
    private function publish(string $data):void
    {
        $bulkUuid = $this->identityService->generateId();
        $bulkDescription = __('Insert Message: ' . $data);
        $operations = [];

        $operations[] = $this->makeOperation(
            $data,
            'nicolasbejean.mysqlmq.consumer.default',
            $bulkUuid
        );

        if (!empty($operations)) {
            $result = $this->bulkManagement->scheduleBulk(
                $bulkUuid,
                $operations,
                $bulkDescription
            );
            if (!$result) {
                throw new LocalizedException(
                    __('Something went wrong while processing the request.')
                );
            }
        }
    }

    /**
     * Crée le bulk dans Magento Bulk
     *
     * @param $dataToEncode
     * @param $queue
     * @param $bulkUuid
     * @return OperationInterface
     */
    private function makeOperation(
        $dataToEncode,
        $queue,
        $bulkUuid
    ):OperationInterface {
        $data = [
            'data' => [
                'bulk_uuid' => $bulkUuid,
                'topic_name' => $queue,
                'serialized_data' => $this->jsonSerializer->serialize($dataToEncode),
                'status' => BulkOperationInterface::STATUS_TYPE_OPEN,
            ]
        ];

        return $this->operationFactory->create($data);
    }

La méthode publish<code> va permettre de créer un </code>bulk<code> et de le programmer. La méthode </code>makeOperation<code> prépare les données pour créer l'opération dans le </code>bulk.

Avant de terminer, ajouter $this->publish('Données de test');<code> à la place de </code>$output->write('Command ready!', true);<code> dans la méthode </code>execute.

Contenu complet de la commande CLI

<?php
namespace NicolasBejean\MySqlMq\Console\Command;

use Magento\AsynchronousOperations\Api\Data\OperationInterface;
use Magento\AsynchronousOperations\Api\Data\OperationInterfaceFactory;
use Magento\Framework\Bulk\BulkManagementInterface;
use Magento\Framework\Bulk\OperationInterface as BulkOperationInterface;
use Magento\Framework\DataObject\IdentityGeneratorInterface;
use Magento\Framework\Exception\LocalizedException;
use Magento\Framework\Serialize\Serializer\Json as JsonSerializer;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

/**
 * Class DefaultMessage
 *
 * Crée un message dans le topic par défaut
 *
 * @author   Nicolas Béjean <[email protected]>
 * @link     https://www.bejean.eu
 */
class DefaultMessage extends Command
{
    /**
     * Description de la commande
     */
    const DESCRIPTION = 'Crée un message dans le topic par défaut';

    /**
     * @var JsonSerializer
     */
    protected $jsonSerializer;

    /**
     * @var int
     */
    private $bulkSize;

    /**
     * @var IdentityGeneratorInterface
     */
    private $identityService;

    /**
     * @var BulkManagementInterface
     */
    private $bulkManagement;

    /**
     * @var OperationInterfaceFactory
     */
    private $operationFactory;

    /**
     * DefaultConsumer constructor.
     *
     * @param string|null $name
     * @param JsonSerializer $jsonSerializer
     * @param IdentityGeneratorInterface $identityService
     * @param BulkManagementInterface $bulkManagement
     * @param OperationInterfaceFactory $operationFactory
     * @param int $bulkSize
     */
    public function __construct(
        JsonSerializer $jsonSerializer,
        IdentityGeneratorInterface $identityService,
        BulkManagementInterface $bulkManagement,
        OperationInterfaceFactory $operationFactory,
        int $bulkSize = 100,
        ?string $name = null
    ) {
        $this->jsonSerializer = $jsonSerializer;

        $this->bulkSize = $bulkSize;
        $this->identityService = $identityService;
        $this->bulkManagement = $bulkManagement;
        $this->operationFactory = $operationFactory;

        parent::__construct($name);
    }

    /**
     * Configuration de la commande
     */
    protected function configure()
    {
        /* Options de la commande */
        $options = [];

        $this->setDefinition($options);
        $this->setDescription(self::DESCRIPTION);

        parent::configure();
    }

    /**
     * Méthode d'exécution de la commande
     *
     * @param InputInterface $input
     * @param OutputInterface $output
     * @return null
     * @throws LocalizedException
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $output->write('Start execution', true);
        $time_start = microtime(true);

        $this->publish('Données de test');

        $time_end = microtime(true);
        $time = $time_end - $time_start;
        $output->write('Total execution time in seconds: ' . $time, true);
    }

    /**
     * Publie dans la base de données
     *
     * @param string $data
     * @throws LocalizedException
     */
    private function publish(string $data):void
    {
        $bulkUuid = $this->identityService->generateId();
        $bulkDescription = __('Insert Message: ' . $data);
        $operations = [];

        $operations[] = $this->makeOperation(
            $data,
            'nicolasbejean.mysqlmq.consumer.default',
            $bulkUuid
        );

        if (!empty($operations)) {
            $result = $this->bulkManagement->scheduleBulk(
                $bulkUuid,
                $operations,
                $bulkDescription
            );
            if (!$result) {
                throw new LocalizedException(
                    __('Something went wrong while processing the request.')
                );
            }
        }
    }

    /**
     * Crée le bulk dans Magento Bulk
     *
     * @param $dataToEncode
     * @param $queue
     * @param $bulkUuid
     * @return OperationInterface
     */
    private function makeOperation(
        $dataToEncode,
        $queue,
        $bulkUuid
    ):OperationInterface {
        $data = [
            'data' => [
                'bulk_uuid' => $bulkUuid,
                'topic_name' => $queue,
                'serialized_data' => $this->jsonSerializer->serialize($dataToEncode),
                'status' => BulkOperationInterface::STATUS_TYPE_OPEN,
            ]
        ];

        return $this->operationFactory->create($data);
    }
}

Résultat

Lorsque la commande bin/magento setup:upgrade<code> est exécutée, une nouvelle file d'attente est créée dans la table </code>queue :

id name
10 nicolasbejean.mysqlmq.consumer.default

Dès lors qu’un message est publié, l’opération est inscrite dans la table magento_bulk :

id uuid user_id user_type description operation_count start_time
1 0x63366439313662642D373964322D343866302D383935302D336639393963343533633435 NULL 2 Insert Message: Données de test 1 2020-08-17 14:39:57

Dans la table queue_message :

id topic_name body
1 nicolasbejean.mysqlmq.consumer.default {"id":null,"bulk_uuid":"c6d916bd-79d2-48f0-8950-3f999c453c45","topic_name":"nicolasbejean.mysqlmq.consumer.default","serialized_data":"\"Donn\\u00e9es de test\"","result_serialized_data":null,"status":4,"result_message":null,"error_code":null}

Et pour terminer, dans la table queue_message_status :

id queue_id message_id updated_at status number_of_trials
1 10 1 2020-08-17 14:43:15 4 0

Traitement des messages

Les messages sont traités par les tâches automatisées (CRON), pour cela, vérifier que vos crons sont fonctionnels, puis vérifier que le fichier app/etc/env.php comporte la configuration suivante :

    'cron_consumers_runner' => [
        'cron_run' => true,
        'max_messages' => 1000,
        'consumers' => []
    ]

Sans cela, vos crons ne lanceront jamais le traitement de vos messages.

Sources

Tutoriel publié le 17/08/2020.

Dernière modification: 17 août 2020

Auteur