RabbitMQ Edge/Cloud APP

Agustin Bassi

Ago 18, 2021 ‧ 40 min estimados ‧ #rabbitmq #amqp #federation #shovel

Contenido

Objetivos

Contexto

Detalles de la aplicación

Preparación de entorno de desarrollo

Entorno cloud

Entorno edge

Configuración de las entidades comunes

Configuración de exchanges

Configuración de queues

Configuración de federation

Comunicación cloud->edge admin

Configuración de shovel

Comunicación edge->cloud admin

Comunicaciones

Comunicación cloud->edge

Comunicación edge/cloud

Exportar configuraciones

Conclusiones

Bibliografía

Licencia

Objetivos

Lo que vas a ver en este documento son los siguientes temas:

Contexto

En esta sección vamos a ver un breve contexto sobre los temas más importantes para entender el desarrollo de la aplicación.

AMQP

AMQP es un protocolo de mensajería que implementa un broker para comunicar aplicaciones/servicios en múltiples lenguajes de programación, tanto propios como de terceros, gracias a la definición de un Mensaje AMQP como lenguaje común para todos.

El protocolo propone la declaración de distintos tipos de exchanges donde los productores envían los mensajes. Así mismo, propone la declaración de queues donde se conectan los consumidores de los mensajes. Finalmente, los binding son reglas que configuran el enrutamiento de los mensajes desde los exchanges hacia las queues. Si querés saber más al respecto podés leer el artículo de Introducción a AMQP que se encuentra en nuestra web.

RabbitMQ

RabbitMQ es un broker que implementa la especificación 0-9-1 de AMQP, es muy completo y utilizado, y posee extensiones para personalizar diferentes comportamientos del broker que lo hacen especialmente útil para gran variedad de aplicaciones. Si querés saber más al respecto podés leer el artículo de Introducción a RabbitMQ que se encuentra en nuestra web.

RabbitMQ tiene diferentes mecanismos para distribuir mensajes entre brokers. Por un lado se puede crear un cluster de brokers y repartir la carga entre los brokers. Por otro lado, la federación permite replicar mensajes desde brokers remotos mediante la configuración de upstreams. Finalmente la funcionalidad shovel permite enviar datos desde un origen a un destino de una manera simple y efectiva. Si querés saber más al respecto podés leer el artículo de RabbitMQ Distribuido que se encuentra en nuestra web.

Connection AMQP

Debido a que consideramos que el protocolo AMQP - y especialmente RabbitMQ - es muy potente para el desarrollo de aplicaciones, armamos el proyecto Connection AMQP. Este proyecto es una plataforma integral que se compone de un broker RabbitMQ (integrado con un broker MQTT y un servidor web), un servicio con códigos de ejemplo para comunicarte con el broker y un cliente web MQTT, que sirve para enviar/recibir mensajes desde/hacia el ecosistema RabbitMQ mediante el broker MQTT integrado. Para el desarrollo de la aplicación vamos a utilizar este proyecto.

Detalles de la aplicación

La aplicación propuesta para demostrar el funcionamiento de la comunicación edge/cloud se compone de un broker RabbitMQ cloud donde se interconectan N brokers edge. Gracias a la funcionalidad que posee RabbitMQ, es posible realizar una única vez la configuración en la nube y luego, cada broker edge que se conecta realiza sus propias configuraciones sin necesidad de ningún trabajo extra en la nube.

Tanto al broker cloud como al broker edge, se conectarán diferentes clientes AMQP que vienen integrados en el proyecto Connection AMQP donde podremos ver el flujo de mensajes entre ambos brokers.

La aplicación que vamos a desarrollar se compone de una configuración edge/cloud que permite enviar configuraciones desde la nube hacia todos los dispositivos edge conectados, como así también enviar configuraciones hacia un dispositivo edge en particular. Por otro lado, permite que cada dispositivo edge pueda enviar hacia la nube datos periódicos, para mantener la información de cada dispositivo edge centralizada en la nube.

