From d93f0680920b81d968663b8420c4ec2672a79ab9 Mon Sep 17 00:00:00 2001 From: knokko Date: Thu, 5 May 2022 13:50:13 +0200 Subject: [PATCH] Use persistent RabbitMQ connection --- binder/inter_process_producers_consumer.py | 160 +++++++++++++++++++++ binder/rabbitmq.py | 108 ++++++++++++++ binder/websocket.py | 25 ++-- ci-requirements.txt | 2 + project/packages.pip | 4 +- setup.py | 2 + 6 files changed, 286 insertions(+), 15 deletions(-) create mode 100644 binder/inter_process_producers_consumer.py create mode 100644 binder/rabbitmq.py diff --git a/binder/inter_process_producers_consumer.py b/binder/inter_process_producers_consumer.py new file mode 100644 index 00000000..67a852bc --- /dev/null +++ b/binder/inter_process_producers_consumer.py @@ -0,0 +1,160 @@ +import os +import socket +import time + +# This file provides functions to create an inter-process producers-consumer system. The basic +# concept is that the *produce* method can be called to ensure that a given 'payload' is +# 'consumed' in another process: the consumer process. If the consumer process is not yet +# running, it will also be started by the *produce* method. After consuming the message, it +# will wait for payloads of future producers and it will quit when it hasn't received anything +# for a certain amount of time. +# +# This system is used to maintain our RabbitMQ connection: the consumer will create a RabbitMQ +# connection and publish all payloads it gets from the producers to RabbitMQ. This avoids the +# need to create and destroy a RabbitMQ connection for each payload. + + +# The producers and the consumer will run on the same machine, so we should use localhost +""" +The host at which the consumer will listen for incoming producer connections +""" +HOST = '127.0.0.1' + +# We might want to stop hardcoding the port at some point and use some environment variable instead, +# but this is not as easy as it looks: the consumer will run in a different process and thus won't +# be able to read the environment variables, so it will have to be propgated some other way. +""" +The port at which the consumer will listen for incoming producer connections +""" +PORT = 22102 + +""" +The consumer will stop when it hasn't received anything for *MAX_CONSUMER_WAIT_TIME* seconds long +""" +MAX_CONSUMER_WAIT_TIME = 5 + +""" +Retries the given *task* *num_attempts* times until it succeeds (returns True). + +After each failed attempt, it will wait between *min_wait_time* and *max_wait_time* seconds +before doing the next attempt. The time between the first attempt and the second attempt is +the shortest, and the time between consecutive attempts will take longer and longer (but never +longer than *max_wait_time*). + +This behavior is nice because it ensures that the first failure is retried quickly while decreasing +the system load when retrying many times. +""" +def _retry(task, min_wait_time, max_wait_time, num_attempts): + for attempt_counter in range(num_attempts): + if task(): + return + else: + time.sleep(min_wait_time + (max_wait_time - min_wait_time) * (attempt_counter / num_attempts)) + raise RuntimeError('Reached maximum number of retries') + +""" +The number of characters (well... bytes) used to encode the payload length of the producers. + +The producer will write the payload length in the first 8 bytes and write the actual payload +thereafter. This is needed to let the consumer know how long each payload is. + +The value is currently 8 characters long, which allows payloads of at most 10^8 bytes, which +should be more than we will ever need. +""" +NUM_LENGTH_CHARS = 8 + +""" +Tries to start the consumer. + +This method will try to open a TCP server socket at port *PORT*. If that succeeds, it will +call *consumer_setup()*. The result of *consumer_setup()* is the consumer. + +Then, it will listen for incoming socket connections from the producers. For each incoming +connection, it will call *consume(consumer, payload)* where *payload* is the payload received +from the incoming connection. (Currently, it allows only 1 payload per producer connection.) + +When no incoming connections have been made for *MAX_CONSUMER_WAIT_TIME* seconds, the server +socket will be closed and it will call *consumer_shutdown(consumer)*. +""" +def _run_consumer(consumer_setup, consume, consumer_shutdown): + from struct import pack + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: + + # This sockopt will cause the server socket port to be freed instantly after the server + # socket is closed. If we would NOT do this, the entire producer-consumer system would + # hang when a payload is produced soon after a consumer has stopped. + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, pack('ii', 1, 0)) + server_socket.bind((HOST, PORT)) + server_socket.settimeout(MAX_CONSUMER_WAIT_TIME) + consumer = consumer_setup() + server_socket.listen() + + try: + while True: + client_connection, _ = server_socket.accept() + with client_connection: + payload_length = int(str(client_connection.recv(NUM_LENGTH_CHARS), 'utf-8')) + payload = str(client_connection.recv(payload_length), 'utf-8') + consume(consumer, payload) + except socket.timeout: + consumer_shutdown(consumer) + +""" +Calls _run_consumer and catches any OSError it may throw. +""" +def try_run_consumer(consumer_setup, consume, consumer_shutdown): + try: + _run_consumer(consumer_setup, consume, consumer_shutdown) + except OSError: + pass + + +""" +Performs a single attempt to produce the given *payload*. If the consumer can NOT be reached, it +will try to start a new consumer. +""" +def _try_produce(payload, consumer_path, consumer_parameters): + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket: + client_socket.connect((HOST, PORT)) + payload_length = str(len(payload)) + if len(payload_length) > NUM_LENGTH_CHARS: + raise RuntimeError('Payload is too large') + while len(payload_length) < NUM_LENGTH_CHARS: + payload_length = '0' + payload_length + client_socket.sendall(bytes(payload_length, 'utf-8')) + client_socket.sendall(bytes(payload, 'utf-8')) + return True + except ConnectionRefusedError or TimeoutError: + + # If the connection failed, the consumer is probably not running, so we should try to + # start it (in a different process). + consumer_process = os.popen('python3 ' + str(consumer_path.absolute()) + ' ' + consumer_parameters, mode="w") + consumer_process.detach() + return False + + +""" +Ensures that the given *payload* is 'consumed' in another process. The *consumer_path* should +point to a .py file that will call the *try_run_consumer* method of this class. If the consumer +is not yet running, this method will basically execute "python3 consumer_path consumer_parameters* +in a new process. +""" +def produce(payload, consumer_path, consumer_parameters): + + # Since the _try_produce method is rather fragile, we may need to retry it a couple of times. + # It is fragile because: + # + # (1) The consumer may or may not be running. If it is not running, the _try_produce will + # obviously fail, but it will try to create a new consumer, so there is a big chance the + # next attempt will succeed. + # + # (2) When a new consumer is created, the OS scheduler might not start it immediately, so + # the second and third attempt may also fail. + # + # (3) When the existing consumer is quitting, the produce attempt will also fail, and so will + # its attempt to create a new consumer (since the port is still claimed by the quitting + # consumer). This will cause the second attempt to fail as well, and maybe also the third and + # fourth attempt. + _retry(lambda: _try_produce(payload, consumer_path, consumer_parameters), 0.01, 1.0, 10) diff --git a/binder/rabbitmq.py b/binder/rabbitmq.py new file mode 100644 index 00000000..ba87a234 --- /dev/null +++ b/binder/rabbitmq.py @@ -0,0 +1,108 @@ +import pathlib +import pika + +from threading import Semaphore, Thread +from time import sleep + + +""" +The absolute path to this file. This is needed for the inter-process producers-consumer system. +""" +RABBITMQ_CONSUMER_PATH = pathlib.Path(__file__).absolute() + + +""" +This method will be executed on the RabbitMQ thread of the consumer. It will create a RabbitMQ +connection and publish all payloads it receives from the consumer. It will communicate with the +main consumer thread using semaphores. + +The RabbitMQ connection is managed in a separate thread to keep the connection responsive while +the main thread is busy interacting with potentially slow producers. +""" +def _rabbitmq_thread_function(consumer): + from sys import argv + + (produced_payload_semaphore, consumed_payload_semaphore, produced_payload) = consumer + + rabbitmq_username = argv[1] + rabbitmq_password = argv[2] + rabbitmq_host = argv[3] + + connection_credentials = pika.PlainCredentials(rabbitmq_username, rabbitmq_password) + connection_parameters = pika.ConnectionParameters(rabbitmq_host, credentials=connection_credentials) + + connection = pika.BlockingConnection(parameters=connection_parameters) + channel = connection.channel() + + # To communicate a stop message, the length of produced_payload will be set to 0 + while len(produced_payload) == 1: + + # To prevent the connection from being closed due to not responding, we don't wait + # on the semaphore indefinitely, but instead process data events every 0.01 seconds + # or after each publish. + has_task = produced_payload_semaphore.acquire(timeout=0.01) + if has_task: + channel.basic_publish('hightemplar', routing_key='*', body=produced_payload[0]) + consumed_payload_semaphore.release() + connection.process_data_events(0) + + connection.close() + +""" +Creates the consumer and starts the RabbitMQ thread. It will use semaphores to manage +the synchronization and it will use an array of length 1 to communicate the payload +(the array can be shared between threads and the payload can be communicated by changing +its first and only element). +""" +def _consumer_setup(): + produced_payload_semaphore = Semaphore(0) + consumed_payload_semaphore = Semaphore(0) + produced_payload = [None] + + consumer = (produced_payload_semaphore, consumed_payload_semaphore, produced_payload) + + rabbitmq_thread = Thread(target=lambda: _rabbitmq_thread_function(consumer)) + # Make it a daemon thread to prevent it from outliving the main thread in case of + # unexpected errors. (But it should close the RabbitMQ connection properly in normal + # circumstances.) + rabbitmq_thread.setDaemon(True) + rabbitmq_thread.start() + + return consumer + +""" +Consumer a payload from a producer and propagates it to the RabbitMQ thread. It will block until +the payload has been published by the RabbitMQ thread. +""" +def _consume(consumer, payload): + (produced_payload_semaphore, consumed_payload_semaphore, produced_payload) = consumer + + produced_payload[0] = payload + produced_payload_semaphore.release() + consumed_payload_semaphore.acquire() + +""" +Sends the stop signal to the RabbitMQ thread +""" +def _consumer_shutdown(consumer): + (_, _, produced_payload) = consumer + + # This will change the length of produced_payload to zero, which is the stop signal for + # the RabbitMQ thread + produced_payload.pop() + + # Give the RabbitMQ thread some time to observe the signal and cleanly close the connection + sleep(0.1) + +""" +Starts the RabbitMQ consumer +""" +def main(): + from inter_process_producers_consumer import try_run_consumer + + try_run_consumer(_consumer_setup, _consume, _consumer_shutdown) + +# IMPORTANT: only call main() in the actual consumer process. NOT when the main process just +# tries to access *RABBITMQ_CONSUMER_PATH* +if __name__ == '__main__': + main() diff --git a/binder/websocket.py b/binder/websocket.py index 10c9c108..705a8fd6 100644 --- a/binder/websocket.py +++ b/binder/websocket.py @@ -1,6 +1,8 @@ from django.conf import settings from .json import jsondumps +from binder.inter_process_producers_consumer import produce +from binder.rabbitmq import RABBITMQ_CONSUMER_PATH import requests from requests.exceptions import RequestException @@ -29,23 +31,18 @@ def list_rooms_for_user(self, user): return rooms +# WARNING: When this is non-empty, the entire RabbitMQ connection will be mocked +mock_trigger_listeners = [] def trigger(data, rooms): if 'rabbitmq' in getattr(settings, 'HIGH_TEMPLAR', {}): - import pika - from pika import BlockingConnection - - connection_credentials = pika.PlainCredentials(settings.HIGH_TEMPLAR['rabbitmq']['username'], - settings.HIGH_TEMPLAR['rabbitmq']['password']) - connection_parameters = pika.ConnectionParameters(settings.HIGH_TEMPLAR['rabbitmq']['host'], - credentials=connection_credentials) - connection = BlockingConnection(parameters=connection_parameters) - channel = connection.channel() - - channel.basic_publish('hightemplar', routing_key='*', body=jsondumps({ - 'data': data, - 'rooms': rooms, - })) + global mock_trigger_listeners + if len(mock_trigger_listeners) > 0: + for trigger_listener in mock_trigger_listeners: + trigger_listener(jsondumps({ 'data': data, 'rooms': rooms })) + else: + rabbitmq_consumer_args = settings.HIGH_TEMPLAR['rabbitmq']['username'] + ' ' + settings.HIGH_TEMPLAR['rabbitmq']['password'] + ' ' + settings.HIGH_TEMPLAR['rabbitmq']['host'] + produce(jsondumps({ 'data': data, 'rooms': rooms }), RABBITMQ_CONSUMER_PATH, rabbitmq_consumer_args) if getattr(settings, 'HIGH_TEMPLAR_URL', None): url = getattr(settings, 'HIGH_TEMPLAR_URL') try: diff --git a/ci-requirements.txt b/ci-requirements.txt index f02a7f22..7dfd9b03 100644 --- a/ci-requirements.txt +++ b/ci-requirements.txt @@ -8,3 +8,5 @@ codecov coverage django-hijack<3.0.0 openpyxl +fasteners +pika diff --git a/project/packages.pip b/project/packages.pip index 88f0be8c..9bbe012f 100644 --- a/project/packages.pip +++ b/project/packages.pip @@ -3,4 +3,6 @@ Pillow django-request-id psycopg2 requests -openpyxl \ No newline at end of file +openpyxl +fasteners +pika \ No newline at end of file diff --git a/setup.py b/setup.py index 55196440..c7631183 100755 --- a/setup.py +++ b/setup.py @@ -41,6 +41,8 @@ 'Pillow >= 3.2.0', 'django-request-id >= 1.0.0', 'requests >= 2.13.0', + 'fasteners >= 0.17.3', + 'pika >= 1.1.0' ], tests_require=[ 'django-hijack >= 2.1.10, < 3.0.0',