From 2434b5725b718830f2a29dc215839f0441f375c4 Mon Sep 17 00:00:00 2001 From: LePailleurThibault Date: Thu, 27 Oct 2022 14:40:58 +0200 Subject: [PATCH 1/3] Adding a retain flag supported by mqtt in the gateway and status topics in mqtt. Some brokers (like AWS iot core) do not support retain messages. Add option to transport to specify if the retain is supported by broker. New MQTT topics can be used in the case where the mqtt broker does not support retained message and we would like to know the status of the gateways as soon as possible. The concerned topics in the mqtt broker are : gw-request/get_gw_status gw-response/get_gw_status// --- python_transport/tests/test_arguments.py | 12 +++++ .../wirepas_gateway/protocol/mqtt_wrapper.py | 12 ++++- .../wirepas_gateway/protocol/topic_helper.py | 8 ++++ .../wirepas_gateway/transport_service.py | 44 ++++++++++++++++--- .../wirepas_gateway/utils/argument_tools.py | 9 ++++ 5 files changed, 77 insertions(+), 8 deletions(-) diff --git a/python_transport/tests/test_arguments.py b/python_transport/tests/test_arguments.py index 9d2ebc52..e44dc945 100644 --- a/python_transport/tests/test_arguments.py +++ b/python_transport/tests/test_arguments.py @@ -18,6 +18,7 @@ env_vars["WM_SERVICES_MQTT_PERSIST_SESSION"] = True env_vars["WM_SERVICES_MQTT_FORCE_UNSECURE"] = True env_vars["WM_SERVICES_MQTT_ALLOW_UNTRUSTED"] = True +env_vars["WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED"] = True env_vars["WM_GW_BUFFERING_MAX_BUFFERED_PACKETS"] = 1000 env_vars["WM_GW_BUFFERING_MAX_DELAY_WITHOUT_PUBLISH"] = 128 @@ -60,11 +61,13 @@ file_vars["gateway_version"] = env_vars["WM_GW_VERSION"] file_vars["ignored_endpoints_filter"] = env_vars["WM_GW_IGNORED_ENDPOINTS_FILTER"] file_vars["whitened_endpoints_filter"] = env_vars["WM_GW_WHITENED_ENDPOINTS_FILTER"] +file_vars["mqtt_retain_flag_supported"] = env_vars["WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED"] booleans = [ "WM_SERVICES_MQTT_PERSIST_SESSION", "WM_SERVICES_MQTT_FORCE_UNSECURE", "WM_SERVICES_MQTT_ALLOW_UNTRUSTED", + "WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED", ] @@ -139,6 +142,14 @@ def content_tests(settings, vcopy): vcopy["WM_SERVICES_MQTT_ALLOW_UNTRUSTED"] == settings.mqtt_allow_untrusted ) + if "WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED" not in vcopy: + assert settings.mqtt_retain_flag_supported is False + else: + assert ( + vcopy["WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED"] + == settings.mqtt_retain_flag_supported + ) + assert vcopy["WM_SERVICES_MQTT_RECONNECT_DELAY"] == settings.mqtt_reconnect_delay assert ( vcopy["WM_GW_BUFFERING_MAX_BUFFERED_PACKETS"] @@ -199,6 +210,7 @@ def test_defaults(): assert settings.mqtt_persist_session is False assert settings.mqtt_force_unsecure is False assert settings.mqtt_allow_untrusted is False + assert settings.mqtt_retain_flag_supported is False assert settings.mqtt_reconnect_delay == 0 assert settings.buffering_max_buffered_packets == 0 assert settings.buffering_max_delay_without_publish == 0 diff --git a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py index cdf69756..6fbae8a1 100644 --- a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py +++ b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py @@ -43,6 +43,13 @@ def __init__( # Keep track of latest published packet self._publish_monitor = PublishMonitor() + # load special settings for broker compatibility + self.retain_supported = settings.mqtt_retain_flag_supported + self.logger.info( + "MQTT retain flag supported is set to %s", + settings.mqtt_retain_flag_supported + ) + if settings.mqtt_use_websocket: transport = "websockets" self._use_websockets = True @@ -233,7 +240,7 @@ def _get_socket(self): def _set_last_will(self, topic, data): # Set Last wil message - self._client.will_set(topic, data, qos=2, retain=True) + self._client.will_set(topic, data, qos=2, retain=self.retain_supported) def run(self): self.running = True @@ -278,6 +285,9 @@ def _publish_from_wrapper_thread(self, topic, payload, qos, retain): retain: Is it a retain message """ + # Clear retain flag if not supported + retain = retain and self.retain_supported + mid = self._client.publish(topic, payload, qos=qos, retain=retain).mid self._unpublished_mid_set.add(mid) diff --git a/python_transport/wirepas_gateway/protocol/topic_helper.py b/python_transport/wirepas_gateway/protocol/topic_helper.py index a5c26ae4..85f8a8fb 100644 --- a/python_transport/wirepas_gateway/protocol/topic_helper.py +++ b/python_transport/wirepas_gateway/protocol/topic_helper.py @@ -74,6 +74,10 @@ def make_otap_set_target_scratchpad_request_topic(gw_id="+", sink_id="+"): def make_get_gateway_info_request_topic(gw_id): return TopicGenerator._make_request_topic("get_gw_info", [str(gw_id)]) + @staticmethod + def make_get_gw_status_request_topic(): + return TopicGenerator._make_request_topic("get_gw_status", []) + ################## # Response Part ################## @@ -125,6 +129,10 @@ def make_otap_set_target_scratchpad_response_topic(gw_id="+", sink_id="+"): def make_get_gateway_info_response_topic(gw_id): return TopicGenerator._make_response_topic("get_gw_info", [str(gw_id)]) + @staticmethod + def make_get_gw_status_response_topic(gw_id): + return TopicGenerator._make_response_topic("get_gw_status", [str(gw_id)]) + ################## # Event Part ################## diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index 3a0849e5..6ed72a49 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -183,9 +183,12 @@ def __init__(self, settings, **kwargs): self.gw_model = settings.gateway_model self.gw_version = settings.gateway_version + # Does broker support retain flag + self.retain_supported = settings.mqtt_retain_flag_supported + self.whitened_ep_filter = settings.whitened_endpoints_filter - last_will_topic = TopicGenerator.make_status_topic(self.gw_id) + self.status_topic = TopicGenerator.make_status_topic(self.gw_id) last_will_message = wmm.StatusEvent( self.gw_id, wmm.GatewayState.OFFLINE ).payload @@ -194,7 +197,7 @@ def __init__(self, settings, **kwargs): settings, self._on_mqtt_wrapper_termination_cb, self._on_connect, - last_will_topic, + self.status_topic, last_will_message, ) @@ -239,9 +242,9 @@ def _on_mqtt_wrapper_termination_cb(self): def _set_status(self): event_online = wmm.StatusEvent(self.gw_id, wmm.GatewayState.ONLINE) - topic = TopicGenerator.make_status_topic(self.gw_id) - - self.mqtt_wrapper.publish(topic, event_online.payload, qos=1, retain=True) + self.mqtt_wrapper.publish( + self.status_topic, event_online.payload, qos=1, retain=self.retain_supported + ) def _on_connect(self): # Register for get gateway info @@ -283,13 +286,17 @@ def _on_connect(self): topic, self._on_otap_set_target_scratchpad_request_received ) + topic = TopicGenerator.make_get_gw_status_request_topic() + self.mqtt_wrapper.subscribe( + topic, self._on_get_gw_status_request_received + ) + # Register ourself to our status in case someone else (by mistake) # update our status. # It will work only if we are allowed to register for event topic # at broker level - topic = TopicGenerator.make_status_topic(self.gw_id) self.mqtt_wrapper.subscribe( - topic, self._on_own_status_received + self.status_topic, self._on_own_status_received ) self._set_status() @@ -746,6 +753,29 @@ def _on_otap_set_target_scratchpad_request_received( self.mqtt_wrapper.publish(topic, response.payload, qos=2) + @deferred_thread + def _on_get_gw_status_request_received( + self, client, userdata, message + ): + # pylint: disable=unused-argument + res = wmm.GatewayResultCode.GW_RES_OK + self.logger.info("Get gateway status request received") + try: + request = wmm.GetGatewayStatusRequest.from_payload( + message.payload + ) + except wmm.GatewayAPIParsingException as e: + self.logger.error(str(e)) + return + + response = wmm.GetGatewayStatusResponse( + request.req_id, self.gw_id, res, wmm.GatewayState.ONLINE + ) + topic = TopicGenerator.make_get_gw_status_response_topic( + self.gw_id + ) + + self.mqtt_wrapper.publish(topic, response.payload, qos=1) def parse_setting_list(list_setting): """ This function parse ep list specified from setting file or cmd line diff --git a/python_transport/wirepas_gateway/utils/argument_tools.py b/python_transport/wirepas_gateway/utils/argument_tools.py index 2f45b1ec..4e694cdf 100644 --- a/python_transport/wirepas_gateway/utils/argument_tools.py +++ b/python_transport/wirepas_gateway/utils/argument_tools.py @@ -363,6 +363,15 @@ def add_mqtt(self): ), ) + self.mqtt.add_argument( + "--mqtt_retain_flag_supported", + default=os.environ.get("WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED", True), + type=self.str2bool, + nargs="?", + const=True, + help=("Set to true if broker support retain flag"), + ) + def add_buffering_settings(self): """ Parameters used to avoid black hole case """ self.buffering.add_argument( From 3637f84d3402ef2eaabcc48de46c1531c69557cc Mon Sep 17 00:00:00 2001 From: LePailleurThibault Date: Tue, 3 Jan 2023 09:46:38 +0100 Subject: [PATCH 2/3] fix of PR#237 --- .../wirepas_gateway/protocol/mqtt_wrapper.py | 4 ++-- python_transport/wirepas_gateway/transport_service.py | 9 ++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py index 6fbae8a1..145a9045 100644 --- a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py +++ b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py @@ -45,9 +45,9 @@ def __init__( # load special settings for broker compatibility self.retain_supported = settings.mqtt_retain_flag_supported - self.logger.info( + logging.info( "MQTT retain flag supported is set to %s", - settings.mqtt_retain_flag_supported + self.retain_supported ) if settings.mqtt_use_websocket: diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index 6ed72a49..505bde29 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -243,7 +243,10 @@ def _set_status(self): event_online = wmm.StatusEvent(self.gw_id, wmm.GatewayState.ONLINE) self.mqtt_wrapper.publish( - self.status_topic, event_online.payload, qos=1, retain=self.retain_supported + self.status_topic, + event_online.payload, + qos=1, + retain=self.retain_supported ) def _on_connect(self): @@ -759,13 +762,13 @@ def _on_get_gw_status_request_received( ): # pylint: disable=unused-argument res = wmm.GatewayResultCode.GW_RES_OK - self.logger.info("Get gateway status request received") + logging.info("Get gateway status request received") try: request = wmm.GetGatewayStatusRequest.from_payload( message.payload ) except wmm.GatewayAPIParsingException as e: - self.logger.error(str(e)) + logging.error(str(e)) return response = wmm.GetGatewayStatusResponse( From 8cb79b2b8d6d62bd3236a8b2339231badf5f71ed Mon Sep 17 00:00:00 2001 From: LePailleurThibault Date: Tue, 3 Jan 2023 14:44:01 +0100 Subject: [PATCH 3/3] Adding the thread to publish periodically gw status (from gateway_status branch to suppress it and simplify branches) + PR#236 --- python_transport/tests/test_arguments.py | 16 ++-- .../wirepas_gateway/protocol/mqtt_wrapper.py | 14 ++-- .../wirepas_gateway/transport_service.py | 83 +++++++++++++++++-- .../wirepas_gateway/utils/argument_tools.py | 22 ++++- 4 files changed, 113 insertions(+), 22 deletions(-) diff --git a/python_transport/tests/test_arguments.py b/python_transport/tests/test_arguments.py index e44dc945..80cbf43c 100644 --- a/python_transport/tests/test_arguments.py +++ b/python_transport/tests/test_arguments.py @@ -18,7 +18,7 @@ env_vars["WM_SERVICES_MQTT_PERSIST_SESSION"] = True env_vars["WM_SERVICES_MQTT_FORCE_UNSECURE"] = True env_vars["WM_SERVICES_MQTT_ALLOW_UNTRUSTED"] = True -env_vars["WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED"] = True +env_vars["WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED"] = True env_vars["WM_GW_BUFFERING_MAX_BUFFERED_PACKETS"] = 1000 env_vars["WM_GW_BUFFERING_MAX_DELAY_WITHOUT_PUBLISH"] = 128 @@ -61,13 +61,13 @@ file_vars["gateway_version"] = env_vars["WM_GW_VERSION"] file_vars["ignored_endpoints_filter"] = env_vars["WM_GW_IGNORED_ENDPOINTS_FILTER"] file_vars["whitened_endpoints_filter"] = env_vars["WM_GW_WHITENED_ENDPOINTS_FILTER"] -file_vars["mqtt_retain_flag_supported"] = env_vars["WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED"] +file_vars["mqtt_retain_flag_not_supported"] = env_vars["WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED"] booleans = [ "WM_SERVICES_MQTT_PERSIST_SESSION", "WM_SERVICES_MQTT_FORCE_UNSECURE", "WM_SERVICES_MQTT_ALLOW_UNTRUSTED", - "WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED", + "WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED", ] @@ -142,12 +142,12 @@ def content_tests(settings, vcopy): vcopy["WM_SERVICES_MQTT_ALLOW_UNTRUSTED"] == settings.mqtt_allow_untrusted ) - if "WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED" not in vcopy: - assert settings.mqtt_retain_flag_supported is False + if "WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED" not in vcopy: + assert settings.mqtt_retain_flag_not_supported is False else: assert ( - vcopy["WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED"] - == settings.mqtt_retain_flag_supported + vcopy["WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED"] + == settings.mqtt_retain_flag_not_supported ) assert vcopy["WM_SERVICES_MQTT_RECONNECT_DELAY"] == settings.mqtt_reconnect_delay @@ -210,7 +210,7 @@ def test_defaults(): assert settings.mqtt_persist_session is False assert settings.mqtt_force_unsecure is False assert settings.mqtt_allow_untrusted is False - assert settings.mqtt_retain_flag_supported is False + assert settings.mqtt_retain_flag_not_supported is False assert settings.mqtt_reconnect_delay == 0 assert settings.buffering_max_buffered_packets == 0 assert settings.buffering_max_delay_without_publish == 0 diff --git a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py index 145a9045..36d1adf4 100644 --- a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py +++ b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py @@ -43,12 +43,8 @@ def __init__( # Keep track of latest published packet self._publish_monitor = PublishMonitor() - # load special settings for broker compatibility - self.retain_supported = settings.mqtt_retain_flag_supported - logging.info( - "MQTT retain flag supported is set to %s", - self.retain_supported - ) + # Load special settings for broker compatibility + self.retain_supported = not settings.mqtt_retain_flag_not_supported if settings.mqtt_use_websocket: transport = "websockets" @@ -306,6 +302,12 @@ def publish(self, topic, payload, qos=1, retain=False) -> None: self._publish_monitor.on_publish_request() def subscribe(self, topic, cb, qos=2) -> None: + """ Method to subscribe to mqtt topic + Args: + topic: Topic to subscribe to + cb: Callback to call on message reception + qos: Qos to use. + """ logging.debug("Subscribing to: {}".format(topic)) self._client.subscribe(topic, qos) self._client.message_callback_add(topic, cb) diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index 505bde29..4d2363c7 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -156,6 +156,67 @@ def initialize_sink(self, name): sink.cost = self.minimum_sink_cost +class SendGatewayStatusThread(Thread): + def __init__( + self, + period_s, + mqtt_wrapper, + gw_id, + ): + """ + Thread sending periodically Gateway Status to the MQTT broker. + Args: + period_s: the period to send the status + mqtt_wrapper: the mqtt wrapper to publish gateway status on + gw_id: the id of the gateway to be use for MQTT topic + """ + Thread.__init__(self) + + # Daemonize thread to exit with full process + self.daemon = True + + # How often to send status + self.period_s = period_s + self.mqtt_wrapper = mqtt_wrapper + + self.gw_id = gw_id + self.running = False + + def publish_status(self): + """ + Publish the gateway status in the MQTT Broker. + """ + event_online = wmm.StatusEvent(self.gw_id, wmm.GatewayState.ONLINE) + status_topic = TopicGenerator.make_status_topic(self.gw_id) + logging.info("Publishing periodic gateway status!") + self.mqtt_wrapper.publish(status_topic, event_online.payload, qos=1) + + def is_connected(self): + """ + Check if the gateway is connected with the MQTT broker. + """ + return self.mqtt_wrapper.connected + + def run(self): + """ + Main loop that send periodically the status of the gateway to the MQTT Broker. + """ + self.running = True + + while self.running: + if self.is_connected(): + self.publish_status() + + # Wait for period_s seconds + sleep(self.period_s) + + def stop(self): + """ + Stop the periodical sending gateway status thread. + """ + self.running = False + + class TransportService(BusClient): """ Implementation of gateway to backend protocol @@ -184,11 +245,11 @@ def __init__(self, settings, **kwargs): self.gw_version = settings.gateway_version # Does broker support retain flag - self.retain_supported = settings.mqtt_retain_flag_supported + self.retain_supported = not settings.mqtt_retain_flag_not_supported self.whitened_ep_filter = settings.whitened_endpoints_filter - self.status_topic = TopicGenerator.make_status_topic(self.gw_id) + self.make_status_topic = TopicGenerator.make_status_topic(self.gw_id) last_will_message = wmm.StatusEvent( self.gw_id, wmm.GatewayState.OFFLINE ).payload @@ -197,7 +258,7 @@ def __init__(self, settings, **kwargs): settings, self._on_mqtt_wrapper_termination_cb, self._on_connect, - self.status_topic, + self.make_status_topic, last_will_message, ) @@ -230,6 +291,18 @@ def __init__(self, settings, **kwargs): else: self.data_event_id = None + if not self.retain_supported: + logging.info("Gateway status is sent every %d seconds", settings.mqtt_send_gateway_status_period_s) + + self.send_status_thread = SendGatewayStatusThread( + settings.mqtt_send_gateway_status_period_s, + self.mqtt_wrapper, + self.gw_id, + ) + self.send_status_thread.start() + else: + logging.info("Retain flag is supported by the gateway") + def _on_mqtt_wrapper_termination_cb(self): """ Callback used to be informed when the MQTT wrapper has exited @@ -243,7 +316,7 @@ def _set_status(self): event_online = wmm.StatusEvent(self.gw_id, wmm.GatewayState.ONLINE) self.mqtt_wrapper.publish( - self.status_topic, + self.make_status_topic, event_online.payload, qos=1, retain=self.retain_supported @@ -299,7 +372,7 @@ def _on_connect(self): # It will work only if we are allowed to register for event topic # at broker level self.mqtt_wrapper.subscribe( - self.status_topic, self._on_own_status_received + self.make_status_topic, self._on_own_status_received ) self._set_status() diff --git a/python_transport/wirepas_gateway/utils/argument_tools.py b/python_transport/wirepas_gateway/utils/argument_tools.py index 4e694cdf..417f7181 100644 --- a/python_transport/wirepas_gateway/utils/argument_tools.py +++ b/python_transport/wirepas_gateway/utils/argument_tools.py @@ -364,12 +364,28 @@ def add_mqtt(self): ) self.mqtt.add_argument( - "--mqtt_retain_flag_supported", - default=os.environ.get("WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED", True), + "--mqtt_retain_flag_not_supported", + default=os.environ.get("WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED", False), type=self.str2bool, nargs="?", const=True, - help=("Set to true if broker support retain flag"), + help=( + "Set to true if broker support retain flag" + "Gateway is sending its status periodically only" + "if retain option is not supported" + "otherwise status is retained in the MQTT" + ), + ) + + self.mqtt.add_argument( + "--mqtt_send_gateway_status_period_s", + default=os.environ.get("WM_SERVICES_MQTT_SEND_GATEWAY_STATUS_PERIOD_S", 20), + action="store", + type=self.str2int, + help=( + "Period in seconds of a gateway to send its status" + "to the backend if retain option is not supported" + ), ) def add_buffering_settings(self):