El exchange edge.config.all se encarga de enviar la configuración a todos los dispositivos edge. El exchange edge.config.target se encarga de enviar la configuración a un dispositivo edge en particular. El exchange edge.data se encarga de enviar los datos de cada dispositivo edge hacia la nube. En cada broker edge hay una cola llamada edge.config.queue que recibe los mensajes de configuraciones provenientes de la nube (ruteada a los exchange edge.config.X). En el broker cloud hay una cola llamada edge.data.queue que recibe los datos provenientes de cada broker edge. En esta figura podés ver la arquitectura de la solución propuesta.

Preparación de ambientes

Antes de comenzar con el desarrollo de la solución es necesario contar con un broker cloud como así también un broker local.

Entorno cloud

Para poner en marcha un broker cloud, lo más simple es utilizar la plataforma CloudAMQP, un servicio pre-configurado para correr RabbitMQ en la nube. Tiene diferentes planes de funcionamiento, y tiene una opción free que es suficiente para esta aplicación.

Lo primero es registrarte como usuario, luego crear una instancia, asignarle un nombre (Edge/Cloud por ejemplo), seleccionar el plan free y seguir los pasos hasta crearla. Una vez creada, en los detalles, deberías ver una imagen similar a la siguiente.

Accedé al administrador del broker cloud presionando el botón RabbitMQ Manager.

Mantené abierta esta pestaña o copia en algún lugar el valor del campo AMQP URL, ya que lo vamos a utilizar para realizar las conexiones al broker.

Entorno edge

Para poner en marcha el entorno edge vamos a hacer uso del proyecto Connection AMQP, que contiene todo lo que necesitamos para tal fin. En el README del proyecto están los detalles para hacer un fork del proyecto a tu cuenta de Github (si tenés), aunque para simplificar el proceso, en este caso vamos a realizar un clone con el siguiente comando.

git clone https://github.com/gotoiot/connection-amqp.git

Luego, ingresando dentro del directorio ejecuta los siguientes comandos para traer los submódulos del proyecto y  compilar la imagen de Docker con el código de ejemplo (puede demorar unos minutos).

git submodule update --init --recursive --remote && docker-compose build amqp-samples

 

Una vez que finalice el comando anterior, ejecuta el comando docker-compose up -d para poner en marcha el proyecto y luego de unos instantes podrás acceder al administrador de RabbitMQ accediendo a http://localhost:15672 con el usuario y contraseña definidos (por defecto gotoiot:gotoiot). En caso de estar corriendo el proyecto de manera remota, deberás cambiar la IP del host.

Con los brokers corriendo en la nube y de manera local, podés continuar con la configuración de las entidades comunes en cada broker.

Configuración de las entidades comunes

Ahora que ya contamos con el entorno necesario, lo siguiente es crear las entidades comunes que van a compartir tanto los broker edge como el broker cloud. Para ello es necesario crear los exchanges, las queues y los bindings.

Configuración de exchanges

Los exchanges necesitan crearse de igual manera tanto en el broker cloud como en los broker edge. Comenzá en el broker cloud creando un exchange llamado edge.config.all del tipo fanout donde se publicarán las configuraciones para todos los dispositivos edge con la configuración de esta imagen.

Luego, creá un exchange llamado edge.config.target del tipo topic que servirá para recibir una configuración destinada a cada dispositivo edge.

A continuación, creá un exchange llamado edge.data del tipo topic que servirá para publicar información relacionada con la base de datos de cada dispositivo edge.

Cuando tengas los exchanges declarados en el broker cloud, hacé lo mismo para el edge.

Configuración de queues

En el broker del dispositivo edge, crea una queue llamada edge.config.queue que tenga un binding con el exchange edge.config.all sin routing key (ya que es fanout) y con  edge.config.target con la routing key edge.config.001 (el 001 indica un posible ID del dispositivo edge). De esta manera, cuando se envíe desde la nube una configuración general (all) o bien una personalizada (target) sea recibida en la cola edge.config.queue. Luego de aplicar los binding con cada exchange, en los bindings de la queue deberías ver una lista como la siguiente.

