Crear y utilizar colas de mensajes MQ en Magento

por David Abad
Colas de mensajes MQ en Magento

Una cola de mensajes MQ en Magento funciona como un sistema de gestión de tareas asíncronas que permite separar la ejecución de procesos pesados del resto de tareas. De este modo, la ejecución de estos procesos no penaliza el rendimiento general del sitio.

Cómo funciona una cola de mensajes

Las colas de mensajes son una estructura de programación para el almacenamiento de tareas asíncronas mediante el patrón publish-subscribe.

Colas de mensajes

En estas colas de mensajes intervienen varios elementos:

  • Publisher: Componente que emite los mensajes (en un formato definido) hacia la cola de mensajes MQ 
  • Exchange: Componente que permite enrutar cada mensaje a las colas de mensajes apropiadas.
  • Queue: Cola que almacena los mensajes.
  • Consumer: Componente que obtiene los mensajes almacenados en las colas y ejecuta el proceso asociado. 
  • Broker: Tecnología central que actúa de intermediaria entre publishers y consumers (p.ej. RabbitMQ, Apache Kafka, etc.)

Este tipo de sistemas de colas aporta numerosas ventajas a nuestro sistema:

  • Desacoplamiento: Los componentes publisher y consumer son totalmente independientes entre ellos y puedes modificarse sin necesidad de afectar a la otra parte.
  • Escalabilidad: Estos sistemas son facilmente escalables, simplemente incrementando los componentes publisher y/o consumer según sea necesario.
  • Tolerancia a errores: Los sistemas MQ tienen una gran estabilidad ya que, si un consumer falla al ejecutar un mensaje, éste no afectará al resto de consumers. Además el mensaje erróneo quedará almacenado en la cola para posibles reintentos.
  • Registro y auditoría: Las colas de mensajes facilitan enormemente el registro de las acciones realizadas, pudiendo revisar los mensajes ejecutados satisfactoriamente, los posibles errores producidos, tareas pendientes, etc. 

Crear colas de mensajes personalizadas en Magento

1. Crear fichero <Modulo>/etc/communication.xml:
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="mimodulo.queue.mitarea" is_synchronous="false" request="string">
        <handler name="miTarea" type="MiVendor\MiModulo\Model\Queue\Consumer" method="process" />
    </topic>
</config>
2. Crear clase handler del consumidor:
<?php

declare(strict_types=1);

namespace MiVendor\MiModulo\Model\Queue;

class Consumer
{
    public function process(string $data): void
    {
        [...]
    }
}
3. Crear fichero <Modulo>/etc/queue_consumer.xml:
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <consumer name="mimodulo.queue.mitarea"
              queue="mimodulo.queue.mitarea"
              connection="db"
              consumerInstance="Magento\Framework\MessageQueue\Consumer" />
</config>
4. Crear fichero <Modulo>/etc/queue_topology.xml:
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="magento" type="topic" connection="db">
        <binding id="mimodulo.queue.mitarea"
                topic="mimodulo.queue.mitarea"
                destinationType="queue"
                destination="mimodulo.queue.mitarea" />
    </exchange>
</config>
5. Crear fichero <Modulo>/etc/queue_publisher.xml:
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <publisher topic="mimodulo.queue.mitarea">
        <connection name="db" exchange="magento" />
    </publisher>
</config>
6. [Ejemplo] Publicar un mensaje en la nueva cola:
<?php

declare(strict_types=1);

namespace MiVendor\MiModulo\Model\Queue;

use Magento\Framework\MessageQueue\PublisherInterface;
use Magento\Framework\Serialize\Serializer\Json;

class Publisher
{
	protected Json $_json;
	protected PublisherInterface $_publisher;

	public function __construct(
    	PublisherInterface $publisher,
    	Json $json
	){
    	$this->_publisher = $publisher;
    	$this->_json = $json;
	}
	
	public function publish(array $data): void
	{
	    $this->_publisher->publish('mimodulo.queue.mitarea', $this->_json->serialize($data));
	}
}

Ejecutar colas de mensajes

La ejecución de los consumidores de colas de mensajes se realiza automáticamente mediante una tarea periódica nativa de Magento:

  • Cron job: consumers_runner
  • Instance: Magento\MessageQueue\Model\Cron\ConsumersRunner
  • Schedule: * * * * *

Sin embargo, Magento también cuenta con una serie de comandos específicos para la gestión de los consumidores:

Listar consumidores de colas de mensajes
bin/magento queue:consumers:list
Ejecutar consumidor de colas de mensajes
bin/magento queue:consumers:start [--max-messages=<value>] <consumidor> 

El parámetro opcional max-messages índica el nº de mensajes que serán consumidos por el proceso antes de finalizar. Si la cola de mensajes se vacía antes de alcanzar este número, el proceso consumidor permanecerá activo a la espera de nuevos mensajes.

Detener un consumidor de colas de mensajes
ps -aux | grep "bin/magento" | grep "queue:consumers:start"  # Para obtener ID del proceso
kill -9 <PROCESO>

Colas de mensajes con MySQL

Por defecto, Magento utiliza su base de datos MySQL/MariaDB como broker y almacenamiento para sus colas de mensajes.

Las tablas de base de datos más importantes que intervienen en el almacenamiento y procesamiento de mensajes son:

  • queue: Colas de mensajes disponibles.
  • queue_message: Mensajes almacenados en las diferentes colas.
  • queue_message_status: Estado de los mensajes (2 = NEW; 3 = IN PROGRESS; 4 = COMPLETE; 5 = RETRY REQUIRED; 6 = ERROR; 7 = TO BE DELETED).
Utilizar la base de datos como broker de colas de mensajes es muy cómodo ya que no es necesario instalar y mantener servicios adicionales. Sin embargo, lo más recomendable es el uso del software RabbitMQ como broker.

Colas de mensajes con RabbitMQ

Magento incluye soporte nativo para utilizar el software RabbitMQ como broker para trabajar con colas de mensajes, permitiendo aliviar la carga de la base de datos y mejorando el rendimiento general del sistema.

Para utilizar RabbitMQ como broker es necesario indicar los datos de conexión al servicio en el fichero app/etc/env.php de Magento:

'queue' => [
    'amqp' => [
        'host' => '127.0.0.1',
        'port' => '5672',
        'user' => 'user',
        'password' => 'password',
        'virtualhost' => '/'
    ],
    'consumers_wait_for_messages' => 1
],

Una vez configurado, las colas serán gestionadas por RabbitMQ y podrán consultarse los detalles de las colas, estados de los mensajes, etc. a través de la interfaz web que RabbitMQ expone en el puerto 15672 (por defecto):

RabbitMQ WebUI

Déjanos tu email para recibir contenido interesante en tu bandeja de entrada, cada mes.

¡No hacemos spam!

Otros artículos