lunes, 26 de diciembre de 2016

Colas en Python con RabbitMQ y Pika (parte 3)



Antes de este, debes leer los dos artículos anteriores de esta serie. En el primero, desarrollamos un simple flujo de mensajes y en el segundo, estructuramos dichos mensajes como trabajos.

Ahora vamos a afrontar otra de las funcionalidades que requiere cualquier cola de trabajos: debe haber un mecanismo para que el proceso consumidor pueda confirmar que un trabajo se ha completado, y en caso contrario, sea reasignado a otro proceso.

Se ruega acuse de recibo

La confirmación, acuse de recibo o acknowledgment, es un concepto usado en prácticamente todos los protocolos (incluso los no digitales) para aquellas operaciones en las que queremos asegurar que se completan correctamente.

En el programa que hemos hecho en esta serie de artículos, hasta ahora le estábamos indicando a la cola de RabbitMQ que no hiciera acuse de recibo, mediante la opción no_ack=True en la llamada a basic_consume(). Esto hacía que la cola no esperase ninguna confirmación y eliminase los trabajos tan pronto como eran despachados al consumidor. Esto es muy eficiente, pero evidentemente corremos el peligro de que algún trabajo quede sin hacer y sea silenciosamente olvidado.

Para hacer el acuse de recibo, eliminamos la opción no_ack (por defecto es False) y añadimos en la función callback una llamada a ch.basic_ack(). Esta llamada avisa a la cola que el trabajo se ha completado y ya puede ser retirado. Para identificar el trabajo al que nos referimos, se incluye el argumento delivery_tag, que a su vez hemos recibido a través del argumento method.

En resumen, el código cambia de esta manera:

# Definir la función callback
def procesar(ch, method, properties, body):
    datos = body.decode('utf-8')
    print("Se ha recibido un trabajo: %s" % datos)
    t = Trabajo.importar(datos)
    ejecutar(t)
    ch.basic_ack(delivery_tag=method.delivery_tag)

# Enganchar el callback
ch.basic_consume(procesar, queue='trabajos')

Si ejecutamos este consumidor, en principio no notaremos ningún cambio, a no ser que miremos en la cola de mensajes y nos demos cuenta que el mensaje se elimina después de pasar el tiempo de espera, no antes.

Desde la línea de comandos de rabbitmqctl, hay una forma para ver no solo los mensajes en espera, sino cuántos han sido despachados y están esperando confirmación:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

En nuestro ejemplo, los posibles valores arrojados por esta consulta pueden ser:

a) No hay trabajos.

Listing queues ...
trabajos 0 0

b) Se ha enviado un trabajo con productor.py.

Listing queues ...
trabajos 1 0

c) Se ha lanzado consumidor.py y está procesando el trabajo.

Listing queues ...
trabajos 0 1

d) El trabajo se ha terminado de procesar, y el acuse de recibo se ha completado.

Listing queues ...
trabajos 0 0

¿No es maravilloso? ¡Seguimos para bingo!

Competencia entre consumidores

El ejemplo anterior es muy trivial, debido a que sólo contamos con un consumidor. La cosa se pone más interesante cuando hay varios consumidores procesando los mensajes, lo que inevitablemente genera una "competencia" entre ellos. Veremos que RabbitMQ puede lidiar con esa competencia de forma brillante.

Para simular una competencia, vamos a modificar nuestros inocentes programas para que fuercen la máquina. Vamos a hacer lo siguiente:
  • Haremos que productor.py genere trabajos de manera constante.
  • Haremos que consumidor.py, además de ejecutar trabajos en espera simulando que trabaja, aleatoriamente falle lanzando una excepción. De esta manera comprobaremos si dicho trabajo se redirige a otro consumidor.
  • Lanzaremos varios productores y varios consumidores a la vez, en distintas terminales.
Nuestros programas se quedan de esta manera:

productor.py

import pika
from trabajo import Trabajo
from random import randint
import time

# Establecer conexión
con = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
ch = con.channel()

# Declarar la cola
ch.queue_declare(queue='trabajos')

# Bucle infinito
while True:
    # Espera aleatoria antes de lanzar el siguiente trabajo
    time.sleep(randint(1,4))

    # Generar un trabajo
    t = Trabajo('esperar', randint(1,10))

    # Publicar el mensaje
    ch.basic_publish(exchange='', routing_key='trabajos', body=t.exportar().encode('utf-8'))
    print("Trabajo enviado: %s" % t.exportar())

consumidor.py

import pika
from trabajo import Trabajo
from proceso import ejecutar

# Establecer conexión
con = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
ch = con.channel()

# Declarar la cola
ch.queue_declare(queue='trabajos')


# Definir la función callback
def procesar(ch, method, properties, body):
    datos = body.decode('utf-8')
    print("Se ha recibido un trabajo: %s" % datos)
    t = Trabajo.importar(datos)
    ejecutar(t)
    ch.basic_ack(delivery_tag=method.delivery_tag)

# Enganchar el callback
ch.basic_consume(procesar, queue='trabajos')

# Poner en espera
print('Esperando trabajos...')
ch.start_consuming()

proceso.py

import time
import random

def ejecutar(trabajo):
    if trabajo.operacion == 'esperar':
        print('Esperando %d segundos...' % trabajo.entrada)
        time.sleep(trabajo.entrada)

        # Simular un fallo el 20% de las veces
        if random.random() < 0.2:
            raise Exception

        print('Hecho!')
    else:
        raise NotImplementedError('Operación "%s" no soportada.' % trabajo.operacion)

Si, con esta configuración, lanzamos, por ejemplo, dos productores y tres consumidores, veremos cómo los trabajos van fluyendo y cuando, eventualmente, uno de los consumidores aborta, los otros siguen procesando con normalidad, sin que se pierda ningún trabajo.

Dos productores y tres consumidores trabajando a la vez. Podemos observar que, a pesar de que los consumidores 1 y 2 han abortado, el segundo sigue en la brecha procesando trabajos.

No hay comentarios:

Publicar un comentario

Expresa tu opinión respetando a los demás y a unas mínimas reglas del lenguaje castellano.