Por otro lado, en el broker cloud será necesario que crees una queue llamada edge.data.queue que debe tener un binding con el exchange edge.data utilizando la routing key #, que indica que cualquier mensaje que se publique en el exchange edge.data vaya a parar a la queue. Luego de aplicarlo, en la queue deberías ver los bindings de esta manera.

 

Configuración de federation

Tal como vimos en el artículo de RabbitMQ Distribuido, la federación se encarga de tomar los datos remotos provenientes de queues o exchanges remotos y replicarlos de manera local.

La federación posibilita la comunicación - llamada upstream - desde el broker cloud a los brokers edge sin necesidad de configuración en el broker cloud. Esto permite que las federaciones sean creadas a demanda, a medida que nuevos dispositivos edge se unen a la red, y permitiendo que las conexiones puedan ser automatizadas.

Para la configuración de la federación es necesario que estén habilitados los plugins rabbitmq_federation y rabbitmq_federation_management en cada broker remoto. Esto puede ser realizado desde la línea de comandos del broker, o bien agregando ambos plugins a la lista en enable_plugins.

Una vez habilitados los plugins, en la sección de Admin -> Federation Upstream deberías cargar los datos del broker cloud. Para ello, copia la AMQP URL desde los detalles de la instancia del broker remoto, como vimos en la sección de Preparación de entornos de desarrollo. Sólo necesitas cargar la URL y el nombre edge.cloud.federation, los demás datos podés dejarlos en blanco, como podés ver en la figura siguiente.

Luego es necesario crear una policy dentro del broker edge para indicar que todos los exchanges que estén declarados en la nube y que coincidan con el patrón .*edge.config.* - es decir, los exchanges que sirven para enviar la configuración a los dispositivos edge - sean replicados en cada broker edge. En la figura siguiente podés ver la declaración de la policy edge.cloud.policy con el patrón .*edge.config.*, aplicado únicamente a exchanges y con la definición federation-upstream-set igual a all.

Una vez declarada la policy, en la sección Admin -> Federation Status deberías ver el estado de la conexión de la federación con una imagen similar a esta.

Comunicación cloud->edge admin

Ahora que ya realizamos la configuración de la federación vamos a realizar una prueba de comunicación desde el broker cloud hacia el broker edge. Para ello, vamos a publicar un mensaje en el exchange edge.config.all con el payload Config01, como vemos en esta imagen.

Así mismo, en el exchange edge.config.target del broker cloud publica un mensaje con la routing key edge.config.001 con el mensaje Config02, como en esta imagen.

Ahora que ya publicaste los mensajes, intentemos recibirlos en el broker edge. Para ello, accede a la queue edge.config.queue del broker edge y en la sección Get Messages setea 10 de cantidad y presiona el botón Get Messages. Deberías ver los mensajes publicados anteriormente.

Como podrás ver, los mensajes publicados en el broker cloud, ya sea para todos los dispositivos edge (all), como para uno en particular (target), son recibidos por el broker edge y pueden ser consumidos por servicios locales.

Configuración de shovel

La funcionalidad del plugin shovel es recibir mensajes de una fuente y publicarlos a un destino. Esta es una necesidad común a la hora de trabajar con brokers distribuidos, y el plugin de shovel cumple a la perfección con esta tarea, ya que es un cliente realizado por el core de RabbitMQ diseñado para tal fin.

Para la configuración del shovel es necesario que estén habilitados los plugins rabbitmq_shovel y rabbitmq_shovel_management en cada broker remoto. Esto puede ser realizado desde la línea de comandos del broker, o bien agregando ambos plugins a la lista en enable_plugins.

