jueves, 15 de diciembre de 2016

Colas en Python con RabbitMQ y Pika (parte 1)


La cola (queue) es un objeto clásico en programación (y, por desgracia, en la vida real también), que consiste en un conjunto de elementos ordenados que deben consumirse siguiendo el concepto FIFO (First In, First Out: primero en entrar, primero en salir).

Un ejemplo típico de esta estructura son las colas de trabajos. Un determinado dispositivo o recurso sólo puede atender un trabajo a la vez y organiza sus tareas en una cola que se va sirviendo por orden. Esto es lo que todos tenemos en nuestro ordenador para los trabajos de impresión.

En general, cuando tenemos uno o varios procesos que deben atender una gran carga de trabajo (no interactiva), no es buena idea que se sirvan las peticiones en paralelo, sino que es más factible organizar dichas tareas en una cola, de manera que los procesos no se colapsen.

Un ejemplo de eso podría ser un sistema de entrega de mensajes como Twitter. Cada vez que escribes un tuit, dicho mensaje entra en una o varias colas de trabajo desde las cuales unos procesos especializados van a a ir haciéndolo llegar a todos tus seguidores. El sistema es muy rápido, pero asíncrono al fin y al cabo.

Desde el punto de vista de la Ingeniería del Software, se trata de una solución del clásico problema del productor-consumidor.

Una cola de paso de mensajes

En este artículo vamos a dar un primer paso, haciendo un flujo de "mensajes" (es decir, fragmentos de información) basado en colas. Esto aún no se puede considerar una cola de trabajos, dado que habría que añadir mucha más funcionalidad, que veremos en futuros artículos. Lo llamaremos "cola de mensajes" para no confundir términos.

Generar un flujo de mensajes entre procesos en Python, es relativamente sencillo usando dos herramientas de software libre basadas en el protocolo AMPQ: el servicio de mensajería RabbitMQ y el cliente Pika.

RabbitMQ es un gestor de mensajes (message broker) desarrollado en el exótico lenguaje Erlang (que curiosamente lleva el nombre del inventor de la Teoría de Colas).

Su único trabajo consiste en aceptar y despachar "mensajes" (donde un mensaje es cualquier fragmento de información). En este sentido, RabbitMQ hace las veces de buzón, de oficina de correos y de cartero, todo en uno. La diferencia con los sistemas de correo electrónico es que no trata con personas, sino con procesos. Usaremos este servicio de bajo nivel como base para nuestro sistema de cola de trabajos.

Para el intercambio de mensajes, RabbitMQ implementa un protocolo estándar llamado AMQP (Advanced Message Queue Protocol). Dado que es un estándar, podemos interactuar con él usando cualquier lenguaje que tenga una librería cliente para dicho protocolo. En este artículo vamos a usar la librería Pika en Python, pero hay otras similares para montones de lenguajes, incluidos Java, PHP, Javascript, C# y un largo etcétera.

NOTA: En este artículo no voy a entrar en la instalación. Cuento con que tienes RabbitMQ instalado y corriendo en tu máquina local. Desde la página de RabbitMQ se puede descargar el software para Linux, Windows y Mac OS X. También se puede obtener mediante Docker. Por su parte, Pika está disponible en el repositorio estándar de Python. Los programas de ejemplo en este artículo están probados en Python 3.5.2 sobre Ubuntu Desktop 16.
Comprobar que el servicio está arriba

Ttanto el emisor como el receptor deberán conectar con el servicio RabbitMQ, que deberá estar corriendo en la máquina local. Para ver que está corriendo, podemos usar el siguiente comando:
$ sudo rabbitmqctl status
Si el servicio está corriendo, recibiremos una larga serie de información de depuración. Si no es así, obtendremos un error de conexión.

La instalación por defecto en Ubuntu nos levanta el servicio automáticamente, así que en condiciones normales debería estar arriba. En caso contrario, se puede usar el mismo comando rabbitmqctl para arrancar el servicio, pero es más recomendable delegar este trabajo en la gestión de servicios del sistema operativo. Así pues, por ejemplo en Ubuntu haríamos:
$ sudo service rabbitmq-server start
Una vez que estamos seguros que tenemos el servicio de mensajería arriba, podemos pasar a nuestro primer programita.

Prueba de concepto: envío de mensajes

Vamos a hacer la prueba de concepto mínima de envío y recepción de mensajes.

La idea es crear un proceso que espera la llegada de un mensaje, y otro que hace el envío. Así comprobaremos que la emisión y recepción funcionan correctamente.

Ambos procesos, el receptor (consumidor) y el emisor (productor) van a tener que hacer la conexión con el servicio de mensajería RabbitMQ de la máquina local. Para ello, instanciamos la clase pika.BlockingConnection. De los tipos de conexión que soporta Pika, el "blocking" es el más sencillo de utilizar, dado que nos resulta muy familiar y similar al paradigma clásico de apertura y cierre de un canal de comunicación, igual que si de una conexión a base de datos se tratara.

A esta clase (denominada adaptador), le pasamos una instancia de pika.ConnectionParameters, a la que definimos un único parámetro diciendo que el destino es la propia máquina local.

con = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

Una vez instanciado, abrimos el canal llamando al método channel().

ch = con.channel()

Siempre que sea posible, nos cuidaremos de liberar recursos cerrando la conexión cuando ya no haga falta:

con.close()

Una vez tenemos acceso al servicio de mensajería, podemos empezar a usar una cola para enviar mensajes o recibirlos. Pero hay que ser precavidos, puesto que nada nos garantiza que la cola ya exista. Así que lo primero será declarar la cola, por si acaso no existiera:

