lunes, 19 de diciembre de 2016

Colas en Python con RabbitMQ y Pika (parte 2)


En el anterior artículo de esta serie, aprendimos a crear un flujo de mensajes entre un proceso que denominamos productor y otro que llamamos consumidor. Pero eso dista mucho de lo que podríamos llamar una cola de trabajos. Para eso, necesitamos añadir ciertas funcionalidades.

Una de esas funcionalidades, que es imprescindible para nuestra cola de trabajo, es que los mensajes (que denominaremos trabajos a partir de ahora) deben tener un formato estructurado y un significado concreto, reconocido por todos los procesos implicados.

Dando formato a los mensajes

Vamos a convertir los simples mensajes que manejábamos hasta ahora en auténticas órdenes de trabajo. Para ello, hace falta estructurar la información. Tendremos que definir el objeto Trabajo y luego insertarlo en la cola como mensaje.

Dado que RabbitMQ acepta mensajes binarios, somos libres de elegir el formato que más nos plazca. En cualquier caso, ese formato deberá saber representar ciertos objetos de los que maneja el programa. En programación, la acción de convertir objetos de programa en bloques de datos almacenables o transmisibles se denomina serialización o marshalling. Algunos buenos candidatos para esto en Python son:
  • XML: Es un estándar muy utilizado para representar objetos, independientemente del lenguaje de programación.
  • JSON: Pronunciado "yeison", es similar a XML pero mucho más simple y legible para los humanos. Es menos estándar, pero está implementado en casi todos los lenguajes.
  • YAML: Tiene una filosofía parecida a JSON, pero más flexible. Hay quien considera a JSON un subconjunto de YAML.
  • Pickle: Es un mecanismo desarrollado específicamente para Python. Es complejo y muy potente, pero no es legible por humanos (son datos binarios), lo que dificulta bastante la depuración del código.
En la elección del formato, hay que tener en cuenta principalmente dos factores: la complejidad de los datos que vamos a serializar y la portabilidad de éstos a otros lenguajes. Este segundo punto no es trivial. Si realizamos un sistema de mensajes que esté atado a un lenguaje determinado, perdemos la libertad de maniobra que nos da una plataforma como RabbitMQ.

Otros factores que podemos tener en cuenta son la legibilidad (a efectos de depuración) y la eficiencia (que el coste de tiempo de proceso en la serialización y deserialización sea bajo).

En este caso, me voy a decantar por JSON, que es un formato muy utilizado en Python, que también tiene soporte en casi todos los lenguajes y además es muy legible y bastante eficiente en cuanto a rendimiento.

Su uso básico en Python es muy simple. Para serializar un objeto en formato cadena (legible) usamos el método json.dumps(), y para deserializar una cadena de vuelta al objeto original, json.loads(). El módulo json viene "de fábrica" en cualquier instalación de Python, y además maneja automáticamente todos los tipos nativos del lenguaje, como listas y diccionarios. Su uso es tan común en Python, que los conversores estándar str() y repr() son sospechosamente similares a él.

Define trabajo

Vamos a definir el objeto genérico Trabajo como una operación con unos datos (opcionales) de entrada. Una primera versión de trabajo.py puede ser algo así:

class Trabajo:
    def __init__(self, operacion, entrada=None):
        self.operacion = operacion
        self.entrada = entrada

En esta definición, hemos abusado sin piedad del duck typing de Python para dejar abierta la elección de qué consideramos una operación y una entrada.

La simple existencia de un objeto no lo hace serializable. Si ahora definimos una instancia de Trabajo, no podremos pasarla a JSON y nos lanzará un TypeError. Hay muchas maneras en Python de hacerlo serializable, por ejemplo usando un JSONEncoder. Aquí vamos a ir a lo más simple, que es definir métodos ad hoc para exportar e importar los objetos. La versión serializada será el resultado de exportar todas las variables en forma de diccionario:


import json


class Trabajo:
    def __init__(self, operacion, entrada=None):
        self.operacion = operacion
        self.entrada = entrada

    def exportar(self):
        return json.dumps(self.__dict__)

    @classmethod
    def importar(cls, datos):
        dic = json.loads(datos)
        return cls(dic['operacion'], dic['entrada'])

Con esta implementación, las operaciones posibles son:

Crear un trabajo:
t = Trabajo('hola', 'mundo')

Serializarlo:
cadena = t.exportar()

Deserializarlo:
t2 = Trabajo.importar(cadena)

Esta implementación tan simple sólo va a funcionar si los tipos que usamos para la operación y la entrada son a su vez serializables. Para nuestro ejemplo, nos vale perfectamente.

Con todo esto, vamos a modificar los procesos productor y consumidor del anterior artículo para que manejen el objeto que acabamos de definir.

Para simular un trabajo, definiremos la operación "esperar" con un número aleatorio de segundos.

productor.py

import pika
from trabajo import Trabajo
from random import randint

# Establecer conexión
con = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
ch = con.channel()

# Declarar la cola
ch.queue_declare(queue='trabajos')

# Generar un trabajo
t = Trabajo('esperar', randint(1,10))

# Publicar el mensaje
ch.basic_publish(exchange='', routing_key='trabajos', body=t.exportar().encode('utf-8'))
print("Trabajo enviado.")

# Cerrar conexión
con.close()

consumidor.py

import pika
from trabajo import Trabajo
from proceso import ejecutar

# Establecer conexión
con = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
ch = con.channel()

# Declarar la cola
ch.queue_declare(queue='trabajos')


# Definir la función callback
def procesar(ch, method, properties, body):
    datos = body.decode('utf-8')
    print("Se ha recibido un trabajo: %s" % datos)
    t = Trabajo.importar(datos)
    ejecutar(t)

# Enganchar el callback
ch.basic_consume(procesar, queue='trabajos', no_ack=True)

# Poner en espera
print('Esperando trabajos...')
ch.start_consuming()


Un par de detalles a destacar:

  • Dado que el body de los mensajes ha de ser binario, hacemos la correspondiente conversión desde/hacia UTF-8.
  • Para mantener el código debidamente modular, hemos separado la gestión de los trabajos por un lado y la propia ejecución de los mismos. Para ello hemos creado un módulo proceso.py con una función ejecutar().
proceso.py

import time


def ejecutar(trabajo):
    if trabajo.operacion == 'esperar':
        print('Esperando %d segundos...' % trabajo.entrada)
        time.sleep(trabajo.entrada)
        print('Hecho!')
    else:
        raise NotImplementedError('Operación "%s" no soportada.' % trabajo.operacion)

Con esto, tenemos un precioso consumidor de trabajos:


$ python consumidor.py
Esperando trabajos...
Se ha recibido un trabajo: {"entrada": 5, "operacion": "esperar"}
Esperando 5 segundos...
Hecho!






















No hay comentarios:

Publicar un comentario

Expresa tu opinión respetando a los demás y a unas mínimas reglas del lenguaje castellano.