Una vez habilitados los plugins, en la sección de Admin -> Shovel Management deberías cargar el nombre edge.cloud.shovel y los datos del source y destination. En el source deberías poner la URI del edge broker, el exchange edge.data y la routing key # (es decir todo lo que se publica en el exchange). Para la parte de destino deberías cargar los mismos datos del exchange y routing key, y solamente cambiar la URI del broker cloud. La figura siguiente muestra los datos que deberías cargar.

En la sección Admin -> Shovel Status deberías ver el estado de la conexión del plugin shovel con una imagen similar a esta.

Comunicación edge->cloud admin

Una vez que está realizada la configuración del shovel, vamos a publicar un mensaje en el broker edge y chequear si es recibido en el broker cloud. Para ello desde el broker edge, en el exchange edge.data publica el mensaje Data001 con la routing key edge.data.001 (nuevamente, el 001 podría indicar el ID del dispositivo edge), como vemos en esta imagen.

Luego, desde el broker cloud, accedé a la Queue edge.data.queue, y en la sección Get Messages configura la cantidad de 10,  y presioná el botón Get Messages. Deberías ver el mensaje enviado por el broker edge como vemos en la imagen siguiente.

Esto nos va a permitir tener un consumidor que se comunique al broker cloud y que reciba la información de todos los broker edge, procese la información y la termine impactando en el data center de la aplicación. Desde este punto se podría realizar toda la explotación de los datos desde la nube con la información correspondiente a cada dispositivo edge.

Comunicación mediante clientes

En las secciones anteriores nos encargamos de realizar las configuraciones de las entidades AMQP, tanto en el broker cloud como en el broker edge, realizamos la configuración de la federación para recibir datos desde el broker cloud como así también la configuración del plugin shovel para enviar datos al broker cloud. También hicimos pruebas de comunicación desde el admin en ambos sentidos.

En esta sección nos vamos a encargar de correr distintos clientes AMQP que se comunicarán con los broker cloud y edge, y que permitirán demostrar el funcionamiento de las comunicaciones en ambos sentidos a través de las configuraciones que realizamos anteriormente.

Para este propósito vamos a usar el servicio amqp-samples, que está integrado dentro del proyecto Connection AMQP, que tiene código de ejemplo para comunicarse con un broker AMQP de diferentes maneras. Si querés acceder a la información completa del servicio podés leer el README del proyecto.

También será necesario que cuentes con los datos de user, pass, host y port, tanto para el broker edge como para el broker cloud. Si estás usando la configuración por defecto del broker dentro del proyecto Connection AMQP debería ser amqp://gotoiot:gotoiot@localhost:5672 mientras que la URI de comunicación del broker remoto deberías obtenerla del campo AMQP URL de los detalles de la instancia.

Comunicación cloud->edge

La primera parte de la comunicación será publicar datos de configuración desde el broker cloud y recibirlos desde el broker edge mediante el uso de los exchanges edge.config.all y edge.config.target.

Usando el exchange edge.config.all

En este caso vamos a publicar un mensaje en el exchange edge.config.all del broker cloud, y esperamos recibir ese mensaje en la cola edge.config.queue del broker edge. Desde la raíz del proyecto Connection AMQP, abrí una terminal y ejecuta el siguiente comando para comenzar a consumir desde la cola edge.config.queue del broker edge mediante un binding con el exchange edge.config.all que es del tipo fanout.

docker-compose run amqp-samples python samples/generic_client/consumer.py -e edge.config.all -t fanout -q edge.config.queue

Luego, desde otra terminal vamos a publicar el mensaje Config01 en el exchange edge.config.all del broker cloud con el comando siguiente.

docker-compose run amqp-samples python samples/generic_client/producer.py -H CLOUD_HOSTNAME -u CLOUD_USER -P CLOUD_PASS -v CLOUD_VHOST -e edge.config.all -t fanout -m Config01

Como podrás notar, los mensajes enviados al exchange edge.config.all del broker cloud son replicados en el exchange edge.config.all del broker edge mediante la federación, que internamente tiene un binding hacia la cola edge.config.queue del broker edge. Esto se hará en cada broker edge federado al broker cloud, de manera que puede ser recibido por N brokers al mismo tiempo.