ch.queue_declare(queue='prueba')

En caso de que la cola ya exista, esto no tendrá efecto alguno.

Para enviar un mensaje a la cola, hay también varios métodos. El más simple es basic_publish(), que recibe como mínimo dos parámetros:

  • exchange: El intercambiador es una abstracción que nos permite configurar la entrega de los mensajes como una caja negra, de manera que en principio no tendríamos que decir a qué cola o colas van los mensajes ni cómo se hace la entrega. Esto es muy potente, pero complejo. Para una simple prueba podemos obviar todo esto dejando vacío este parámetro. En ese caso, añadiremos el parámetro adicional routing_key diciendo directamente el nombre de la cola de destino.
  • body: El cuerpo del mensaje. Recordemos que puede ser cualquier trozo de información, ya sea binaria o de caracteres.
La orden queda de esta manera:

ch.basic_publish(exchange='', routing_key='prueba', body='Hola Mundo')


Así pues, ya podemos poner todo junto para nuestro envío de mensajes. Lo vamos a poner en un módulo llamado productor.py:


import pika

# Establecer conexión
con = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
ch = con.channel()

# Declarar la cola
ch.queue_declare(queue='prueba')

# Publicar el mensaje
ch.basic_publish(exchange='', routing_key='prueba', body='Hola Mundo')
print("Mensaje enviado.")

# Cerrar conexión
con.close()


Si lanzamos este programa, terminará con éxito a pesar de que no hay ningún proceso esperando los mensajes:

$ python productor.py
Mensaje enviado.
$

Esto es así porque la entrega de mensajes es asíncrona. El mensaje quedará en la cola hasta que alguien lo consuma. En cualquier momento, podemos ver las colas que hay y cuántos mensajes tienen esperando, mediante este comando:

$ sudo rabbitmqctl list_queues
Listing queues ...
prueba 1
$

NOTA: El procedimiento basic_publish() no tiene mecanismos para comprobar si el mensaje realmente ha llegado a la cola. En nuestro ejemplo, el programa productor sólo sabe que se ha intentado el envío. El protocolo permite usar acuses de recibo, pero de momento obviaremos esa parte.

Prueba de concepto: recepción de mensajes

La recepción no es tan sencilla. Al ser un sistema asíncrono, necesitamos de alguna forma "suscribirnos" a la cola para recibir una llamada cada vez que haya un mensaje. Esto se hace mediante una función tipo callback. Por ejemplo, definimos lo siguiente:


def recepcion(ch, method, properties, body):
    print("Se ha recibido el siguiente mensaje: %s" % body)

Es obligatorio declarar estos parámetros, aunque por el momento vamos a ignorarlos a excepción del body.

Para enganchar esta función a la cola, usamos el método basic_consume().

ch.basic_consume(recepcion, queue='prueba', no_ack=True)

El primer parámetro es la referencia a la función que hemos definido. Recuerda que se trata de un callback, es decir, no usamos recepción() con paréntesis porque en este caso queremos usar la referencia (o puntero), y no el resultado de la función.

Los otros dos parámetros que hemos usado es la cola a la que nos queremos enganchar (queue) y un booleano (no_ack), que indica a la cola que borre el mensaje en cuanto sea servido, sin esperar confirmación del proceso consumidor. Esto simplifica la programación, y también hace el proceso más eficiente, a costa de un cierto riesgo de pérdida de datos.

Por último, nos ponemos en espera de mensajes;

ch.start_consuming()

Este comando dejará bloqueado el proceso eternamente hasta que se aborte con CTRL+C. Mientras tanto, la función recepcion() recibirá un callout cada vez que tenga un mensaje en cola.

El código completo de consumidor.py es el siguiente:


import pika

# Establecer conexión
con = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
ch = con.channel()

# Declarar la cola
ch.queue_declare(queue='prueba')

# Definir la función callback
def recepcion(ch, method, properties, body):
    print("Se ha recibido el siguiente mensaje: %s" % body)

# Enganchar el callback
ch.basic_consume(recepcion, queue='prueba', no_ack=True)

# Poner en espera
print('Esperando mensajes...')
ch.start_consuming()

Lanzamos el consumidor:
$ python consumidor.py
Esperando mensajes...
Se ha recibido el siguiente mensaje: b'Hola Mundo'

El consumidor ha recibido inmediatamente el mensaje que estaba en la cola, y se ha quedado esperando más. Si en otro terminal lanzamos el productor seguirá recibiéndolos.

Obsérvese que nuestro "Hola Mundo" va precedido de una b, indicando que es de tipo byte. Es una confirmación de lo que dijimos antes: que lo que la cola maneja son datos binarios y podremos poner ahí cualquier cosa que se nos ocurra.

Si miramos en la cola, veremos que ya no hay mensajes en espera:

$ sudo rabbitmqctl list_queues
Listing queues ...
prueba 0
$

Hemos conseguido crear un flujo de mensajes entre dos procesos independientes en la misma máquina. Esto se podría hacer en distintas máquinas, simplemente cambiando el "localhost" por la dirección de la máquina donde reside la cola. Como ves, el potencial que nos ofrece esta herramienta y su capacidad de abstracción es muy grande, con la ventaja de que hay un protocolo subyacente del que no tenemos que preocuparnos.

En futuros artículos, iremos añadiendo más funcionalidad hasta convertir este simple programa en un completo gestor de colas de trabajos.



No hay comentarios:

Publicar un comentario

Expresa tu opinión respetando a los demás y a unas mínimas reglas del lenguaje castellano.