Usando el exchange edge.config.target

Para este caso, abriremos una nueva terminal y ejecutaremos este comando para consumir desde la cola edge.config.queue, que se va a vincular al exchange edge.config.target del tipo topic usando la routing key edge.config.001. Esto quiere decir que vamos a recibir una configuración específica para cada dispositivo edge.

docker-compose run amqp-samples python samples/generic_client/consumer.py -e edge.config.target -t topic -q edge.config.queue -r edge.config.001

Luego, desde otra terminal vamos a publicar un mensaje en el exchange edge.config.target del tipo topic del broker cloud utilizando la routing key edge.config.001 y con el payload SpecificConfig01 con el siguiente comando.

docker-compose run amqp-samples python samples/generic_client/producer.py -H CLOUD_HOSTNAME -u CLOUD_USER -P CLOUD_PASS -v CLOUD_VHOST -e edge.config.target -t topic -r edge.config.001 -m SpecificConfig01

En este caso, los mensajes que van a parar a la cola edge.config.queue del broker edge, llegan a la cola mediante el exchange edge.config.target replicado entre el broker cloud y los N broker edge. Esto permite recibir datos específicos de un dispositivo.

Intentá publicar un mensaje con la routing key edge.config.002 y verás que no es recibido por el consumidor, ya que no coincide la routing key.

Comunicación edge/cloud

Ahora que ya probamos la comunicación desde la nube hacia el edge, vamos a realizar el envío de datos desde el edge hacia la nube.

Para ello abrí una terminal para consumir datos de la queue edge.data.queue del broker cloud, que tiene un binding con el exchange edge.data (del tipo topic) utilizando la routing key #, con el siguiente comando.

docker-compose run amqp-samples python samples/generic_client/consumer.py -H CLOUD_HOSTNAME -u CLOUD_USER -P CLOUD_PASS -v CLOUD_VHOST -e edge.data -t topic -r '#' -q edge.data.queue

Luego, desde otra terminal vamos a enviar el mensaje Data001 en el exchange edge.data, con la routing key edge.data.001 en el broker edge.

docker-compose run amqp-samples python samples/generic_client/producer.py -e edge.data -t topic -r edge.config.001 -m Data001

Como podrás notar, el mensaje publicado desde el broker edge es recibido por el consumidor del broker cloud, demostrando así que los datos enviados por cualquier cliente que publique en el exchange edge.config.data de cada broker edge será enviado hacia la queue edge.data.queue del broker cloud, haciendo uso de la funcionalidad shovel configurada.

Exportar configuraciones

Las configuraciones realizadas a lo largo de estas secciones fueron extensas y realizar esto por cada broker edge puede resultar completamente inescalable. Para ello, lo más conveniente es exportar las configuraciones realizadas hasta el momento en el broker edge y añadirlas al control de versiones que estemos usando, de manera tal que puedas replicar esta configuración en cada broker edge que despliegues.

Para ello, desde la pestaña Overview del broker edge, realiza la exportación de las configuraciones en la sección Export definitions. Para lo que estuvimos realizando en esta aplicación, deberías tener una configuración similar a la siguiente.

{
   
"users": [
       {
           
"name": "guest",
           
"password": "guest",
           
"tags": "administrator"
       },
       {
           
"name": "gotoiot",
           
"password": "gotoiot",
           
"tags": "administrator"
       }
   ],
   
"vhosts": [
       {
           
"name": "/"
       }
   ],
   
"permissions": [
       {
           
"user": "guest",
           
"vhost": "/",
           
"configure": ".*",
           
"write": ".*",
           
"read": ".*"
       },
       {
           
"user": "gotoiot",
           
"vhost": "/",
           
"configure": ".*",
           
"write": ".*",
           
"read": ".*"
       }
   ],
   
"parameters": [
       {
           
"value": {
               
"ack-mode": "on-confirm",
               
"trust-user-id": false,
               
"uri": "amqps://USER:PASS@HOSTNAME:PORT/VHOST"
           },
           
"vhost": "/",
           
"component": "federation-upstream",
           
"name": "edge.cloud.federation"
       },
       {
           
"value": {
               
"ack-mode": "on-confirm",
               
"dest-add-forward-headers": false,
               
"dest-exchange": "edge.data",
               
"dest-exchange-key": "#",
               
"dest-protocol": "amqp091",
               
"dest-uri": "amqps://USER:PASS@HOSTNAME:PORT/VHOST",
               
"src-delete-after": "never",
               
"src-exchange": "edge.data",
               
"src-exchange-key": "#",
               
"src-protocol": "amqp091",
               
"src-uri": "amqp://USER:PASS@HOSTNAME:PORT/VHOST"
           },
           
"vhost": "/",
           
"component": "shovel",
           
"name": "edge.cloud.shovel"
       }
   ],
   
"policies": [
       {
           
"vhost": "/",
           
"name": "edge.cloud.fed.policy",
           
"pattern": ".*edge.config.*",
           
"apply-to": "exchanges",
           
"definition": {
               
"federation-upstream-set": "all"
           },
           
"priority": 0
       }
   ],
   
"queues": [
       {
           
"name": "edge.config.queue",
           
"vhost": "/",
           
"durable": true,
           
"auto_delete": false,
           
"arguments": {
               
"x-queue-type": "classic"
           }
       }
   ],
   
"exchanges": [
       {
           
"name": "edge.config.target",
           
"vhost": "/",
           
"type": "topic",
           
"durable": true,
           
"auto_delete": false,
           
"internal": false,
           
"arguments": {}
       },
       {
           
"name": "edge.config.all",
           
"vhost": "/",
           
"type": "fanout",
           
"durable": true,
           
"auto_delete": false,
           
"internal": false,
           
"arguments": {}
       },
       {
           
"name": "edge.data",
           
"vhost": "/",
           
"type": "topic",
           
"durable": true,
           
"auto_delete": false,
           
"internal": false,
           
"arguments": {}
       }
   ],
   
"bindings": [
       {
           
"source": "edge.config.all",
           
"vhost": "/",
           
"destination": "edge.config.queue",
           
"destination_type": "queue",
           
"routing_key": "",
           
"arguments": {}
       },
       {
           
"source": "edge.config.target",
           
"vhost": "/",
           
"destination": "edge.config.queue",
           
"destination_type": "queue",
           
"routing_key": "edge.config.001",
           
"arguments": {}
       }
   ]
}

Para la configuración anterior únicamente sería necesario que edites correctamente las URIs del broker cloud y edge en las configuraciones de la federación y shovel.

Conclusiones

A lo largo de esta guía pudimos poner en funcionamiento una configuración que permite comunicar un broker cloud con N brokers edge. Esta es una parte clave en el desarrollo de aplicaciones, ya que permite la vinculación de servicios y aplicaciones corriendo en diferentes locaciones. Incluso, y gracias al funcionamiento nativo de RabbitMQ, están preparados para soportar intermitencia en la comunicación, que es un problema muy frecuente de aplicaciones productivas.

A partir de la demostración de comunicación, tanto desde el administrador del broker como de los clientes que se comunicaron mediante amqp-samples, te queda un marco de trabajo funcionando sobre el cual poder plantear una arquitectura. Si bien el código de muestra presentado está implementado en Python, AMQP es un protocolo estándar y vas a encontrar clientes para múltiples lenguajes y plataformas, por lo que el desarrollo de la aplicación se puede montar prácticamente en cualquier lado.

Para hacer un resumen de todos los temas que vimos a lo largo de estas secciones, podemos nombrar los siguientes:

Bibliografía

Licencia

Este material es distribuido bajo licencia Creative Commons BY-SA 4.0. Podés encontrar detalles sobre el uso del material en este link.